Β· AWS Security Β· 23 min read
Complete Guide to AWS VPC Security Groups vs NACLs
Master AWS network security by understanding the differences between Security Groups and NACLs. Complete guide with real-world examples, automation scripts, and security best practices for startups.
AWS provides two primary layers of network security for your VPC: Security Groups and Network Access Control Lists (NACLs). Understanding when and how to use each is crucial for building secure, scalable infrastructure. Many teams struggle with this distinction, leading to either over-complex configurations or security gaps.
In this comprehensive guide, weβll explore the fundamental differences between Security Groups and NACLs, provide real-world implementation examples, and show you how to build layered network security that scales with your startupβs growth.
Understanding the Security Layers
AWS network security follows a defense-in-depth approach with multiple layers of protection:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Internet β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββΌββββββββββββββββββββββββββββββββ
β Internet Gateway β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββΌββββββββββββββββββββββββββββββββ
β NACL β ββββ Subnet Level
β (Network Access Control List) β Stateless
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββ Allow/Deny
β
βββββββββββββββββββββββΌββββββββββββββββββββββββββββββββ
β Subnet β
β βββββββββββββββββββββββββββββββββββββββββββββββ β
β β Security Group β β ββββ Instance Level
β β (Stateful) β β Stateful
β β βββββββββββββββββββββββββββββββββββββββ β β Allow Only
β β β EC2 Instance β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Security Groups operate at the instance level as virtual firewalls, while NACLs operate at the subnet level as an additional layer of protection.
Security Groups Deep Dive
Security Groups act as stateful firewalls for your EC2 instances. Hereβs what makes them unique:
Key Characteristics
Stateful: When you allow inbound traffic, the corresponding outbound response traffic is automatically allowed, regardless of outbound rules.
Allow Rules Only: You can only create allow rules; everything else is implicitly denied.
Instance Level: Each EC2 instance can have multiple security groups, and rules are evaluated collectively.
Dynamic: Changes take effect immediately without instance restart.
Security Group Implementation
Letβs build a comprehensive security group setup for a typical web application:
import boto3
import json
from datetime import datetime
class SecurityGroupManager:
def __init__(self, region='us-east-1'):
self.ec2 = boto3.client('ec2', region_name=region)
self.region = region
def create_layered_security_groups(self, vpc_id, environment='production'):
"""Create a complete set of security groups for a web application"""
security_groups = {}
# 1. Web Server Security Group
web_sg = self.create_web_server_sg(vpc_id, environment)
security_groups['web'] = web_sg
# 2. Application Server Security Group
app_sg = self.create_app_server_sg(vpc_id, web_sg['GroupId'], environment)
security_groups['app'] = app_sg
# 3. Database Security Group
db_sg = self.create_database_sg(vpc_id, app_sg['GroupId'], environment)
security_groups['database'] = db_sg
# 4. Load Balancer Security Group
lb_sg = self.create_load_balancer_sg(vpc_id, environment)
security_groups['load_balancer'] = lb_sg
# 5. Bastion Host Security Group
bastion_sg = self.create_bastion_sg(vpc_id, environment)
security_groups['bastion'] = bastion_sg
return security_groups
def create_web_server_sg(self, vpc_id, environment):
"""Create security group for web servers"""
sg_name = f'{environment}-web-servers'
try:
response = self.ec2.create_security_group(
GroupName=sg_name,
Description=f'Security group for {environment} web servers',
VpcId=vpc_id
)
sg_id = response['GroupId']
# Add tags
self.ec2.create_tags(
Resources=[sg_id],
Tags=[
{'Key': 'Name', 'Value': sg_name},
{'Key': 'Environment', 'Value': environment},
{'Key': 'Tier', 'Value': 'web'},
{'Key': 'CreatedBy', 'Value': 'SecurityGroupManager'},
{'Key': 'CreatedAt', 'Value': datetime.now().isoformat()}
]
)
# Inbound rules for web servers
inbound_rules = [
{
'IpProtocol': 'tcp',
'FromPort': 80,
'ToPort': 80,
'IpRanges': [{'CidrIp': '0.0.0.0/0', 'Description': 'HTTP from anywhere'}]
},
{
'IpProtocol': 'tcp',
'FromPort': 443,
'ToPort': 443,
'IpRanges': [{'CidrIp': '0.0.0.0/0', 'Description': 'HTTPS from anywhere'}]
}
]
# Add inbound rules
self.ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=inbound_rules
)
# Outbound rules (web servers need to access app servers and external APIs)
outbound_rules = [
{
'IpProtocol': 'tcp',
'FromPort': 80,
'ToPort': 80,
'IpRanges': [{'CidrIp': '0.0.0.0/0', 'Description': 'HTTP to anywhere'}]
},
{
'IpProtocol': 'tcp',
'FromPort': 443,
'ToPort': 443,
'IpRanges': [{'CidrIp': '0.0.0.0/0', 'Description': 'HTTPS to anywhere'}]
},
{
'IpProtocol': 'tcp',
'FromPort': 8080,
'ToPort': 8080,
'IpRanges': [{'CidrIp': '10.0.0.0/8', 'Description': 'Internal app server access'}]
}
]
# Remove default outbound rule first
self.ec2.revoke_security_group_egress(
GroupId=sg_id,
IpPermissions=[
{
'IpProtocol': '-1',
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
}
]
)
# Add specific outbound rules
self.ec2.authorize_security_group_egress(
GroupId=sg_id,
IpPermissions=outbound_rules
)
print(f"β
Created web server security group: {sg_id}")
return {'GroupId': sg_id, 'GroupName': sg_name}
except Exception as e:
print(f"β Error creating web server security group: {e}")
return None
def create_app_server_sg(self, vpc_id, web_sg_id, environment):
"""Create security group for application servers"""
sg_name = f'{environment}-app-servers'
try:
response = self.ec2.create_security_group(
GroupName=sg_name,
Description=f'Security group for {environment} application servers',
VpcId=vpc_id
)
sg_id = response['GroupId']
# Add tags
self.ec2.create_tags(
Resources=[sg_id],
Tags=[
{'Key': 'Name', 'Value': sg_name},
{'Key': 'Environment', 'Value': environment},
{'Key': 'Tier', 'Value': 'application'},
{'Key': 'CreatedBy', 'Value': 'SecurityGroupManager'}
]
)
# Inbound rules - only from web servers
inbound_rules = [
{
'IpProtocol': 'tcp',
'FromPort': 8080,
'ToPort': 8080,
'UserIdGroupPairs': [
{
'GroupId': web_sg_id,
'Description': 'HTTP from web servers'
}
]
},
{
'IpProtocol': 'tcp',
'FromPort': 8443,
'ToPort': 8443,
'UserIdGroupPairs': [
{
'GroupId': web_sg_id,
'Description': 'HTTPS from web servers'
}
]
}
]
self.ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=inbound_rules
)
print(f"β
Created app server security group: {sg_id}")
return {'GroupId': sg_id, 'GroupName': sg_name}
except Exception as e:
print(f"β Error creating app server security group: {e}")
return None
def create_database_sg(self, vpc_id, app_sg_id, environment):
"""Create security group for database servers"""
sg_name = f'{environment}-database-servers'
try:
response = self.ec2.create_security_group(
GroupName=sg_name,
Description=f'Security group for {environment} database servers',
VpcId=vpc_id
)
sg_id = response['GroupId']
# Add tags
self.ec2.create_tags(
Resources=[sg_id],
Tags=[
{'Key': 'Name', 'Value': sg_name},
{'Key': 'Environment', 'Value': environment},
{'Key': 'Tier', 'Value': 'database'},
{'Key': 'CreatedBy', 'Value': 'SecurityGroupManager'}
]
)
# Inbound rules - only from application servers
inbound_rules = [
{
'IpProtocol': 'tcp',
'FromPort': 3306, # MySQL
'ToPort': 3306,
'UserIdGroupPairs': [
{
'GroupId': app_sg_id,
'Description': 'MySQL from app servers'
}
]
},
{
'IpProtocol': 'tcp',
'FromPort': 5432, # PostgreSQL
'ToPort': 5432,
'UserIdGroupPairs': [
{
'GroupId': app_sg_id,
'Description': 'PostgreSQL from app servers'
}
]
}
]
self.ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=inbound_rules
)
# Remove default outbound rule - databases shouldn't initiate connections
self.ec2.revoke_security_group_egress(
GroupId=sg_id,
IpPermissions=[
{
'IpProtocol': '-1',
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
}
]
)
print(f"β
Created database security group: {sg_id}")
return {'GroupId': sg_id, 'GroupName': sg_name}
except Exception as e:
print(f"β Error creating database security group: {e}")
return None
def create_load_balancer_sg(self, vpc_id, environment):
"""Create security group for load balancers"""
sg_name = f'{environment}-load-balancer'
try:
response = self.ec2.create_security_group(
GroupName=sg_name,
Description=f'Security group for {environment} load balancers',
VpcId=vpc_id
)
sg_id = response['GroupId']
# Add tags
self.ec2.create_tags(
Resources=[sg_id],
Tags=[
{'Key': 'Name', 'Value': sg_name},
{'Key': 'Environment', 'Value': environment},
{'Key': 'Tier', 'Value': 'load-balancer'},
{'Key': 'CreatedBy', 'Value': 'SecurityGroupManager'}
]
)
# Inbound rules - open to internet
inbound_rules = [
{
'IpProtocol': 'tcp',
'FromPort': 80,
'ToPort': 80,
'IpRanges': [{'CidrIp': '0.0.0.0/0', 'Description': 'HTTP from anywhere'}]
},
{
'IpProtocol': 'tcp',
'FromPort': 443,
'ToPort': 443,
'IpRanges': [{'CidrIp': '0.0.0.0/0', 'Description': 'HTTPS from anywhere'}]
}
]
self.ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=inbound_rules
)
print(f"β
Created load balancer security group: {sg_id}")
return {'GroupId': sg_id, 'GroupName': sg_name}
except Exception as e:
print(f"β Error creating load balancer security group: {e}")
return None
def create_bastion_sg(self, vpc_id, environment):
"""Create security group for bastion hosts"""
sg_name = f'{environment}-bastion-host'
try:
response = self.ec2.create_security_group(
GroupName=sg_name,
Description=f'Security group for {environment} bastion hosts',
VpcId=vpc_id
)
sg_id = response['GroupId']
# Add tags
self.ec2.create_tags(
Resources=[sg_id],
Tags=[
{'Key': 'Name', 'Value': sg_name},
{'Key': 'Environment', 'Value': environment},
{'Key': 'Tier', 'Value': 'bastion'},
{'Key': 'CreatedBy', 'Value': 'SecurityGroupManager'}
]
)
# Inbound rules - SSH from specific IP ranges only
office_ip = '203.0.113.0/24' # Replace with your office IP
inbound_rules = [
{
'IpProtocol': 'tcp',
'FromPort': 22,
'ToPort': 22,
'IpRanges': [
{
'CidrIp': office_ip,
'Description': 'SSH from office network'
}
]
}
]
self.ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=inbound_rules
)
print(f"β
Created bastion security group: {sg_id}")
return {'GroupId': sg_id, 'GroupName': sg_name}
except Exception as e:
print(f"β Error creating bastion security group: {e}")
return None
Network ACLs (NACLs) Deep Dive
NACLs provide subnet-level network filtering with different characteristics:
Key Characteristics
Stateless: Inbound and outbound rules are evaluated independently. You must explicitly allow both directions.
Allow and Deny Rules: You can create both allow and deny rules, processed in rule number order.
Subnet Level: Applied to all instances in a subnet automatically.
Rule Numbers: Rules are processed in order from lowest to highest number.
NACL Implementation
Letβs implement NACLs for a multi-tier architecture:
class NetworkAclManager:
def __init__(self, region='us-east-1'):
self.ec2 = boto3.client('ec2', region_name=region)
self.region = region
def create_layered_nacls(self, vpc_id, environment='production'):
"""Create NACLs for different subnet tiers"""
nacls = {}
# 1. Public Subnet NACL
public_nacl = self.create_public_subnet_nacl(vpc_id, environment)
nacls['public'] = public_nacl
# 2. Private Application Subnet NACL
private_app_nacl = self.create_private_app_nacl(vpc_id, environment)
nacls['private_app'] = private_app_nacl
# 3. Database Subnet NACL
db_nacl = self.create_database_nacl(vpc_id, environment)
nacls['database'] = db_nacl
return nacls
def create_public_subnet_nacl(self, vpc_id, environment):
"""Create NACL for public subnets (web tier)"""
nacl_name = f'{environment}-public-subnet-nacl'
try:
# Create NACL
response = self.ec2.create_network_acl(VpcId=vpc_id)
nacl_id = response['NetworkAcl']['NetworkAclId']
# Add tags
self.ec2.create_tags(
Resources=[nacl_id],
Tags=[
{'Key': 'Name', 'Value': nacl_name},
{'Key': 'Environment', 'Value': environment},
{'Key': 'Tier', 'Value': 'public'},
{'Key': 'CreatedBy', 'Value': 'NetworkAclManager'}
]
)
# Inbound rules
inbound_rules = [
# HTTP from anywhere
{
'RuleNumber': 100,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 80, 'To': 80},
'CidrBlock': '0.0.0.0/0'
},
# HTTPS from anywhere
{
'RuleNumber': 110,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 443, 'To': 443},
'CidrBlock': '0.0.0.0/0'
},
# SSH from office network only
{
'RuleNumber': 120,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 22, 'To': 22},
'CidrBlock': '203.0.113.0/24' # Office IP range
},
# Ephemeral ports for return traffic
{
'RuleNumber': 130,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 1024, 'To': 65535},
'CidrBlock': '0.0.0.0/0'
},
# Block known malicious networks
{
'RuleNumber': 50,
'Protocol': '-1', # All protocols
'RuleAction': 'DENY',
'CidrBlock': '192.0.2.0/24' # Example malicious network
}
]
# Add inbound rules
for rule in inbound_rules:
self.ec2.create_network_acl_entry(
NetworkAclId=nacl_id,
**rule
)
# Outbound rules
outbound_rules = [
# HTTP to anywhere
{
'RuleNumber': 100,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 80, 'To': 80},
'CidrBlock': '0.0.0.0/0',
'Egress': True
},
# HTTPS to anywhere
{
'RuleNumber': 110,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 443, 'To': 443},
'CidrBlock': '0.0.0.0/0',
'Egress': True
},
# Internal communication to app tier
{
'RuleNumber': 120,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 8080, 'To': 8080},
'CidrBlock': '10.0.2.0/24', # Private app subnet
'Egress': True
},
# Ephemeral ports for responses
{
'RuleNumber': 130,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 1024, 'To': 65535},
'CidrBlock': '0.0.0.0/0',
'Egress': True
}
]
# Add outbound rules
for rule in outbound_rules:
self.ec2.create_network_acl_entry(
NetworkAclId=nacl_id,
**rule
)
print(f"β
Created public subnet NACL: {nacl_id}")
return {'NetworkAclId': nacl_id, 'Name': nacl_name}
except Exception as e:
print(f"β Error creating public subnet NACL: {e}")
return None
def create_private_app_nacl(self, vpc_id, environment):
"""Create NACL for private application subnets"""
nacl_name = f'{environment}-private-app-nacl'
try:
response = self.ec2.create_network_acl(VpcId=vpc_id)
nacl_id = response['NetworkAcl']['NetworkAclId']
# Add tags
self.ec2.create_tags(
Resources=[nacl_id],
Tags=[
{'Key': 'Name', 'Value': nacl_name},
{'Key': 'Environment', 'Value': environment},
{'Key': 'Tier', 'Value': 'private-app'}
]
)
# Inbound rules - only from public subnet
inbound_rules = [
# HTTP from public subnet
{
'RuleNumber': 100,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 8080, 'To': 8080},
'CidrBlock': '10.0.1.0/24' # Public subnet CIDR
},
# HTTPS from public subnet
{
'RuleNumber': 110,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 8443, 'To': 8443},
'CidrBlock': '10.0.1.0/24' # Public subnet CIDR
},
# SSH from bastion host
{
'RuleNumber': 120,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 22, 'To': 22},
'CidrBlock': '10.0.1.0/24' # Public subnet with bastion
},
# Ephemeral ports
{
'RuleNumber': 130,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 1024, 'To': 65535},
'CidrBlock': '0.0.0.0/0'
}
]
for rule in inbound_rules:
self.ec2.create_network_acl_entry(
NetworkAclId=nacl_id,
**rule
)
# Outbound rules
outbound_rules = [
# HTTPS to internet (for API calls, updates)
{
'RuleNumber': 100,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 443, 'To': 443},
'CidrBlock': '0.0.0.0/0',
'Egress': True
},
# Database access
{
'RuleNumber': 110,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 3306, 'To': 3306},
'CidrBlock': '10.0.3.0/24', # Database subnet
'Egress': True
},
# PostgreSQL access
{
'RuleNumber': 115,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 5432, 'To': 5432},
'CidrBlock': '10.0.3.0/24', # Database subnet
'Egress': True
},
# Ephemeral ports
{
'RuleNumber': 120,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 1024, 'To': 65535},
'CidrBlock': '0.0.0.0/0',
'Egress': True
},
# Block outbound to known malicious IPs
{
'RuleNumber': 50,
'Protocol': '-1',
'RuleAction': 'DENY',
'CidrBlock': '192.0.2.0/24', # Malicious network
'Egress': True
}
]
for rule in outbound_rules:
self.ec2.create_network_acl_entry(
NetworkAclId=nacl_id,
**rule
)
print(f"β
Created private app NACL: {nacl_id}")
return {'NetworkAclId': nacl_id, 'Name': nacl_name}
except Exception as e:
print(f"β Error creating private app NACL: {e}")
return None
def create_database_nacl(self, vpc_id, environment):
"""Create NACL for database subnets"""
nacl_name = f'{environment}-database-nacl'
try:
response = self.ec2.create_network_acl(VpcId=vpc_id)
nacl_id = response['NetworkAcl']['NetworkAclId']
# Add tags
self.ec2.create_tags(
Resources=[nacl_id],
Tags=[
{'Key': 'Name', 'Value': nacl_name},
{'Key': 'Environment', 'Value': environment},
{'Key': 'Tier', 'Value': 'database'}
]
)
# Inbound rules - only from app tier
inbound_rules = [
# MySQL from app tier
{
'RuleNumber': 100,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 3306, 'To': 3306},
'CidrBlock': '10.0.2.0/24' # App subnet CIDR
},
# PostgreSQL from app tier
{
'RuleNumber': 110,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 5432, 'To': 5432},
'CidrBlock': '10.0.2.0/24' # App subnet CIDR
},
# SSH from bastion (for admin access)
{
'RuleNumber': 120,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 22, 'To': 22},
'CidrBlock': '10.0.1.0/24' # Public subnet with bastion
}
]
for rule in inbound_rules:
self.ec2.create_network_acl_entry(
NetworkAclId=nacl_id,
**rule
)
# Outbound rules - minimal outbound access
outbound_rules = [
# Ephemeral ports for return traffic
{
'RuleNumber': 100,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 1024, 'To': 65535},
'CidrBlock': '10.0.2.0/24', # App subnet
'Egress': True
},
# HTTPS for software updates (restricted)
{
'RuleNumber': 110,
'Protocol': '6', # TCP
'RuleAction': 'ALLOW',
'PortRange': {'From': 443, 'To': 443},
'CidrBlock': '0.0.0.0/0',
'Egress': True
}
]
for rule in outbound_rules:
self.ec2.create_network_acl_entry(
NetworkAclId=nacl_id,
**rule
)
print(f"β
Created database NACL: {nacl_id}")
return {'NetworkAclId': nacl_id, 'Name': nacl_name}
except Exception as e:
print(f"β Error creating database NACL: {e}")
return None
def associate_nacl_with_subnet(self, nacl_id, subnet_id):
"""Associate NACL with subnet"""
try:
# Get current association
response = self.ec2.describe_network_acls(
Filters=[
{'Name': 'association.subnet-id', 'Values': [subnet_id]}
]
)
# Find current association
current_association_id = None
for nacl in response['NetworkAcls']:
for association in nacl['Associations']:
if association['SubnetId'] == subnet_id:
current_association_id = association['NetworkAclAssociationId']
break
if current_association_id:
# Replace association
self.ec2.replace_network_acl_association(
AssociationId=current_association_id,
NetworkAclId=nacl_id
)
print(f"β
Associated NACL {nacl_id} with subnet {subnet_id}")
else:
print(f"β Could not find current association for subnet {subnet_id}")
except Exception as e:
print(f"β Error associating NACL: {e}")
Security Groups vs NACLs: Key Differences
Feature | Security Groups | NACLs |
---|---|---|
Level | Instance | Subnet |
State | Stateful | Stateless |
Rules | Allow only | Allow & Deny |
Processing | All rules evaluated | Rules processed in order |
Application | Selected per instance | Applied to all instances in subnet |
Default Action | Deny all | Allow all (default NACL) |
Advanced Configuration Patterns
1. Multi-Environment Setup
Create environment-specific configurations:
def create_multi_environment_setup(self):
"""Create security groups and NACLs for multiple environments"""
environments = ['development', 'staging', 'production']
vpc_configs = {
'development': {'vpc_id': 'vpc-dev123', 'cidr': '10.0.0.0/16'},
'staging': {'vpc_id': 'vpc-staging456', 'cidr': '10.1.0.0/16'},
'production': {'vpc_id': 'vpc-prod789', 'cidr': '10.2.0.0/16'}
}
sg_manager = SecurityGroupManager()
nacl_manager = NetworkAclManager()
all_configs = {}
for env in environments:
print(f"π§ Setting up {env} environment...")
vpc_id = vpc_configs[env]['vpc_id']
# Create security groups
security_groups = sg_manager.create_layered_security_groups(vpc_id, env)
# Create NACLs
nacls = nacl_manager.create_layered_nacls(vpc_id, env)
# Environment-specific rules
if env == 'production':
# More restrictive rules for production
sg_manager.add_production_restrictions(security_groups)
nacl_manager.add_production_restrictions(nacls)
elif env == 'development':
# More permissive rules for development
sg_manager.add_development_permissions(security_groups)
all_configs[env] = {
'security_groups': security_groups,
'nacls': nacls
}
return all_configs
def add_production_restrictions(self, security_groups):
"""Add additional restrictions for production environment"""
# Add time-based access restrictions
current_hour = datetime.now().hour
# Disable SSH access outside business hours
if current_hour < 9 or current_hour > 17:
for sg_name, sg_info in security_groups.items():
if 'bastion' in sg_name:
# Temporarily remove SSH access
self.temporarily_revoke_ssh_access(sg_info['GroupId'])
# Add additional monitoring
self.enable_enhanced_monitoring(security_groups)
def temporarily_revoke_ssh_access(self, sg_id):
"""Temporarily revoke SSH access"""
try:
# Get current rules
response = self.ec2.describe_security_groups(GroupIds=[sg_id])
sg = response['SecurityGroups'][0]
# Find SSH rules
ssh_rules = []
for rule in sg['IpPermissions']:
if rule.get('FromPort') == 22 and rule.get('ToPort') == 22:
ssh_rules.append(rule)
# Revoke SSH rules
if ssh_rules:
self.ec2.revoke_security_group_ingress(
GroupId=sg_id,
IpPermissions=ssh_rules
)
print(f"β° Temporarily revoked SSH access for {sg_id}")
# Schedule re-enabling (in production, use CloudWatch Events)
# This is a simplified example
except Exception as e:
print(f"β Error revoking SSH access: {e}")
2. Dynamic Security Group Management
Implement dynamic security group updates based on threat intelligence:
class DynamicSecurityManager:
def __init__(self, region='us-east-1'):
self.ec2 = boto3.client('ec2', region_name=region)
self.s3 = boto3.client('s3', region_name=region)
def update_security_groups_with_threat_intel(self):
"""Update security groups based on threat intelligence"""
# Get latest threat intelligence
malicious_ips = self.get_threat_intelligence()
# Get all security groups
response = self.ec2.describe_security_groups()
for sg in response['SecurityGroups']:
if self.should_update_sg(sg):
self.block_malicious_ips(sg['GroupId'], malicious_ips)
def get_threat_intelligence(self):
"""Fetch latest threat intelligence"""
# In production, integrate with threat intelligence feeds
malicious_ips = [
'192.0.2.1',
'198.51.100.1',
'203.0.113.1'
]
return malicious_ips
def should_update_sg(self, sg):
"""Determine if security group should be updated"""
# Skip default VPC security groups
if sg['GroupName'] == 'default':
return False
# Only update tagged security groups
for tag in sg.get('Tags', []):
if tag['Key'] == 'AutoUpdate' and tag['Value'] == 'true':
return True
return False
def block_malicious_ips(self, sg_id, malicious_ips):
"""Add rules to block malicious IPs"""
try:
# Get current rules to avoid duplicates
response = self.ec2.describe_security_groups(GroupIds=[sg_id])
current_rules = response['SecurityGroups'][0]['IpPermissions']
# Check which IPs are already blocked
blocked_ips = set()
for rule in current_rules:
for ip_range in rule.get('IpRanges', []):
blocked_ips.add(ip_range['CidrIp'].split('/')[0])
# Add rules for new malicious IPs
new_rules = []
for ip in malicious_ips:
if ip not in blocked_ips:
# This approach blocks by denying in NACL,
# Security Groups can't deny, so we'd need a different approach
# For now, we'll log these for NACL updates
print(f"β οΈ New malicious IP detected: {ip}")
# In production, you'd update associated NACLs or use AWS WAF
except Exception as e:
print(f"β Error updating security group {sg_id}: {e}")
Monitoring and Compliance
1. Security Group and NACL Auditing
Create comprehensive auditing for network security:
class NetworkSecurityAuditor:
def __init__(self, region='us-east-1'):
self.ec2 = boto3.client('ec2', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
def audit_security_groups(self):
"""Comprehensive security group audit"""
print("π Starting security group audit...")
response = self.ec2.describe_security_groups()
audit_results = {
'overly_permissive': [],
'unused': [],
'untagged': [],
'compliance_issues': []
}
for sg in response['SecurityGroups']:
sg_id = sg['GroupId']
sg_name = sg['GroupName']
# Check for overly permissive rules
if self.is_overly_permissive(sg):
audit_results['overly_permissive'].append({
'id': sg_id,
'name': sg_name,
'issues': self.get_permissive_issues(sg)
})
# Check if security group is unused
if self.is_unused_security_group(sg_id):
audit_results['unused'].append({
'id': sg_id,
'name': sg_name
})
# Check for proper tagging
if not self.is_properly_tagged(sg):
audit_results['untagged'].append({
'id': sg_id,
'name': sg_name
})
# Check compliance requirements
compliance_issues = self.check_compliance(sg)
if compliance_issues:
audit_results['compliance_issues'].append({
'id': sg_id,
'name': sg_name,
'issues': compliance_issues
})
# Generate audit report
self.generate_audit_report(audit_results)
return audit_results
def is_overly_permissive(self, sg):
"""Check if security group has overly permissive rules"""
for rule in sg['IpPermissions']:
# Check for 0.0.0.0/0 access on sensitive ports
for ip_range in rule.get('IpRanges', []):
if ip_range['CidrIp'] == '0.0.0.0/0':
# SSH, RDP, Database ports should not be open to the world
sensitive_ports = [22, 3389, 3306, 5432, 1433, 27017]
from_port = rule.get('FromPort', 0)
to_port = rule.get('ToPort', 65535)
for sensitive_port in sensitive_ports:
if from_port <= sensitive_port <= to_port:
return True
return False
def get_permissive_issues(self, sg):
"""Get specific permissive issues"""
issues = []
for rule in sg['IpPermissions']:
for ip_range in rule.get('IpRanges', []):
if ip_range['CidrIp'] == '0.0.0.0/0':
from_port = rule.get('FromPort', 0)
to_port = rule.get('ToPort', 65535)
if from_port == 22:
issues.append('SSH (port 22) open to 0.0.0.0/0')
elif from_port == 3389:
issues.append('RDP (port 3389) open to 0.0.0.0/0')
elif from_port == 3306:
issues.append('MySQL (port 3306) open to 0.0.0.0/0')
elif from_port <= 22 <= to_port:
issues.append(f'Port range {from_port}-{to_port} includes SSH (22)')
return issues
def is_unused_security_group(self, sg_id):
"""Check if security group is unused"""
try:
# Check EC2 instances
instances = self.ec2.describe_instances(
Filters=[
{'Name': 'instance.group-id', 'Values': [sg_id]}
]
)
if instances['Reservations']:
return False
# Check ELBs
# Note: You'd need to check ELBs, RDS, etc. in production
# Check if referenced by other security groups
all_sgs = self.ec2.describe_security_groups()
for sg in all_sgs['SecurityGroups']:
for rule in sg['IpPermissions']:
for group_pair in rule.get('UserIdGroupPairs', []):
if group_pair['GroupId'] == sg_id:
return False
return True
except Exception as e:
print(f"Error checking if SG {sg_id} is unused: {e}")
return False
def is_properly_tagged(self, sg):
"""Check if security group has required tags"""
required_tags = ['Environment', 'Owner', 'Project']
sg_tags = {tag['Key']: tag['Value'] for tag in sg.get('Tags', [])}
for required_tag in required_tags:
if required_tag not in sg_tags:
return False
return True
def check_compliance(self, sg):
"""Check compliance with security standards"""
issues = []
# PCI DSS compliance check
if self.has_pci_compliance_issues(sg):
issues.append('PCI DSS compliance violation')
# SOC 2 compliance check
if self.has_soc2_compliance_issues(sg):
issues.append('SOC 2 compliance violation')
return issues
def has_pci_compliance_issues(self, sg):
"""Check for PCI DSS compliance issues"""
# PCI DSS requires strict network segmentation
for rule in sg['IpPermissions']:
for ip_range in rule.get('IpRanges', []):
if ip_range['CidrIp'] == '0.0.0.0/0':
# Any port open to internet is a potential issue
return True
return False
def has_soc2_compliance_issues(self, sg):
"""Check for SOC 2 compliance issues"""
# SOC 2 requires proper access controls
for rule in sg['IpPermissions']:
# Check for administrative ports open to wide ranges
admin_ports = [22, 3389, 5985, 5986] # SSH, RDP, WinRM
from_port = rule.get('FromPort', 0)
to_port = rule.get('ToPort', 65535)
for admin_port in admin_ports:
if from_port <= admin_port <= to_port:
for ip_range in rule.get('IpRanges', []):
# Administrative access should be restricted
cidr = ip_range['CidrIp']
if cidr == '0.0.0.0/0' or '/8' in cidr or '/16' in cidr:
return True
return False
def generate_audit_report(self, audit_results):
"""Generate comprehensive audit report"""
report = {
'audit_timestamp': datetime.now().isoformat(),
'summary': {
'total_overly_permissive': len(audit_results['overly_permissive']),
'total_unused': len(audit_results['unused']),
'total_untagged': len(audit_results['untagged']),
'total_compliance_issues': len(audit_results['compliance_issues'])
},
'details': audit_results,
'recommendations': self.get_recommendations(audit_results)
}
# Store report
report_key = f"security-audit/{datetime.now().strftime('%Y/%m/%d')}/network-audit.json"
try:
self.s3.put_object(
Bucket='security-audit-reports',
Key=report_key,
Body=json.dumps(report, indent=2),
ContentType='application/json'
)
print(f"π Audit report saved to s3://security-audit-reports/{report_key}")
except Exception as e:
print(f"Error saving audit report: {e}")
# Send metrics to CloudWatch
self.send_audit_metrics(audit_results)
return report
def get_recommendations(self, audit_results):
"""Get recommendations based on audit results"""
recommendations = []
if audit_results['overly_permissive']:
recommendations.append({
'priority': 'HIGH',
'action': 'Restrict overly permissive security groups',
'description': 'Remove 0.0.0.0/0 access from sensitive ports'
})
if audit_results['unused']:
recommendations.append({
'priority': 'MEDIUM',
'action': 'Remove unused security groups',
'description': 'Clean up unused security groups to reduce attack surface'
})
if audit_results['untagged']:
recommendations.append({
'priority': 'LOW',
'action': 'Add proper tags to security groups',
'description': 'Ensure all security groups have required tags for governance'
})
return recommendations
def send_audit_metrics(self, audit_results):
"""Send audit metrics to CloudWatch"""
metrics = [
{
'MetricName': 'OverlyPermissiveSecurityGroups',
'Value': len(audit_results['overly_permissive']),
'Unit': 'Count'
},
{
'MetricName': 'UnusedSecurityGroups',
'Value': len(audit_results['unused']),
'Unit': 'Count'
},
{
'MetricName': 'ComplianceIssues',
'Value': len(audit_results['compliance_issues']),
'Unit': 'Count'
}
]
try:
self.cloudwatch.put_metric_data(
Namespace='Security/NetworkAudit',
MetricData=metrics
)
print("π Audit metrics sent to CloudWatch")
except Exception as e:
print(f"Error sending metrics: {e}")
Automated Remediation
1. Automatic Security Group Remediation
Create automated remediation for common security issues:
class SecurityGroupRemediator:
def __init__(self, region='us-east-1'):
self.ec2 = boto3.client('ec2', region_name=region)
self.sns = boto3.client('sns', region_name=region)
def remediate_security_issues(self, audit_results):
"""Automatically remediate security issues"""
remediation_results = {
'remediated': [],
'failed': [],
'manual_review_required': []
}
# Remediate overly permissive security groups
for sg_issue in audit_results['overly_permissive']:
try:
if self.can_auto_remediate(sg_issue):
self.remediate_permissive_sg(sg_issue)
remediation_results['remediated'].append(sg_issue)
else:
remediation_results['manual_review_required'].append(sg_issue)
except Exception as e:
remediation_results['failed'].append({
'sg': sg_issue,
'error': str(e)
})
# Remove unused security groups
for unused_sg in audit_results['unused']:
try:
self.remove_unused_sg(unused_sg)
remediation_results['remediated'].append(unused_sg)
except Exception as e:
remediation_results['failed'].append({
'sg': unused_sg,
'error': str(e)
})
# Send remediation report
self.send_remediation_report(remediation_results)
return remediation_results
def can_auto_remediate(self, sg_issue):
"""Check if security group issue can be auto-remediated"""
# Only auto-remediate if it's a known safe operation
safe_remediations = [
'SSH (port 22) open to 0.0.0.0/0',
'RDP (port 3389) open to 0.0.0.0/0',
'MySQL (port 3306) open to 0.0.0.0/0',
'PostgreSQL (port 5432) open to 0.0.0.0/0'
]
for issue in sg_issue['issues']:
if issue not in safe_remediations:
return False
return True
def remediate_permissive_sg(self, sg_issue):
"""Remediate overly permissive security group"""
sg_id = sg_issue['id']
# Get current rules
response = self.ec2.describe_security_groups(GroupIds=[sg_id])
sg = response['SecurityGroups'][0]
rules_to_revoke = []
rules_to_add = []
for rule in sg['IpPermissions']:
for ip_range in rule.get('IpRanges', []):
if ip_range['CidrIp'] == '0.0.0.0/0':
from_port = rule.get('FromPort')
# Replace with office IP range
if from_port in [22, 3389]: # SSH, RDP
rules_to_revoke.append(rule)
# Create replacement rule with office IP
new_rule = rule.copy()
new_rule['IpRanges'] = [
{
'CidrIp': '203.0.113.0/24', # Office IP range
'Description': 'Access from office network'
}
]
rules_to_add.append(new_rule)
# Remove database access from internet
elif from_port in [3306, 5432, 1433]: # Database ports
rules_to_revoke.append(rule)
# Apply changes
if rules_to_revoke:
self.ec2.revoke_security_group_ingress(
GroupId=sg_id,
IpPermissions=rules_to_revoke
)
if rules_to_add:
self.ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=rules_to_add
)
print(f"β
Remediated security group {sg_id}")
def remove_unused_sg(self, unused_sg):
"""Remove unused security group"""
sg_id = unused_sg['id']
# Double-check it's really unused
if self.is_truly_unused(sg_id):
self.ec2.delete_security_group(GroupId=sg_id)
print(f"ποΈ Removed unused security group {sg_id}")
else:
raise Exception(f"Security group {sg_id} appears to be in use")
def is_truly_unused(self, sg_id):
"""Double-check if security group is truly unused"""
# This would include comprehensive checks across all AWS services
# EC2, ELB, RDS, Lambda, etc.
return True # Simplified for example
def send_remediation_report(self, results):
"""Send remediation report to security team"""
message = {
'remediation_timestamp': datetime.now().isoformat(),
'summary': {
'total_remediated': len(results['remediated']),
'total_failed': len(results['failed']),
'manual_review_required': len(results['manual_review_required'])
},
'details': results
}
try:
self.sns.publish(
TopicArn='arn:aws:sns:us-east-1:*:security-remediation',
Message=json.dumps(message, indent=2),
Subject='Security Group Remediation Report'
)
print("π§ Remediation report sent to security team")
except Exception as e:
print(f"Error sending remediation report: {e}")
Complete Implementation Script
Hereβs a comprehensive script that brings everything together:
#!/usr/bin/env python3
import boto3
import json
import argparse
from datetime import datetime
class ComprehensiveVPCSecurityManager:
def __init__(self, region='us-east-1'):
self.region = region
self.sg_manager = SecurityGroupManager(region)
self.nacl_manager = NetworkAclManager(region)
self.auditor = NetworkSecurityAuditor(region)
self.remediator = SecurityGroupRemediator(region)
def setup_complete_vpc_security(self, vpc_id, environment='production'):
"""Set up complete VPC security with Security Groups and NACLs"""
print(f"π§ Setting up complete VPC security for {environment}...")
# Create security groups
print("1οΈβ£ Creating security groups...")
security_groups = self.sg_manager.create_layered_security_groups(vpc_id, environment)
# Create NACLs
print("2οΈβ£ Creating NACLs...")
nacls = self.nacl_manager.create_layered_nacls(vpc_id, environment)
# Associate NACLs with subnets (you'd specify actual subnet IDs)
print("3οΈβ£ Associating NACLs with subnets...")
# self.associate_nacls_with_subnets(nacls, subnet_mapping)
# Audit the configuration
print("4οΈβ£ Running security audit...")
audit_results = self.auditor.audit_security_groups()
# Auto-remediate issues
print("5οΈβ£ Running auto-remediation...")
remediation_results = self.remediator.remediate_security_issues(audit_results)
return {
'security_groups': security_groups,
'nacls': nacls,
'audit_results': audit_results,
'remediation_results': remediation_results
}
def run_security_maintenance(self):
"""Run regular security maintenance tasks"""
print("π Running security maintenance...")
# Audit current configuration
audit_results = self.auditor.audit_security_groups()
# Auto-remediate issues
remediation_results = self.remediator.remediate_security_issues(audit_results)
# Update threat intelligence
# This would update NACLs with latest threat intel
print("β
Security maintenance completed")
return {
'audit_results': audit_results,
'remediation_results': remediation_results
}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='VPC Security Management')
parser.add_argument('--setup', action='store_true', help='Set up complete VPC security')
parser.add_argument('--audit', action='store_true', help='Run security audit')
parser.add_argument('--maintain', action='store_true', help='Run maintenance tasks')
parser.add_argument('--vpc-id', required=True, help='VPC ID')
parser.add_argument('--environment', default='production', help='Environment name')
parser.add_argument('--region', default='us-east-1', help='AWS region')
args = parser.parse_args()
manager = ComprehensiveVPCSecurityManager(region=args.region)
if args.setup:
results = manager.setup_complete_vpc_security(args.vpc_id, args.environment)
print(f"Setup completed. Results saved to security-config-{datetime.now().strftime('%Y%m%d%H%M%S')}.json")
elif args.audit:
audit_results = manager.auditor.audit_security_groups()
print(f"Audit completed. Found {len(audit_results['overly_permissive'])} overly permissive SGs")
elif args.maintain:
maintenance_results = manager.run_security_maintenance()
print("Maintenance completed")
else:
print("Use --setup, --audit, or --maintain")
Beyond Manual Network Security: The PathShield Advantage
While building comprehensive Security Groups and NACLs provides excellent network security control, managing them at scale presents significant challenges:
Configuration Complexity: As your infrastructure grows, managing dozens of Security Groups and NACLs across multiple environments becomes increasingly complex and error-prone.
Audit Overhead: Regular security audits require substantial engineering time and expertise to identify misconfigurations, unused resources, and compliance violations.
Remediation Delays: Manual remediation of security issues can take days or weeks, leaving your infrastructure vulnerable during that time.
Compliance Management: Maintaining compliance with standards like SOC 2, PCI DSS, and HIPAA requires constant monitoring and documentation of network security controls.
Threat Intelligence Integration: Keeping Security Groups and NACLs updated with the latest threat intelligence requires continuous monitoring and manual updates.
This is where PathShield transforms your network security approach. Instead of manually managing complex Security Group and NACL configurations, PathShield provides:
- Automated Network Security: Intelligent Security Group and NACL management that adapts to your infrastructure changes automatically
- Continuous Compliance Monitoring: Real-time compliance checking against SOC 2, PCI DSS, and other security frameworks
- Instant Threat Response: Automatic blocking of malicious IPs and network patterns based on real-time threat intelligence
- Zero-Configuration Auditing: Comprehensive security audits and remediation recommendations without custom scripts
- Expert-Built Rules: Network security rules created and maintained by AWS security experts
Ready to move beyond manual network security management? Start your free PathShield trial and get intelligent, automated VPC security that scales with your startupβs growth.