· PathShield Team · Tutorials  · 9 min read

AWS Security Group Rules Audit Script - Find Your Open Ports in Minutes

Free Python script to audit all your AWS security groups. Find overly permissive rules, unused groups, and compliance violations before attackers do.

Free Python script to audit all your AWS security groups. Find overly permissive rules, unused groups, and compliance violations before attackers do.

AWS Security Group Rules Audit Script - Find Your Open Ports in Minutes

That security group rule you added “temporarily” six months ago? It’s still there, exposing port 22 to the entire internet. This Python script finds all your risky security group rules across all AWS regions in minutes, not hours.

The Security Group Problem Nobody Talks About

Every startup has them: security groups that look like Swiss cheese. Rules accumulate over time, nobody remembers why port 8080 is open to 0.0.0.0/0, and manual audits take forever. Meanwhile, attackers scan for these exact misconfigurations.

What this script finds:

  • Rules allowing access from 0.0.0.0/0 (the entire internet)
  • Overly broad port ranges
  • Unused security groups burning money
  • Non-compliant rules for SOC 2/ISO 27001
  • Shadow IT security groups nobody knows about

The Complete Security Group Audit Script

Save this as sg_audit.py and run it:

#!/usr/bin/env python3
"""
AWS Security Group Auditor
Finds risky rules across all regions in minutes
"""

import boto3
import json
import csv
from datetime import datetime
from collections import defaultdict
import argparse
import sys

