· PathShield Security Team · 28 min read
The Complete Guide to AWS IAM Security for Startups
Everything you need to know about AWS IAM security, from basic concepts to advanced attack prevention. Includes real-world examples, automation scripts, and lessons from 300+ startup security audits.
“We thought our IAM was secure until PathShield showed us that our intern had delete access to our production database. That 30-minute security review saved us from a potential disaster.” - CTO, Series B SaaS startup
AWS Identity and Access Management (IAM) is simultaneously the most critical and most misunderstood security service in AWS. After auditing 300+ startup AWS environments, I can tell you with certainty: 89% of startups have critical IAM security gaps that could lead to complete compromise.
The scary part? Most founders and CTOs think IAM is “too complex” and delegate it entirely to their first DevOps hire, who learned it from a 2-hour YouTube tutorial.
This guide is different. It’s based on real-world attacks, actual security incidents, and the IAM configurations that work in production environments processing millions in revenue.
What you’ll learn:
- How attackers actually exploit IAM (not theoretical scenarios)
- The exact IAM setup we recommend for different startup stages
- Automation scripts to implement and maintain secure IAM
- How to recover from IAM-related security incidents
- Advanced techniques used by companies like Netflix and Airbnb
Prerequisites: Basic AWS knowledge. If you don’t know what EC2 or S3 are, read AWS basics first.
Table of Contents
- IAM Fundamentals for Security
- Common Attack Patterns
- The Startup IAM Security Framework
- Implementation Guide
- Automation and Monitoring
- Incident Response
- Advanced Techniques
IAM Fundamentals for Security {#iam-fundamentals}
Before diving into attacks and defenses, let’s establish a solid foundation. Most IAM security issues stem from fundamental misunderstandings.
The IAM Security Model
AWS IAM follows a default deny model with explicit allow. This means:
- Everything is denied by default
- You must explicitly grant permissions
- Explicit deny always wins over allow
- Permissions are cumulative across all attached policies
Here’s what this looks like in practice:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::my-bucket/*"
},
{
"Effect": "Deny",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::my-bucket/secrets/*"
}
]
}
Result: User can read from my-bucket
but NOT from my-bucket/secrets/
because explicit deny wins.
The Four IAM Identity Types
Understanding identity types is crucial for security:
1. IAM Users
- Use case: Individual people who need AWS console access
- Security risk: Long-term credentials, often over-privileged
- Best practice: Minimize usage, require MFA
2. IAM Roles
- Use case: Applications, services, temporary access
- Security advantage: No long-term credentials, time-limited sessions
- Best practice: Preferred for almost everything
3. Federated Users
- Use case: Single sign-on from corporate identity providers
- Security advantage: Centralized identity management
- Best practice: Use for all human access in mature organizations
4. Root User
- Use case: Emergency access only
- Security risk: Unlimited permissions, can’t be restricted
- Best practice: Lock it down, never use for daily operations
Permission Evaluation Logic
AWS evaluates permissions in this order:
- Explicit Deny: If any policy explicitly denies, access is denied
- Organization SCPs: Service Control Policies can restrict permissions
- Resource-based Policies: S3 bucket policies, KMS key policies, etc.
- Identity-based Policies: Policies attached to users/roles
- Permission Boundaries: Optional maximum permissions
- Session Policies: For temporary credentials
Understanding this hierarchy is critical for troubleshooting and security.
Common Attack Patterns {#attack-patterns}
Let me show you the actual attack patterns we see in the wild, not theoretical scenarios from AWS documentation.
Attack Pattern 1: Privilege Escalation via Policy Attachment
The Attack: Attacker with limited IAM permissions uses policy attachment to escalate privileges.
Real Example: A startup gave their junior developer iam:AttachUserPolicy
to manage their own permissions. The developer could attach AdministratorAccess
to themselves.
# Attacker's escalation script
import boto3
iam = boto3.client('iam')
# Attach admin policy to self
iam.attach_user_policy(
UserName='junior-dev',
PolicyArn='arn:aws:iam::aws:policy/AdministratorAccess'
)
print("Privilege escalation complete. Now have admin access.")
Detection Script:
#!/usr/bin/env python3
"""
Detect privilege escalation attempts via policy attachment
"""
import boto3
from datetime import datetime, timedelta
def detect_privilege_escalation():
cloudtrail = boto3.client('cloudtrail')
# Look for suspicious IAM policy attachments in last 24 hours
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=24)
events = cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': 'AttachUserPolicy'
}
],
StartTime=start_time,
EndTime=end_time
)
suspicious_events = []
for event in events['Events']:
# Parse event details
username = event.get('Username', '')
resources = event.get('Resources', [])
# Check if user attached policy to themselves
for resource in resources:
if resource.get('ResourceType') == 'AWS::IAM::User':
resource_name = resource.get('ResourceName', '')
if username == resource_name:
suspicious_events.append({
'timestamp': event['EventTime'],
'user': username,
'event_id': event['EventId'],
'source_ip': event.get('SourceIPAddress', ''),
'issue': 'User attached policy to themselves'
})
return suspicious_events
# Run detection
findings = detect_privilege_escalation()
if findings:
print("🚨 POTENTIAL PRIVILEGE ESCALATION DETECTED:")
for finding in findings:
print(f" Time: {finding['timestamp']}")
print(f" User: {finding['user']}")
print(f" IP: {finding['source_ip']}")
print(f" Issue: {finding['issue']}")
print()
Prevention:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": [
"iam:AttachUserPolicy",
"iam:AttachRolePolicy",
"iam:PutUserPolicy",
"iam:PutRolePolicy"
],
"Resource": "*",
"Condition": {
"StringEquals": {
"aws:userid": "${aws:userid}"
}
}
}
]
}
Attack Pattern 2: Cross-Account Role Assumption
The Attack: Attacker compromises credentials in one account and uses cross-account roles to access other accounts.
Real Example: A startup had a shared CI/CD role that could be assumed from their development account. When the dev account was compromised, the attacker gained production access.
# Attacker's commands
aws sts assume-role \
--role-arn "arn:aws:iam::PROD-ACCOUNT:role/CI-CD-Role" \
--role-session-name "malicious-session"
# Export temporary credentials
export AWS_ACCESS_KEY_ID="..."
export AWS_SECRET_ACCESS_KEY="..."
export AWS_SESSION_TOKEN="..."
# Now access production resources
aws s3 ls s3://production-data-bucket
Detection Script:
def detect_suspicious_role_assumptions():
"""Detect unusual cross-account role assumptions"""
cloudtrail = boto3.client('cloudtrail')
sts = boto3.client('sts')
# Get current account ID
current_account = sts.get_caller_identity()['Account']
# Look for AssumeRole events
events = cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': 'AssumeRole'
}
],
StartTime=datetime.utcnow() - timedelta(hours=24)
)
suspicious_assumptions = []
for event in events['Events']:
# Parse the event
event_detail = json.loads(event['CloudTrailEvent'])
source_account = event_detail.get('recipientAccountId', '')
assumed_role = event_detail.get('resources', [{}])[0].get('ARN', '')
source_ip = event_detail.get('sourceIPAddress', '')
user_agent = event_detail.get('userAgent', '')
# Check for cross-account assumptions
if source_account != current_account:
# Check for unusual patterns
suspicious = False
reasons = []
# Check 1: Assumption from unknown account
known_accounts = ['123456789012', '987654321098'] # Your trusted accounts
if source_account not in known_accounts:
suspicious = True
reasons.append(f'Unknown source account: {source_account}')
# Check 2: Unusual time (outside business hours)
event_time = event_detail.get('eventTime', '')
if event_time:
hour = datetime.fromisoformat(event_time.replace('Z', '+00:00')).hour
if hour < 6 or hour > 22:
suspicious = True
reasons.append(f'Outside business hours: {hour}:00')
# Check 3: Suspicious user agent
if any(sus in user_agent.lower() for sus in ['curl', 'wget', 'python']):
suspicious = True
reasons.append(f'Programmatic access: {user_agent}')
if suspicious:
suspicious_assumptions.append({
'timestamp': event['EventTime'],
'source_account': source_account,
'assumed_role': assumed_role,
'source_ip': source_ip,
'user_agent': user_agent,
'reasons': reasons
})
return suspicious_assumptions
Attack Pattern 3: Resource-Based Policy Exploitation
The Attack: Attacker modifies resource-based policies (S3 bucket policies, KMS key policies) to grant themselves access.
Real Example: An attacker with s3:PutBucketPolicy
permission modified a production S3 bucket policy to grant public read access, then downloaded sensitive customer data.
# Attacker's script to make bucket public
import boto3
s3 = boto3.client('s3')
# Malicious bucket policy
malicious_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "PublicRead",
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::production-customer-data/*"
}
]
}
# Apply the policy
s3.put_bucket_policy(
Bucket='production-customer-data',
Policy=json.dumps(malicious_policy)
)
print("Bucket is now public. Downloading data...")
Detection:
def detect_resource_policy_changes():
"""Detect suspicious changes to resource-based policies"""
cloudtrail = boto3.client('cloudtrail')
# Events that modify resource policies
policy_events = [
'PutBucketPolicy',
'PutKeyPolicy',
'PutQueueAttributes',
'PutTopicAttributes',
'ModifyDBSnapshotAttribute',
'ModifySnapshotAttribute'
]
suspicious_changes = []
for event_name in policy_events:
events = cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': event_name
}
],
StartTime=datetime.utcnow() - timedelta(hours=24)
)
for event in events['Events']:
event_detail = json.loads(event['CloudTrailEvent'])
# Check for public access grants
request_params = event_detail.get('requestParameters', {})
# For S3 bucket policies
if event_name == 'PutBucketPolicy':
policy_doc = request_params.get('bucketPolicy', {})
if isinstance(policy_doc, str):
policy_doc = json.loads(policy_doc)
# Check for public access
for statement in policy_doc.get('Statement', []):
principal = statement.get('Principal', {})
if principal == '*':
suspicious_changes.append({
'timestamp': event['EventTime'],
'event_name': event_name,
'user': event_detail.get('userIdentity', {}).get('userName', ''),
'resource': request_params.get('bucketName', ''),
'issue': 'Granted public access via bucket policy'
})
return suspicious_changes
Attack Pattern 4: Session Token Hijacking
The Attack: Attacker steals temporary session tokens and uses them before they expire.
Real Example: A startup logged AWS credentials in their application logs. An attacker found the logs in an unsecured S3 bucket and used the session tokens to access their AWS account.
Detection:
def detect_session_anomalies():
"""Detect unusual session token usage patterns"""
cloudtrail = boto3.client('cloudtrail')
# Track session usage patterns
session_usage = {}
events = cloudtrail.lookup_events(
StartTime=datetime.utcnow() - timedelta(hours=6)
)
for event in events['Events']:
event_detail = json.loads(event['CloudTrailEvent'])
user_identity = event_detail.get('userIdentity', {})
if user_identity.get('type') == 'AssumedRole':
# Extract session information
session_name = user_identity.get('arn', '').split('/')[-1]
access_key_id = user_identity.get('accessKeyId', '')
source_ip = event_detail.get('sourceIPAddress', '')
user_agent = event_detail.get('userAgent', '')
session_key = f"{session_name}_{access_key_id}"
if session_key not in session_usage:
session_usage[session_key] = {
'ips': set(),
'user_agents': set(),
'events': []
}
session_usage[session_key]['ips'].add(source_ip)
session_usage[session_key]['user_agents'].add(user_agent)
session_usage[session_key]['events'].append(event_detail)
# Detect anomalies
anomalies = []
for session_key, usage in session_usage.items():
# Check 1: Multiple IP addresses
if len(usage['ips']) > 1:
anomalies.append({
'session': session_key,
'issue': 'Session used from multiple IP addresses',
'ips': list(usage['ips']),
'severity': 'HIGH'
})
# Check 2: Multiple user agents
if len(usage['user_agents']) > 2:
anomalies.append({
'session': session_key,
'issue': 'Session used with multiple user agents',
'user_agents': list(usage['user_agents']),
'severity': 'MEDIUM'
})
# Check 3: Rapid geographic changes
# This would require IP geolocation service
return anomalies
The Startup IAM Security Framework {#security-framework}
Based on 300+ audits, here’s the IAM framework that works for startups at different stages:
Stage 1: Bootstrap (0-10 employees)
Principles:
- Minimize IAM users
- Use roles for everything possible
- Enable MFA on everything
- No shared accounts
Architecture:
graph TD
A[Founder/CTO] -->|MFA| B[Admin User]
C[Developers] -->|MFA| D[Developer Users]
D -->|AssumeRole| E[Developer Role]
F[CI/CD] -->|OIDC| G[Deployment Role]
H[Applications] -->|Instance Profile| I[Application Role]
Implementation:
#!/usr/bin/env python3
"""
Bootstrap IAM setup for new startup
Creates secure baseline IAM configuration
"""
import boto3
import json
def create_bootstrap_iam():
iam = boto3.client('iam')
# 1. Create password policy
password_policy = {
'MinimumPasswordLength': 14,
'RequireSymbols': True,
'RequireNumbers': True,
'RequireUppercaseCharacters': True,
'RequireLowercaseCharacters': True,
'AllowUsersToChangePassword': True,
'MaxPasswordAge': 90,
'PasswordReusePrevention': 5
}
iam.update_account_password_policy(**password_policy)
print("✅ Password policy configured")
# 2. Create developer role
developer_trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": f"arn:aws:iam::{boto3.client('sts').get_caller_identity()['Account']}:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"Bool": {
"aws:MultiFactorAuthPresent": "true"
}
}
}
]
}
try:
iam.create_role(
RoleName='DeveloperRole',
AssumeRolePolicyDocument=json.dumps(developer_trust_policy),
Description='Role for developers with MFA requirement'
)
print("✅ DeveloperRole created")
except iam.exceptions.EntityAlreadyExistsException:
print("ℹ️ DeveloperRole already exists")
# 3. Create developer policy
developer_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:Describe*",
"s3:List*",
"s3:Get*",
"lambda:List*",
"lambda:Get*",
"logs:Describe*",
"logs:Get*",
"logs:FilterLogEvents",
"cloudwatch:Get*",
"cloudwatch:List*",
"cloudwatch:Describe*"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::*-dev/*",
"arn:aws:s3:::*-staging/*"
]
},
{
"Effect": "Deny",
"Action": [
"iam:*",
"organizations:*",
"account:*"
],
"Resource": "*"
}
]
}
try:
iam.create_policy(
PolicyName='DeveloperPolicy',
PolicyDocument=json.dumps(developer_policy),
Description='Development access with production restrictions'
)
# Attach to role
account_id = boto3.client('sts').get_caller_identity()['Account']
iam.attach_role_policy(
RoleName='DeveloperRole',
PolicyArn=f'arn:aws:iam::{account_id}:policy/DeveloperPolicy'
)
print("✅ DeveloperPolicy created and attached")
except iam.exceptions.EntityAlreadyExistsException:
print("ℹ️ DeveloperPolicy already exists")
# 4. Create CI/CD role with OIDC
oidc_trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": f"arn:aws:iam::{boto3.client('sts').get_caller_identity()['Account']}:oidc-provider/token.actions.githubusercontent.com"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"token.actions.githubusercontent.com:aud": "sts.amazonaws.com"
},
"StringLike": {
"token.actions.githubusercontent.com:sub": "repo:YOUR-ORG/*:*"
}
}
}
]
}
try:
iam.create_role(
RoleName='GitHubActionsRole',
AssumeRolePolicyDocument=json.dumps(oidc_trust_policy),
Description='Role for GitHub Actions CI/CD'
)
print("✅ GitHubActionsRole created")
except iam.exceptions.EntityAlreadyExistsException:
print("ℹ️ GitHubActionsRole already exists")
return {
'developer_role': 'DeveloperRole',
'cicd_role': 'GitHubActionsRole',
'next_steps': [
'Create individual IAM users for team members',
'Enable MFA on all users',
'Set up GitHub OIDC provider',
'Configure assume role profiles in AWS CLI'
]
}
if __name__ == "__main__":
result = create_bootstrap_iam()
print("\n🎉 Bootstrap IAM setup complete!")
print("\nNext steps:")
for step in result['next_steps']:
print(f" • {step}")
Stage 2: Growth (10-50 employees)
New Requirements:
- Department-based access control
- Temporary access for contractors
- Audit logging and compliance
- Break-glass procedures
Enhanced Architecture:
def create_growth_stage_iam():
"""IAM setup for growth-stage startup"""
iam = boto3.client('iam')
# 1. Create department-based groups
departments = {
'Engineering': {
'policies': ['DeveloperPolicy', 'S3DevAccess'],
'description': 'Software engineers and DevOps'
},
'Product': {
'policies': ['ReadOnlyAccess', 'AnalyticsAccess'],
'description': 'Product managers and analysts'
},
'Sales': {
'policies': ['CRMAccess'],
'description': 'Sales team members'
},
'Support': {
'policies': ['SupportAccess'],
'description': 'Customer support team'
}
}
for dept_name, config in departments.items():
try:
# Create group
iam.create_group(
GroupName=dept_name,
Path=f'/departments/'
)
# Attach policies
for policy_name in config['policies']:
iam.attach_group_policy(
GroupName=dept_name,
PolicyArn=f'arn:aws:iam::aws:policy/{policy_name}'
)
print(f"✅ Created department group: {dept_name}")
except iam.exceptions.EntityAlreadyExistsException:
print(f"ℹ️ Group {dept_name} already exists")
# 2. Create contractor access role
contractor_trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": f"arn:aws:iam::{boto3.client('sts').get_caller_identity()['Account']}:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"Bool": {
"aws:MultiFactorAuthPresent": "true"
},
"DateLessThan": {
"aws:CurrentTime": "2024-12-31T23:59:59Z" # Expiration date
}
}
}
]
}
try:
iam.create_role(
RoleName='ContractorRole',
AssumeRolePolicyDocument=json.dumps(contractor_trust_policy),
Description='Temporary access role for contractors',
MaxSessionDuration=3600 # 1 hour sessions
)
print("✅ ContractorRole created")
except iam.exceptions.EntityAlreadyExistsException:
print("ℹ️ ContractorRole already exists")
# 3. Create break-glass role
breakglass_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "*",
"Resource": "*"
}
]
}
try:
# Create policy
iam.create_policy(
PolicyName='BreakGlassPolicy',
PolicyDocument=json.dumps(breakglass_policy)
)
# Create role with strict conditions
breakglass_trust = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": [
f"arn:aws:iam::{boto3.client('sts').get_caller_identity()['Account']}:user/founder",
f"arn:aws:iam::{boto3.client('sts').get_caller_identity()['Account']}:user/cto"
]
},
"Action": "sts:AssumeRole",
"Condition": {
"Bool": {
"aws:MultiFactorAuthPresent": "true"
}
}
}
]
}
iam.create_role(
RoleName='BreakGlassRole',
AssumeRolePolicyDocument=json.dumps(breakglass_trust),
MaxSessionDuration=3600 # 1 hour max
)
# Attach policy
account_id = boto3.client('sts').get_caller_identity()['Account']
iam.attach_role_policy(
RoleName='BreakGlassRole',
PolicyArn=f'arn:aws:iam::{account_id}:policy/BreakGlassPolicy'
)
print("✅ BreakGlassRole created")
except iam.exceptions.EntityAlreadyExistsException:
print("ℹ️ BreakGlassRole already exists")
Stage 3: Scale (50+ employees)
New Requirements:
- Single Sign-On (SSO)
- Just-in-time access
- Advanced monitoring and alerting
- Compliance automation
SSO Integration:
def setup_sso_integration():
"""Set up AWS SSO for enterprise-scale access"""
import boto3
# This example uses Okta, but works with any SAML provider
iam = boto3.client('iam')
# 1. Create SAML identity provider
saml_metadata = """<?xml version="1.0" encoding="UTF-8"?>
<md:EntityDescriptor xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata"
entityID="http://www.okta.com/YOUR_ORG_ID">
<!-- SAML metadata from your identity provider -->
</md:EntityDescriptor>"""
try:
iam.create_saml_provider(
SAMLMetadataDocument=saml_metadata,
Name='OktaProvider'
)
print("✅ SAML provider created")
except iam.exceptions.EntityAlreadyExistsException:
print("ℹ️ SAML provider already exists")
# 2. Create SSO roles
sso_roles = {
'SSODeveloperRole': {
'max_session': 8 * 3600, # 8 hours
'policies': ['DeveloperPolicy']
},
'SSOAdminRole': {
'max_session': 1 * 3600, # 1 hour
'policies': ['AdministratorAccess']
},
'SSOReadOnlyRole': {
'max_session': 8 * 3600, # 8 hours
'policies': ['ReadOnlyAccess']
}
}
account_id = boto3.client('sts').get_caller_identity()['Account']
for role_name, config in sso_roles.items():
# Trust policy for SAML federation
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": f"arn:aws:iam::{account_id}:saml-provider/OktaProvider"
},
"Action": "sts:AssumeRoleWithSAML",
"Condition": {
"StringEquals": {
"SAML:aud": "https://signin.aws.amazon.com/saml"
}
}
}
]
}
try:
iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
MaxSessionDuration=config['max_session']
)
# Attach policies
for policy in config['policies']:
if policy.startswith('arn:'):
policy_arn = policy
else:
policy_arn = f'arn:aws:iam::aws:policy/{policy}'
iam.attach_role_policy(
RoleName=role_name,
PolicyArn=policy_arn
)
print(f"✅ Created SSO role: {role_name}")
except iam.exceptions.EntityAlreadyExistsException:
print(f"ℹ️ SSO role {role_name} already exists")
Implementation Guide {#implementation-guide}
Let’s walk through implementing secure IAM step by step, with real code you can run in production.
Step 1: Audit Current IAM Setup
Before making changes, audit what you have:
#!/usr/bin/env python3
"""
Comprehensive IAM Security Audit
Identifies security issues in current IAM setup
"""
import boto3
import json
from datetime import datetime, timedelta
class IAMSecurityAuditor:
def __init__(self):
self.iam = boto3.client('iam')
self.findings = {
'critical': [],
'high': [],
'medium': [],
'low': []
}
def run_full_audit(self):
"""Run comprehensive IAM security audit"""
print("🔍 Starting IAM Security Audit...")
print("=" * 50)
# Check root account security
self.audit_root_account()
# Check password policy
self.audit_password_policy()
# Check users
self.audit_users()
# Check roles
self.audit_roles()
# Check policies
self.audit_policies()
# Check access keys
self.audit_access_keys()
# Generate report
self.generate_report()
def audit_root_account(self):
"""Audit root account security"""
try:
# Check if root account has MFA
summary = self.iam.get_account_summary()['SummaryMap']
if summary.get('AccountMFAEnabled', 0) == 0:
self.add_finding('critical', 'Root Account',
'Root account does not have MFA enabled')
# Check for root access keys
if summary.get('AccountAccessKeysPresent', 0) > 0:
self.add_finding('critical', 'Root Account',
'Root account has active access keys')
# Check recent root usage
cloudtrail = boto3.client('cloudtrail')
events = cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'UserName',
'AttributeValue': 'root'
}
],
StartTime=datetime.utcnow() - timedelta(days=90)
)
if events['Events']:
self.add_finding('high', 'Root Account',
f'Root account used {len(events["Events"])} times in last 90 days')
except Exception as e:
self.add_finding('medium', 'Audit',
f'Could not audit root account: {str(e)}')
def audit_password_policy(self):
"""Audit account password policy"""
try:
policy = self.iam.get_account_password_policy()['PasswordPolicy']
# Check minimum length
if policy.get('MinimumPasswordLength', 0) < 14:
self.add_finding('high', 'Password Policy',
f'Minimum password length is {policy.get("MinimumPasswordLength", 0)}, should be 14+')
# Check complexity requirements
required_fields = [
'RequireSymbols',
'RequireNumbers',
'RequireUppercaseCharacters',
'RequireLowercaseCharacters'
]
for field in required_fields:
if not policy.get(field, False):
self.add_finding('medium', 'Password Policy',
f'Password policy does not require {field.replace("Require", "").lower()}')
# Check password age
if policy.get('MaxPasswordAge', 0) == 0 or policy.get('MaxPasswordAge', 0) > 90:
self.add_finding('medium', 'Password Policy',
'Passwords do not expire or expire too infrequently')
except self.iam.exceptions.NoSuchEntityException:
self.add_finding('high', 'Password Policy',
'No password policy configured')
def audit_users(self):
"""Audit IAM users"""
paginator = self.iam.get_paginator('list_users')
for page in paginator.paginate():
for user in page['Users']:
username = user['UserName']
# Check for console access without MFA
try:
self.iam.get_login_profile(UserName=username)
# User has console access, check MFA
mfa_devices = self.iam.list_mfa_devices(UserName=username)['MFADevices']
if len(mfa_devices) == 0:
self.add_finding('high', f'User {username}',
'Has console access but no MFA enabled')
except self.iam.exceptions.NoSuchEntityException:
pass # No console access
# Check for unused users
password_last_used = user.get('PasswordLastUsed')
if password_last_used:
days_since_use = (datetime.now(password_last_used.tzinfo) - password_last_used).days
if days_since_use > 90:
self.add_finding('medium', f'User {username}',
f'Has not used password in {days_since_use} days')
# Check for overly broad permissions
self.check_user_permissions(username)
def check_user_permissions(self, username):
"""Check if user has overly broad permissions"""
# Check attached policies
attached_policies = self.iam.list_attached_user_policies(UserName=username)
for policy in attached_policies['AttachedPolicies']:
if 'Administrator' in policy['PolicyName']:
self.add_finding('high', f'User {username}',
f'Has administrator policy: {policy["PolicyName"]}')
# Check inline policies
inline_policies = self.iam.list_user_policies(UserName=username)
for policy_name in inline_policies['PolicyNames']:
policy_doc = self.iam.get_user_policy(
UserName=username,
PolicyName=policy_name
)['PolicyDocument']
# Check for wildcard permissions
if self.has_wildcard_permissions(policy_doc):
self.add_finding('high', f'User {username}',
f'Has wildcard permissions in policy: {policy_name}')
# Check groups
groups = self.iam.get_groups_for_user(UserName=username)['Groups']
for group in groups:
group_policies = self.iam.list_attached_group_policies(GroupName=group['GroupName'])
for policy in group_policies['AttachedPolicies']:
if 'Administrator' in policy['PolicyName']:
self.add_finding('high', f'User {username}',
f'Has admin access via group {group["GroupName"]}')
def audit_roles(self):
"""Audit IAM roles"""
paginator = self.iam.get_paginator('list_roles')
for page in paginator.paginate():
for role in page['Roles']:
role_name = role['RoleName']
# Skip AWS service roles
if role_name.startswith('AWS'):
continue
# Check trust policy
trust_policy = role['AssumeRolePolicyDocument']
self.check_role_trust_policy(role_name, trust_policy)
# Check for unused roles
try:
last_used = self.iam.get_role(RoleName=role_name)['Role'].get('RoleLastUsed')
if last_used and 'LastUsedDate' in last_used:
days_since_use = (datetime.now(last_used['LastUsedDate'].tzinfo) -
last_used['LastUsedDate']).days
if days_since_use > 90:
self.add_finding('low', f'Role {role_name}',
f'Has not been used in {days_since_use} days')
except:
pass
def check_role_trust_policy(self, role_name, trust_policy):
"""Check role trust policy for security issues"""
for statement in trust_policy.get('Statement', []):
if statement.get('Effect') != 'Allow':
continue
principal = statement.get('Principal', {})
# Check for overly broad trust
if principal == '*':
self.add_finding('critical', f'Role {role_name}',
'Trust policy allows any principal (*)')
elif isinstance(principal, dict) and 'AWS' in principal:
aws_principals = principal['AWS']
if isinstance(aws_principals, str):
aws_principals = [aws_principals]
for aws_principal in aws_principals:
if aws_principal == '*':
self.add_finding('critical', f'Role {role_name}',
'Trust policy allows any AWS account (*)')
elif ':root' in aws_principal and not aws_principal.startswith(f'arn:aws:iam::{boto3.client("sts").get_caller_identity()["Account"]}:'):
self.add_finding('high', f'Role {role_name}',
f'Trust policy allows external account root: {aws_principal}')
# Check for missing MFA requirement
condition = statement.get('Condition', {})
if not condition.get('Bool', {}).get('aws:MultiFactorAuthPresent'):
if any(keyword in role_name.lower() for keyword in ['admin', 'power', 'privileged']):
self.add_finding('medium', f'Role {role_name}',
'Privileged role does not require MFA')
def audit_policies(self):
"""Audit customer managed policies"""
paginator = self.iam.get_paginator('list_policies')
for page in paginator.paginate(Scope='Local'):
for policy in page['Policies']:
policy_name = policy['PolicyName']
policy_arn = policy['Arn']
# Get policy document
version_id = policy['DefaultVersionId']
policy_version = self.iam.get_policy_version(
PolicyArn=policy_arn,
VersionId=version_id
)
policy_doc = policy_version['PolicyVersion']['Document']
# Check for overly broad permissions
if self.has_wildcard_permissions(policy_doc):
self.add_finding('high', f'Policy {policy_name}',
'Contains wildcard permissions (*)')
# Check for unused policies
if policy['AttachmentCount'] == 0:
self.add_finding('low', f'Policy {policy_name}',
'Policy is not attached to any users, groups, or roles')
def has_wildcard_permissions(self, policy_doc):
"""Check if policy document has wildcard permissions"""
for statement in policy_doc.get('Statement', []):
if statement.get('Effect') != 'Allow':
continue
actions = statement.get('Action', [])
resources = statement.get('Resource', [])
# Convert to lists
if isinstance(actions, str):
actions = [actions]
if isinstance(resources, str):
resources = [resources]
# Check for wildcards
if '*' in actions or '*' in resources:
return True
# Check for service-level wildcards
for action in actions:
if action.endswith(':*'):
return True
return False
def audit_access_keys(self):
"""Audit access keys"""
paginator = self.iam.get_paginator('list_users')
for page in paginator.paginate():
for user in page['Users']:
username = user['UserName']
# Get access keys
access_keys = self.iam.list_access_keys(UserName=username)['AccessKeyMetadata']
for key in access_keys:
key_id = key['AccessKeyId']
key_age = (datetime.now(key['CreateDate'].tzinfo) - key['CreateDate']).days
# Check key age
if key_age > 90:
self.add_finding('medium', f'User {username}',
f'Access key {key_id} is {key_age} days old')
# Check if key is active but user hasn't logged in recently
if key['Status'] == 'Active':
try:
# Get key last used info
last_used = self.iam.get_access_key_last_used(AccessKeyId=key_id)
if 'LastUsedDate' in last_used['AccessKeyLastUsed']:
days_since_use = (datetime.now(last_used['AccessKeyLastUsed']['LastUsedDate'].tzinfo) -
last_used['AccessKeyLastUsed']['LastUsedDate']).days
if days_since_use > 90:
self.add_finding('medium', f'User {username}',
f'Access key {key_id} has not been used in {days_since_use} days')
except:
pass
# Check for multiple access keys
if len(access_keys) > 1:
self.add_finding('low', f'User {username}',
f'Has {len(access_keys)} access keys (consider reducing)')
def add_finding(self, severity, entity, description):
"""Add security finding"""
finding = {
'entity': entity,
'description': description,
'timestamp': datetime.utcnow().isoformat()
}
self.findings[severity].append(finding)
def generate_report(self):
"""Generate audit report"""
print("\n" + "=" * 60)
print("📋 IAM SECURITY AUDIT REPORT")
print("=" * 60)
total_findings = sum(len(findings) for findings in self.findings.values())
if total_findings == 0:
print("✅ No security issues found!")
return
print(f"\n📊 SUMMARY:")
print(f" 🔥 Critical: {len(self.findings['critical'])} issues")
print(f" 🔴 High: {len(self.findings['high'])} issues")
print(f" 🟡 Medium: {len(self.findings['medium'])} issues")
print(f" 🟢 Low: {len(self.findings['low'])} issues")
# Print findings by severity
severities = ['critical', 'high', 'medium', 'low']
for severity in severities:
if not self.findings[severity]:
continue
print(f"\n{severity.upper()} SEVERITY ISSUES:")
for finding in self.findings[severity]:
print(f" • {finding['entity']}: {finding['description']}")
# Generate remediation priorities
print(f"\n🎯 REMEDIATION PRIORITIES:")
if self.findings['critical']:
print(" 1. Fix CRITICAL issues immediately (within 24 hours)")
if self.findings['high']:
print(" 2. Address HIGH severity issues (within 1 week)")
if self.findings['medium']:
print(" 3. Plan MEDIUM severity fixes (within 1 month)")
if self.findings['low']:
print(" 4. Consider LOW severity improvements (next quarter)")
def main():
auditor = IAMSecurityAuditor()
auditor.run_full_audit()
if __name__ == "__main__":
main()
Step 2: Implement Least Privilege Access
Create policies that grant minimum necessary permissions:
def generate_least_privilege_policy(service_usage_logs):
"""
Generate least privilege policy based on actual usage
This analyzes CloudTrail logs to determine minimum required permissions
"""
import boto3
from collections import defaultdict
# Analyze CloudTrail logs
actions_used = defaultdict(set) # user -> set of actions
resources_accessed = defaultdict(set) # user -> set of resources
cloudtrail = boto3.client('cloudtrail')
# Get events from last 30 days
events = cloudtrail.lookup_events(
StartTime=datetime.utcnow() - timedelta(days=30)
)
for event in events['Events']:
user_identity = event.get('UserIdentity', {})
username = user_identity.get('userName', '')
if not username:
continue
# Extract action and resource
event_name = event.get('EventName', '')
event_source = event.get('EventSource', '').replace('.amazonaws.com', '')
action = f"{event_source}:{event_name}"
actions_used[username].add(action)
# Extract resources
resources = event.get('Resources', [])
for resource in resources:
resource_arn = resource.get('ARN', '')
if resource_arn:
resources_accessed[username].add(resource_arn)
# Generate policies
policies = {}
for username, actions in actions_used.items():
user_resources = list(resources_accessed[username])
# Group actions by service
service_actions = defaultdict(list)
for action in actions:
if ':' in action:
service, action_name = action.split(':', 1)
service_actions[service].append(action_name)
# Generate policy statements
statements = []
for service, service_action_list in service_actions.items():
# Create service-specific statement
statement = {
"Effect": "Allow",
"Action": [f"{service}:{action}" for action in service_action_list]
}
# Add resources if available
service_resources = [r for r in user_resources if service in r]
if service_resources:
statement["Resource"] = service_resources
else:
statement["Resource"] = "*" # Some actions don't support resource-level permissions
statements.append(statement)
# Create policy document
policy_doc = {
"Version": "2012-10-17",
"Statement": statements
}
policies[username] = policy_doc
return policies
# Example usage
def create_developer_least_privilege_policy():
"""Create least privilege policy for developers"""
# Based on analysis of typical developer activities
developer_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadOnlyAccess",
"Effect": "Allow",
"Action": [
"ec2:Describe*",
"s3:List*",
"s3:Get*",
"lambda:List*",
"lambda:Get*",
"logs:Describe*",
"logs:Get*",
"logs:FilterLogEvents",
"cloudwatch:Get*",
"cloudwatch:List*",
"cloudwatch:Describe*",
"rds:Describe*",
"dynamodb:List*",
"dynamodb:Describe*",
"iam:List*",
"iam:Get*"
],
"Resource": "*"
},
{
"Sid": "DevelopmentEnvironmentWrite",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:DeleteObject",
"s3:PutObjectAcl"
],
"Resource": [
"arn:aws:s3:::*-dev/*",
"arn:aws:s3:::*-development/*",
"arn:aws:s3:::*-staging/*"
]
},
{
"Sid": "LambdaDevelopment",
"Effect": "Allow",
"Action": [
"lambda:UpdateFunctionCode",
"lambda:UpdateFunctionConfiguration",
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:*:*:function:*-dev-*",
"arn:aws:lambda:*:*:function:*-development-*"
]
},
{
"Sid": "LogsAccess",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:*:*:log-group:/aws/lambda/*-dev-*",
"arn:aws:logs:*:*:log-group:/aws/lambda/*-development-*"
]
},
{
"Sid": "DenyProductionAccess",
"Effect": "Deny",
"Action": "*",
"Resource": [
"arn:aws:s3:::*-prod/*",
"arn:aws:s3:::*-production/*",
"arn:aws:lambda:*:*:function:*-prod-*",
"arn:aws:lambda:*:*:function:*-production-*",
"arn:aws:rds:*:*:db:*-prod-*",
"arn:aws:rds:*:*:db:*-production-*"
]
},
{
"Sid": "DenyIAMChanges",
"Effect": "Deny",
"Action": [
"iam:Create*",
"iam:Delete*",
"iam:Attach*",
"iam:Detach*",
"iam:Put*",
"iam:Update*"
],
"Resource": "*"
}
]
}
return developer_policy
Step 3: Implement MFA Enforcement
Force MFA for all privileged operations:
def create_mfa_enforcement_policy():
"""Create policy that enforces MFA for sensitive operations"""
mfa_enforcement_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowViewAccountInfo",
"Effect": "Allow",
"Action": [
"iam:GetAccountPasswordPolicy",
"iam:GetAccountSummary",
"iam:ListVirtualMFADevices"
],
"Resource": "*"
},
{
"Sid": "AllowManageOwnPasswords",
"Effect": "Allow",
"Action": [
"iam:ChangePassword",
"iam:GetUser"
],
"Resource": "arn:aws:iam::*:user/${aws:username}"
},
{
"Sid": "AllowManageOwnMFA",
"Effect": "Allow",
"Action": [
"iam:CreateVirtualMFADevice",
"iam:DeleteVirtualMFADevice",
"iam:ListMFADevices",
"iam:EnableMFADevice",
"iam:ResyncMFADevice"
],
"Resource": [
"arn:aws:iam::*:mfa/${aws:username}",
"arn:aws:iam::*:user/${aws:username}"
]
},
{
"Sid": "AllowDeactivateOwnMFAOnlyWhenUsingMFA",
"Effect": "Allow",
"Action": [
"iam:DeactivateMFADevice"
],
"Resource": [
"arn:aws:iam::*:mfa/${aws:username}",
"arn:aws:iam::*:user/${aws:username}"
],
"Condition": {
"Bool": {
"aws:MultiFactorAuthPresent": "true"
}
}
},
{
"Sid": "DenyAllExceptUnlessSignedInWithMFA",
"Effect": "Deny",
"NotAction": [
"iam:CreateVirtualMFADevice",
"iam:EnableMFADevice",
"iam:GetUser",
"iam:ListMFADevices",
"iam:ListVirtualMFADevices",
"iam:ResyncMFADevice",
"sts:GetSessionToken"
],
"Resource": "*",
"Condition": {
"BoolIfExists": {
"aws:MultiFactorAuthPresent": "false"
}
}
}
]
}
return mfa_enforcement_policy
def setup_mfa_enforcement():
"""Set up MFA enforcement for all users"""
iam = boto3.client('iam')
# Create MFA enforcement policy
policy_doc = create_mfa_enforcement_policy()
try:
iam.create_policy(
PolicyName='EnforceMFA',
PolicyDocument=json.dumps(policy_doc),
Description='Enforces MFA for all operations except MFA setup'
)
print("✅ MFA enforcement policy created")
except iam.exceptions.EntityAlreadyExistsException:
print("ℹ️ MFA enforcement policy already exists")
# Attach to all users (or create a group)
try:
iam.create_group(GroupName='MFAUsers')
account_id = boto3.client('sts').get_caller_identity()['Account']
iam.attach_group_policy(
GroupName='MFAUsers',
PolicyArn=f'arn:aws:iam::{account_id}:policy/EnforceMFA'
)
print("✅ MFAUsers group created")
# Add all existing users to the group
paginator = iam.get_paginator('list_users')
for page in paginator.paginate():
for user in page['Users']:
try:
iam.add_user_to_group(
GroupName='MFAUsers',
UserName=user['UserName']
)
print(f"✅ Added {user['UserName']} to MFAUsers group")
except Exception as e:
print(f"⚠️ Could not add {user['UserName']} to group: {e}")
except iam.exceptions.EntityAlreadyExistsException:
print("ℹ️ MFAUsers group already exists")
Automation and Monitoring {#automation}
Manual IAM management doesn’t scale. Here’s how to automate it:
Automated Policy Review
#!/usr/bin/env python3
"""
Automated IAM Policy Review System
Continuously monitors and reports on IAM policy changes
"""
import boto3
import json
from datetime import datetime, timedelta
class IAMPolicyMonitor:
def __init__(self):
self.iam = boto3.client('iam')
self.sns = boto3.client('sns')
self.dynamodb = boto3.resource('dynamodb')
# Configuration
self.alert_topic_arn = "arn:aws:sns:us-east-1:123456789012:iam-alerts"
self.policy_table = self.dynamodb.Table('iam-policy-history')
def monitor_policy_changes(self):
"""Monitor IAM policy changes and alert on suspicious activity"""
# Get recent CloudTrail events
cloudtrail = boto3.client('cloudtrail')
policy_events = [
'AttachUserPolicy', 'DetachUserPolicy',
'AttachRolePolicy', 'DetachRolePolicy',
'AttachGroupPolicy', 'DetachGroupPolicy',
'PutUserPolicy', 'PutRolePolicy', 'PutGroupPolicy',
'DeleteUserPolicy', 'DeleteRolePolicy', 'DeleteGroupPolicy',
'CreatePolicy', 'DeletePolicy',
'CreatePolicyVersion', 'SetDefaultPolicyVersion'
]
alerts = []
for event_name in policy_events:
events = cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventName',
'AttributeValue': event_name
}
],
StartTime=datetime.utcnow() - timedelta(hours=1)
)
for event in events['Events']:
alert = self.analyze_policy_event(event, event_name)
if alert:
alerts.append(alert)
# Send alerts
for alert in alerts:
self.send_alert(alert)
return alerts
def analyze_policy_event(self, event, event_name):
"""Analyze individual policy event for suspicious activity"""
user_identity = event.get('UserIdentity', {})
username = user_identity.get('userName', 'unknown')
source_ip = event.get('SourceIPAddress', '')
# Parse event details
try:
event_detail = json.loads(event['CloudTrailEvent'])
request_params = event_detail.get('requestParameters', {})
except:
return None
alert = None
# Check 1: Policy attachment to self
if event_name in ['AttachUserPolicy', 'PutUserPolicy']:
target_user = request_params.get('userName', '')
if target_user == username:
alert = {
'severity': 'HIGH',
'type': 'Self Policy Attachment',
'user': username,
'event': event_name,
'details': f'User {username} attached policy to themselves',
'source_ip': source_ip,
'timestamp': event['EventTime']
}
# Check 2: Administrator policy attachment
elif event_name in ['AttachUserPolicy', 'AttachRolePolicy', 'AttachGroupPolicy']:
policy_arn = request_params.get('policyArn', '')
if 'AdministratorAccess' in policy_arn:
alert = {
'severity': 'CRITICAL',
'type': 'Administrator Policy Attached',
'user': username,
'event': event_name,
'details': f'Administrator policy attached by {username}',
'policy_arn': policy_arn,
'source_ip': source_ip,
'timestamp': event['EventTime']
}
# Check 3: Policy creation with overly broad permissions
elif event_name == 'CreatePolicy':
policy_doc = request_params.get('policyDocument', {})
if isinstance(policy_doc, str):
policy_doc = json.loads(policy_doc)
if self.has_dangerous_permissions(policy_doc):
alert = {
'severity': 'HIGH',
'type': 'Dangerous Policy Created',
'user': username,
'event': event_name,
'details': f'Policy with broad permissions created by {username}',
'policy_name': request_params.get('policyName', ''),
'source_ip': source_ip,
'timestamp': event['EventTime']
}
# Check 4: Outside business hours
if alert:
event_time = datetime.fromisoformat(event['EventTime'].replace('Z', '+00:00'))
if event_time.hour < 6 or event_time.hour > 22:
alert['severity'] = 'CRITICAL' if alert['severity'] == 'HIGH' else 'HIGH'
alert['details'] += ' (outside business hours)'
return alert
def has_dangerous_permissions(self, policy_doc):
"""Check if policy has dangerous permissions"""
for statement in policy_doc.get('Statement', []):
if statement.get('Effect') != 'Allow':
continue
actions = statement.get('Action', [])
resources = statement.get('Resource', [])
# Convert to lists
if isinstance(actions, str):
actions = [actions]
if isinstance(resources, str):
resources = [resources]
# Check for wildcards
if '*' in actions and '*' in resources:
return True
# Check for dangerous combinations
dangerous_actions = [
'iam:*',
'sts:AssumeRole',
'organizations:*',
'account:*'
]
for action in actions:
if action in dangerous_actions and '*' in resources:
return True
return False
def send_alert(self, alert):
"""Send alert via SNS"""
message = f"""
🚨 IAM SECURITY ALERT 🚨
Severity: {alert['severity']}
Type: {alert['type']}
User: {alert['user']}
Event: {alert['event']}
Time: {alert['timestamp']}
Source IP: {alert['source_ip']}
Details: {alert['details']}
Investigation Links:
- CloudTrail: https://console.aws.amazon.com/cloudtrail/home
- IAM Console: https://console.aws.amazon.com/iam/home
Immediate Actions:
1. Verify this was an authorized change
2. Review user's recent activity
3. Check for any unauthorized access
4. Consider revoking sessions if suspicious
"""
self.sns.publish(
TopicArn=self.alert_topic_arn,
Subject=f"IAM Alert: {alert['type']}",
Message=message
)
# Store in DynamoDB for tracking
self.policy_table.put_item(
Item={
'alert_id': f"{alert['user']}_{alert['timestamp']}",
'severity': alert['severity'],
'type': alert['type'],
'user': alert['user'],
'details': alert['details'],
'timestamp': alert['timestamp'],
'ttl': int((datetime.utcnow() + timedelta(days=90)).timestamp())
}
)
def main():
monitor = IAMPolicyMonitor()
alerts = monitor.monitor_policy_changes()
if alerts:
print(f"🚨 {len(alerts)} security alerts generated")
else:
print("✅ No suspicious IAM activity detected")
if __name__ == "__main__":
main()
Automated Access Review
#!/usr/bin/env python3
"""
Automated IAM Access Review
Periodically reviews and reports on IAM permissions
"""
import boto3
import json
from datetime import datetime, timedelta
from collections import defaultdict
class IAMAccessReviewer:
def __init__(self):
self.iam = boto3.client('iam')
self.cloudtrail = boto3.client('cloudtrail')
def generate_access_review_report(self):
"""Generate comprehensive access review report"""
report = {
'generated_at': datetime.utcnow().isoformat(),
'users': [],
'roles': [],
'unused_permissions': [],
'overprivileged_entities': [],
'recommendations': []
}
# Review users
report['users'] = self.review_users()
# Review roles
report['roles'] = self.review_roles()
# Find unused permissions
report['unused_permissions'] = self.find_unused_permissions()
# Find overprivileged entities
report['overprivileged_entities'] = self.find_overprivileged_entities()
# Generate recommendations
report['recommendations'] = self.generate_recommendations(report)
return report
def review_users(self):
"""Review all IAM users"""
users_review = []
paginator = self.iam.get_paginator('list_users')
for page in paginator.paginate():
for user in page['Users']:
username = user['UserName']
user_review = {
'username': username,
'created': user['CreateDate'].isoformat(),
'last_activity': self.get_last_activity(username),
'permissions': self.get_user_permissions(username),
'mfa_enabled': self.has_mfa_enabled(username),
'access_keys': self.get_access_key_info(username),
'risk_score': 0
}
# Calculate risk score
user_review['risk_score'] = self.calculate_user_risk_score(user_review)
users_review.append(user_review)
return users_review
def get_last_activity(self, username):
"""Get last activity for user from CloudTrail"""
try:
events = self.cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'UserName',
'AttributeValue': username
}
],
StartTime=datetime.utcnow() - timedelta(days=90)
)
if events['Events']:
return events['Events'][0]['EventTime'].isoformat()
else:
return None
except Exception as e:
return f"Error: {str(e)}"
def get_user_permissions(self, username):
"""Get all permissions for user"""
permissions = {
'attached_policies': [],
'inline_policies': [],
'group_policies': []
}
# Attached policies
attached = self.iam.list_attached_user_policies(UserName=username)
permissions['attached_policies'] = [p['PolicyName'] for p in attached['AttachedPolicies']]
# Inline policies
inline = self.iam.list_user_policies(UserName=username)
permissions['inline_policies'] = inline['PolicyNames']
# Group policies
groups = self.iam.get_groups_for_user(UserName=username)
for group in groups['Groups']:
group_attached = self.iam.list_attached_group_policies(GroupName=group['GroupName'])
group_inline = self.iam.list_group_policies(GroupName=group['GroupName'])
permissions['group_policies'].extend([
f"{group['GroupName']}:{p['PolicyName']}" for p in group_attached['AttachedPolicies']
])
permissions['group_policies'].extend([
f"{group['GroupName']}:{p}" for p in group_inline['PolicyNames']
])
return permissions
def has_mfa_enabled(self, username):
"""Check if user has MFA enabled"""
try:
mfa_devices = self.iam.list_mfa_devices(UserName=username)
return len(mfa_devices['MFADevices']) > 0
except:
return False
def get_access_key_info(self, username):
"""Get access key information"""
try:
keys = self.iam.list_access_keys(UserName=username)
key_info = []
for key in keys['AccessKeyMetadata']:
key_age = (datetime.now(key['CreateDate'].tzinfo) - key['CreateDate']).days
# Get last used info
try:
last_used = self.iam.get_access_key_last_used(AccessKeyId=key['AccessKeyId'])
last_used_date = last_used['AccessKeyLastUsed'].get('LastUsedDate')
last_used_str = last_used_date.isoformat() if last_used_date else 'Never'
except:
last_used_str = 'Unknown'
key_info.append({
'key_id': key['AccessKeyId'],
'status': key['Status'],
'age_days': key_age,
'last_used': last_used_str
})
return key_info
except:
return []
def calculate_user_risk_score(self, user_review):
"""Calculate risk score for user (0-100)"""
score = 0
# No recent activity
if not user_review['last_activity']:
score += 30
# No MFA
if not user_review['mfa_enabled']:
score += 25
# Admin permissions
permissions = user_review['permissions']
all_policies = (permissions['attached_policies'] +
permissions['inline_policies'] +
permissions['group_policies'])
if any('Admin' in policy for policy in all_policies):
score += 20
# Old access keys
for key in user_review['access_keys']:
if key['age_days'] > 90:
score += 10
# Multiple access keys
if len(user_review['access_keys']) > 1:
score += 5
return min(score, 100)
def find_unused_permissions(self):
"""Find permissions that haven't been used in 90 days"""
# This would require detailed CloudTrail analysis
# For brevity, returning placeholder
return [
{
'entity': 'DeveloperRole',
'unused_actions': ['s3:DeleteBucket', 'rds:DeleteDBInstance'],
'recommendation': 'Remove unused dangerous permissions'
}
]
def find_overprivileged_entities(self):
"""Find overprivileged users and roles"""
overprivileged = []
# Check users with admin access
paginator = self.iam.get_paginator('list_users')
for page in paginator.paginate():
for user in page['Users']:
username = user['UserName']
# Check for admin policies
attached = self.iam.list_attached_user_policies(UserName=username)
for policy in attached['AttachedPolicies']:
if 'Administrator' in policy['PolicyName']:
overprivileged.append({
'type': 'User',
'name': username,
'issue': f'Has administrator policy: {policy["PolicyName"]}',
'recommendation': 'Review if admin access is necessary'
})
return overprivileged
def generate_recommendations(self, report):
"""Generate actionable recommendations"""
recommendations = []
# High-risk users
high_risk_users = [u for u in report['users'] if u['risk_score'] > 70]
if high_risk_users:
recommendations.append({
'priority': 'HIGH',
'category': 'User Security',
'description': f'{len(high_risk_users)} users have high risk scores',
'action': 'Review high-risk users and remediate security issues'
})
# Users without MFA
no_mfa_users = [u for u in report['users'] if not u['mfa_enabled']]
if no_mfa_users:
recommendations.append({
'priority': 'HIGH',
'category': 'Authentication',
'description': f'{len(no_mfa_users)} users do not have MFA enabled',
'action': 'Enforce MFA for all users'
})
# Inactive users
inactive_users = [u for u in report['users'] if not u['last_activity']]
if inactive_users:
recommendations.append({
'priority': 'MEDIUM',
'category': 'Access Cleanup',
'description': f'{len(inactive_users)} users have no recent activity',
'action': 'Consider disabling or removing inactive users'
})
return recommendations
def main():
reviewer = IAMAccessReviewer()
report = reviewer.generate_access_review_report()
# Print summary
print("📊 IAM ACCESS REVIEW REPORT")
print("=" * 40)
print(f"Generated: {report['generated_at']}")
print(f"Users reviewed: {len(report['users'])}")
print(f"Roles reviewed: {len(report['roles'])}")
print(f"Recommendations: {len(report['recommendations'])}")
# Print high-priority recommendations
high_priority = [r for r in report['recommendations'] if r['priority'] == 'HIGH']
if high_priority:
print(f"\n🚨 HIGH PRIORITY ACTIONS:")
for rec in high_priority:
print(f" • {rec['description']}")
print(f" Action: {rec['action']}")
# Save detailed report
with open(f'iam_access_review_{datetime.now().strftime("%Y%m%d")}.json', 'w') as f:
json.dump(report, f, indent=2, default=str)
print(f"\n💾 Detailed report saved to iam_access_review_{datetime.now().strftime('%Y%m%d')}.json")
if __name__ == "__main__":
main()
Next Steps and Advanced Topics
This guide covers the fundamentals, but IAM security is an ongoing journey. Here are the advanced topics to explore next:
1. Permission Boundaries
Use IAM permission boundaries to set maximum permissions for users and roles.
2. AWS Organizations SCPs
Implement Service Control Policies for organization-wide guardrails.
3. Cross-Account Access Patterns
Design secure cross-account access for multi-account architectures.
4. Identity Federation
Integrate with external identity providers for enterprise SSO.
5. Advanced Monitoring
Implement ML-based anomaly detection for IAM activity.
Get PathShield IAM Protection
Manual IAM management is error-prone and time-consuming. PathShield automates everything in this guide:
✅ Automated IAM auditing - Continuous monitoring of all IAM configurations ✅ Real-time threat detection - Immediate alerts on suspicious IAM activity
✅ Least-privilege recommendations - AI-powered permission optimization ✅ Compliance reporting - SOC 2, PCI DSS, HIPAA IAM compliance ✅ One-click remediation - Fix issues instantly with approved actions
Resources:
- GitHub: Complete IAM security scripts
- IAM Security Checklist (PDF)
- Video: 30-minute IAM setup walkthrough
About the Author: I’ve implemented secure IAM for 300+ startups, from pre-seed to IPO. Previously built identity systems at companies processing billions in transactions.
Tags: #aws-iam #identity-access-management #aws-security #least-privilege #startup-security #iam-best-practices