· PathShield Team · Tutorials · 9 min read
AWS Security Group Rules Audit Script - Find Your Open Ports in Minutes
Free Python script to audit all your AWS security groups. Find overly permissive rules, unused groups, and compliance violations before attackers do.
AWS Security Group Rules Audit Script - Find Your Open Ports in Minutes
That security group rule you added “temporarily” six months ago? It’s still there, exposing port 22 to the entire internet. This Python script finds all your risky security group rules across all AWS regions in minutes, not hours.
The Security Group Problem Nobody Talks About
Every startup has them: security groups that look like Swiss cheese. Rules accumulate over time, nobody remembers why port 8080 is open to 0.0.0.0/0, and manual audits take forever. Meanwhile, attackers scan for these exact misconfigurations.
What this script finds:
- Rules allowing access from 0.0.0.0/0 (the entire internet)
- Overly broad port ranges
- Unused security groups burning money
- Non-compliant rules for SOC 2/ISO 27001
- Shadow IT security groups nobody knows about
The Complete Security Group Audit Script
Save this as sg_audit.py
and run it:
#!/usr/bin/env python3
"""
AWS Security Group Auditor
Finds risky rules across all regions in minutes
"""
import boto3
import json
import csv
from datetime import datetime
from collections import defaultdict
import argparse
import sys
class SecurityGroupAuditor:
def __init__(self, profile=None):
self.profile = profile
self.risky_ports = {
22: 'SSH',
3389: 'RDP',
3306: 'MySQL',
5432: 'PostgreSQL',
27017: 'MongoDB',
6379: 'Redis',
9200: 'Elasticsearch',
5984: 'CouchDB',
11211: 'Memcached',
135: 'Windows RPC',
445: 'SMB',
1433: 'SQL Server',
23: 'Telnet'
}
self.findings = []
def get_all_regions(self):
"""Get all available AWS regions"""
if self.profile:
session = boto3.Session(profile_name=self.profile)
ec2 = session.client('ec2', region_name='us-east-1')
else:
ec2 = boto3.client('ec2', region_name='us-east-1')
regions = ec2.describe_regions()['Regions']
return [region['RegionName'] for region in regions]
def analyze_security_group(self, sg, region):
"""Analyze a single security group for risks"""
sg_findings = []
# Check for overly permissive rules
for rule in sg.get('IpPermissions', []):
for ip_range in rule.get('IpRanges', []):
if ip_range.get('CidrIp') == '0.0.0.0/0':
from_port = rule.get('FromPort', 0)
to_port = rule.get('ToPort', 65535)
protocol = rule.get('IpProtocol', 'all')
# Determine severity
severity = self.calculate_severity(from_port, to_port, protocol)
finding = {
'Region': region,
'SecurityGroupId': sg['GroupId'],
'SecurityGroupName': sg.get('GroupName', 'N/A'),
'VPCId': sg.get('VpcId', 'N/A'),
'Type': 'Open to Internet',
'Protocol': protocol,
'PortRange': f"{from_port}-{to_port}" if from_port else "All",
'Source': '0.0.0.0/0',
'Severity': severity,
'Description': self.get_risk_description(from_port, to_port, protocol),
'Recommendation': self.get_recommendation(from_port, to_port, protocol)
}
sg_findings.append(finding)
# Check for large CIDR blocks
elif self.is_overly_broad_cidr(ip_range.get('CidrIp', '')):
finding = {
'Region': region,
'SecurityGroupId': sg['GroupId'],
'SecurityGroupName': sg.get('GroupName', 'N/A'),
'VPCId': sg.get('VpcId', 'N/A'),
'Type': 'Overly Broad CIDR',
'Protocol': rule.get('IpProtocol', 'all'),
'PortRange': f"{rule.get('FromPort', 0)}-{rule.get('ToPort', 65535)}",
'Source': ip_range.get('CidrIp'),
'Severity': 'MEDIUM',
'Description': 'Large IP range has access',
'Recommendation': 'Restrict to specific IPs or smaller ranges'
}
sg_findings.append(finding)
# Check for unused security groups
if self.is_potentially_unused(sg):
finding = {
'Region': region,
'SecurityGroupId': sg['GroupId'],
'SecurityGroupName': sg.get('GroupName', 'N/A'),
'VPCId': sg.get('VpcId', 'N/A'),
'Type': 'Potentially Unused',
'Protocol': 'N/A',
'PortRange': 'N/A',
'Source': 'N/A',
'Severity': 'LOW',
'Description': 'No instances attached',
'Recommendation': 'Review and delete if unused'
}
sg_findings.append(finding)
return sg_findings
def calculate_severity(self, from_port, to_port, protocol):
"""Calculate severity based on exposed ports"""
if protocol == '-1': # All protocols
return 'CRITICAL'
if from_port == 0 and to_port == 65535: # All ports
return 'CRITICAL'
# Check for risky ports
for port in range(from_port or 0, (to_port or 65535) + 1):
if port in self.risky_ports:
return 'HIGH'
# Web ports are medium risk
if from_port in [80, 443, 8080, 8443]:
return 'MEDIUM'
return 'LOW'
def get_risk_description(self, from_port, to_port, protocol):
"""Get human-readable risk description"""
if protocol == '-1':
return 'All traffic allowed from internet!'
risky_services = []
for port in range(from_port or 0, (to_port or 65535) + 1):
if port in self.risky_ports:
risky_services.append(f"{self.risky_ports[port]} ({port})")
if risky_services:
return f"Exposes: {', '.join(risky_services[:3])}"
return f"Ports {from_port}-{to_port} exposed"
def get_recommendation(self, from_port, to_port, protocol):
"""Get specific recommendations"""
if from_port == 22:
return "Use Systems Manager Session Manager instead"
elif from_port == 3389:
return "Use AWS Systems Manager for Windows access"
elif from_port in [3306, 5432, 27017, 6379]:
return "Move database to private subnet"
elif protocol == '-1':
return "Restrict to specific ports and IPs"
else:
return "Limit to known IP ranges or use VPN"
def is_overly_broad_cidr(self, cidr):
"""Check if CIDR block is too large"""
if not cidr or '/' not in cidr:
return False
try:
prefix = int(cidr.split('/')[1])
return prefix < 16 and cidr != '0.0.0.0/0'
except:
return False
def is_potentially_unused(self, sg):
"""Check if security group might be unused"""
# Skip default security groups
if sg.get('GroupName') == 'default':
return False
# In real implementation, would check EC2 instances, RDS, etc.
# For now, this is a placeholder
return len(sg.get('IpPermissions', [])) == 0
def audit_region(self, region):
"""Audit all security groups in a region"""
print(f"Scanning {region}...")
try:
if self.profile:
session = boto3.Session(profile_name=self.profile)
ec2 = session.client('ec2', region_name=region)
else:
ec2 = boto3.client('ec2', region_name=region)
# Get all security groups
paginator = ec2.get_paginator('describe_security_groups')
for page in paginator.paginate():
for sg in page['SecurityGroups']:
findings = self.analyze_security_group(sg, region)
self.findings.extend(findings)
except Exception as e:
print(f"Error scanning {region}: {str(e)}")
def generate_report(self):
"""Generate audit report"""
if not self.findings:
print("\n✅ No security group issues found!")
return
# Summary statistics
print(f"\n🔍 Security Group Audit Complete")
print(f"{'='*50}")
print(f"Total findings: {len(self.findings)}")
severity_counts = defaultdict(int)
type_counts = defaultdict(int)
for finding in self.findings:
severity_counts[finding['Severity']] += 1
type_counts[finding['Type']] += 1
print(f"\nBy Severity:")
for severity in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']:
if severity in severity_counts:
print(f" {severity}: {severity_counts[severity]}")
print(f"\nBy Type:")
for issue_type, count in type_counts.items():
print(f" {issue_type}: {count}")
# Critical findings
critical = [f for f in self.findings if f['Severity'] == 'CRITICAL']
if critical:
print(f"\n❌ CRITICAL FINDINGS (Fix immediately!):")
for finding in critical[:5]: # Show first 5
print(f"\n Region: {finding['Region']}")
print(f" Security Group: {finding['SecurityGroupName']} ({finding['SecurityGroupId']})")
print(f" Issue: {finding['Description']}")
print(f" Fix: {finding['Recommendation']}")
def export_findings(self, format='csv'):
"""Export findings to file"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if format == 'csv':
filename = f'sg_audit_{timestamp}.csv'
with open(filename, 'w', newline='') as f:
if self.findings:
writer = csv.DictWriter(f, fieldnames=self.findings[0].keys())
writer.writeheader()
writer.writerows(self.findings)
print(f"\n📄 Full report saved to: {filename}")
elif format == 'json':
filename = f'sg_audit_{timestamp}.json'
with open(filename, 'w') as f:
json.dump(self.findings, f, indent=2)
print(f"\n📄 Full report saved to: {filename}")
def get_remediation_script(self):
"""Generate remediation commands"""
print("\n🔧 Quick Fix Commands:")
print("# WARNING: Review before running!")
seen_groups = set()
for finding in self.findings:
if finding['Severity'] in ['CRITICAL', 'HIGH'] and finding['Type'] == 'Open to Internet':
sg_id = finding['SecurityGroupId']
region = finding['Region']
if (sg_id, region) not in seen_groups:
seen_groups.add((sg_id, region))
print(f"\n# Fix {finding['SecurityGroupName']} in {region}")
print(f"# Current: {finding['Description']}")
# Generate revoke command
if finding['Protocol'] == '-1':
protocol = 'all'
else:
protocol = finding['Protocol']
print(f"aws ec2 revoke-security-group-ingress \\")
print(f" --region {region} \\")
print(f" --group-id {sg_id} \\")
print(f" --protocol {protocol} \\")
print(f" --port {finding['PortRange']} \\")
print(f" --cidr 0.0.0.0/0")
def main():
parser = argparse.ArgumentParser(description='Audit AWS Security Groups')
parser.add_argument('--profile', help='AWS profile to use')
parser.add_argument('--region', help='Specific region to audit')
parser.add_argument('--export', choices=['csv', 'json'], default='csv',
help='Export format')
parser.add_argument('--fix-commands', action='store_true',
help='Generate remediation commands')
args = parser.parse_args()
# Initialize auditor
auditor = SecurityGroupAuditor(profile=args.profile)
# Get regions to scan
if args.region:
regions = [args.region]
else:
print("🌍 Discovering all AWS regions...")
regions = auditor.get_all_regions()
print(f"Found {len(regions)} regions to scan")
# Run audit
for region in regions:
auditor.audit_region(region)
# Generate report
auditor.generate_report()
# Export findings
if auditor.findings:
auditor.export_findings(args.export)
# Generate fix commands if requested
if args.fix_commands:
auditor.get_remediation_script()
if __name__ == '__main__':
main()
How to Use the Script
Basic Usage
# Scan all regions with default credentials
python sg_audit.py
# Use specific AWS profile
python sg_audit.py --profile production
# Scan specific region only
python sg_audit.py --region us-east-1
# Generate remediation commands
python sg_audit.py --fix-commands
# Export as JSON
python sg_audit.py --export json
Understanding the Output
🔍 Security Group Audit Complete
==================================================
Total findings: 47
By Severity:
CRITICAL: 3
HIGH: 12
MEDIUM: 25
LOW: 7
By Type:
Open to Internet: 35
Overly Broad CIDR: 8
Potentially Unused: 4
❌ CRITICAL FINDINGS (Fix immediately!):
Region: us-east-1
Security Group: production-db-sg (sg-0123456789abcdef0)
Issue: All traffic allowed from internet!
Fix: Restrict to specific ports and IPs
Common Security Group Mistakes This Finds
1. The “Temporary” Debug Rule
Port 22 (SSH) open to 0.0.0.0/0
Added: "Just for debugging"
Reality: Still there 6 months later
Risk: Brute force attacks, unauthorized access
2. The Copy-Paste Disaster
Copied security group from tutorial
Contains: Port 9200 (Elasticsearch) open
Impact: Data breach waiting to happen
Fix: Review all rules before using
3. The “It Was Working” Rule
All ports (0-65535) open to 0.0.0.0/0
Reason: "Couldn't figure out which port"
Impact: Complete exposure
Fix: Use AWS VPC Flow Logs to find actual ports
Remediation Strategies
Quick Wins (Do Now)
- Remove 0.0.0.0/0 rules for databases
# Find and fix database exposures
aws ec2 describe-security-groups \
--filters "Name=ip-permission.from-port,Values=3306,5432,27017" \
--query 'SecurityGroups[?IpPermissions[?IpRanges[?CidrIp==`0.0.0.0/0`]]]'
- Replace SSH access with Systems Manager
# No more port 22!
aws ssm start-session --target i-1234567890abcdef0
- Use security group references
# Instead of IP ranges, reference other security groups
aws ec2 authorize-security-group-ingress \
--group-id sg-target \
--source-group sg-source \
--protocol tcp \
--port 443
Medium-term Fixes
- Implement least-privilege groups
# security_group_builder.py
def create_web_server_sg():
return {
'GroupName': 'web-servers',
'Rules': [
{'Port': 443, 'Source': 'cloudfront'},
{'Port': 80, 'Source': 'cloudfront', 'Redirect': 443}
]
}
def create_app_server_sg():
return {
'GroupName': 'app-servers',
'Rules': [
{'Port': 8080, 'Source': 'web-servers-sg'}
]
}
- Automate security group reviews
# .github/workflows/sg-audit.yml
name: Weekly Security Group Audit
on:
schedule:
- cron: '0 9 * * 1' # Mondays at 9 AM
jobs:
audit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run SG Audit
run: |
python sg_audit.py --export csv
python send_report.py
Extending the Script
Add Custom Checks
def check_compliance_rules(self, sg, region):
"""Add your compliance-specific checks"""
findings = []
# SOC 2: No direct database access
if self.has_database_ports_exposed(sg):
findings.append({
'Type': 'SOC2 Violation',
'Description': 'Database ports exposed',
'Severity': 'HIGH'
})
# PCI: Encryption required
if self.has_unencrypted_protocols(sg):
findings.append({
'Type': 'PCI Violation',
'Description': 'Unencrypted protocols allowed',
'Severity': 'HIGH'
})
return findings
Add Slack Notifications
def send_slack_alert(findings):
critical = [f for f in findings if f['Severity'] == 'CRITICAL']
if critical:
webhook_url = os.environ['SLACK_WEBHOOK']
message = {
'text': f'🚨 {len(critical)} critical security group issues found!',
'attachments': [{
'color': 'danger',
'fields': [{
'title': f['SecurityGroupName'],
'value': f['Description']
} for f in critical[:3]]
}]
}
requests.post(webhook_url, json=message)
Prevention: Security Groups as Code
Stop security group drift with Infrastructure as Code:
# security_groups.tf
resource "aws_security_group" "web" {
name_prefix = "web-"
description = "Security group for web servers"
vpc_id = aws_vpc.main.id
ingress {
description = "HTTPS from CloudFront"
from_port = 443
to_port = 443
protocol = "tcp"
prefix_list_ids = [data.aws_prefix_list.cloudfront.id]
}
egress {
description = "HTTPS to anywhere (for updates)"
from_port = 443
to_port = 443
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
tags = {
Name = "web-servers"
Compliance = "SOC2"
}
}
# Prevent changes outside Terraform
resource "aws_config_config_rule" "sg_changes" {
name = "detect-sg-changes"
source {
owner = "AWS"
source_identifier = "SECURITY_GROUP_CHANGES"
}
}
Conclusion
Security groups are your first line of defense in AWS. This script helps you find the holes before attackers do. Run it weekly, fix the critical issues immediately, and gradually improve your security posture.
Remember: The best security group is one that’s regularly audited and only allows what’s absolutely necessary.
Next steps:
- Run the script now (seriously, it takes 2 minutes)
- Fix any CRITICAL findings today
- Schedule weekly audits
- Move to Infrastructure as Code
Want continuous security group monitoring without running scripts? Tools like PathShield automatically detect risky security group changes in real-time and alert you before they become breaches.