Β· AWS Security Β· 22 min read
AWS Organizations Security Best Practices for Multi-Account Management
Master AWS Organizations security with comprehensive multi-account governance, automated security controls, and centralized compliance management. Complete guide with production-ready automation scripts.
As your startup scales, managing security across multiple AWS accounts becomes critical. AWS Organizations provides powerful tools for centralized governance, but without proper security configuration, it can become a compliance nightmare. Many growing companies struggle with inconsistent security policies, sprawling permissions, and lack of visibility across their account landscape.
In this comprehensive guide, weβll show you how to implement enterprise-grade security governance using AWS Organizations, create automated security controls that scale with your growth, and maintain compliance across your entire AWS footprint.
Understanding Multi-Account Security Architecture
A well-designed multi-account strategy provides strong security boundaries and simplifies compliance. Hereβs the recommended account structure for startups scaling to enterprise:
βββββββββββββββββββββββββββββββββββ
β Management Account β
β (Organizations Master) β
β β
β β’ Root user management β
β β’ Billing consolidation β
β β’ Organization policies β
β β’ CloudTrail organization β
ββββββββββββββββ¬βββββββββββββββββββ
β
ββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββ
β β β
βββββββββΌβββββββββ βββββββββββΌβββββββββ βββββββββββΌβββββββββ
β Security β β Shared Services β β Workload β
β Account β β Account β β Accounts β
β β β β β β
β β’ Security Hub β β β’ DNS (Route 53) β β β’ Production β
β β’ GuardDuty β β β’ Directory Svc β β β’ Staging β
β β’ Config β β β’ Logging β β β’ Development β
β β’ CloudTrail β β β’ Monitoring β β β’ Sandbox β
ββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ
Key Security Benefits
Blast Radius Containment: Security incidents in one account donβt affect others.
Simplified Compliance: Each environment can have tailored compliance controls.
Clear Ownership: Account boundaries make responsibility and access control clearer.
Cost Isolation: Better cost tracking and budget controls per environment.
Setting Up AWS Organizations with Security Focus
1. Management Account Configuration
Letβs start with secure Organizations setup:
import boto3
import json
from datetime import datetime, timedelta
class SecureOrganizationsManager:
def __init__(self, region='us-east-1'):
self.organizations = boto3.client('organizations', region_name=region)
self.iam = boto3.client('iam', region_name=region)
self.cloudtrail = boto3.client('cloudtrail', region_name=region)
self.s3 = boto3.client('s3', region_name=region)
self.region = region
def setup_secure_organization(self):
"""Set up AWS Organizations with security best practices"""
try:
# Create organization if it doesn't exist
try:
org = self.organizations.describe_organization()
org_id = org['Organization']['Id']
print(f"β
Using existing organization: {org_id}")
except self.organizations.exceptions.AWSOrganizationsNotInUseException:
org = self.organizations.create_organization(
FeatureSet='ALL' # Enable all features
)
org_id = org['Organization']['Id']
print(f"β
Created new organization: {org_id}")
# Enable trusted access for security services
self.enable_trusted_access()
# Create organizational units
ous = self.create_organizational_units()
# Set up Service Control Policies
scps = self.create_service_control_policies()
# Apply SCPs to OUs
self.apply_policies_to_ous(scps, ous)
# Set up centralized logging
self.setup_organization_cloudtrail()
# Configure AWS Config
self.setup_organization_config()
# Set up GuardDuty
self.setup_organization_guardduty()
return {
'organization_id': org_id,
'organizational_units': ous,
'policies': scps
}
except Exception as e:
print(f"β Error setting up organization: {e}")
return None
def enable_trusted_access(self):
"""Enable trusted access for AWS services"""
trusted_services = [
'cloudtrail.amazonaws.com',
'config.amazonaws.com',
'guardduty.amazonaws.com',
'securityhub.amazonaws.com',
'access-analyzer.amazonaws.com',
'sso.amazonaws.com',
'ram.amazonaws.com'
]
for service in trusted_services:
try:
self.organizations.enable_aws_service_access(
ServicePrincipal=service
)
print(f"β
Enabled trusted access for {service}")
except Exception as e:
print(f"β οΈ Could not enable {service}: {e}")
def create_organizational_units(self):
"""Create organizational units for different environments"""
# Get root ID
roots = self.organizations.list_roots()
root_id = roots['Roots'][0]['Id']
ous = {}
ou_structure = [
{'name': 'Security', 'description': 'Security and compliance accounts'},
{'name': 'SharedServices', 'description': 'Shared infrastructure accounts'},
{'name': 'Workloads', 'description': 'Application workload accounts'},
{'name': 'Sandbox', 'description': 'Sandbox and experimental accounts'}
]
for ou_config in ou_structure:
try:
response = self.organizations.create_organizational_unit(
ParentId=root_id,
Name=ou_config['name'],
Tags=[
{'Key': 'Purpose', 'Value': ou_config['description']},
{'Key': 'CreatedBy', 'Value': 'SecureOrganizationsManager'},
{'Key': 'CreatedAt', 'Value': datetime.now().isoformat()}
]
)
ou_id = response['OrganizationalUnit']['Id']
ous[ou_config['name']] = ou_id
print(f"β
Created OU: {ou_config['name']} ({ou_id})")
except self.organizations.exceptions.DuplicateOrganizationalUnitException:
# OU already exists, get its ID
existing_ous = self.organizations.list_organizational_units_for_parent(
ParentId=root_id
)
for ou in existing_ous['OrganizationalUnits']:
if ou['Name'] == ou_config['name']:
ous[ou_config['name']] = ou['Id']
print(f"β
Using existing OU: {ou_config['name']} ({ou['Id']})")
break
except Exception as e:
print(f"β Error creating OU {ou_config['name']}: {e}")
return ous
def create_service_control_policies(self):
"""Create Service Control Policies for different environments"""
policies = {}
# 1. Security Account SCP - Very restrictive
security_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": [
"organizations:LeaveOrganization",
"iam:CreateUser",
"iam:CreateAccessKey",
"iam:DeleteRole",
"iam:DeletePolicy",
"cloudtrail:StopLogging",
"cloudtrail:DeleteTrail",
"config:DeleteConfigurationRecorder",
"config:DeleteDeliveryChannel",
"guardduty:DeleteDetector"
],
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:PrincipalArn": [
"arn:aws:iam::*:role/OrganizationAccountAccessRole",
"arn:aws:iam::*:role/SecurityAdminRole"
]
}
}
},
{
"Effect": "Deny",
"Action": "*",
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:RequestedRegion": [
"us-east-1",
"us-west-2"
]
},
"ForAllValues:StringNotEquals": {
"aws:PrincipalArn": [
"arn:aws:iam::*:role/OrganizationAccountAccessRole"
]
}
}
}
]
}
policies['security'] = self.create_policy(
'SecurityAccountSCP',
'Service Control Policy for Security accounts',
security_policy
)
# 2. Production Workload SCP - Restrictive
production_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": [
"organizations:LeaveOrganization",
"iam:CreateUser",
"iam:CreateAccessKey",
"cloudtrail:StopLogging",
"cloudtrail:DeleteTrail",
"config:DeleteConfigurationRecorder"
],
"Resource": "*"
},
{
"Effect": "Deny",
"Action": [
"ec2:TerminateInstances",
"ec2:DeleteVolume",
"rds:DeleteDBInstance",
"s3:DeleteBucket"
],
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:PrincipalArn": [
"arn:aws:iam::*:role/ProductionAdminRole",
"arn:aws:iam::*:role/OrganizationAccountAccessRole"
]
}
}
},
{
"Effect": "Deny",
"Action": "*",
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:RequestedRegion": [
"us-east-1",
"us-west-2",
"eu-west-1"
]
}
}
}
]
}
policies['production'] = self.create_policy(
'ProductionWorkloadSCP',
'Service Control Policy for Production workload accounts',
production_policy
)
# 3. Development/Staging SCP - Moderate restrictions
development_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": [
"organizations:LeaveOrganization",
"cloudtrail:StopLogging",
"cloudtrail:DeleteTrail"
],
"Resource": "*"
},
{
"Effect": "Deny",
"Action": "*",
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:RequestedRegion": [
"us-east-1",
"us-west-2"
]
}
}
},
{
"Effect": "Deny",
"Action": [
"ec2:RunInstances"
],
"Resource": "arn:aws:ec2:*:*:instance/*",
"Condition": {
"ForAllValues:StringNotLike": {
"ec2:InstanceType": [
"t3.*",
"t2.*",
"m5.large",
"m5.xlarge"
]
}
}
}
]
}
policies['development'] = self.create_policy(
'DevelopmentSCP',
'Service Control Policy for Development accounts',
development_policy
)
# 4. Sandbox SCP - Cost controls only
sandbox_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": [
"organizations:LeaveOrganization"
],
"Resource": "*"
},
{
"Effect": "Deny",
"Action": [
"ec2:RunInstances"
],
"Resource": "arn:aws:ec2:*:*:instance/*",
"Condition": {
"ForAllValues:StringNotLike": {
"ec2:InstanceType": [
"t3.micro",
"t3.small",
"t2.micro",
"t2.small"
]
}
}
},
{
"Effect": "Deny",
"Action": "*",
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:RequestedRegion": [
"us-east-1"
]
}
}
}
]
}
policies['sandbox'] = self.create_policy(
'SandboxSCP',
'Service Control Policy for Sandbox accounts',
sandbox_policy
)
return policies
def create_policy(self, name, description, policy_document):
"""Create a Service Control Policy"""
try:
response = self.organizations.create_policy(
Name=name,
Description=description,
Type='SERVICE_CONTROL_POLICY',
Content=json.dumps(policy_document, indent=2)
)
policy_id = response['Policy']['PolicySummary']['Id']
print(f"β
Created SCP: {name} ({policy_id})")
return policy_id
except self.organizations.exceptions.DuplicatePolicyException:
# Policy already exists, get its ID
policies = self.organizations.list_policies(Filter='SERVICE_CONTROL_POLICY')
for policy in policies['Policies']:
if policy['Name'] == name:
print(f"β
Using existing SCP: {name} ({policy['Id']})")
return policy['Id']
except Exception as e:
print(f"β Error creating policy {name}: {e}")
return None
def apply_policies_to_ous(self, policies, ous):
"""Apply Service Control Policies to Organizational Units"""
policy_mappings = [
{'ou': 'Security', 'policy': 'security'},
{'ou': 'SharedServices', 'policy': 'production'},
{'ou': 'Workloads', 'policy': 'production'},
{'ou': 'Sandbox', 'policy': 'sandbox'}
]
for mapping in policy_mappings:
ou_name = mapping['ou']
policy_key = mapping['policy']
if ou_name in ous and policy_key in policies:
try:
self.organizations.attach_policy(
PolicyId=policies[policy_key],
TargetId=ous[ou_name]
)
print(f"β
Attached {policy_key} SCP to {ou_name} OU")
except Exception as e:
if "already attached" not in str(e):
print(f"β Error attaching policy to {ou_name}: {e}")
2. Centralized Security Services Setup
Configure organization-wide security services:
def setup_organization_cloudtrail(self):
"""Set up organization-wide CloudTrail"""
# Create S3 bucket for CloudTrail logs
bucket_name = f'organization-cloudtrail-{self.region}-{datetime.now().strftime("%Y%m%d")}'
try:
# Create bucket
if self.region == 'us-east-1':
self.s3.create_bucket(Bucket=bucket_name)
else:
self.s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': self.region}
)
# Configure bucket policy for CloudTrail
bucket_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AWSCloudTrailAclCheck",
"Effect": "Allow",
"Principal": {"Service": "cloudtrail.amazonaws.com"},
"Action": "s3:GetBucketAcl",
"Resource": f"arn:aws:s3:::{bucket_name}"
},
{
"Sid": "AWSCloudTrailWrite",
"Effect": "Allow",
"Principal": {"Service": "cloudtrail.amazonaws.com"},
"Action": "s3:PutObject",
"Resource": f"arn:aws:s3:::{bucket_name}/*",
"Condition": {
"StringEquals": {
"s3:x-amz-acl": "bucket-owner-full-control"
}
}
}
]
}
self.s3.put_bucket_policy(
Bucket=bucket_name,
Policy=json.dumps(bucket_policy)
)
# Enable bucket encryption
self.s3.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'AES256'
},
'BucketKeyEnabled': True
}
]
}
)
# Create organization trail
trail_name = 'OrganizationCloudTrail'
response = self.cloudtrail.create_trail(
Name=trail_name,
S3BucketName=bucket_name,
IncludeGlobalServiceEvents=True,
IsMultiRegionTrail=True,
EnableLogFileValidation=True,
IsOrganizationTrail=True,
EventSelectors=[
{
'ReadWriteType': 'All',
'IncludeManagementEvents': True,
'DataResources': [
{
'Type': 'AWS::S3::Object',
'Values': ['arn:aws:s3:::*/*']
},
{
'Type': 'AWS::Lambda::Function',
'Values': ['arn:aws:lambda:*:*:function/*']
}
]
}
]
)
# Start logging
self.cloudtrail.start_logging(Name=trail_name)
print(f"β
Created organization CloudTrail: {trail_name}")
return trail_name
except Exception as e:
print(f"β Error setting up organization CloudTrail: {e}")
return None
def setup_organization_config(self):
"""Set up AWS Config for the organization"""
config_client = boto3.client('config', region_name=self.region)
try:
# Create Config service role
config_role_name = 'AWSConfigRole'
assume_role_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "config.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
try:
self.iam.create_role(
RoleName=config_role_name,
AssumeRolePolicyDocument=json.dumps(assume_role_policy),
Description='Service role for AWS Config'
)
# Attach managed policy
self.iam.attach_role_policy(
RoleName=config_role_name,
PolicyArn='arn:aws:iam::aws:policy/service-role/ConfigRole'
)
except self.iam.exceptions.EntityAlreadyExistsException:
pass # Role already exists
# Create S3 bucket for Config
config_bucket = f'organization-config-{self.region}-{datetime.now().strftime("%Y%m%d")}'
if self.region == 'us-east-1':
self.s3.create_bucket(Bucket=config_bucket)
else:
self.s3.create_bucket(
Bucket=config_bucket,
CreateBucketConfiguration={'LocationConstraint': self.region}
)
# Set up configuration recorder
config_client.put_configuration_recorder(
ConfigurationRecorder={
'name': 'OrganizationConfigRecorder',
'roleARN': f'arn:aws:iam::{boto3.client("sts").get_caller_identity()["Account"]}:role/{config_role_name}',
'recordingGroup': {
'allSupported': True,
'includeGlobalResourceTypes': True,
'recordingModeOverrides': []
}
}
)
# Set up delivery channel
config_client.put_delivery_channel(
DeliveryChannel={
'name': 'OrganizationConfigDeliveryChannel',
's3BucketName': config_bucket,
'configSnapshotDeliveryProperties': {
'deliveryFrequency': 'TwentyFour_Hours'
}
}
)
# Start configuration recorder
config_client.start_configuration_recorder(
ConfigurationRecorderName='OrganizationConfigRecorder'
)
print("β
Set up organization AWS Config")
except Exception as e:
print(f"β Error setting up AWS Config: {e}")
def setup_organization_guardduty(self):
"""Set up GuardDuty for the organization"""
guardduty = boto3.client('guardduty', region_name=self.region)
try:
# Enable GuardDuty in master account
try:
response = guardduty.create_detector(
Enable=True,
FindingPublishingFrequency='FIFTEEN_MINUTES',
DataSources={
'S3Logs': {'Enable': True},
'KubernetesAuditLogs': {'Enable': True},
'MalwareProtection': {'ScanEc2InstanceWithFindings': {'EbsVolumes': True}}
}
)
detector_id = response['DetectorId']
print(f"β
Created GuardDuty detector: {detector_id}")
except guardduty.exceptions.BadRequestException:
# Detector already exists
detectors = guardduty.list_detectors()
detector_id = detectors['DetectorIds'][0]
print(f"β
Using existing GuardDuty detector: {detector_id}")
# Enable organization features
try:
guardduty.enable_organization_admin_account(
AdminAccountId=boto3.client("sts").get_caller_identity()["Account"]
)
print("β
Enabled GuardDuty organization admin")
except Exception as e:
if "already enabled" not in str(e):
print(f"β οΈ GuardDuty organization admin: {e}")
return detector_id
except Exception as e:
print(f"β Error setting up GuardDuty: {e}")
return None
Account Creation and Onboarding Automation
1. Automated Account Creation
Create a system for automated, secure account creation:
class AccountCreationManager:
def __init__(self, region='us-east-1'):
self.organizations = boto3.client('organizations', region_name=region)
self.sts = boto3.client('sts', region_name=region)
self.iam = boto3.client('iam', region_name=region)
def create_secure_account(self, account_name, email, ou_name, account_type='workload'):
"""Create a new AWS account with security best practices"""
try:
# Create account
response = self.organizations.create_account(
AccountName=account_name,
Email=email,
Tags=[
{'Key': 'AccountType', 'Value': account_type},
{'Key': 'CreatedBy', 'Value': 'AccountCreationManager'},
{'Key': 'CreatedAt', 'Value': datetime.now().isoformat()}
]
)
creation_request_id = response['CreateAccountStatus']['Id']
# Wait for account creation to complete
account_id = self.wait_for_account_creation(creation_request_id)
if account_id:
# Move account to appropriate OU
self.move_account_to_ou(account_id, ou_name)
# Set up baseline security configuration
self.setup_account_baseline_security(account_id, account_type)
# Create cross-account roles
self.create_cross_account_roles(account_id, account_type)
print(f"β
Successfully created and configured account: {account_id}")
return account_id
else:
print("β Account creation failed")
return None
except Exception as e:
print(f"β Error creating account: {e}")
return None
def wait_for_account_creation(self, request_id, max_wait_minutes=30):
"""Wait for account creation to complete"""
import time
for _ in range(max_wait_minutes * 2): # Check every 30 seconds
try:
response = self.organizations.describe_create_account_status(
CreateAccountRequestId=request_id
)
status = response['CreateAccountStatus']['State']
if status == 'SUCCEEDED':
account_id = response['CreateAccountStatus']['AccountId']
print(f"β
Account creation completed: {account_id}")
return account_id
elif status == 'FAILED':
failure_reason = response['CreateAccountStatus'].get('FailureReason', 'Unknown')
print(f"β Account creation failed: {failure_reason}")
return None
else:
print(f"β³ Account creation in progress: {status}")
except Exception as e:
print(f"Error checking account creation status: {e}")
time.sleep(30)
print("β Account creation timed out")
return None
def move_account_to_ou(self, account_id, ou_name):
"""Move account to appropriate Organizational Unit"""
try:
# Get root ID
roots = self.organizations.list_roots()
root_id = roots['Roots'][0]['Id']
# Find the target OU
ous = self.organizations.list_organizational_units_for_parent(ParentId=root_id)
target_ou_id = None
for ou in ous['OrganizationalUnits']:
if ou['Name'] == ou_name:
target_ou_id = ou['Id']
break
if target_ou_id:
self.organizations.move_account(
AccountId=account_id,
SourceParentId=root_id,
DestinationParentId=target_ou_id
)
print(f"β
Moved account {account_id} to {ou_name} OU")
else:
print(f"β Could not find OU: {ou_name}")
except Exception as e:
print(f"β Error moving account to OU: {e}")
def setup_account_baseline_security(self, account_id, account_type):
"""Set up baseline security configuration for new account"""
# This would typically use AWS Control Tower or custom automation
# to configure security baselines in the new account
baseline_configs = {
'production': {
'cloudtrail': True,
'config': True,
'guardduty': True,
'security_hub': True,
'password_policy': 'strict',
'mfa_required': True
},
'development': {
'cloudtrail': True,
'config': True,
'guardduty': True,
'security_hub': False,
'password_policy': 'standard',
'mfa_required': True
},
'sandbox': {
'cloudtrail': True,
'config': False,
'guardduty': False,
'security_hub': False,
'password_policy': 'standard',
'mfa_required': False
}
}
config = baseline_configs.get(account_type, baseline_configs['production'])
# In production, you would use AWS Systems Manager or Lambda
# to execute these configurations in the target account
print(f"π Baseline security configuration for {account_type}:")
for service, enabled in config.items():
print(f" - {service}: {enabled}")
def create_cross_account_roles(self, account_id, account_type):
"""Create cross-account access roles"""
# Define role configurations based on account type
role_configs = {
'production': [
{
'name': 'ProductionReadOnlyRole',
'description': 'Read-only access to production account',
'managed_policies': [
'arn:aws:iam::aws:policy/ReadOnlyAccess'
],
'trusted_accounts': ['SECURITY_ACCOUNT_ID']
},
{
'name': 'ProductionAdminRole',
'description': 'Administrative access to production account',
'managed_policies': [
'arn:aws:iam::aws:policy/PowerUserAccess'
],
'trusted_accounts': ['SECURITY_ACCOUNT_ID'],
'mfa_required': True
}
],
'development': [
{
'name': 'DeveloperRole',
'description': 'Developer access to development account',
'managed_policies': [
'arn:aws:iam::aws:policy/PowerUserAccess'
],
'trusted_accounts': ['SECURITY_ACCOUNT_ID']
}
],
'sandbox': [
{
'name': 'SandboxFullAccessRole',
'description': 'Full access to sandbox account',
'managed_policies': [
'arn:aws:iam::aws:policy/AdministratorAccess'
],
'trusted_accounts': ['SECURITY_ACCOUNT_ID']
}
]
}
roles = role_configs.get(account_type, [])
for role_config in roles:
print(f"π Would create role: {role_config['name']} in account {account_id}")
# In production, this would create the actual roles using
# cross-account execution or AWS Control Tower
2. Account Security Baseline Automation
Create automated security baseline deployment:
class SecurityBaselineManager:
def __init__(self, region='us-east-1'):
self.region = region
def deploy_security_baseline(self, account_id, account_type):
"""Deploy security baseline to a new account"""
# This would typically be implemented using:
# - AWS Control Tower (if available)
# - AWS Systems Manager automation documents
# - Lambda functions with cross-account roles
# - CloudFormation StackSets
baseline_components = self.get_baseline_components(account_type)
for component in baseline_components:
self.deploy_component(account_id, component)
def get_baseline_components(self, account_type):
"""Get security baseline components for account type"""
common_components = [
'iam_password_policy',
'cloudtrail_logging',
'vpc_flow_logs',
'default_encryption',
'access_logging'
]
type_specific = {
'production': [
'aws_config',
'guardduty',
'security_hub',
'inspector',
'systems_manager_compliance'
],
'development': [
'aws_config',
'guardduty',
'cost_budgets'
],
'sandbox': [
'cost_budgets',
'resource_limits'
]
}
components = common_components + type_specific.get(account_type, [])
return components
def deploy_component(self, account_id, component):
"""Deploy individual security component"""
# Component deployment templates
templates = {
'iam_password_policy': self.create_iam_password_policy_template(),
'cloudtrail_logging': self.create_cloudtrail_template(),
'vpc_flow_logs': self.create_vpc_flow_logs_template(),
'aws_config': self.create_config_template(),
'guardduty': self.create_guardduty_template()
}
if component in templates:
template = templates[component]
print(f"π§ Deploying {component} to account {account_id}")
# In production, deploy using CloudFormation StackSets or similar
else:
print(f"β οΈ Unknown component: {component}")
def create_iam_password_policy_template(self):
"""Create IAM password policy CloudFormation template"""
return {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "IAM Password Policy",
"Resources": {
"PasswordPolicy": {
"Type": "AWS::IAM::PasswordPolicy",
"Properties": {
"MinimumPasswordLength": 14,
"RequireSymbols": True,
"RequireNumbers": True,
"RequireUppercaseCharacters": True,
"RequireLowercaseCharacters": True,
"AllowUsersToChangePassword": True,
"MaxPasswordAge": 90,
"PasswordReusePrevention": 12,
"HardExpiry": False
}
}
}
}
def create_cloudtrail_template(self):
"""Create CloudTrail CloudFormation template"""
return {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Account-level CloudTrail",
"Resources": {
"CloudTrailBucket": {
"Type": "AWS::S3::Bucket",
"Properties": {
"BucketName": {
"Fn::Sub": "account-cloudtrail-${AWS::AccountId}-${AWS::Region}"
},
"BucketEncryption": {
"ServerSideEncryptionConfiguration": [
{
"ServerSideEncryptionByDefault": {
"SSEAlgorithm": "AES256"
}
}
]
},
"PublicAccessBlockConfiguration": {
"BlockPublicAcls": True,
"BlockPublicPolicy": True,
"IgnorePublicAcls": True,
"RestrictPublicBuckets": True
}
}
},
"CloudTrail": {
"Type": "AWS::CloudTrail::Trail",
"Properties": {
"TrailName": "AccountCloudTrail",
"S3BucketName": {"Ref": "CloudTrailBucket"},
"IncludeGlobalServiceEvents": True,
"IsMultiRegionTrail": True,
"EnableLogFileValidation": True
}
}
}
}
Cross-Account Access and Identity Management
1. Centralized Identity Management
Implement centralized identity management across accounts:
class CrossAccountIdentityManager:
def __init__(self, region='us-east-1'):
self.iam = boto3.client('iam', region_name=region)
self.organizations = boto3.client('organizations', region_name=region)
self.sso = boto3.client('sso-admin', region_name=region)
def setup_sso_integration(self):
"""Set up AWS SSO for centralized identity management"""
try:
# Get SSO instance
instances = self.sso.list_instances()
if not instances['Instances']:
print("β AWS SSO not enabled. Please enable AWS SSO first.")
return None
instance_arn = instances['Instances'][0]['InstanceArn']
identity_store_id = instances['Instances'][0]['IdentityStoreId']
# Create permission sets for different roles
permission_sets = self.create_permission_sets(instance_arn)
# Assign permission sets to accounts and groups
self.assign_permission_sets(instance_arn, permission_sets)
return {
'instance_arn': instance_arn,
'identity_store_id': identity_store_id,
'permission_sets': permission_sets
}
except Exception as e:
print(f"β Error setting up SSO: {e}")
return None
def create_permission_sets(self, instance_arn):
"""Create SSO permission sets for different roles"""
permission_set_configs = [
{
'name': 'SecurityAdministrator',
'description': 'Full access to security services across all accounts',
'session_duration': 'PT4H', # 4 hours
'managed_policies': [
'arn:aws:iam::aws:policy/SecurityAudit',
'arn:aws:iam::aws:policy/IAMFullAccess'
],
'inline_policy': {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"guardduty:*",
"securityhub:*",
"config:*",
"cloudtrail:*",
"inspector:*"
],
"Resource": "*"
}
]
}
},
{
'name': 'ProductionReadOnly',
'description': 'Read-only access to production accounts',
'session_duration': 'PT2H', # 2 hours
'managed_policies': [
'arn:aws:iam::aws:policy/ReadOnlyAccess'
]
},
{
'name': 'DeveloperAccess',
'description': 'Developer access for non-production accounts',
'session_duration': 'PT8H', # 8 hours
'managed_policies': [
'arn:aws:iam::aws:policy/PowerUserAccess'
],
'inline_policy': {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": [
"iam:CreateUser",
"iam:CreateAccessKey",
"iam:DeleteRole",
"organizations:*"
],
"Resource": "*"
}
]
}
},
{
'name': 'SandboxFullAccess',
'description': 'Full access for sandbox accounts',
'session_duration': 'PT12H', # 12 hours
'managed_policies': [
'arn:aws:iam::aws:policy/AdministratorAccess'
]
}
]
permission_sets = {}
for config in permission_set_configs:
try:
response = self.sso.create_permission_set(
Name=config['name'],
Description=config['description'],
InstanceArn=instance_arn,
SessionDuration=config['session_duration']
)
ps_arn = response['PermissionSet']['PermissionSetArn']
permission_sets[config['name']] = ps_arn
# Attach managed policies
for policy_arn in config.get('managed_policies', []):
self.sso.attach_managed_policy_to_permission_set(
InstanceArn=instance_arn,
PermissionSetArn=ps_arn,
ManagedPolicyArn=policy_arn
)
# Add inline policy if specified
if 'inline_policy' in config:
self.sso.put_inline_policy_to_permission_set(
InstanceArn=instance_arn,
PermissionSetArn=ps_arn,
InlinePolicy=json.dumps(config['inline_policy'])
)
print(f"β
Created permission set: {config['name']}")
except Exception as e:
if "already exists" in str(e):
print(f"β
Using existing permission set: {config['name']}")
else:
print(f"β Error creating permission set {config['name']}: {e}")
return permission_sets
def assign_permission_sets(self, instance_arn, permission_sets):
"""Assign permission sets to accounts and groups"""
# Get organization accounts
accounts = self.organizations.list_accounts()
# Define assignment rules
assignment_rules = [
{
'permission_set': 'SecurityAdministrator',
'accounts': 'all',
'groups': ['SecurityTeam']
},
{
'permission_set': 'ProductionReadOnly',
'accounts': ['production'],
'groups': ['Developers', 'SupportTeam']
},
{
'permission_set': 'DeveloperAccess',
'accounts': ['development', 'staging'],
'groups': ['Developers']
},
{
'permission_set': 'SandboxFullAccess',
'accounts': ['sandbox'],
'groups': ['Developers', 'Interns']
}
]
for rule in assignment_rules:
ps_name = rule['permission_set']
if ps_name not in permission_sets:
continue
ps_arn = permission_sets[ps_name]
# Get target accounts
target_accounts = []
if rule['accounts'] == 'all':
target_accounts = [acc['Id'] for acc in accounts['Accounts']]
else:
# In production, you'd have a mapping of account names to IDs
# For now, we'll just use the rule as-is
target_accounts = rule['accounts']
# Assign to each account and group combination
for account_id in target_accounts:
for group_name in rule['groups']:
try:
# In production, you'd get the actual group ID from Identity Store
print(f"π Would assign {ps_name} to {group_name} for account {account_id}")
# Actual assignment would be:
# self.sso.create_account_assignment(
# InstanceArn=instance_arn,
# TargetId=account_id,
# TargetType='AWS_ACCOUNT',
# PermissionSetArn=ps_arn,
# PrincipalType='GROUP',
# PrincipalId=group_id
# )
except Exception as e:
print(f"β Error assigning {ps_name}: {e}")
2. Cross-Account Automation Roles
Create roles for cross-account automation:
def create_automation_roles(self):
"""Create roles for cross-account automation"""
automation_roles = [
{
'name': 'OrganizationSecurityAuditRole',
'description': 'Role for cross-account security auditing',
'trusted_accounts': ['SECURITY_ACCOUNT_ID'],
'policies': [
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:List*",
"iam:Get*",
"ec2:Describe*",
"s3:GetBucket*",
"s3:List*",
"cloudtrail:Describe*",
"config:Describe*",
"guardduty:Get*",
"guardduty:List*"
],
"Resource": "*"
}
]
}
]
},
{
'name': 'OrganizationComplianceRole',
'description': 'Role for compliance automation',
'trusted_accounts': ['SECURITY_ACCOUNT_ID'],
'policies': [
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"config:PutConfigRule",
"config:DeleteConfigRule",
"config:PutRemediationConfiguration",
"lambda:InvokeFunction",
"ssm:SendCommand",
"ssm:StartAutomationExecution"
],
"Resource": "*"
}
]
}
]
},
{
'name': 'OrganizationIncidentResponseRole',
'description': 'Role for incident response automation',
'trusted_accounts': ['SECURITY_ACCOUNT_ID'],
'policies': [
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:StopInstances",
"ec2:CreateSnapshot",
"ec2:CreateImage",
"iam:AttachUserPolicy",
"iam:DetachUserPolicy",
"s3:PutBucketPolicy",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
]
}
]
for role_config in automation_roles:
print(f"π Creating automation role: {role_config['name']}")
# In production, create these roles in each account using
# CloudFormation StackSets or similar automation
Compliance and Governance Automation
1. Automated Compliance Monitoring
Create automated compliance monitoring across all accounts:
class ComplianceMonitoringManager:
def __init__(self, region='us-east-1'):
self.config = boto3.client('config', region_name=region)
self.organizations = boto3.client('organizations', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
def deploy_compliance_rules(self):
"""Deploy compliance rules across all organization accounts"""
# Get all accounts
accounts = self.organizations.list_accounts()
# Define compliance rules for different frameworks
compliance_rules = {
'SOC2': [
'root-mfa-enabled',
'iam-password-policy',
'cloudtrail-enabled',
'vpc-flow-logs-enabled',
's3-bucket-ssl-requests-only',
'encrypted-volumes',
'security-group-ssh-check'
],
'PCI_DSS': [
'cloudtrail-enabled',
'mfa-enabled-for-iam-console-access',
's3-bucket-public-access-prohibited',
'encrypted-volumes',
'security-group-ssh-check',
'rds-storage-encrypted'
],
'HIPAA': [
'cloudtrail-enabled',
's3-bucket-ssl-requests-only',
'encrypted-volumes',
'rds-storage-encrypted',
'kms-cmk-not-scheduled-for-deletion'
]
}
# Deploy rules to appropriate accounts
for framework, rules in compliance_rules.items():
self.deploy_framework_rules(framework, rules, accounts['Accounts'])
def deploy_framework_rules(self, framework, rules, accounts):
"""Deploy compliance rules for a specific framework"""
print(f"π Deploying {framework} compliance rules...")
for account in accounts:
account_id = account['Id']
# Skip management account for some rules
if self.should_skip_account(account_id, framework):
continue
for rule_name in rules:
self.deploy_config_rule(account_id, rule_name, framework)
def should_skip_account(self, account_id, framework):
"""Determine if account should be skipped for certain rules"""
# Get current account ID (management account)
current_account = boto3.client('sts').get_caller_identity()['Account']
# Skip management account for some rules
if account_id == current_account:
return False # Apply rules to management account too
return False
def deploy_config_rule(self, account_id, rule_name, framework):
"""Deploy individual Config rule to account"""
# Rule configurations
rule_configs = {
'root-mfa-enabled': {
'ConfigRuleName': 'root-user-mfa-enabled',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'ROOT_USER_MFA_ENABLED'
}
},
'iam-password-policy': {
'ConfigRuleName': 'iam-password-policy-check',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'IAM_PASSWORD_POLICY'
},
'InputParameters': json.dumps({
'RequireUppercaseCharacters': 'true',
'RequireLowercaseCharacters': 'true',
'RequireSymbols': 'true',
'RequireNumbers': 'true',
'MinimumPasswordLength': '14',
'PasswordReusePrevention': '12',
'MaxPasswordAge': '90'
})
},
'cloudtrail-enabled': {
'ConfigRuleName': 'cloudtrail-enabled-check',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'CLOUD_TRAIL_ENABLED'
}
},
'vpc-flow-logs-enabled': {
'ConfigRuleName': 'vpc-flow-logs-enabled',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'VPC_FLOW_LOGS_ENABLED'
}
},
's3-bucket-ssl-requests-only': {
'ConfigRuleName': 's3-bucket-ssl-requests-only',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'S3_BUCKET_SSL_REQUESTS_ONLY'
}
},
'encrypted-volumes': {
'ConfigRuleName': 'encrypted-volumes-check',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'ENCRYPTED_VOLUMES'
}
},
'security-group-ssh-check': {
'ConfigRuleName': 'incoming-ssh-disabled',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'INCOMING_SSH_DISABLED'
}
}
}
if rule_name in rule_configs:
rule_config = rule_configs[rule_name]
print(f"π Deploying {rule_name} to account {account_id}")
# In production, this would use cross-account roles or StackSets
# to deploy the rule to the target account
else:
print(f"β οΈ Unknown rule: {rule_name}")
def generate_compliance_report(self):
"""Generate organization-wide compliance report"""
accounts = self.organizations.list_accounts()
compliance_status = {
'compliant_accounts': 0,
'non_compliant_accounts': 0,
'account_details': {}
}
for account in accounts['Accounts']:
account_id = account['Id']
account_status = self.get_account_compliance_status(account_id)
compliance_status['account_details'][account_id] = account_status
if account_status['overall_compliance'] >= 0.8: # 80% compliance threshold
compliance_status['compliant_accounts'] += 1
else:
compliance_status['non_compliant_accounts'] += 1
# Send metrics to CloudWatch
self.send_compliance_metrics(compliance_status)
# Generate detailed report
return self.create_compliance_report(compliance_status)
def get_account_compliance_status(self, account_id):
"""Get compliance status for individual account"""
# In production, this would query Config rules across accounts
# using cross-account roles or Config aggregators
return {
'total_rules': 10,
'compliant_rules': 8,
'non_compliant_rules': 2,
'overall_compliance': 0.8,
'critical_violations': 1,
'last_checked': datetime.now().isoformat()
}
def send_compliance_metrics(self, compliance_status):
"""Send compliance metrics to CloudWatch"""
metrics = [
{
'MetricName': 'CompliantAccounts',
'Value': compliance_status['compliant_accounts'],
'Unit': 'Count'
},
{
'MetricName': 'NonCompliantAccounts',
'Value': compliance_status['non_compliant_accounts'],
'Unit': 'Count'
},
{
'MetricName': 'OverallComplianceRate',
'Value': compliance_status['compliant_accounts'] /
(compliance_status['compliant_accounts'] + compliance_status['non_compliant_accounts']),
'Unit': 'Percent'
}
]
try:
self.cloudwatch.put_metric_data(
Namespace='Organizations/Compliance',
MetricData=metrics
)
print("π Compliance metrics sent to CloudWatch")
except Exception as e:
print(f"β Error sending compliance metrics: {e}")
def create_compliance_report(self, compliance_status):
"""Create detailed compliance report"""
report = {
'report_timestamp': datetime.now().isoformat(),
'organization_summary': {
'total_accounts': len(compliance_status['account_details']),
'compliant_accounts': compliance_status['compliant_accounts'],
'non_compliant_accounts': compliance_status['non_compliant_accounts'],
'compliance_rate': compliance_status['compliant_accounts'] / len(compliance_status['account_details'])
},
'account_details': compliance_status['account_details'],
'recommendations': self.get_compliance_recommendations(compliance_status)
}
return report
def get_compliance_recommendations(self, compliance_status):
"""Get compliance improvement recommendations"""
recommendations = []
non_compliant_count = compliance_status['non_compliant_accounts']
if non_compliant_count > 0:
recommendations.append({
'priority': 'HIGH',
'title': 'Address Non-Compliant Accounts',
'description': f'{non_compliant_count} accounts are below compliance threshold',
'action': 'Review and remediate compliance violations in identified accounts'
})
# Add more specific recommendations based on common issues
recommendations.extend([
{
'priority': 'MEDIUM',
'title': 'Automate Compliance Remediation',
'description': 'Implement automated remediation for common compliance violations',
'action': 'Set up AWS Config remediation actions for critical rules'
},
{
'priority': 'LOW',
'title': 'Regular Compliance Reviews',
'description': 'Schedule monthly compliance reviews',
'action': 'Create calendar reminders for compliance team reviews'
}
])
return recommendations
Complete Implementation Script
Hereβs a comprehensive script that ties everything together:
#!/usr/bin/env python3
import boto3
import json
import argparse
from datetime import datetime
class OrganizationSecurityManager:
def __init__(self, region='us-east-1'):
self.region = region
self.org_manager = SecureOrganizationsManager(region)
self.account_manager = AccountCreationManager(region)
self.identity_manager = CrossAccountIdentityManager(region)
self.compliance_manager = ComplianceMonitoringManager(region)
def setup_secure_organization(self):
"""Complete organization security setup"""
print("π Starting secure AWS Organizations setup...")
# 1. Set up organization with security best practices
print("1οΈβ£ Setting up secure organization...")
org_config = self.org_manager.setup_secure_organization()
if not org_config:
print("β Failed to set up organization")
return False
# 2. Set up centralized identity management
print("2οΈβ£ Setting up centralized identity management...")
sso_config = self.identity_manager.setup_sso_integration()
# 3. Deploy compliance monitoring
print("3οΈβ£ Deploying compliance monitoring...")
self.compliance_manager.deploy_compliance_rules()
# 4. Generate initial compliance report
print("4οΈβ£ Generating compliance report...")
compliance_report = self.compliance_manager.generate_compliance_report()
print("β
Organization security setup completed!")
return {
'organization': org_config,
'sso': sso_config,
'compliance': compliance_report
}
def create_new_account(self, account_name, email, account_type, ou_name):
"""Create and configure new account"""
print(f"ποΈ Creating new {account_type} account: {account_name}")
# Create account
account_id = self.account_manager.create_secure_account(
account_name, email, ou_name, account_type
)
if account_id:
# Wait for account to be ready, then run compliance check
print("β³ Waiting for account to be ready...")
time.sleep(60) # Wait for account to stabilize
# Check compliance
compliance_status = self.compliance_manager.get_account_compliance_status(account_id)
print(f"β
Account {account_id} created with {compliance_status['overall_compliance']*100:.1f}% compliance")
return account_id
else:
print("β Account creation failed")
return None
def run_compliance_audit(self):
"""Run comprehensive compliance audit"""
print("π Running organization-wide compliance audit...")
compliance_report = self.compliance_manager.generate_compliance_report()
# Print summary
summary = compliance_report['organization_summary']
print(f"π Compliance Summary:")
print(f" - Total Accounts: {summary['total_accounts']}")
print(f" - Compliant: {summary['compliant_accounts']}")
print(f" - Non-Compliant: {summary['non_compliant_accounts']}")
print(f" - Compliance Rate: {summary['compliance_rate']*100:.1f}%")
# Show recommendations
print(f"\nπ‘ Recommendations:")
for rec in compliance_report['recommendations'][:3]: # Top 3
print(f" - {rec['priority']}: {rec['title']}")
print(f" {rec['description']}")
return compliance_report
def emergency_lockdown(self, account_id):
"""Emergency lockdown procedures for compromised account"""
print(f"π¨ Initiating emergency lockdown for account {account_id}")
# This would implement emergency response procedures:
# 1. Attach restrictive SCP
# 2. Disable SSO access
# 3. Rotate access keys
# 4. Enable detailed logging
# 5. Notify security team
lockdown_scp = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": "*",
"Resource": "*",
"Condition": {
"StringNotEquals": {
"aws:PrincipalArn": [
"arn:aws:iam::*:role/OrganizationIncidentResponseRole"
]
}
}
}
]
}
print("π Would apply emergency lockdown SCP")
print("π§ Would notify security team")
print("π Would disable user access")
# In production, implement actual lockdown procedures
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='AWS Organizations Security Management')
parser.add_argument('--setup', action='store_true', help='Set up secure organization')
parser.add_argument('--create-account', help='Create new account (format: name:email:type:ou)')
parser.add_argument('--audit', action='store_true', help='Run compliance audit')
parser.add_argument('--lockdown', help='Emergency lockdown for account ID')
parser.add_argument('--region', default='us-east-1', help='AWS region')
args = parser.parse_args()
manager = OrganizationSecurityManager(region=args.region)
if args.setup:
result = manager.setup_secure_organization()
if result:
print("π Organization security setup completed successfully!")
elif args.create_account:
parts = args.create_account.split(':')
if len(parts) == 4:
name, email, account_type, ou = parts
account_id = manager.create_new_account(name, email, account_type, ou)
if account_id:
print(f"π Account created successfully: {account_id}")
else:
print("β Invalid format. Use: name:email:type:ou")
elif args.audit:
report = manager.run_compliance_audit()
print("π Detailed report available in compliance object")
elif args.lockdown:
success = manager.emergency_lockdown(args.lockdown)
if success:
print(f"π Emergency lockdown initiated for {args.lockdown}")
else:
print("Use --setup, --create-account, --audit, or --lockdown")
Beyond Manual Multi-Account Management: The PathShield Advantage
While implementing comprehensive AWS Organizations security provides excellent governance and control, managing it at scale presents significant challenges:
Complex Policy Management: Service Control Policies become increasingly complex as your organization grows, requiring deep AWS expertise to maintain effectively.
Cross-Account Visibility: Maintaining security visibility across dozens of accounts requires sophisticated tooling and constant monitoring.
Compliance Overhead: Manual compliance monitoring and reporting across multiple accounts and frameworks requires substantial resources and expertise.
Identity Management Complexity: Managing cross-account access, roles, and permissions becomes exponentially more complex as teams and accounts grow.
Incident Response Coordination: Coordinating security incident response across multiple accounts requires pre-built automation and clear procedures.
This is where PathShield transforms your multi-account security approach. Instead of building and maintaining complex Organizations governance manually, PathShield provides:
- Automated Multi-Account Governance: Intelligent policy management and compliance monitoring across your entire AWS footprint
- Unified Security Dashboard: Single-pane-of-glass visibility into security across all accounts and environments
- Continuous Compliance Monitoring: Real-time compliance checking and reporting for SOC 2, PCI DSS, and other frameworks
- Intelligent Access Management: Smart cross-account access controls that adapt to your organizational changes
- Coordinated Incident Response: Automated incident response workflows that work seamlessly across account boundaries
Ready to move beyond manual multi-account security management? Start your free PathShield trial and get enterprise-grade AWS Organizations security that scales with your startupβs growth without the operational complexity.