Building a Security Log Correlator: Fast Incident Detection
3:47 AM - When Log Correlation Stopped a Ransomware Attack
The Slack alert woke up Sarah Chen, Senior Security Engineer at FinSecure Technologies. "CRITICAL: 47 failed login attempts on admin_svc_backup account" - her automated correlation tool had flagged something.
Most analysts would have dismissed it. Failed logins happen constantly. But Sarah’s log correlator had done something crucial: it had correlated authentication failures with a successful login 90 seconds later - and that success came from an IP address in Belarus, not their Virginia data center.
She pulled up the full incident timeline:
03:42:15 - admin_svc_backup: login FAILURE (IP: 178.248.xxx.xxx)
03:42:18 - admin_svc_backup: login ...
Building a Security Log Correlator: Fast Incident Detection
3:47 AM - When Log Correlation Stopped a Ransomware Attack
The Slack alert woke up Sarah Chen, Senior Security Engineer at FinSecure Technologies. "CRITICAL: 47 failed login attempts on admin_svc_backup account" - her automated correlation tool had flagged something.
Most analysts would have dismissed it. Failed logins happen constantly. But Sarah’s log correlator had done something crucial: it had correlated authentication failures with a successful login 90 seconds later - and that success came from an IP address in Belarus, not their Virginia data center.
She pulled up the full incident timeline:
03:42:15 - admin_svc_backup: login FAILURE (IP: 178.248.xxx.xxx)
03:42:18 - admin_svc_backup: login FAILURE (IP: 178.248.xxx.xxx)
[... 45 more failures in 3 minutes ...]
03:45:47 - admin_svc_backup: login SUCCESS (IP: 178.248.xxx.xxx) ⚠️
03:46:12 - admin_svc_backup: privilege_change to "domain_admin" ⚠️
03:46:34 - admin_svc_backup: file_access /etc/shadow ⚠️
03:47:01 - admin_svc_backup: file_access /backup/customer_data/* ⚠️
The attacker had gotten in. They’d brute forced a backup service account (weak password: "Backup2023!"), immediately escalated to domain admin, and started accessing customer financial data. Sarah’s correlation tool had caught them 14 minutes into what would have become a $200M ransomware attack.
By 4:15 AM, the compromised account was locked, the attacker’s session terminated, and the security team was analyzing how they’d obtained the credential list. The breach was contained before a single customer record was exfiltrated.
Without automated log correlation, this attack would have been discovered during Monday’s routine log review - 72 hours and millions of stolen records later.
Note: This scenario combines elements from documented breach patterns including credential stuffing attacks against service accounts. The attack timeline and detection methods reflect real SOC incident response procedures.
Introduction
This is the power of effective log correlation. As Security Engineers, we spend significant time analyzing logs to detect security incidents. But when you’re dealing with thousands of authentication attempts and security events per minute, efficiency matters. A lot.
In this post, you’ll build the exact type of log correlator that caught Sarah’s attack. This isn’t a theoretical exercise - it’s the same pattern-detection logic that SOC teams use to stop real attacks in progress.
In this post, I’ll walk you through building a production-grade log correlator that efficiently processes authentication and security logs to detect incidents. This is exactly the type of challenge you’ll face in Security Engineering interviews - and the exact skill that stops breaches like Sarah’s ransomware attack.
What you’ll learn:
- Parsing heterogeneous log formats (CSV and JSON) securely
- Building efficient data structures for log correlation (the key to real-time detection)
- Detecting common attack patterns (brute force, privilege escalation, anomalous access)
- Applying secure coding principles to systems programming
- Why this matters: These are the exact patterns that security teams use to catch real breaches before they cause catastrophic damage
Prerequisites:
- Intermediate Python knowledge
- Basic understanding of security concepts
- Familiarity with time complexity analysis
🎯 How This Exercise Relates to Production Security Work
"Will I actually write Python log parsers in my Security Engineering job?"
The honest answer: Probably not. In production, you’ll use SIEM platforms (Splunk, Azure Sentinel, ELK Stack) that handle log ingestion and parsing automatically. You’ll write detection rules in query languages like SPL or KQL, not Python parsers.
So why build this from scratch?
What You’re Actually Learning (The Real Value)
1. Detection Logic & Pattern Recognition ✅ 100% Transferable
- The brute force detection logic (5+ failures → success within window) is exactly how SIEM rules work
- Privilege escalation patterns (login → immediate sudo) are standard SOC detections
- These concepts transfer directly to writing Splunk/Sentinel queries
2. Understanding What’s Happening Under the Hood ✅ Critical for Interviews When a company asks: "How would you detect credential stuffing in authentication logs?"
- Weak answer: "I’d use Splunk"
- Strong answer: "I’d correlate failed login attempts by source IP and user, flagging when we see 5+ failures within a 5-minute window followed by success. The key challenge is the time window calculation and handling distributed attacks from botnets..."
You need to understand the logic to explain it, optimize it, and debug it.
3. Foundation for SIEM Work ✅ Career-Critical When you start writing detection rules in production:
# Real Splunk query for brute force detection
index=auth action=login status=failure
| stats count by user src_ip
| where count > 5
| join user [search index=auth action=login status=success]
You’ll understand why this query works because you implemented the logic yourself. You’ll know:
- Why the time window matters
- What makes queries slow
- How to optimize detection rules
- When correlation is feasible vs. too expensive
4. Interview Preparation ✅ Directly Tested Security Engineering interviews commonly include:
- "Build a tool to detect X pattern in these logs" ← This exercise
- "How would you correlate authentication and security events?" ← You can explain it
- "What’s the time complexity of your approach?" ← You understand the trade-offs
According to the Team Blind Security Engineering guide, log correlation challenges appear frequently in technical interviews.
What’s Different in Production
| This Exercise | Production Reality |
|---|---|
| Parse CSV/JSON manually | Logs auto-ingested by forwarders (Filebeat, Splunk Universal Forwarder) |
| Process files in memory | SIEM indexes billions of events in distributed databases |
| Write Python detection logic | Write SPL/KQL/YARA-L detection rules |
| Return structured results | Trigger alerts → tickets → incident response workflows |
| Test with 50K events | Process millions of events per day across clusters |
Where You WILL Write Python in Security Engineering
Even though you won’t write log parsers, Python is essential for:
- Security automation: Scripts for repetitive SOC tasks
- API integrations: Pulling threat intel, updating firewalls
- Custom tooling: Gaps your SIEM can’t fill
- Threat hunting: Processing forensic dumps, analyzing malware
- Detection engineering: Testing and validating SIEM rules
The Bottom Line
This exercise teaches you:
- ✅ How log correlation fundamentally works
- ✅ Attack pattern recognition (brute force, privilege escalation)
- ✅ Why time windows and thresholds matter
- ✅ The signal-to-noise challenge (5% attacks in 95% legitimate traffic)
You won’t use this exact code in production, but you’ll use these concepts every single day.
Think of it like learning to drive: You practice fundamentals in an empty parking lot before driving on highways with traffic. This exercise is your parking lot - learning correlation fundamentals before working with enterprise SIEM platforms.
What These Logs Represent in the Real World
Before we dive in, let’s be clear: these aren’t toy examples. The log formats and attack patterns you’ll work with mirror actual production security logs that SOC analysts investigate daily.
auth.log - Authentication Events
In production, this data comes from:
- Linux/Unix systems:
/var/log/auth.log,/var/log/secure(SSH, sudo attempts) - Windows: Security Event Log (Event IDs 4624/4625)
- Cloud platforms: AWS CloudTrail, Azure Activity Logs, GCP Audit Logs
- Applications: Web app logins, VPN gateways, corporate SSO systems
Example real scenario: An attacker in Eastern Europe attempts 10,000 password combinations against admin accounts. Each attempt generates an auth.log entry. When they succeed with "Password123!", that’s your signal to investigate.
security.log - Post-Authentication Activity
In production, this data comes from:
- SIEM systems: Log aggregation and correlation platforms
- EDR tools: Endpoint detection and response platforms
- File integrity monitoring: OSSEC, Tripwire, Linux auditd
- Cloud APIs: AWS API calls, Azure Resource Manager operations
Example real scenario: After successful login, an attacker immediately accesses /etc/shadow (password hashes), escalates to root, and creates backdoor accounts. Each action generates a security.log entry. Your correlator detects this suspicious sequence.
Attack Patterns You’re Detecting
The test cases simulate real security incidents:
- Brute Force (Test 031+): Similar to documented attacks where attackers try thousands of password combinations
- Privilege Escalation (Test 046+): Mirrors real breaches where compromised credentials led to immediate admin access from foreign IPs
- Distributed Attacks (Test 043): Reflects ongoing account takeover attempts from 500+ bot IPs
These aren’t hypotheticals - they’re based on published incident reports and MITRE ATT&CK patterns.
📊 ABOUT THE TEST DATA:
The test files for this exercise contain production-realistic volumes:
- 53,556 total log entries across 101 test cases
- Real signal-to-noise ratios: 5-20% attacks hidden in 80-95% legitimate traffic
- Actual breach patterns: Based on documented real-world incidents
- Scalable testing: From 50 entries (small) to 25,000 entries (enterprise scale)
Why this matters: Testing with 3 log entries proves nothing. Testing with 10,000 entries where attacks are buried in normal traffic? That proves your code works in production.
The Problem
You’re investigating a potential security incident. You have two log sources:
- Authentication Log (
auth.log) - CSV format tracking all login attempts - Security Events Log (
security.log) - JSON format tracking file access, privilege changes, and alerts
You need to:
- Correlate events by user efficiently
- Detect brute force attacks
- Identify suspicious privilege escalations
- Flag anomalous access patterns
Sounds straightforward, right? Let’s dig into the details.
Log Formats
Authentication Log (CSV):
timestamp,user_id,action,ip_address,status,session_id
2024-01-15T10:23:45Z,user123,login,192.168.1.50,success,sess_abc123
2024-01-15T10:24:01Z,user456,login,203.0.113.42,failure,sess_def456
Security Events Log (JSON):
{"timestamp": "2024-01-15T10:24:12Z", "user_id": "user123", "event_type": "file_access", "resource": "/etc/passwd", "session_id": "sess_abc123", "ip_address": "192.168.1.50"}
{"timestamp": "2024-01-15T10:25:30Z", "user_id": "user456", "event_type": "privilege_change", "resource": "sudo_access", "session_id": "sess_def456", "ip_address": "203.0.113.99"}
Input & Output Examples
Sample Input Files
auth.log (50 entries showing brute force attack):
timestamp,user_id,action,ip_address,status,session_id
2024-01-15T10:00:00Z,attacker01,login,203.0.113.42,failure,sess_fail_01
2024-01-15T10:00:48Z,attacker01,login,203.0.113.42,failure,sess_fail_02
2024-01-15T10:01:36Z,attacker01,login,203.0.113.42,failure,sess_fail_03
2024-01-15T10:02:24Z,attacker01,login,203.0.113.42,failure,sess_fail_04
2024-01-15T10:03:12Z,attacker01,login,203.0.113.42,failure,sess_fail_05
2024-01-15T10:04:00Z,attacker01,login,203.0.113.42,success,sess_success_01
2024-01-15T10:05:00Z,normal_user01,login,192.168.1.100,success,sess_norm_01
2024-01-15T10:06:00Z,normal_user02,login,192.168.1.101,success,sess_norm_02
...
(42 more normal user logins)
security.log (75 entries showing post-compromise activity):
{"timestamp": "2024-01-15T10:04:30Z", "user_id": "attacker01", "event_type": "file_access", "resource": "/etc/shadow", "session_id": "sess_success_01", "ip_address": "203.0.113.42"}
{"timestamp": "2024-01-15T10:04:45Z", "user_id": "attacker01", "event_type": "privilege_change", "resource": "sudo_access", "session_id": "sess_success_01", "ip_address": "203.0.113.42"}
{"timestamp": "2024-01-15T10:05:00Z", "user_id": "attacker01", "event_type": "file_access", "resource": "/root/.ssh/authorized_keys", "session_id": "sess_success_01", "ip_address": "203.0.113.42"}
{"timestamp": "2024-01-15T10:05:30Z", "user_id": "normal_user01", "event_type": "file_access", "resource": "/home/normal_user01/report.pdf", "session_id": "sess_norm_01", "ip_address": "192.168.1.100"}
{"timestamp": "2024-01-15T10:06:15Z", "user_id": "normal_user02", "event_type": "file_access", "resource": "/home/normal_user02/data.csv", "session_id": "sess_norm_02", "ip_address": "192.168.1.101"}
...
(70 more normal user activities)
Expected Output
1. After Parsing:
# parse_auth_log() returns:
[
{"timestamp": "2024-01-15T10:00:00Z", "user_id": "attacker01", "action": "login",
"ip_address": "203.0.113.42", "status": "failure", "session_id": "sess_fail_01"},
{"timestamp": "2024-01-15T10:00:48Z", "user_id": "attacker01", "action": "login",
"ip_address": "203.0.113.42", "status": "failure", "session_id": "sess_fail_02"},
# ... more events
]
# parse_security_log() returns:
[
{"timestamp": "2024-01-15T10:04:30Z", "user_id": "attacker01", "event_type": "file_access",
"resource": "/etc/shadow", "session_id": "sess_success_01", "ip_address": "203.0.113.42"},
# ... more events
]
2. After Correlation:
# correlate_events() returns:
{
"attacker01": {
"auth_events": [
{"timestamp": "2024-01-15T10:00:00Z", "status": "failure", ...},
{"timestamp": "2024-01-15T10:00:48Z", "status": "failure", ...},
{"timestamp": "2024-01-15T10:01:36Z", "status": "failure", ...},
{"timestamp": "2024-01-15T10:02:24Z", "status": "failure", ...},
{"timestamp": "2024-01-15T10:03:12Z", "status": "failure", ...},
{"timestamp": "2024-01-15T10:04:00Z", "status": "success", ...}
],
"security_events": [
{"timestamp": "2024-01-15T10:04:30Z", "resource": "/etc/shadow", ...},
{"timestamp": "2024-01-15T10:04:45Z", "resource": "sudo_access", ...},
{"timestamp": "2024-01-15T10:05:00Z", "resource": "/root/.ssh/authorized_keys", ...}
]
},
"normal_user01": {
"auth_events": [
{"timestamp": "2024-01-15T10:05:00Z", "status": "success", ...}
],
"security_events": [
{"timestamp": "2024-01-15T10:05:30Z", "resource": "/home/normal_user01/report.pdf", ...}
]
},
# ... more users
}
3. After Detection:
# detect_brute_force("attacker01", user_events) returns:
True # 5 failures within 5 minutes, then success
# detect_brute_force("normal_user01", user_events) returns:
False # No attack pattern detected
# detect_anomalous_access("attacker01", user_events) returns:
True # Accessed /etc/shadow (sensitive file)
# detect_anomalous_access("normal_user01", user_events) returns:
False # Only accessed normal files
4. Final Incident Report:
# generate_incident_report("attacker01", user_events) returns:
{
"user_id": "attacker01",
"auth_events": [6 events], # 5 failures + 1 success
"security_events": [3 events], # 3 suspicious file accesses
"incident_flags": ["brute_force", "anomalous_access"]
}
# When printed:
"""
User: attacker01
Auth Events: 6
Security Events: 3
Incident Flags: ['brute_force', 'anomalous_access']
Timeline:
2024-01-15T10:00:00Z - Login attempt from 203.0.113.42 - FAILURE
2024-01-15T10:00:48Z - Login attempt from 203.0.113.42 - FAILURE
2024-01-15T10:01:36Z - Login attempt from 203.0.113.42 - FAILURE
2024-01-15T10:02:24Z - Login attempt from 203.0.113.42 - FAILURE
2024-01-15T10:03:12Z - Login attempt from 203.0.113.42 - FAILURE
2024-01-15T10:04:00Z - Login attempt from 203.0.113.42 - SUCCESS ⚠️
2024-01-15T10:04:30Z - File access: /etc/shadow ⚠️
2024-01-15T10:04:45Z - Privilege change: sudo_access ⚠️
2024-01-15T10:05:00Z - File access: /root/.ssh/authorized_keys ⚠️
⚠️ SECURITY INCIDENT DETECTED ⚠️
- Brute force attack: 5 failed logins followed by success
- Anomalous access: Accessed sensitive system files
- Privilege escalation: Gained sudo access immediately after login
"""
Running Your Implementation
$ python3 log_correlator.py
Parsing logs...
Parsed 50 auth events
Parsed 75 security events
Correlating events...
Correlated events for 43 users
Analyzing users...
⚠️ INCIDENT: attacker01 - brute_force, anomalous_access, privilege_escalation
✅ NORMAL: normal_user01 - no incidents detected
✅ NORMAL: normal_user02 - no incidents detected
...
Incident Summary:
- Total users: 43
- Users with incidents: 1
- Brute force attacks detected: 1
- Privilege escalations detected: 1
- Anomalous file access detected: 1
This is exactly what SOC analysts see when investigating alerts in SIEM systems!
Detection Criteria - What Counts as an Attack?
Before you start coding, you need to know exactly what your detection functions should flag. Here are the precise criteria:
🚨 Brute Force Attack Detection
Where to look in the data structure:
user_events = {
"user123": {
"auth_events": [ # ← CHECK THIS for login patterns
{"timestamp": "10:00:00Z", "status": "failure", "ip_address": "203.0.113.42", ...},
{"timestamp": "10:00:48Z", "status": "failure", "ip_address": "203.0.113.42", ...},
{"timestamp": "10:01:36Z", "status": "failure", "ip_address": "203.0.113.42", ...},
{"timestamp": "10:02:24Z", "status": "failure", "ip_address": "203.0.113.42", ...},
{"timestamp": "10:03:12Z", "status": "failure", "ip_address": "203.0.113.42", ...},
{"timestamp": "10:04:00Z", "status": "success", "ip_address": "203.0.113.42", ...}
],
"security_events": [...] # ← NOT used for brute force detection
}
}
When to flag as brute force:
- 5 or more failed login attempts for the same user
- All failures must occur within a 5-minute time window
- A successful login must occur within the SAME 5-minute window
- Record the EXACT successful login event within that window
CRITICAL IMPLEMENTATION NOTE:
- The 5-minute window is measured from the first failure to the last failure in the sequence
- Success must be within this same window to be detected as brute force
- If success occurs OUTSIDE the window → Return
None(don’t detect) - If success occurs WITHIN the window → Return attack details with that exact success event
- This keeps the logic clean: either detect a complete attack (with success), or don’t detect at all
Example 1: Attack DETECTED (Success Within Window)
10:00:00 - user123 login FAILURE (IP: 203.0.113.42) ← Window starts
10:00:48 - user123 login FAILURE (IP: 203.0.113.42)
10:01:36 - user123 login FAILURE (IP: 203.0.113.42)
10:02:24 - user123 login FAILURE (IP: 203.0.113.42)
10:03:12 - user123 login FAILURE (IP: 203.0.113.42) ← 5th failure
10:04:00 - user123 login SUCCESS (IP: 203.0.113.42) ← Within 5-min window!
(4 min from first failure)
Time window: 10:00:00 to 10:05:00 (5 minutes)
Detection result: ✅ DETECTED - 5 failures + success within window
Return: Attack dict with success_event = the 10:04:00 login event
Example 2: Attack NOT DETECTED (Success Outside Window)
10:00:00 - user123 login FAILURE (IP: 203.0.113.42) ← Window starts
10:01:00 - user123 login FAILURE (IP: 203.0.113.42)
10:02:00 - user123 login FAILURE (IP: 203.0.113.42)
10:03:00 - user123 login FAILURE (IP: 203.0.113.42)
10:04:00 - user123 login FAILURE (IP: 203.0.113.42) ← 5th failure
← Window ends at 10:05:00
10:07:00 - user123 login SUCCESS (IP: 203.0.113.42) ← Outside window!
(7 min from first failure)
Time window: 10:00:00 to 10:05:00 (5 minutes)
Detection result: ❌ NOT DETECTED - Success outside window
Return: None (attack attempt blocked or unrelated success)
When NOT to flag (false positive examples):
# Only 4 failures (below threshold)
10:00:00 - user123 login FAILURE
10:00:30 - user123 login FAILURE
10:01:00 - user123 login FAILURE
10:01:30 - user123 login FAILURE ← Only 4 failures
10:02:00 - user123 login SUCCESS
Detection result: ❌ Do NOT flag (below 5-failure threshold)
# Failures outside time window
10:00:00 - user123 login FAILURE
10:07:00 - user123 login FAILURE ← More than 5 minutes apart
10:14:00 - user123 login FAILURE
10:21:00 - user123 login FAILURE
10:28:00 - user123 login FAILURE
10:35:00 - user123 login SUCCESS
Detection result: ❌ Do NOT flag (failures too spread out)
Handling Multiple Attacks on Same User (detect_brute_force)
What if a user experiences multiple separate brute force attacks?
In production, attackers often try multiple times - they might brute force an account in the morning, get locked out, then try again in the afternoon with a different password list. Your detect_brute_force() function should detect ALL attacks.
Example: Two separate attacks on same user:
# Morning attack (Attack 1)
08:00:00 - admin login FAILURE (IP: 203.0.113.42)
08:01:00 - admin login FAILURE (IP: 203.0.113.42)
08:02:00 - admin login FAILURE (IP: 203.0.113.42)
08:03:00 - admin login FAILURE (IP: 203.0.113.42)
08:04:00 - admin login FAILURE (IP: 203.0.113.42)
08:05:00 - admin login SUCCESS (IP: 203.0.113.42) ← Attack 1 succeeded
# Normal activity for several hours...
# Afternoon attack (Attack 2) - more aggressive
14:00:00 - admin login FAILURE (IP: 45.134.142.XX)
14:00:30 - admin login FAILURE (IP: 45.134.142.XX)
14:01:00 - admin login FAILURE (IP: 45.134.142.XX)
14:01:30 - admin login FAILURE (IP: 45.134.142.XX)
14:02:00 - admin login FAILURE (IP: 45.134.142.XX)
14:02:30 - admin login FAILURE (IP: 45.134.142.XX)
14:03:00 - admin login FAILURE (IP: 45.134.142.XX)
14:03:30 - admin login FAILURE (IP: 45.134.142.XX)
14:04:00 - admin login SUCCESS (IP: 45.134.142.XX) ← Attack 2 succeeded
Your detect_brute_force() function should return:
{
"user_id": "admin",
"attacks": [
{
"failure_count": 5,
"failure_chain": [/* 5 morning failures */],
"success_event": {/* 08:05:00 success */},
"attack_duration_seconds": 300
},
{
"failure_count": 8,
"failure_chain": [/* 8 afternoon failures */],
"success_event": {/* 14:04:00 success */},
"attack_duration_seconds": 240
}
],
"total_attacks": 2
}
This is the return value from calling:
result = detect_brute_force("admin", user_events)
# result contains the structure shown above
Why this matters for SOC: A single attack might be an opportunistic attacker. Multiple attacks in one day? That’s a targeted, persistent threat that needs immediate investigation. Your correlator provides the complete attack timeline.
Implementation note for detect_brute_force(): If you detect only ONE attack, you can return either the single attack structure OR the multiple attacks structure with one item in the array - both are valid. But if you detect multiple attacks, you MUST use the multiple attacks structure shown above.
🔓 Privilege Escalation Detection
Where to look in the data structure:
user_events = {
"user456": {
"auth_events": [ # ← CHECK THIS for login events
{"timestamp": "...", "status": "success", "ip_address": "...", ...}
],
"security_events": [ # ← CHECK THIS for privilege_change events
{"timestamp": "...", "event_type": "privilege_change", "resource": "sudo_access", ...}
]
}
}
# Your code needs to check BOTH lists:
auth_events = user_events["user456"]["auth_events"] # Step 1: Find login
security_events = user_events["user456"]["security_events"] # Step 2: Find privilege change
When to flag as privilege escalation:
Core detection criteria (ALWAYS check these):
- User has a successful login (any IP address)
- Within 10 minutes of that login, there’s a security event with
event_type: "privilege_change" - The
resourcefield indicates elevated access: "sudo_access", "admin_role", "root_access", "elevated_privileges", "administrator"
Why this is suspicious: Normal users login and work for hours before needing elevated privileges. Attackers login and immediately escalate to maximize their access time.
Optional enhancement (BONUS, not required): You can make detection MORE sophisticated by also checking for:
- Failed login attempts before the successful login (indicates credential guessing)
- Login from different IP than usual (indicates account compromise)
- Multiple privilege escalations in short time (indicates automated attack)
But for the basic exercise, just detect: Login → Privilege change within 10 minutes
Example attack pattern:
# auth.log
10:02:00 - user456 login SUCCESS (IP: 203.0.113.99) ← User logs in
# security.log
10:03:30 - user456 privilege_change: "sudo_access" (IP: 203.0.113.99) ← Escalates 90 seconds later!
Detection result: ✅ Flag as privilege escalation (privilege gained within 10 min of login)
Why suspicious: Normal users work for hours before needing sudo. Immediate escalation indicates attacker maximizing their access window.
Real-world breach pattern:
- Attacker obtained credentials
- Used credentials to login via VPN
- Immediately escalated to admin privileges ← This pattern!
- This is what your code should detect
Additional example with failed attempts (more suspicious):
# auth.log
10:00:00 - user456 login FAILURE (IP: 192.168.1.100) ← Failed from corporate
10:02:00 - user456 login SUCCESS (IP: 203.0.113.99) ← Success from external
# security.log
10:03:30 - user456 privilege_change: "sudo_access" ← Immediate escalation
Detection result: ✅ Flag as privilege escalation (even more suspicious with failed attempts first)
When NOT to flag:
# Normal admin workflow
10:00:00 - admin_user login SUCCESS (IP: 192.168.1.100)
14:30:00 - admin_user privilege_change: "sudo_access" ← 4.5 hours later (outside 10-min window)
Detection result: ❌ Do NOT flag (privilege change happened hours later = normal work pattern)
# User already logged in, no privilege change
10:00:00 - user456 login SUCCESS
10:05:00 - (no privilege_change event)
Detection result: ❌ Do NOT flag (no privilege escalation occurred)
🚨 CRITICAL IMPLEMENTATION REQUIREMENT - Filtering Login Sessions
YOU MUST ONLY RETURN LOGIN SESSIONS THAT HAVE PRIVILEGE ESCALATIONS.
This is a common mistake that causes test failures. Read carefully:
The Common Mistake:
Students often return ALL login sessions, even those with zero privilege escalations.
Example of the problem:
- User logs in 5 times during normal work
- Only 1 of those logins has a privilege escalation within 10 minutes
- Wrong approach: Returns all 5 login sessions in the result
- Correct approach: Returns only the 1 session that had an escalation
Why this matters:
login_sessionsshould contain ONLY sessions where privilege escalation occurredtotal_login_sessionsrepresents "logins that led to escalation" (not total logins)- If user logged in 10 times but only 2 had escalations →
total_login_sessions= 2
What You Need to Implement:
Before adding a login session to your results, check if it actually has any privilege escalations. Only include sessions where escalations occurred within the time window.
Example Timeline:
User's login timeline:
08:00 - login SUCCESS → no privilege change → ❌ Don't include
09:00 - login SUCCESS → no privilege change → ❌ Don't include
10:00 - login SUCCESS → privilege_change at 10:02 → ✅ Include this session!
11:00 - login SUCCESS → no privilege change → ❌ Don't include
12:00 - login SUCCESS → privilege_change at 12:01 → ✅ Include this session!
Your return value should have:
- login_sessions array with 2 items (10:00 and 12:00 sessions)
- total_login_sessions = 2
- total_escalations = 2
Key principle: Filter your results to include only login sessions that have associated privilege escalations. Empty sessions don’t belong in a security report.
ONLY include login sessions that have privilege escalations. Do NOT include login sessions with zero escalations.
This means:
- If a user logs in 5 times during the day
- Only 1 of those logins has a privilege escalation within 10 minutes
- Your
login_sessionsarray should contain ONLY that 1 session total_login_sessionsshould be 1 (not 5)
Example of CORRECT filtering:
# WRONG - includes ALL login sessions:
for login in auth_events:
if login['status'] == 'success':
session = find_escalations_after_login(login)
session['escalation_count'] = len(session['privilege_escalations'])
login_sessions.append(session) # ❌ Adds session even if escalation_count is 0
# CORRECT - only includes sessions WITH escalations:
for login in auth_events:
if login['status'] == 'success':
session = find_escalations_after_login(login)
session['escalation_count'] = len(session['privilege_escalations'])
if session['escalation_count'] > 0: # ✅ Only add if there ARE escalations
login_sessions.append(session)
Why this matters:
total_login_sessionsrepresents "logins that led to privilege escalation"- NOT "total number of logins the user had"
- Only suspicious sessions should be in the report
What counts as "privilege_change": Look for security events where:
-
event_typefield equals"privilege_change" -
resourcefield contains indicators like: -
"sudo_access"
-
"admin_role"
-
"root_access"
-
"elevated_privileges"
-
"administrator"
📁 Anomalous File Access Detection
Where to look in the data structure:
user_events = {
"user123": {
"auth_events": [...], # ← NOT used for anomalous access
"security_events": [...] # ← CHECK THIS for file_access events
}
}
When to flag as anomalous access: User accessed ANY of these sensitive files in their security events:
CRITICAL SYSTEM FILES:
/etc/passwd- User account database (world-readable but attackers enumerate it)/etc/shadow- Encrypted passwords (should NEVER be accessed except by root)/etc/sudoers- Sudo permissions configuration/etc/group- Group definitions
SSH & AUTHENTICATION:
/root/.ssh/authorized_keys- Root SSH keys (backdoor installation)/home/*/.ssh/authorized_keys- User SSH keys (persistence mechanism)/home/*/.ssh/id_rsa- Private SSH keys (credential theft)/home/*/.ssh/id_ed25519- Private SSH keys (newer format)
SYSTEM CONFIGURATION:
/boot/grub/grub.cfg- Bootloader configuration/etc/crontab- System-wide scheduled tasks (persistence)/var/spool/cron/*- User cron jobs (persistence)
LOGS (tampering attempts):
/var/log/auth.log- Authentication logs (covering tracks)/var/log/secure- Security logs (covering tracks)/var/log/audit/audit.log- Audit logs (covering tracks)
Example attack pattern:
# security.log
{"timestamp": "10:04:30Z", "user_id": "attacker01",
"event_type": "file_access", "resource": "/etc/shadow", ...}
Detection result: ✅ Flag as anomalous (/etc/shadow is sensitive)
{"timestamp": "10:05:00Z", "user_id": "attacker01",
"event_type": "file_access", "resource": "/root/.ssh/authorized_keys", ...}
Detection result: ✅ Flag as anomalous (installing backdoor)
When NOT to flag (normal file access):
# User's own documents
{"event_type": "file_access", "resource": "/home/alice/documents/report.pdf"}
Detection result: ❌ Do NOT flag (normal file)
# Temporary files
{"event_type": "file_access", "resource": "/tmp/upload_12345.tmp"}
Detection result: ❌ Do NOT flag (temp files are normal)
# Web content (if user is web admin)
{"event_type": "file_access", "resource": "/var/www/html/index.html"}
Detection result: ❌ Do NOT flag (web admin accessing web files)
# Shared system files
{"event_type": "file_access", "resource": "/usr/share/icons/theme.png"}
Detection result: ❌ Do NOT flag (normal system file)
Implementation tip:
Check if the resource field in security events contains any of the sensitive file paths. Use string matching:
sensitive_files = ["/etc/passwd", "/etc/shadow", "/root/.ssh/authorized_keys", ...]
for event in security_events:
resource = event.get("resource", "")
for sensitive_path in sensitive_files:
if sensitive_path in resource:
return True # Found sensitive file access!
Core Requirements
⚠️ CRITICAL: Defensive Programming for Detect Functions
All three detect functions MUST handle missing data gracefully. Here’s what to return:
| Function | Missing user_id | Missing auth_events | Missing security_events |
|---|---|---|---|
detect_brute_force | {"user_id": user_id} | {"user_id": user_id} | N/A (doesn’t need it) |
detect_privilege_escalation | Full empty structure* | Full empty structure* | Full empty structure* |
detect_anomalous_access | None | N/A (doesn’t need it) | None |
*Full empty structure for detect_privilege_escalation:
{
"user_id": user_id,
"login_sessions": [],
"total_login_sessions": 0,
"total_escalations": 0
}
Why this matters:
- Grader checks these fields without try/except
- Returning wrong type (None vs dict) causes test failures
- Each function has different requirements based on what data it needs
Example defensive code pattern:
def detect_brute_force(user_id, user_events, ...):
# ✅ ALWAYS check these first
if user_id not in user_events:
return {"user_id": user_id}
if "auth_events" not in user_events[user_id]:
return {"user_id": user_id}
# ✅ Now safe to process
auth_events = user_events[user_id]["auth_events"]
# ... rest of implementation
Your implementation must include these functions (no classes needed):
1. Data Representation (Use Dictionaries)
# Authentication event (dictionary)
auth_event = {
"timestamp": "2024-01-15T10:23:45Z",
"user_id": "user123",
"action": "login",
"ip_address": "192.168.1.50",
"status": "success",
"session_id": "sess_abc123"
}
# Security event (dictionary)
security_event = {
"timestamp": "2024-01-15T10:24:12Z",
"user_id": "user123",
"event_type": "file_access",
"resource": "/etc/passwd",
"session_id": "sess_abc123",
"ip_address": "192.168.1.50"
}
# Correlated events (dictionary with lists)
user_events = {
"user123": {
"auth_events": [auth_event1, auth_event2, ...],
"security_events": [security_event1, security_event2, ...],
"incident_flags": ["brute_force", "privilege_escalation"]
}
}
2. Parsing Functions
NOTE: There are two valid implementation approaches:
Approach A: Parse functions return lists, then correlate separately Approach B: Parse functions build correlated structure directly (recommended for grader)
# Approach A (returns lists)
def parse_auth_log(filepath):
"""
Parse CSV authentication log.
Returns: List of dictionaries (auth events)
Must handle:
- Malformed CSV lines
- Invalid timestamps
- Invalid IP addresses
- Missing fields
"""
# TODO: Implement
pass
def parse_security_log(filepath):
"""
Parse JSON security events log.
Returns: List of dictionaries (security events)
Must handle:
- Malformed JSON
- Missing required fields
- Encoding issues
"""
# TODO: Implement
pass
# Approach B (builds user_events directly - used by grader)
def parse_auth_log(filepath, user_events):
"""
Parse CSV authentication log and populate user_events in-place.
Args:
filepath: Path to auth.log
user_events: Dictionary to populate with structure:
{user_id: {"auth_events": [...], "security_events": [...]}}
Modifies user_events in-place, building:
user_events[user_id]["auth_events"] = [list of auth event dicts]
Must handle:
- Malformed CSV lines
- Invalid timestamps
- Invalid IP addresses
- Missing fields
"""
# TODO: Implement
pass
def parse_security_log(filepath, user_events):
"""
Parse JSON security events log and populate user_events in-place.
Args:
filepath: Path to security.log
user_events: Dictionary to populate (same dict from parse_auth_log)
Modifies user_events in-place, building:
user_events[user_id]["security_events"] = [list of security event dicts]
Must handle:
- Malformed JSON
- Missing required fields
- Encoding issues
"""
# TODO: Implement
pass
3. Correlation Function
def correlate_events(auth_events, security_events):
"""
Correlate events by user_id for efficient lookup.
**NOTE:** This function is OPTIONAL. You can instead build the correlated structure
directly during parsing by having parse_auth_log() and parse_security_log() accept
a user_events dictionary parameter and populate it in-place. Both approaches are valid:
**Approach A (Three-function):**
python auth_events = parse_auth_log(filepath) # Returns list security_events = parse_security_log(filepath) # Returns list user_events = correlate_events(auth_events, security_events) # Returns dict
**Approach B (Two-function, correlation during parsing):**
python user_events = {} parse_auth_log(filepath, user_events) # Modifies dict in-place parse_security_log(filepath, user_events) # Modifies dict in-place
Args:
auth_events: List of auth event dictionaries
security_events: List of security event dictionaries
Returns:
Dictionary mapping user_id -> {"auth_events": [...], "security_events": [...]}
Example:
{
"user123": {
"auth_events": [{...}, {...}],
"security_events": [{...}, {...}, {...}]
},
"user456": {
"auth_events": [{...}],
"security_events": [{...}]
}
}
"""
# TODO: Implement correlation logic
pass
4. Detection Functions
def detect_brute_force(user_id, user_events, time_window_minutes=5, failure_threshold=5):
"""
Detect brute force attacks: 5+ failed logins within time window + success within same window.
Only detects COMPLETED attacks where the attacker succeeded within the time window.
Does NOT detect failed attempts where success occurred outside the window.
See "Detection Criteria - Brute Force Attack Detection" section above for examples.
Args:
user_id: User to check
user_events: Dictionary of correlated events (from correlate_events)
time_window_minutes: Time window for failures AND success (default 5)
failure_threshold: Number of failures to trigger detection (default 5)
Returns:
dict or None: Attack details if detected, None otherwise
**DEFENSIVE PROGRAMMING - CRITICAL:**
If user_id doesn't exist in user_events OR user has no auth_events:
→ Return {"user_id": user_id} (NOT None!)
This empty dict signals "no attack detected" without crashing.
Example:
python if user_id not in user_events: return {“user_id”: user_id} # ✅ Safe empty response
if “auth_events” not in user_events[user_id]: return {“user_id”: user_id} # ✅ Safe empty response
This function returns one of three options:
1. None - if no brute force detected
2. Single attack dict - if ONE brute force detected (see structure below)
3. Multiple attacks dict - if TWO OR MORE brute force detected (see structure below)
Attack details structure (SINGLE attack):
{
"user_id": "user123",
"failure_count": 7,
"failure_chain": [
{"timestamp": "...", "status": "failure", "ip_address": "...", ...},
{"timestamp": "...", "status": "failure", "ip_address": "...", ...}
],
"success_event": {"timestamp": "...", "status": "success", ...},
# ^ This must be the EXACT successful login event within the time window.
# It proves the brute force attack succeeded.
# If success is OUTSIDE the window, don't detect at all (return None).
"attack_duration_seconds": 245.5
}
Attack details structure (MULTIPLE attacks detected):
{
"user_id": "user123",
"attacks": [
{
"failure_count": 5,
"failure_chain": [...], # First attack failures
"success_event": {...}, # Success within window
"attack_duration_seconds": 300
},
{
"failure_count": 8,
"failure_chain": [...], # Second attack failures
"success_event": {...}, # Success within window
"attack_duration_seconds": 240
}
],
"total_attacks": 2
}
NOTE: Only attacks where success occurs WITHIN the 5-minute window are detected.
If 5+ failures occur but success is outside the window, return None for that sequence.
If multiple separate attacks are detected (each with success in window), return ALL
in the "attacks" array.
Example (SINGLE attack):
attack = detect_brute_force("user123", user_events)
if attack:
print(f"🚨 Brute force detected: {attack['failure_count']} failures")
print(f" Success at: {attack['success_event']['timestamp']}")
for event in attack['failure_chain']:
print(f" Failed: {event['timestamp']} from {event['ip_address']}")
Example (MULTIPLE attacks):
attack = detect_brute_force("user456", user_events)
if attack and "attacks" in attack:
print(f"🚨 Multiple brute force attacks: {attack['total_attacks']} detected")
for i, single_attack in enumerate(attack['attacks'], 1):
print(f" Attack {i}: {single_attack['failure_count']} failures")
print(f" Success: {single_attack['success_event']['timestamp']}")
print(f" Duration: {single_attack['attack_duration_seconds']}s")
"""
# TODO: Implement
# - Get user's auth events: auth_events = user_events[user_id]["auth_events"]
# - Loop through auth_events, check status field ("failure" or "success")
# - Build failure_chain list with full event details
# - Count failures within time window (first failure to last failure <= 5 minutes)
# - When 5+ failures found within window:
# * Check if there's a SUCCESS within the SAME 5-minute window
# * If YES and within window: Create attack dict with that success_event
# * If NO or outside window: Skip this sequence (return None for it)
# Example: First failure at 10:00:00, last failure at 10:04:00
# Window is 10:00:00 to 10:05:00
# Success at 10:04:30 → DETECT (within window)
# Success at 10:07:00 → DON'T DETECT (outside window)
# - Keep scanning for MORE brute force attacks (user might be attacked multiple times)
# - If ONE attack found (with success in window), return single attack structure
# - If MULTIPLE attacks found (each with success in window), return "attacks" array
# - If no complete attacks found, return None
pass
def detect_privilege_escalation(user_id, user_events, time_window_minutes=10):
"""
Detect suspicious privilege escalation:
Successful login followed by privilege_change within 10 minutes.
See "Detection Criteria - Privilege Escalation Detection" section above for:
- Complete specification
- Real-world breach pattern example
- What counts as "privilege_change"
Args:
user_id: User to check
user_events: Dictionary of correlated events
time_window_minutes: Time window (default 10)
Returns:
dict or None: Attack details if detected, None otherwise
**DEFENSIVE PROGRAMMING - CRITICAL:**
If user_id doesn't exist, has no auth_events, OR has no security_events:
→ Return full empty structure (NOT None!)
python if user_id not in user_events: return { “user_id”: user_id, “login_sessions”: [], “total_login_sessions”: 0, “total_escalations”: 0 }
if “auth_events” not in user_events[user_id]: return { “user_id”: user_id, “login_sessions”: [], “total_login_sessions”: 0, “total_escalations”: 0 }
if “security_events” not in user_events[user_id]: return { “user_id”: user_id, “login_sessions”: [], “total_login_sessions”: 0, “total_escalations”: 0 }
This ensures grader can safely check fields without crashes.
Always returns grouped structure by login sessions (even for single escalation).
Attack details structure (ALWAYS uses login_sessions):
{
"user_id": "user456",
"login_sessions": [
{
"login_event": { # Full auth event from auth.log
"timestamp": "2024-01-15T10:00:00Z",
"user_id": "user456",
"action": "login",
"ip_address": "192.168.1.100",
"status": "success",
"session_id": "sess_123"
},
"privilege_escalations": [
{
"privilege_event": { # Full security event from security.log
"timestamp": "2024-01-15T10:02:00Z",
"user_id": "user456",
"event_type": "privilege_change",
"resource": "sudo_access", # or "admin_role", "root_access", etc.
"session_id": "sess_123",
"ip_address": "192.168.1.100",
# ... any other fields from security.log
},
"time_to_escalation_seconds": 90.5
}
# ... more escalations after this login
],
"escalation_count": 1 # Number of escalations after this login
}
# ... more login sessions
],
"total_login_sessions": 1, # Total logins with escalations
"total_escalations": 1 # Total escalations across all logins
}
Example scenarios:
- Single login, single escalation:
total_login_sessions=1, total_escalations=1, escalation_count=1
- Single login, multiple escalations:
total_login_sessions=1, total_escalations=3, escalation_count=3
- Multiple logins, one escalation each:
total_login_sessions=2, total_escalations=2, escalation_count=1 (per session)
- Multiple logins, multiple escalations:
total_login_sessions=2, total_escalations=5, escalation_count varies
NOTE: Multiple privilege escalations can occur after the SAME login (e.g., escalate
to sudo, then root, then admin in sequence). They are grouped by login_event.
Example (always use login_sessions structure):
attack = detect_privilege_escalation("admin", user_events)
if attack:
print(f"🚨 Detected {attack['total_login_sessions']} login session(s) with escalations")
print(f" Total escalations: {attack['total_escalations']}")
for i, session in enumerate(attack['login_sessions'], 1):
print(f"\n Session {i} - Login: {session['login_event']['timestamp']}")
print(f" Escalations in this session: {session['escalation_count']}")
for j, esc in enumerate(session['privilege_escalations'], 1):
print(f" {j}. {esc['privilege_event']['resource']} "
f"({esc['time_to_escalation_seconds']}s after login)")
"""
# TODO: Implement
# Step 1: Get auth events: auth_events = user_events[user_id]["auth_events"]
# Step 2: Find all successful logins (status == "success"), store with timestamps
# Step 3: Get security events: security_events = user_events[user_id]["security_events"]
# Step 4: Look for event_type == "privilege_change" events
# Step 5: For EACH login, find ALL privilege_change events within 10 minutes
# Step 6: GROUP escalations by their associated login (same login can have multiple escalations)
# Step 7: Verify resource field contains "sudo_access", "admin_role", "root_access", etc.
# Step 8: ALWAYS return "login_sessions" structure (even for single login/escalation)
# Step 9: CRITICAL - Only include login sessions that HAVE escalations (escalation_count > 0)
# Do NOT include sessions with zero escalations in login_sessions array
# Step 10: If no escalations found, return None
pass
def detect_anomalous_access(user_id, user_events):
"""
Detect anomalous file access patterns.
See "Detection Criteria - Anomalous File Access Detection" section above for:
- Complete list of sensitive files to detect
- Examples of what to flag vs. ignore
- Implementation tips
Args:
user_id: User to check
user_events: Dictionary of correlated events
Returns:
dict or None: Attack details if detected, None otherwise
**DEFENSIVE PROGRAMMING - CRITICAL:**
If user_id doesn't exist OR has no security_events:
→ Return None (different from other detect functions!)
python if user_id not in user_events: return None # ✅ User doesn’t exist
if “security_events” not in user_events[user_id]: return None # ✅ User has no security events
NOTE: This function returns None for missing data (unlike the other
two detect functions which return empty dicts). This is intentional -
anomalous access detection requires security_events to work.
Attack details structure:
{
"user_id": "user789",
"sensitive_files_accessed": [
{
"file": "/etc/shadow",
"timestamp": "...",
"event": {...} # Full security event
},
{
"file": "/root/.ssh/authorized_keys",
"timestamp": "...",
"event": {...}
}
],
"access_count": 2
}
Example:
attack = detect_anomalous_access("user789", user_events)
if attack:
print(f"🚨 Anomalous access: {attack['access_count']} sensitive files")
for access in attack['sensitive_files_accessed']:
print(f" {access['file']} at {access['timestamp']}")
Flag as anomalous if security_events contain file_access to ANY of:
- /etc/passwd, /etc/shadow, /etc/sudoers, /etc/group
- /root/.ssh/authorized_keys, /home/*/.ssh/authorized_keys, /home/*/.ssh/id_rsa
- /etc/crontab, /var/spool/cron/*
- /var/log/auth.log, /var/log/secure, /var/log/audit/audit.log
Do NOT flag normal files like:
- /home/username/documents/* (user's own files)
- /tmp/* (temporary files)
- /usr/share/* (shared system files)
"""
# TODO: Implement
# Step 1: Get security events: security_events = user_events[user_id]["security_events"]
# Step 2: Loop through security_events
# Step 3: Check if event["event_type"] == "file_access"
# Step 4: Get the file path: resource = event["resource"]
# Step 5: Check if resource contains any sensitive file path
# Step 6: Build list of sensitive_files_accessed with full event details
# Step 7: If found, return dict with all access details
# Step 8: If not found, return None
pass