class SecurityGroupAuditor:
    def __init__(self, profile=None):
        self.profile = profile
        self.risky_ports = {
            22: 'SSH',
            3389: 'RDP',
            3306: 'MySQL',
            5432: 'PostgreSQL',
            27017: 'MongoDB',
            6379: 'Redis',
            9200: 'Elasticsearch',
            5984: 'CouchDB',
            11211: 'Memcached',
            135: 'Windows RPC',
            445: 'SMB',
            1433: 'SQL Server',
            23: 'Telnet'
        }
        self.findings = []
        
    def get_all_regions(self):
        """Get all available AWS regions"""
        if self.profile:
            session = boto3.Session(profile_name=self.profile)
            ec2 = session.client('ec2', region_name='us-east-1')
        else:
            ec2 = boto3.client('ec2', region_name='us-east-1')
            
        regions = ec2.describe_regions()['Regions']
        return [region['RegionName'] for region in regions]
    
    def analyze_security_group(self, sg, region):
        """Analyze a single security group for risks"""
        sg_findings = []
        
        # Check for overly permissive rules
        for rule in sg.get('IpPermissions', []):
            for ip_range in rule.get('IpRanges', []):
                if ip_range.get('CidrIp') == '0.0.0.0/0':
                    from_port = rule.get('FromPort', 0)
                    to_port = rule.get('ToPort', 65535)
                    protocol = rule.get('IpProtocol', 'all')
                    
                    # Determine severity
                    severity = self.calculate_severity(from_port, to_port, protocol)
                    
                    finding = {
                        'Region': region,
                        'SecurityGroupId': sg['GroupId'],
                        'SecurityGroupName': sg.get('GroupName', 'N/A'),
                        'VPCId': sg.get('VpcId', 'N/A'),
                        'Type': 'Open to Internet',
                        'Protocol': protocol,
                        'PortRange': f"{from_port}-{to_port}" if from_port else "All",
                        'Source': '0.0.0.0/0',
                        'Severity': severity,
                        'Description': self.get_risk_description(from_port, to_port, protocol),
                        'Recommendation': self.get_recommendation(from_port, to_port, protocol)
                    }
                    
                    sg_findings.append(finding)
                    
                # Check for large CIDR blocks
                elif self.is_overly_broad_cidr(ip_range.get('CidrIp', '')):
                    finding = {
                        'Region': region,
                        'SecurityGroupId': sg['GroupId'],
                        'SecurityGroupName': sg.get('GroupName', 'N/A'),
                        'VPCId': sg.get('VpcId', 'N/A'),
                        'Type': 'Overly Broad CIDR',
                        'Protocol': rule.get('IpProtocol', 'all'),
                        'PortRange': f"{rule.get('FromPort', 0)}-{rule.get('ToPort', 65535)}",
                        'Source': ip_range.get('CidrIp'),
                        'Severity': 'MEDIUM',
                        'Description': 'Large IP range has access',
                        'Recommendation': 'Restrict to specific IPs or smaller ranges'
                    }
                    sg_findings.append(finding)
        
        # Check for unused security groups
        if self.is_potentially_unused(sg):
            finding = {
                'Region': region,
                'SecurityGroupId': sg['GroupId'],
                'SecurityGroupName': sg.get('GroupName', 'N/A'),
                'VPCId': sg.get('VpcId', 'N/A'),
                'Type': 'Potentially Unused',
                'Protocol': 'N/A',
                'PortRange': 'N/A',
                'Source': 'N/A',
                'Severity': 'LOW',
                'Description': 'No instances attached',
                'Recommendation': 'Review and delete if unused'
            }
            sg_findings.append(finding)
            
        return sg_findings
    
    def calculate_severity(self, from_port, to_port, protocol):
        """Calculate severity based on exposed ports"""
        if protocol == '-1':  # All protocols
            return 'CRITICAL'
            
        if from_port == 0 and to_port == 65535:  # All ports
            return 'CRITICAL'
            
        # Check for risky ports
        for port in range(from_port or 0, (to_port or 65535) + 1):
            if port in self.risky_ports:
                return 'HIGH'
                
        # Web ports are medium risk
        if from_port in [80, 443, 8080, 8443]:
            return 'MEDIUM'
            
        return 'LOW'
    
    def get_risk_description(self, from_port, to_port, protocol):
        """Get human-readable risk description"""
        if protocol == '-1':
            return 'All traffic allowed from internet!'
            
        risky_services = []
        for port in range(from_port or 0, (to_port or 65535) + 1):
            if port in self.risky_ports:
                risky_services.append(f"{self.risky_ports[port]} ({port})")
                
        if risky_services:
            return f"Exposes: {', '.join(risky_services[:3])}"
            
        return f"Ports {from_port}-{to_port} exposed"
    
    def get_recommendation(self, from_port, to_port, protocol):
        """Get specific recommendations"""
        if from_port == 22:
            return "Use Systems Manager Session Manager instead"
        elif from_port == 3389:
            return "Use AWS Systems Manager for Windows access"
        elif from_port in [3306, 5432, 27017, 6379]:
            return "Move database to private subnet"
        elif protocol == '-1':
            return "Restrict to specific ports and IPs"
        else:
            return "Limit to known IP ranges or use VPN"
    
    def is_overly_broad_cidr(self, cidr):
        """Check if CIDR block is too large"""
        if not cidr or '/' not in cidr:
            return False
            
        try:
            prefix = int(cidr.split('/')[1])
            return prefix < 16 and cidr != '0.0.0.0/0'
        except:
            return False
    
    def is_potentially_unused(self, sg):
        """Check if security group might be unused"""
        # Skip default security groups
        if sg.get('GroupName') == 'default':
            return False
            
        # In real implementation, would check EC2 instances, RDS, etc.
        # For now, this is a placeholder
        return len(sg.get('IpPermissions', [])) == 0
    
    def audit_region(self, region):
        """Audit all security groups in a region"""
        print(f"Scanning {region}...")
        
        try:
            if self.profile:
                session = boto3.Session(profile_name=self.profile)
                ec2 = session.client('ec2', region_name=region)
            else:
                ec2 = boto3.client('ec2', region_name=region)
                
            # Get all security groups
            paginator = ec2.get_paginator('describe_security_groups')
            
            for page in paginator.paginate():
                for sg in page['SecurityGroups']:
                    findings = self.analyze_security_group(sg, region)
                    self.findings.extend(findings)
                    
        except Exception as e:
            print(f"Error scanning {region}: {str(e)}")
    
    def generate_report(self):
        """Generate audit report"""
        if not self.findings:
            print("\n✅ No security group issues found!")
            return
            
        # Summary statistics
        print(f"\n🔍 Security Group Audit Complete")
        print(f"{'='*50}")
        print(f"Total findings: {len(self.findings)}")
        
        severity_counts = defaultdict(int)
        type_counts = defaultdict(int)
        
        for finding in self.findings:
            severity_counts[finding['Severity']] += 1
            type_counts[finding['Type']] += 1
        
        print(f"\nBy Severity:")
        for severity in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']:
            if severity in severity_counts:
                print(f"  {severity}: {severity_counts[severity]}")
        
        print(f"\nBy Type:")
        for issue_type, count in type_counts.items():
            print(f"  {issue_type}: {count}")
        
        # Critical findings
        critical = [f for f in self.findings if f['Severity'] == 'CRITICAL']
        if critical:
            print(f"\n❌ CRITICAL FINDINGS (Fix immediately!):")
            for finding in critical[:5]:  # Show first 5
                print(f"\n  Region: {finding['Region']}")
                print(f"  Security Group: {finding['SecurityGroupName']} ({finding['SecurityGroupId']})")
                print(f"  Issue: {finding['Description']}")
                print(f"  Fix: {finding['Recommendation']}")
    
    def export_findings(self, format='csv'):
        """Export findings to file"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        if format == 'csv':
            filename = f'sg_audit_{timestamp}.csv'
            with open(filename, 'w', newline='') as f:
                if self.findings:
                    writer = csv.DictWriter(f, fieldnames=self.findings[0].keys())
                    writer.writeheader()
                    writer.writerows(self.findings)
            print(f"\n📄 Full report saved to: {filename}")
            
        elif format == 'json':
            filename = f'sg_audit_{timestamp}.json'
            with open(filename, 'w') as f:
                json.dump(self.findings, f, indent=2)
            print(f"\n📄 Full report saved to: {filename}")
    
    def get_remediation_script(self):
        """Generate remediation commands"""
        print("\n🔧 Quick Fix Commands:")
        print("# WARNING: Review before running!")
        
        seen_groups = set()
        for finding in self.findings:
            if finding['Severity'] in ['CRITICAL', 'HIGH'] and finding['Type'] == 'Open to Internet':
                sg_id = finding['SecurityGroupId']
                region = finding['Region']
                
                if (sg_id, region) not in seen_groups:
                    seen_groups.add((sg_id, region))
                    print(f"\n# Fix {finding['SecurityGroupName']} in {region}")
                    print(f"# Current: {finding['Description']}")
                    
                    # Generate revoke command
                    if finding['Protocol'] == '-1':
                        protocol = 'all'
                    else:
                        protocol = finding['Protocol']
                        
                    print(f"aws ec2 revoke-security-group-ingress \\")
                    print(f"  --region {region} \\")
                    print(f"  --group-id {sg_id} \\")
                    print(f"  --protocol {protocol} \\")
                    print(f"  --port {finding['PortRange']} \\")
                    print(f"  --cidr 0.0.0.0/0")

def main():
    parser = argparse.ArgumentParser(description='Audit AWS Security Groups')
    parser.add_argument('--profile', help='AWS profile to use')
    parser.add_argument('--region', help='Specific region to audit')
    parser.add_argument('--export', choices=['csv', 'json'], default='csv', 
                       help='Export format')
    parser.add_argument('--fix-commands', action='store_true',
                       help='Generate remediation commands')
    
    args = parser.parse_args()
    
    # Initialize auditor
    auditor = SecurityGroupAuditor(profile=args.profile)
    
    # Get regions to scan
    if args.region:
        regions = [args.region]
    else:
        print("🌍 Discovering all AWS regions...")
        regions = auditor.get_all_regions()
        print(f"Found {len(regions)} regions to scan")
    
    # Run audit
    for region in regions:
        auditor.audit_region(region)
    
    # Generate report
    auditor.generate_report()
    
    # Export findings
    if auditor.findings:
        auditor.export_findings(args.export)
        
        # Generate fix commands if requested
        if args.fix_commands:
            auditor.get_remediation_script()

if __name__ == '__main__':
    main()

How to Use the Script

Basic Usage

# Scan all regions with default credentials
python sg_audit.py

# Use specific AWS profile
python sg_audit.py --profile production

# Scan specific region only
python sg_audit.py --region us-east-1

# Generate remediation commands
python sg_audit.py --fix-commands

# Export as JSON
python sg_audit.py --export json

Understanding the Output

🔍 Security Group Audit Complete
==================================================
Total findings: 47

By Severity:
  CRITICAL: 3
  HIGH: 12
  MEDIUM: 25
  LOW: 7

By Type:
  Open to Internet: 35
  Overly Broad CIDR: 8
  Potentially Unused: 4

❌ CRITICAL FINDINGS (Fix immediately!):

  Region: us-east-1
  Security Group: production-db-sg (sg-0123456789abcdef0)
  Issue: All traffic allowed from internet!
  Fix: Restrict to specific ports and IPs

Common Security Group Mistakes This Finds

1. The “Temporary” Debug Rule

Port 22 (SSH) open to 0.0.0.0/0
Added: "Just for debugging"
Reality: Still there 6 months later
Risk: Brute force attacks, unauthorized access

2. The Copy-Paste Disaster

Copied security group from tutorial
Contains: Port 9200 (Elasticsearch) open
Impact: Data breach waiting to happen
Fix: Review all rules before using

3. The “It Was Working” Rule

All ports (0-65535) open to 0.0.0.0/0
Reason: "Couldn't figure out which port"
Impact: Complete exposure
Fix: Use AWS VPC Flow Logs to find actual ports

Remediation Strategies

Quick Wins (Do Now)

  1. Remove 0.0.0.0/0 rules for databases
# Find and fix database exposures
aws ec2 describe-security-groups \
  --filters "Name=ip-permission.from-port,Values=3306,5432,27017" \
  --query 'SecurityGroups[?IpPermissions[?IpRanges[?CidrIp==`0.0.0.0/0`]]]'
  1. Replace SSH access with Systems Manager
# No more port 22!
aws ssm start-session --target i-1234567890abcdef0
  1. Use security group references
# Instead of IP ranges, reference other security groups
aws ec2 authorize-security-group-ingress \
  --group-id sg-target \
  --source-group sg-source \
  --protocol tcp \
  --port 443

Medium-term Fixes

  1. Implement least-privilege groups
# security_group_builder.py
def create_web_server_sg():
    return {
        'GroupName': 'web-servers',
        'Rules': [
            {'Port': 443, 'Source': 'cloudfront'},
            {'Port': 80, 'Source': 'cloudfront', 'Redirect': 443}
        ]
    }

def create_app_server_sg():
    return {
        'GroupName': 'app-servers',
        'Rules': [
            {'Port': 8080, 'Source': 'web-servers-sg'}
        ]
    }
  1. Automate security group reviews
# .github/workflows/sg-audit.yml
name: Weekly Security Group Audit
on:
  schedule:
    - cron: '0 9 * * 1'  # Mondays at 9 AM

jobs:
  audit:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Run SG Audit
        run: |
          python sg_audit.py --export csv
          python send_report.py

Extending the Script

Add Custom Checks

def check_compliance_rules(self, sg, region):
    """Add your compliance-specific checks"""
    findings = []
    
    # SOC 2: No direct database access
    if self.has_database_ports_exposed(sg):
        findings.append({
            'Type': 'SOC2 Violation',
            'Description': 'Database ports exposed',
            'Severity': 'HIGH'
        })
    
    # PCI: Encryption required
    if self.has_unencrypted_protocols(sg):
        findings.append({
            'Type': 'PCI Violation',
            'Description': 'Unencrypted protocols allowed',
            'Severity': 'HIGH'
        })
    
    return findings

Add Slack Notifications

def send_slack_alert(findings):
    critical = [f for f in findings if f['Severity'] == 'CRITICAL']
    if critical:
        webhook_url = os.environ['SLACK_WEBHOOK']
        message = {
            'text': f'🚨 {len(critical)} critical security group issues found!',
            'attachments': [{
                'color': 'danger',
                'fields': [{
                    'title': f['SecurityGroupName'],
                    'value': f['Description']
                } for f in critical[:3]]
            }]
        }
        requests.post(webhook_url, json=message)

Prevention: Security Groups as Code

Stop security group drift with Infrastructure as Code:

# security_groups.tf
resource "aws_security_group" "web" {
  name_prefix = "web-"
  description = "Security group for web servers"
  vpc_id      = aws_vpc.main.id

  ingress {
    description = "HTTPS from CloudFront"
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    prefix_list_ids = [data.aws_prefix_list.cloudfront.id]
  }

  egress {
    description = "HTTPS to anywhere (for updates)"
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = {
    Name = "web-servers"
    Compliance = "SOC2"
  }
}

# Prevent changes outside Terraform
resource "aws_config_config_rule" "sg_changes" {
  name = "detect-sg-changes"
  
  source {
    owner             = "AWS"
    source_identifier = "SECURITY_GROUP_CHANGES"
  }
}

Conclusion

Security groups are your first line of defense in AWS. This script helps you find the holes before attackers do. Run it weekly, fix the critical issues immediately, and gradually improve your security posture.

Remember: The best security group is one that’s regularly audited and only allows what’s absolutely necessary.

Next steps:

  1. Run the script now (seriously, it takes 2 minutes)
  2. Fix any CRITICAL findings today
  3. Schedule weekly audits
  4. Move to Infrastructure as Code

Want continuous security group monitoring without running scripts? Tools like PathShield automatically detect risky security group changes in real-time and alert you before they become breaches.

Back to Blog

Related Posts

View All Posts »