· PathShield Team · Tutorials · 16 min read
AWS Cost Optimization Through Security - Save Money While Improving Security
Security improvements that actually reduce your AWS bill. Learn how fixing misconfigurations, cleaning up unused resources, and right-sizing can cut costs 30-50%.
AWS Cost Optimization Through Security - Save Money While Improving Security
Most startups think security costs money. The reality? Good security practices often reduce your AWS bill. Unused IAM roles, over-provisioned instances, forgotten test environments, and misconfigured storage can waste thousands monthly. This guide shows you how to cut costs while improving security posture.
The Security-Cost Connection Most Teams Miss
Traditional thinking: Security = additional overhead and cost Reality: Bad security practices waste massive amounts of money
Common cost-wasting security issues:
- Unused IAM roles and policies (complexity overhead)
- Over-privileged instances running 24/7
- Test environments left running indefinitely
- Unoptimized storage from poor lifecycle policies
- Shadow IT resources nobody knows about
The 7 Security Practices That Reduce AWS Costs
1. Clean Up Unused IAM Resources (Save 10-20% on complexity costs)
The Problem: Unused IAM resources slow down policy evaluation, increase audit time, and mask real security issues.
#!/usr/bin/env python3
"""
IAM Cleanup Script - Find unused resources and calculate savings
"""
import boto3
import json
from datetime import datetime, timedelta
from collections import defaultdict
class IAMCostOptimizer:
def __init__(self):
self.iam = boto3.client('iam')
self.ec2 = boto3.client('ec2')
self.lambda_client = boto3.client('lambda')
self.savings_potential = 0
def find_unused_roles(self):
"""Find unused IAM roles and calculate complexity savings"""
unused_roles = []
roles = self.iam.list_roles()['Roles']
print(f"Analyzing {len(roles)} IAM roles...")
for role in roles:
# Skip AWS service-linked roles
if role['Path'].startswith('/aws-service-role/'):
continue
role_name = role['RoleName']
# Check last used
try:
role_details = self.iam.get_role(RoleName=role_name)
last_used = role_details['Role'].get('RoleLastUsed', {}).get('LastUsedDate')
if not last_used or (datetime.now(timezone.utc) - last_used).days > 90:
# Check if role is actually attached to resources
if not self.is_role_in_use(role_name):
unused_roles.append({
'name': role_name,
'created': role['CreateDate'],
'last_used': last_used or 'Never',
'attached_policies': len(self.iam.list_attached_role_policies(RoleName=role_name)['AttachedPolicies'])
})
except Exception as e:
print(f"Error checking role {role_name}: {e}")
# Calculate savings
# Each unused role adds ~$5/month in management overhead
monthly_savings = len(unused_roles) * 5
self.savings_potential += monthly_savings
print(f"\n💰 Unused IAM Roles: {len(unused_roles)} roles")
print(f" Estimated monthly savings: ${monthly_savings}")
print(f" Annual savings: ${monthly_savings * 12}")
return unused_roles
def is_role_in_use(self, role_name):
"""Check if role is actually used by resources"""
role_arn = f"arn:aws:iam::{self.get_account_id()}:role/{role_name}"
# Check EC2 instances
try:
instances = self.ec2.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
if instance.get('IamInstanceProfile', {}).get('Arn', '').endswith(role_name):
return True
except:
pass
# Check Lambda functions
try:
functions = self.lambda_client.list_functions()
for function in functions['Functions']:
if function.get('Role') == role_arn:
return True
except:
pass
return False
def find_unused_policies(self):
"""Find unused customer-managed policies"""
unused_policies = []
policies = self.iam.list_policies(Scope='Local')['Policies']
for policy in policies:
# Check if policy is attached to any users, groups, or roles
entities = self.iam.list_entities_for_policy(PolicyArn=policy['Arn'])
if (not entities['PolicyUsers'] and
not entities['PolicyGroups'] and
not entities['PolicyRoles']):
unused_policies.append({
'name': policy['PolicyName'],
'arn': policy['Arn'],
'created': policy['CreateDate']
})
# Each unused policy adds ~$2/month in management overhead
monthly_savings = len(unused_policies) * 2
self.savings_potential += monthly_savings
print(f"\n💰 Unused IAM Policies: {len(unused_policies)} policies")
print(f" Estimated monthly savings: ${monthly_savings}")
return unused_policies
def get_account_id(self):
"""Get AWS account ID"""
return boto3.client('sts').get_caller_identity()['Account']
# Usage
optimizer = IAMCostOptimizer()
unused_roles = optimizer.find_unused_roles()
unused_policies = optimizer.find_unused_policies()
2. Right-Size Over-Privileged Resources (Save 20-40% on compute costs)
The Problem: Developers often provision large instances “to be safe” instead of right-sizing based on actual needs.
#!/usr/bin/env python3
"""
Right-sizing recommendations based on actual usage
"""
import boto3
from datetime import datetime, timedelta
class RightSizingAnalyzer:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.cloudwatch = boto3.client('cloudwatch')
self.pricing = boto3.client('pricing', region_name='us-east-1')
def analyze_instance_utilization(self, days=30):
"""Analyze EC2 instances for right-sizing opportunities"""
instances = self.ec2.describe_instances()
recommendations = []
total_savings = 0
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] != 'running':
continue
instance_id = instance['InstanceId']
instance_type = instance['InstanceType']
# Get CPU utilization
cpu_usage = self.get_average_cpu_utilization(instance_id, days)
# Get memory utilization (requires CloudWatch agent)
memory_usage = self.get_average_memory_utilization(instance_id, days)
# Recommend smaller instance type if underutilized
if cpu_usage < 20 and memory_usage < 40:
recommended_type = self.get_smaller_instance_type(instance_type)
if recommended_type:
current_cost = self.get_monthly_cost(instance_type)
new_cost = self.get_monthly_cost(recommended_type)
monthly_savings = current_cost - new_cost
recommendations.append({
'instance_id': instance_id,
'current_type': instance_type,
'recommended_type': recommended_type,
'cpu_usage': cpu_usage,
'memory_usage': memory_usage,
'monthly_savings': monthly_savings
})
total_savings += monthly_savings
print(f"\n💰 Right-sizing Opportunities: {len(recommendations)} instances")
print(f" Estimated monthly savings: ${total_savings:.2f}")
print(f" Annual savings: ${total_savings * 12:.2f}")
return recommendations
def get_average_cpu_utilization(self, instance_id, days):
"""Get average CPU utilization over specified days"""
try:
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
StartTime=datetime.utcnow() - timedelta(days=days),
EndTime=datetime.utcnow(),
Period=3600, # 1 hour
Statistics=['Average']
)
if response['Datapoints']:
avg_cpu = sum(dp['Average'] for dp in response['Datapoints']) / len(response['Datapoints'])
return round(avg_cpu, 2)
except:
pass
return 0
def get_smaller_instance_type(self, current_type):
"""Recommend smaller instance type"""
# Simplified mapping - in production, use AWS Compute Optimizer
size_map = {
't3.large': 't3.medium',
't3.xlarge': 't3.large',
't3.2xlarge': 't3.xlarge',
'm5.large': 'm5.medium',
'm5.xlarge': 'm5.large',
'm5.2xlarge': 'm5.xlarge',
'c5.large': 'c5.medium',
'c5.xlarge': 'c5.large',
}
return size_map.get(current_type)
def find_idle_instances(self, days=7):
"""Find instances with very low utilization"""
instances = self.ec2.describe_instances()
idle_instances = []
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] != 'running':
continue
instance_id = instance['InstanceId']
cpu_usage = self.get_average_cpu_utilization(instance_id, days)
if cpu_usage < 5: # Less than 5% CPU
monthly_cost = self.get_monthly_cost(instance['InstanceType'])
idle_instances.append({
'instance_id': instance_id,
'type': instance['InstanceType'],
'cpu_usage': cpu_usage,
'monthly_cost': monthly_cost,
'launch_time': instance['LaunchTime']
})
total_waste = sum(i['monthly_cost'] for i in idle_instances)
print(f"\n💰 Idle Instances: {len(idle_instances)} instances")
print(f" Monthly waste: ${total_waste:.2f}")
print(f" Annual waste: ${total_waste * 12:.2f}")
return idle_instances
# Usage
analyzer = RightSizingAnalyzer()
rightsizing_recommendations = analyzer.analyze_instance_utilization()
idle_instances = analyzer.find_idle_instances()
3. Discover and Clean Up Shadow IT (Save 15-30% on unknown resources)
The Problem: Developers spin up resources for testing and forget about them. These “shadow IT” resources can represent 30% of your AWS bill.
#!/usr/bin/env python3
"""
Shadow IT Discovery - Find forgotten and untagged resources
"""
import boto3
from datetime import datetime, timedelta
from collections import defaultdict
class ShadowITFinder:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.rds = boto3.client('rds')
self.s3 = boto3.client('s3')
self.elb = boto3.client('elbv2')
def find_untagged_resources(self):
"""Find resources without proper tags"""
untagged_resources = []
total_cost = 0
# Required tags for governance
required_tags = ['Environment', 'Owner', 'Project']
# Check EC2 instances
instances = self.ec2.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] == 'terminated':
continue
tags = {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
missing_tags = [tag for tag in required_tags if tag not in tags]
if missing_tags:
monthly_cost = self.estimate_instance_cost(instance['InstanceType'])
untagged_resources.append({
'type': 'EC2',
'id': instance['InstanceId'],
'missing_tags': missing_tags,
'monthly_cost': monthly_cost,
'launch_time': instance['LaunchTime']
})
total_cost += monthly_cost
# Check RDS instances
db_instances = self.rds.describe_db_instances()
for db in db_instances['DBInstances']:
tags = self.rds.list_tags_for_resource(ResourceName=db['DBInstanceArn'])['TagList']
tag_dict = {tag['Key']: tag['Value'] for tag in tags}
missing_tags = [tag for tag in required_tags if tag not in tag_dict]
if missing_tags:
monthly_cost = self.estimate_rds_cost(db['DBInstanceClass'])
untagged_resources.append({
'type': 'RDS',
'id': db['DBInstanceIdentifier'],
'missing_tags': missing_tags,
'monthly_cost': monthly_cost,
'created': db['InstanceCreateTime']
})
total_cost += monthly_cost
print(f"\n💰 Untagged Resources: {len(untagged_resources)} resources")
print(f" Monthly cost of untagged resources: ${total_cost:.2f}")
print(f" Annual cost: ${total_cost * 12:.2f}")
return untagged_resources
def find_test_environments_left_running(self):
"""Find test/dev environments that might be forgotten"""
test_resources = []
total_cost = 0
test_indicators = ['test', 'dev', 'staging', 'experiment', 'poc', 'demo']
# Check EC2 instances
instances = self.ec2.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] != 'running':
continue
# Check instance name and tags for test indicators
name = ''
tags = {}
for tag in instance.get('Tags', []):
if tag['Key'] == 'Name':
name = tag['Value'].lower()
tags[tag['Key'].lower()] = tag['Value'].lower()
if any(indicator in name for indicator in test_indicators) or \
any(indicator in str(tags.values()) for indicator in test_indicators):
# Check if running for more than 24 hours (typical test duration)
launch_time = instance['LaunchTime']
hours_running = (datetime.now(timezone.utc) - launch_time).total_seconds() / 3600
if hours_running > 24:
monthly_cost = self.estimate_instance_cost(instance['InstanceType'])
test_resources.append({
'type': 'EC2',
'id': instance['InstanceId'],
'name': name,
'hours_running': hours_running,
'monthly_cost': monthly_cost,
'environment': self.detect_environment(tags)
})
total_cost += monthly_cost
print(f"\n💰 Long-running Test Resources: {len(test_resources)} resources")
print(f" Monthly cost: ${total_cost:.2f}")
print(f" Potential monthly savings if stopped: ${total_cost:.2f}")
return test_resources
def find_orphaned_resources(self):
"""Find orphaned resources (no longer connected to active infrastructure)"""
orphaned = []
# Find EBS volumes not attached to instances
volumes = self.ec2.describe_volumes()['Volumes']
for volume in volumes:
if volume['State'] == 'available': # Not attached
monthly_cost = self.estimate_ebs_cost(volume['Size'], volume['VolumeType'])
orphaned.append({
'type': 'EBS Volume',
'id': volume['VolumeId'],
'size': volume['Size'],
'monthly_cost': monthly_cost,
'created': volume['CreateTime']
})
# Find elastic IPs not associated with instances
addresses = self.ec2.describe_addresses()['Addresses']
for address in addresses:
if 'InstanceId' not in address: # Not associated
orphaned.append({
'type': 'Elastic IP',
'id': address['PublicIp'],
'monthly_cost': 3.65, # $0.005/hour for unattached EIP
'domain': address.get('Domain', 'vpc')
})
total_cost = sum(item['monthly_cost'] for item in orphaned)
print(f"\n💰 Orphaned Resources: {len(orphaned)} resources")
print(f" Monthly waste: ${total_cost:.2f}")
print(f" Annual waste: ${total_cost * 12:.2f}")
return orphaned
def estimate_instance_cost(self, instance_type):
"""Estimate monthly cost for EC2 instance type"""
# Simplified pricing - use AWS Pricing API in production
base_costs = {
't3.micro': 7.50,
't3.small': 15.00,
't3.medium': 30.00,
't3.large': 60.00,
't3.xlarge': 120.00,
'm5.large': 70.00,
'm5.xlarge': 140.00,
'c5.large': 65.00,
'c5.xlarge': 130.00
}
return base_costs.get(instance_type, 50.00) # Default estimate
# Usage
finder = ShadowITFinder()
untagged = finder.find_untagged_resources()
test_resources = finder.find_test_environments_left_running()
orphaned = finder.find_orphaned_resources()
4. Optimize Storage Through Security Best Practices (Save 25-40% on storage costs)
The Problem: Poor data lifecycle management and security practices lead to massive storage waste.
#!/usr/bin/env python3
"""
Storage optimization through security best practices
"""
import boto3
from datetime import datetime, timedelta
class StorageOptimizer:
def __init__(self):
self.s3 = boto3.client('s3')
self.cloudwatch = boto3.client('cloudwatch')
def analyze_s3_storage_waste(self):
"""Find S3 storage optimization opportunities"""
buckets = self.s3.list_buckets()['Buckets']
total_savings = 0
recommendations = []
for bucket in buckets:
bucket_name = bucket['Name']
try:
# Get bucket size and cost
bucket_size = self.get_bucket_size(bucket_name)
# Check lifecycle configuration
lifecycle_savings = self.check_lifecycle_policy(bucket_name, bucket_size)
# Check versioning optimization
versioning_savings = self.check_versioning_optimization(bucket_name)
# Check for old multipart uploads
multipart_savings = self.check_incomplete_multipart_uploads(bucket_name)
bucket_total_savings = lifecycle_savings + versioning_savings + multipart_savings
if bucket_total_savings > 0:
recommendations.append({
'bucket': bucket_name,
'size_gb': bucket_size / (1024**3), # Convert to GB
'lifecycle_savings': lifecycle_savings,
'versioning_savings': versioning_savings,
'multipart_savings': multipart_savings,
'total_monthly_savings': bucket_total_savings
})
total_savings += bucket_total_savings
except Exception as e:
print(f"Error analyzing bucket {bucket_name}: {e}")
print(f"\n💰 S3 Storage Optimization: {len(recommendations)} buckets")
print(f" Monthly savings potential: ${total_savings:.2f}")
print(f" Annual savings: ${total_savings * 12:.2f}")
return recommendations
def check_lifecycle_policy(self, bucket_name, bucket_size):
"""Check if bucket has proper lifecycle policies"""
try:
lifecycle = self.s3.get_bucket_lifecycle_configuration(Bucket=bucket_name)
# Bucket has lifecycle policy
return 0
except self.s3.exceptions.NoSuchLifecycleConfiguration:
# No lifecycle policy - estimate savings
# Assume 30% of data could be moved to IA after 30 days
# and 50% could be moved to Glacier after 90 days
standard_cost_per_gb = 0.023 # $0.023/GB/month for S3 Standard
ia_cost_per_gb = 0.0125 # $0.0125/GB/month for S3 IA
glacier_cost_per_gb = 0.004 # $0.004/GB/month for Glacier
bucket_size_gb = bucket_size / (1024**3)
# Estimate current cost (all in Standard)
current_cost = bucket_size_gb * standard_cost_per_gb
# Estimate optimized cost
standard_portion = bucket_size_gb * 0.2 # 20% stays in Standard
ia_portion = bucket_size_gb * 0.3 # 30% moves to IA
glacier_portion = bucket_size_gb * 0.5 # 50% moves to Glacier
optimized_cost = (standard_portion * standard_cost_per_gb +
ia_portion * ia_cost_per_gb +
glacier_portion * glacier_cost_per_gb)
return max(0, current_cost - optimized_cost)
def check_versioning_optimization(self, bucket_name):
"""Check for excessive object versions"""
try:
versioning = self.s3.get_bucket_versioning(Bucket=bucket_name)
if versioning.get('Status') == 'Enabled':
# Estimate savings from limiting versions
# Assume 70% savings from limiting to 3 versions
bucket_size = self.get_bucket_size(bucket_name)
bucket_size_gb = bucket_size / (1024**3)
cost_per_gb = 0.023
# Rough estimate: versioned buckets use 3x storage on average
current_cost = bucket_size_gb * cost_per_gb
optimized_cost = current_cost * 0.4 # 60% reduction
return current_cost - optimized_cost
except:
pass
return 0
def check_incomplete_multipart_uploads(self, bucket_name):
"""Check for incomplete multipart uploads wasting storage"""
try:
uploads = self.s3.list_multipart_uploads(Bucket=bucket_name)
incomplete_uploads = uploads.get('Uploads', [])
# Estimate size and cost of incomplete uploads
# Each incomplete upload typically wastes 5MB on average
if incomplete_uploads:
wasted_size_gb = len(incomplete_uploads) * 5 / 1024 # Convert MB to GB
monthly_waste = wasted_size_gb * 0.023 # S3 Standard pricing
return monthly_waste
except:
pass
return 0
def get_bucket_size(self, bucket_name):
"""Get bucket size in bytes using CloudWatch metrics"""
try:
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[
{'Name': 'BucketName', 'Value': bucket_name},
{'Name': 'StorageType', 'Value': 'StandardStorage'}
],
StartTime=datetime.utcnow() - timedelta(days=2),
EndTime=datetime.utcnow(),
Period=86400, # 1 day
Statistics=['Average']
)
if response['Datapoints']:
return response['Datapoints'][0]['Average']
except:
pass
return 0
def generate_lifecycle_policy(self, bucket_name):
"""Generate optimal lifecycle policy for a bucket"""
policy = {
"Rules": [
{
"ID": "OptimizeStorage",
"Status": "Enabled",
"Filter": {"Prefix": ""},
"Transitions": [
{
"Days": 30,
"StorageClass": "STANDARD_IA"
},
{
"Days": 90,
"StorageClass": "GLACIER"
},
{
"Days": 365,
"StorageClass": "DEEP_ARCHIVE"
}
]
},
{
"ID": "CleanupIncompleteUploads",
"Status": "Enabled",
"Filter": {"Prefix": ""},
"AbortIncompleteMultipartUpload": {
"DaysAfterInitiation": 7
}
},
{
"ID": "LimitVersions",
"Status": "Enabled",
"Filter": {"Prefix": ""},
"NoncurrentVersionTransitions": [
{
"NoncurrentDays": 30,
"StorageClass": "STANDARD_IA"
}
],
"NoncurrentVersionExpiration": {
"NoncurrentDays": 90
}
}
]
}
return policy
# Usage
optimizer = StorageOptimizer()
s3_recommendations = optimizer.analyze_s3_storage_waste()
5. Implement Automated Scheduling (Save 60-80% on dev/test environments)
The Problem: Development and testing environments run 24/7 when they’re only needed during business hours.
#!/usr/bin/env python3
"""
Automated resource scheduling for cost optimization
"""
import boto3
import json
from datetime import datetime, time
class ResourceScheduler:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.rds = boto3.client('rds')
self.events = boto3.client('events')
self.lambda_client = boto3.client('lambda')
def identify_schedulable_resources(self):
"""Identify resources that can be scheduled"""
schedulable = []
total_savings = 0
# Find dev/test EC2 instances
instances = self.ec2.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] != 'running':
continue
tags = {tag['Key'].lower(): tag['Value'].lower()
for tag in instance.get('Tags', [])}
environment = tags.get('environment', '')
if environment in ['dev', 'test', 'staging', 'development']:
# Calculate savings from stopping nights/weekends
# Assume 12 hours/day, 5 days/week = 60 hours/week
# vs 168 hours/week = 64% savings
monthly_cost = self.estimate_instance_cost(instance['InstanceType'])
monthly_savings = monthly_cost * 0.64
schedulable.append({
'type': 'EC2',
'id': instance['InstanceId'],
'name': tags.get('name', 'unnamed'),
'environment': environment,
'instance_type': instance['InstanceType'],
'monthly_cost': monthly_cost,
'monthly_savings': monthly_savings
})
total_savings += monthly_savings
# Find dev/test RDS instances
db_instances = self.rds.describe_db_instances()
for db in db_instances['DBInstances']:
if db['DBInstanceStatus'] != 'available':
continue
tags = self.rds.list_tags_for_resource(ResourceName=db['DBInstanceArn'])['TagList']
tag_dict = {tag['Key'].lower(): tag['Value'].lower() for tag in tags}
environment = tag_dict.get('environment', '')
if environment in ['dev', 'test', 'staging', 'development']:
monthly_cost = self.estimate_rds_cost(db['DBInstanceClass'])
monthly_savings = monthly_cost * 0.64
schedulable.append({
'type': 'RDS',
'id': db['DBInstanceIdentifier'],
'environment': environment,
'instance_class': db['DBInstanceClass'],
'monthly_cost': monthly_cost,
'monthly_savings': monthly_savings
})
total_savings += monthly_savings
print(f"\n💰 Schedulable Resources: {len(schedulable)} resources")
print(f" Monthly savings potential: ${total_savings:.2f}")
print(f" Annual savings: ${total_savings * 12:.2f}")
return schedulable
def create_scheduling_lambda(self):
"""Create Lambda function for resource scheduling"""
lambda_code = '''
import json
import boto3
from datetime import datetime, time
def lambda_handler(event, context):
ec2 = boto3.client('ec2')
rds = boto3.client('rds')
action = event.get('action') # 'start' or 'stop'
resource_type = event.get('resource_type') # 'ec2' or 'rds'
resource_ids = event.get('resource_ids', [])
results = []
for resource_id in resource_ids:
try:
if resource_type == 'ec2':
if action == 'stop':
ec2.stop_instances(InstanceIds=[resource_id])
elif action == 'start':
ec2.start_instances(InstanceIds=[resource_id])
elif resource_type == 'rds':
if action == 'stop':
rds.stop_db_instance(DBInstanceIdentifier=resource_id)
elif action == 'start':
rds.start_db_instance(DBInstanceIdentifier=resource_id)
results.append({'resource_id': resource_id, 'status': 'success'})
except Exception as e:
results.append({
'resource_id': resource_id,
'status': 'error',
'error': str(e)
})
return {
'statusCode': 200,
'body': json.dumps(results)
}
'''
# Create Lambda function
try:
response = self.lambda_client.create_function(
FunctionName='resource-scheduler',
Runtime='python3.9',
Role='arn:aws:iam::ACCOUNT:role/lambda-execution-role', # Replace with actual role
Handler='lambda_function.lambda_handler',
Code={'ZipFile': lambda_code},
Description='Automated resource scheduling for cost optimization',
Timeout=300,
Tags={
'Purpose': 'CostOptimization',
'CreatedBy': 'SecurityTeam'
}
)
return response['FunctionArn']
except Exception as e:
print(f"Error creating Lambda function: {e}")
return None
def create_schedule_rules(self, schedulable_resources):
"""Create CloudWatch Events rules for scheduling"""
# Group resources by environment
by_environment = {}
for resource in schedulable_resources:
env = resource['environment']
if env not in by_environment:
by_environment[env] = {'ec2': [], 'rds': []}
by_environment[env][resource['type'].lower()].append(resource['id'])
rules_created = []
for environment, resources in by_environment.items():
# Create stop rule (evening)
stop_rule_name = f'stop-{environment}-resources'
self.events.put_rule(
Name=stop_rule_name,
ScheduleExpression='cron(0 19 * * MON-FRI *)', # 7 PM weekdays
Description=f'Stop {environment} resources in the evening',
State='ENABLED'
)
# Create start rule (morning)
start_rule_name = f'start-{environment}-resources'
self.events.put_rule(
Name=start_rule_name,
ScheduleExpression='cron(0 8 * * MON-FRI *)', # 8 AM weekdays
Description=f'Start {environment} resources in the morning',
State='ENABLED'
)
rules_created.extend([stop_rule_name, start_rule_name])
return rules_created
# Usage
scheduler = ResourceScheduler()
schedulable = scheduler.identify_schedulable_resources()
6. Reserved Instance and Savings Plan Optimization
#!/usr/bin/env python3
"""
Reserved Instance and Savings Plan recommendations
"""
import boto3
from datetime import datetime, timedelta
from collections import defaultdict
class ReservationOptimizer:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.ce = boto3.client('ce') # Cost Explorer
def analyze_ri_opportunities(self):
"""Analyze Reserved Instance opportunities"""
# Get usage data for last 30 days
end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
response = self.ce.get_rightsizing_recommendation(
Service='AmazonEC2',
Filter={
'Dimensions': {
'Key': 'SERVICE',
'Values': ['Amazon Elastic Compute Cloud - Compute']
}
}
)
# Get RI recommendations
ri_response = self.ce.get_reservation_purchase_recommendation(
Service='EC2-Instance',
PaymentOption='PARTIAL_UPFRONT',
TermInYears='ONE_YEAR'
)
recommendations = []
total_savings = 0
for rec in ri_response.get('Recommendations', []):
instance_details = rec['InstanceDetails']['EC2InstanceDetails']
monthly_cost = float(rec['RecurringStandardMonthlyCost'])
ri_monthly_cost = float(rec['RecurringReservedInstanceMonthlyCost'])
monthly_savings = monthly_cost - ri_monthly_cost
recommendations.append({
'instance_type': instance_details['InstanceType'],
'availability_zone': instance_details['AvailabilityZone'],
'platform': instance_details['Platform'],
'monthly_savings': monthly_savings,
'upfront_cost': float(rec['UpfrontCost']),
'recommendation_score': rec['RecommendationDetails']['RecommendationDetailId']
})
total_savings += monthly_savings
print(f"\n💰 Reserved Instance Opportunities: {len(recommendations)} recommendations")
print(f" Monthly savings potential: ${total_savings:.2f}")
print(f" Annual savings: ${total_savings * 12:.2f}")
return recommendations
# Usage
ri_optimizer = ReservationOptimizer()
ri_opportunities = ri_optimizer.analyze_ri_opportunities()
Comprehensive Cost-Security Dashboard
#!/usr/bin/env python3
"""
Master script that runs all cost optimization analyses
"""
def main():
print("🔍 AWS Cost Optimization Through Security Analysis")
print("=" * 60)
total_monthly_savings = 0
# 1. IAM cleanup
print("\n1. IAM Resource Cleanup")
iam_optimizer = IAMCostOptimizer()
unused_roles = iam_optimizer.find_unused_roles()
unused_policies = iam_optimizer.find_unused_policies()
total_monthly_savings += iam_optimizer.savings_potential
# 2. Right-sizing
print("\n2. Right-sizing Analysis")
sizing_analyzer = RightSizingAnalyzer()
rightsizing_recs = sizing_analyzer.analyze_instance_utilization()
idle_instances = sizing_analyzer.find_idle_instances()
# 3. Shadow IT discovery
print("\n3. Shadow IT Discovery")
shadow_finder = ShadowITFinder()
untagged = shadow_finder.find_untagged_resources()
test_resources = shadow_finder.find_test_environments_left_running()
orphaned = shadow_finder.find_orphaned_resources()
# 4. Storage optimization
print("\n4. Storage Optimization")
storage_optimizer = StorageOptimizer()
s3_recommendations = storage_optimizer.analyze_s3_storage_waste()
# 5. Resource scheduling
print("\n5. Resource Scheduling Opportunities")
scheduler = ResourceScheduler()
schedulable = scheduler.identify_schedulable_resources()
# Summary
print(f"\n{'='*60}")
print("💰 TOTAL COST OPTIMIZATION SUMMARY")
print(f"{'='*60}")
print(f"Estimated total monthly savings: ${total_monthly_savings:.2f}")
print(f"Estimated annual savings: ${total_monthly_savings * 12:.2f}")
# Priority recommendations
print(f"\n🎯 TOP PRIORITY ACTIONS:")
print(f"1. Clean up {len(unused_roles)} unused IAM roles")
print(f"2. Right-size {len(rightsizing_recs)} underutilized instances")
print(f"3. Stop/schedule {len(test_resources)} long-running test resources")
print(f"4. Clean up {len(orphaned)} orphaned resources")
print(f"5. Implement lifecycle policies on {len(s3_recommendations)} S3 buckets")
if __name__ == '__main__':
main()
Implementation Roadmap
Week 1: Quick Wins (Immediate 15-25% savings)
- Run the unused resource cleanup scripts
- Stop obviously idle test instances
- Delete orphaned EBS volumes and unattached EIPs
- Tag untagged resources or delete if unnecessary
Week 2: Right-sizing (Additional 10-20% savings)
- Analyze instance utilization with CloudWatch
- Downsize overprovisioned instances
- Implement automated scheduling for dev/test environments
Week 3: Storage Optimization (Additional 15-30% savings)
- Implement S3 lifecycle policies
- Clean up incomplete multipart uploads
- Optimize object versioning settings
- Move old data to cheaper storage classes
Week 4: Long-term Optimization (Additional 20-40% savings)
- Purchase Reserved Instances for stable workloads
- Implement Savings Plans
- Set up automated cost monitoring and alerting
- Create monthly cost optimization reviews
Conclusion
Security-driven cost optimization isn’t just about saving money—it’s about building sustainable, manageable AWS environments. When you clean up unused resources, implement proper tagging, and optimize storage, you’re not just reducing costs; you’re reducing security risks and operational complexity.
The best part? These optimizations compound over time. A 30% reduction in your AWS bill this month becomes $36,000 saved per year on a $10,000/month bill. That’s enough to fund a security engineer’s salary.
Start with the highest-impact items:
- Stop idle test resources (immediate savings)
- Clean up unused IAM roles and policies
- Implement S3 lifecycle policies
- Schedule dev/test environments
Want continuous cost optimization without running scripts monthly? Modern platforms like PathShield can automatically detect cost-wasting security misconfigurations and send you prioritized recommendations to optimize both cost and security simultaneously.