· PathShield Team · Tutorials · 22 min read
EKS Security Hardening Checklist for Production - Complete 2025 Guide
Secure your Amazon EKS clusters for production with this comprehensive hardening guide. 100+ security checks, automated scripts, and real-world attack prevention.
EKS Security Hardening Checklist for Production - Complete 2025 Guide
A crypto mining attack on an EKS cluster cost a startup $47,000 in compute charges and 3 weeks of downtime. The attackers exploited a misconfigured RBAC policy and deployed miners across 200+ pods. This comprehensive guide shows you how to harden your EKS clusters to prevent attacks like this.
Why EKS Security Hardening Matters More Than Ever
Common EKS attack vectors in 2025:
- Exposed Kubernetes API servers
- Over-privileged service accounts
- Vulnerable container images
- Misconfigured network policies
- Weak RBAC configurations
- Unencrypted secrets and etcd
The cost of EKS security failures:
- Average cryptomining attack: $15,000-$50,000
- Data breaches: $4.2M average cost
- Compliance violations: $500K-$2M in fines
- Downtime and recovery: Weeks of engineering time
EKS Security Architecture Overview
graph TB
A[User/Developer] --> B[AWS IAM]
B --> C[EKS Control Plane]
C --> D[Worker Nodes]
D --> E[Pods]
F[VPC] --> G[Private Subnets]
G --> H[Security Groups]
H --> D
I[Secrets Manager] --> E
J[ECR] --> E
K[CloudTrail] --> C
L[VPC Flow Logs] --> H
The Complete EKS Security Hardening Checklist
1. Control Plane Security (Critical)
Enable EKS Cluster Logging
# Enable all log types for security monitoring
aws eks update-cluster-config \
--name production-cluster \
--logging '{
"enable": [
{
"types": ["api", "audit", "authenticator", "controllerManager", "scheduler"]
}
]
}'
Configure Private API Server Access
# Terraform configuration for private EKS cluster
resource "aws_eks_cluster" "production" {
name = "production-cluster"
role_arn = aws_iam_role.eks_cluster.arn
version = "1.28"
vpc_config {
subnet_ids = var.private_subnet_ids
endpoint_private_access = true
endpoint_public_access = false # Disable public access
public_access_cidrs = []
security_group_ids = [aws_security_group.eks_cluster.id]
}
encryption_config {
provider {
key_arn = aws_kms_key.eks.arn
}
resources = ["secrets"]
}
enabled_cluster_log_types = [
"api", "audit", "authenticator", "controllerManager", "scheduler"
]
depends_on = [
aws_iam_role_policy_attachment.eks_cluster_policy,
aws_cloudwatch_log_group.eks_cluster,
]
tags = {
Name = "production-cluster"
Environment = "production"
Security = "hardened"
}
}
# KMS key for etcd encryption
resource "aws_kms_key" "eks" {
description = "EKS Secret Encryption"
deletion_window_in_days = 7
enable_key_rotation = true
tags = {
Name = "eks-secrets-key"
}
}
Implement Strong API Server Authentication
# Enhanced cluster configuration with OIDC
resource "aws_eks_identity_provider_config" "oidc" {
cluster_name = aws_eks_cluster.production.name
oidc {
client_id = "your-oidc-client-id"
identity_provider_config_name = "corporate-oidc"
issuer_url = "https://your-identity-provider.com"
username_claim = "email"
username_prefix = "oidc:"
groups_claim = "groups"
groups_prefix = "oidc:"
}
}
2. Network Security Hardening
Secure VPC Configuration
#!/usr/bin/env python3
"""
EKS Network Security Assessment Script
"""
import boto3
import json
from datetime import datetime
class EKSNetworkSecurityAuditor:
def __init__(self, cluster_name):
self.cluster_name = cluster_name
self.eks = boto3.client('eks')
self.ec2 = boto3.client('ec2')
self.findings = []
def audit_cluster_network_security(self):
"""Comprehensive network security audit"""
print(f"🔍 Auditing network security for EKS cluster: {self.cluster_name}")
# Get cluster details
cluster = self.eks.describe_cluster(name=self.cluster_name)['cluster']
vpc_config = cluster['resourcesVpcConfig']
# Audit API server access
self.audit_api_server_access(vpc_config)
# Audit security groups
self.audit_security_groups(vpc_config)
# Audit subnets
self.audit_subnets(vpc_config)
# Audit node groups
self.audit_node_groups()
# Generate report
self.generate_network_security_report()
def audit_api_server_access(self, vpc_config):
"""Audit EKS API server access configuration"""
endpoint_config = vpc_config.get('endpointConfigResponse', {})
# Check if public access is disabled
public_access = endpoint_config.get('publicAccess', True)
if public_access:
self.add_finding(
'HIGH',
'API Server Public Access',
'EKS API server allows public access',
'Disable public access and use private access only'
)
# Check public access CIDRs if public access is enabled
if public_access:
public_cidrs = endpoint_config.get('publicAccessCidrs', [])
if '0.0.0.0/0' in public_cidrs:
self.add_finding(
'CRITICAL',
'API Server Open to Internet',
'EKS API server accessible from anywhere on the internet',
'Restrict access to specific IP ranges'
)
# Check private access
private_access = endpoint_config.get('privateAccess', False)
if not private_access:
self.add_finding(
'MEDIUM',
'Private Access Disabled',
'EKS API server private access is disabled',
'Enable private access for internal connectivity'
)
def audit_security_groups(self, vpc_config):
"""Audit EKS security groups"""
cluster_sg_id = vpc_config.get('clusterSecurityGroupId')
additional_sgs = vpc_config.get('securityGroupIds', [])
all_sgs = [cluster_sg_id] + additional_sgs
for sg_id in all_sgs:
if sg_id:
self.audit_single_security_group(sg_id)
def audit_single_security_group(self, sg_id):
"""Audit individual security group"""
try:
response = self.ec2.describe_security_groups(GroupIds=[sg_id])
sg = response['SecurityGroups'][0]
# Check ingress rules
for rule in sg.get('IpPermissions', []):
self.check_security_group_rule(sg_id, rule, 'ingress')
# Check egress rules
for rule in sg.get('IpPermissionsEgress', []):
self.check_security_group_rule(sg_id, rule, 'egress')
except Exception as e:
print(f"❌ Error auditing security group {sg_id}: {e}")
def check_security_group_rule(self, sg_id, rule, direction):
"""Check individual security group rule"""
from_port = rule.get('FromPort', 0)
to_port = rule.get('ToPort', 65535)
protocol = rule.get('IpProtocol', 'all')
# Check for overly permissive rules
for ip_range in rule.get('IpRanges', []):
cidr = ip_range.get('CidrIp', '')
if cidr == '0.0.0.0/0':
if direction == 'ingress' and from_port in [22, 443, 6443, 10250]:
self.add_finding(
'HIGH',
f'Security Group {direction.title()} Rule',
f'Security group {sg_id} allows {direction} from anywhere on port {from_port}',
f'Restrict {direction} access to specific IP ranges'
)
elif protocol == '-1': # All protocols
self.add_finding(
'CRITICAL',
f'Security Group {direction.title()} Rule',
f'Security group {sg_id} allows all {direction} traffic from anywhere',
f'Implement least privilege {direction} rules'
)
def audit_subnets(self, vpc_config):
"""Audit EKS subnets configuration"""
subnet_ids = vpc_config.get('subnetIds', [])
if len(subnet_ids) < 2:
self.add_finding(
'MEDIUM',
'Insufficient Subnet Redundancy',
'EKS cluster has fewer than 2 subnets',
'Deploy across multiple AZs for high availability'
)
# Check if subnets are private
for subnet_id in subnet_ids:
try:
response = self.ec2.describe_subnets(SubnetIds=[subnet_id])
subnet = response['Subnets'][0]
if subnet.get('MapPublicIpOnLaunch', False):
self.add_finding(
'HIGH',
'Public Subnet Usage',
f'EKS cluster uses public subnet {subnet_id}',
'Use private subnets for EKS worker nodes'
)
except Exception as e:
print(f"❌ Error auditing subnet {subnet_id}: {e}")
def audit_node_groups(self):
"""Audit EKS node groups"""
try:
node_groups = self.eks.list_nodegroups(clusterName=self.cluster_name)
for ng_name in node_groups['nodegroups']:
ng_details = self.eks.describe_nodegroup(
clusterName=self.cluster_name,
nodegroupName=ng_name
)['nodegroup']
self.audit_single_node_group(ng_name, ng_details)
except Exception as e:
print(f"❌ Error auditing node groups: {e}")
def audit_single_node_group(self, ng_name, ng_details):
"""Audit individual node group"""
# Check if nodes are in public subnets
subnets = ng_details.get('subnets', [])
for subnet_id in subnets:
try:
response = self.ec2.describe_subnets(SubnetIds=[subnet_id])
subnet = response['Subnets'][0]
if subnet.get('MapPublicIpOnLaunch', False):
self.add_finding(
'HIGH',
'Node Group in Public Subnet',
f'Node group {ng_name} deploys nodes in public subnet {subnet_id}',
'Move node group to private subnets'
)
except:
pass
# Check remote access configuration
remote_access = ng_details.get('remoteAccess', {})
if remote_access.get('ec2SshKey'):
source_sgs = remote_access.get('sourceSecurityGroups', [])
if not source_sgs:
self.add_finding(
'MEDIUM',
'Unrestricted SSH Access',
f'Node group {ng_name} allows SSH without security group restrictions',
'Restrict SSH access to specific security groups'
)
def add_finding(self, severity, category, description, recommendation):
"""Add security finding"""
self.findings.append({
'severity': severity,
'category': category,
'description': description,
'recommendation': recommendation,
'timestamp': datetime.now().isoformat()
})
def generate_network_security_report(self):
"""Generate network security audit report"""
if not self.findings:
print("✅ No network security issues found!")
return
# Sort findings by severity
severity_order = {'CRITICAL': 0, 'HIGH': 1, 'MEDIUM': 2, 'LOW': 3}
sorted_findings = sorted(
self.findings,
key=lambda x: severity_order.get(x['severity'], 4)
)
print(f"\n🔒 EKS Network Security Audit Report")
print(f"{'='*60}")
print(f"Cluster: {self.cluster_name}")
print(f"Total Findings: {len(sorted_findings)}")
# Count by severity
severity_counts = {}
for finding in sorted_findings:
severity = finding['severity']
severity_counts[severity] = severity_counts.get(severity, 0) + 1
print(f"\nFindings by Severity:")
for severity, count in severity_counts.items():
print(f" {severity}: {count}")
print(f"\n🚨 Detailed Findings:")
for i, finding in enumerate(sorted_findings, 1):
print(f"\n{i}. [{finding['severity']}] {finding['category']}")
print(f" Issue: {finding['description']}")
print(f" Fix: {finding['recommendation']}")
# Save report
report_data = {
'cluster_name': self.cluster_name,
'audit_timestamp': datetime.now().isoformat(),
'total_findings': len(sorted_findings),
'severity_breakdown': severity_counts,
'findings': sorted_findings
}
filename = f'eks_network_security_audit_{self.cluster_name}_{datetime.now().strftime("%Y%m%d")}.json'
with open(filename, 'w') as f:
json.dump(report_data, f, indent=2)
print(f"\n📄 Report saved to: {filename}")
# Usage
auditor = EKSNetworkSecurityAuditor('production-cluster')
auditor.audit_cluster_network_security()
Network Policies Implementation
# Default deny-all network policy
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: production
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
---
# Allow specific ingress for web applications
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-web-ingress
namespace: production
spec:
podSelector:
matchLabels:
app: web-server
policyTypes:
- Ingress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: ingress-nginx
ports:
- protocol: TCP
port: 8080
---
# Restrict egress to only necessary services
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: restrict-egress
namespace: production
spec:
podSelector:
matchLabels:
app: api-server
policyTypes:
- Egress
egress:
- to:
- namespaceSelector:
matchLabels:
name: database
ports:
- protocol: TCP
port: 5432
- to: [] # DNS
ports:
- protocol: UDP
port: 53
3. Worker Node Security
Secure AMI and Instance Configuration
# Terraform configuration for secure EKS node group
resource "aws_eks_node_group" "production" {
cluster_name = aws_eks_cluster.production.name
node_group_name = "production-workers"
node_role_arn = aws_iam_role.eks_node_group.arn
subnet_ids = var.private_subnet_ids
# Use latest EKS-optimized AMI
ami_type = "AL2_x86_64"
capacity_type = "ON_DEMAND"
disk_size = 50
instance_types = ["t3.medium"]
# Enable IMDSv2 only
remote_access {
ec2_ssh_key = var.ssh_key_name
source_security_group_ids = [aws_security_group.bastion.id]
}
scaling_config {
desired_size = 3
max_size = 10
min_size = 3
}
update_config {
max_unavailable_percentage = 25
}
# Security configurations
launch_template {
id = aws_launch_template.eks_nodes.id
version = aws_launch_template.eks_nodes.latest_version
}
depends_on = [
aws_iam_role_policy_attachment.eks_worker_node_policy,
aws_iam_role_policy_attachment.eks_cni_policy,
aws_iam_role_policy_attachment.eks_container_registry_policy,
]
tags = {
Environment = "production"
Security = "hardened"
}
}
# Secure launch template for worker nodes
resource "aws_launch_template" "eks_nodes" {
name_prefix = "eks-production-"
image_id = data.aws_ami.eks_worker.id
instance_type = "t3.medium"
# Enable detailed monitoring
monitoring {
enabled = true
}
# IMDSv2 enforcement
metadata_options {
http_endpoint = "enabled"
http_tokens = "required" # Require IMDSv2
http_put_response_hop_limit = 1
instance_metadata_tags = "enabled"
}
# Encrypted EBS volumes
block_device_mappings {
device_name = "/dev/xvda"
ebs {
volume_size = 50
volume_type = "gp3"
encrypted = true
kms_key_id = aws_kms_key.ebs.arn
delete_on_termination = true
}
}
# Security groups
vpc_security_group_ids = [aws_security_group.eks_nodes.id]
# User data for additional hardening
user_data = base64encode(templatefile("${path.module}/user_data.sh", {
cluster_name = aws_eks_cluster.production.name
}))
tag_specifications {
resource_type = "instance"
tags = {
Name = "eks-worker-production"
Environment = "production"
}
}
}
Node Hardening Script
#!/bin/bash
# user_data.sh - EKS worker node hardening script
set -e
# Update system
yum update -y
# Install security tools
yum install -y fail2ban aide
# Configure fail2ban
systemctl enable fail2ban
systemctl start fail2ban
# Disable unnecessary services
systemctl disable postfix
systemctl disable rpcbind
# Kernel hardening
cat >> /etc/sysctl.d/99-kubernetes-security.conf << EOF
# Network security
net.ipv4.ip_forward = 1
net.bridge.bridge-nf-call-iptables = 1
net.bridge.bridge-nf-call-ip6tables = 1
# Disable IPv6 if not needed
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
# Security hardening
kernel.dmesg_restrict = 1
kernel.kptr_restrict = 2
kernel.yama.ptrace_scope = 1
net.ipv4.conf.all.log_martians = 1
net.ipv4.conf.default.log_martians = 1
net.ipv4.conf.all.send_redirects = 0
net.ipv4.conf.default.send_redirects = 0
net.ipv4.conf.all.accept_redirects = 0
net.ipv4.conf.default.accept_redirects = 0
net.ipv4.conf.all.secure_redirects = 0
net.ipv4.conf.default.secure_redirects = 0
EOF
sysctl -p /etc/sysctl.d/99-kubernetes-security.conf
# Configure Docker daemon securely
mkdir -p /etc/docker
cat > /etc/docker/daemon.json << EOF
{
"log-driver": "json-file",
"log-opts": {
"max-size": "10m",
"max-file": "3"
},
"live-restore": true,
"no-new-privileges": true,
"userland-proxy": false
}
EOF
# Restart Docker
systemctl restart docker
# Set up CloudWatch agent for monitoring
wget https://s3.amazonaws.com/amazoncloudwatch-agent/amazon_linux/amd64/latest/amazon-cloudwatch-agent.rpm
rpm -U ./amazon-cloudwatch-agent.rpm
# Configure CloudWatch agent
cat > /opt/aws/amazon-cloudwatch-agent/etc/amazon-cloudwatch-agent.json << EOF
{
"metrics": {
"namespace": "EKS/WorkerNodes",
"metrics_collected": {
"cpu": {
"measurement": ["cpu_usage_idle", "cpu_usage_iowait", "cpu_usage_user", "cpu_usage_system"],
"metrics_collection_interval": 60
},
"disk": {
"measurement": ["used_percent"],
"metrics_collection_interval": 60,
"resources": ["*"]
},
"mem": {
"measurement": ["mem_used_percent"],
"metrics_collection_interval": 60
},
"netstat": {
"measurement": ["tcp_established", "tcp_time_wait"],
"metrics_collection_interval": 60
}
}
},
"logs": {
"logs_collected": {
"files": {
"collect_list": [
{
"file_path": "/var/log/messages",
"log_group_name": "/aws/eks/worker-nodes/system",
"log_stream_name": "{instance_id}/messages"
},
{
"file_path": "/var/log/secure",
"log_group_name": "/aws/eks/worker-nodes/security",
"log_stream_name": "{instance_id}/secure"
}
]
}
}
}
}
EOF
# Start CloudWatch agent
systemctl enable amazon-cloudwatch-agent
systemctl start amazon-cloudwatch-agent
# Join EKS cluster
/etc/eks/bootstrap.sh ${cluster_name} \
--container-runtime containerd \
--kubelet-extra-args '--node-labels=security=hardened'
4. RBAC Security Configuration
Least Privilege RBAC Policies
# Service account for application pods
apiVersion: v1
kind: ServiceAccount
metadata:
name: app-service-account
namespace: production
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::ACCOUNT:role/app-role
---
# Role with minimal permissions
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: production
name: app-role
rules:
- apiGroups: [""]
resources: ["configmaps", "secrets"]
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
---
# Bind role to service account
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: app-role-binding
namespace: production
subjects:
- kind: ServiceAccount
name: app-service-account
namespace: production
roleRef:
kind: Role
name: app-role
apiGroup: rbac.authorization.k8s.io
---
# Cluster-level read-only access for monitoring
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: monitoring-reader
rules:
- apiGroups: [""]
resources: ["nodes", "nodes/metrics", "pods", "services", "endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: ["metrics.k8s.io"]
resources: ["nodes", "pods"]
verbs: ["get", "list"]
---
# Monitoring service account
apiVersion: v1
kind: ServiceAccount
metadata:
name: monitoring-service-account
namespace: monitoring
---
# Bind monitoring role
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: monitoring-role-binding
subjects:
- kind: ServiceAccount
name: monitoring-service-account
namespace: monitoring
roleRef:
kind: ClusterRole
name: monitoring-reader
apiGroup: rbac.authorization.k8s.io
RBAC Audit Script
#!/usr/bin/env python3
"""
Kubernetes RBAC security auditor
"""
import yaml
import subprocess
import json
from datetime import datetime
class RBACSecurityAuditor:
def __init__(self):
self.findings = []
def audit_rbac_security(self):
"""Comprehensive RBAC security audit"""
print("🔍 Auditing Kubernetes RBAC Security...")
# Audit cluster roles
self.audit_cluster_roles()
# Audit role bindings
self.audit_role_bindings()
# Audit service accounts
self.audit_service_accounts()
# Check for dangerous permissions
self.check_dangerous_permissions()
# Generate report
self.generate_rbac_report()
def audit_cluster_roles(self):
"""Audit cluster roles for overly broad permissions"""
try:
result = subprocess.run(
['kubectl', 'get', 'clusterroles', '-o', 'json'],
capture_output=True, text=True, check=True
)
cluster_roles = json.loads(result.stdout)
for role in cluster_roles['items']:
role_name = role['metadata']['name']
# Skip system roles
if role_name.startswith('system:'):
continue
self.check_role_permissions(role_name, role.get('rules', []), 'ClusterRole')
except subprocess.CalledProcessError as e:
print(f"❌ Error getting cluster roles: {e}")
def audit_role_bindings(self):
"""Audit role bindings for security issues"""
try:
# Check cluster role bindings
result = subprocess.run(
['kubectl', 'get', 'clusterrolebindings', '-o', 'json'],
capture_output=True, text=True, check=True
)
bindings = json.loads(result.stdout)
for binding in bindings['items']:
binding_name = binding['metadata']['name']
subjects = binding.get('subjects', [])
role_ref = binding.get('roleRef', {})
self.check_binding_security(binding_name, subjects, role_ref, 'ClusterRoleBinding')
# Check namespace role bindings
result = subprocess.run(
['kubectl', 'get', 'rolebindings', '--all-namespaces', '-o', 'json'],
capture_output=True, text=True, check=True
)
bindings = json.loads(result.stdout)
for binding in bindings['items']:
binding_name = binding['metadata']['name']
namespace = binding['metadata']['namespace']
subjects = binding.get('subjects', [])
role_ref = binding.get('roleRef', {})
self.check_binding_security(f"{namespace}/{binding_name}", subjects, role_ref, 'RoleBinding')
except subprocess.CalledProcessError as e:
print(f"❌ Error getting role bindings: {e}")
def check_role_permissions(self, role_name, rules, role_type):
"""Check role permissions for security issues"""
for rule in rules:
api_groups = rule.get('apiGroups', [])
resources = rule.get('resources', [])
verbs = rule.get('verbs', [])
# Check for wildcard permissions
if '*' in api_groups or '*' in resources or '*' in verbs:
self.add_finding(
'HIGH',
'Wildcard Permissions',
f'{role_type} {role_name} has wildcard permissions',
'Use specific API groups, resources, and verbs instead of wildcards'
)
# Check for dangerous resource access
dangerous_resources = [
'secrets', 'serviceaccounts', 'roles', 'rolebindings',
'clusterroles', 'clusterrolebindings', 'nodes'
]
for resource in resources:
if resource in dangerous_resources and 'create' in verbs:
self.add_finding(
'MEDIUM',
'Dangerous Resource Access',
f'{role_type} {role_name} can create {resource}',
f'Restrict creation of {resource} to admin roles only'
)
def check_binding_security(self, binding_name, subjects, role_ref, binding_type):
"""Check role binding security"""
role_name = role_ref.get('name', '')
# Check for dangerous role bindings
if role_name in ['cluster-admin', 'admin']:
for subject in subjects:
subject_kind = subject.get('kind', '')
subject_name = subject.get('name', '')
if subject_kind == 'ServiceAccount' and subject_name == 'default':
self.add_finding(
'CRITICAL',
'Default Service Account with Admin Access',
f'{binding_type} {binding_name} grants admin access to default service account',
'Create dedicated service accounts with minimal permissions'
)
# Check for external subjects
if subject_kind == 'User' and '@' in subject_name:
self.add_finding(
'MEDIUM',
'External User with Admin Access',
f'{binding_type} {binding_name} grants admin access to external user {subject_name}',
'Review external user access and ensure it follows least privilege'
)
def audit_service_accounts(self):
"""Audit service accounts for security issues"""
try:
result = subprocess.run(
['kubectl', 'get', 'serviceaccounts', '--all-namespaces', '-o', 'json'],
capture_output=True, text=True, check=True
)
service_accounts = json.loads(result.stdout)
for sa in service_accounts['items']:
sa_name = sa['metadata']['name']
namespace = sa['metadata']['namespace']
# Check for service accounts with AWS IAM roles
annotations = sa['metadata'].get('annotations', {})
iam_role = annotations.get('eks.amazonaws.com/role-arn')
if iam_role and 'admin' in iam_role.lower():
self.add_finding(
'HIGH',
'Service Account with Admin IAM Role',
f'Service account {namespace}/{sa_name} uses admin IAM role',
'Use service accounts with minimal IAM permissions'
)
# Check automount service account token
automount = sa.get('automountServiceAccountToken', True)
if automount and sa_name == 'default':
self.add_finding(
'MEDIUM',
'Default Service Account Token Automount',
f'Default service account in {namespace} automounts token',
'Disable token automounting for unused service accounts'
)
except subprocess.CalledProcessError as e:
print(f"❌ Error getting service accounts: {e}")
def check_dangerous_permissions(self):
"""Check for specific dangerous permission combinations"""
dangerous_checks = [
{
'name': 'Pod Creation with Host Access',
'resources': ['pods'],
'verbs': ['create'],
'additional_check': self.check_host_access_pods
},
{
'name': 'Secret Access',
'resources': ['secrets'],
'verbs': ['get', 'list'],
'additional_check': None
}
]
# This would require more complex analysis of actual permissions
# Implementation would check kubectl auth can-i for various combinations
def check_host_access_pods(self, role_name):
"""Check if role can create pods with host access"""
# Implementation would check for security context capabilities
pass
def add_finding(self, severity, category, description, recommendation):
"""Add RBAC security finding"""
self.findings.append({
'severity': severity,
'category': category,
'description': description,
'recommendation': recommendation,
'timestamp': datetime.now().isoformat()
})
def generate_rbac_report(self):
"""Generate RBAC security audit report"""
if not self.findings:
print("✅ No RBAC security issues found!")
return
# Sort by severity
severity_order = {'CRITICAL': 0, 'HIGH': 1, 'MEDIUM': 2, 'LOW': 3}
sorted_findings = sorted(
self.findings,
key=lambda x: severity_order.get(x['severity'], 4)
)
print(f"\n🔐 Kubernetes RBAC Security Audit Report")
print(f"{'='*60}")
print(f"Total Findings: {len(sorted_findings)}")
# Count by severity
severity_counts = {}
for finding in sorted_findings:
severity = finding['severity']
severity_counts[severity] = severity_counts.get(severity, 0) + 1
print(f"\nFindings by Severity:")
for severity, count in severity_counts.items():
print(f" {severity}: {count}")
print(f"\n🚨 Detailed Findings:")
for i, finding in enumerate(sorted_findings, 1):
print(f"\n{i}. [{finding['severity']}] {finding['category']}")
print(f" Issue: {finding['description']}")
print(f" Fix: {finding['recommendation']}")
# Save report
report_data = {
'audit_timestamp': datetime.now().isoformat(),
'total_findings': len(sorted_findings),
'severity_breakdown': severity_counts,
'findings': sorted_findings
}
filename = f'rbac_security_audit_{datetime.now().strftime("%Y%m%d")}.json'
with open(filename, 'w') as f:
json.dump(report_data, f, indent=2)
print(f"\n📄 Report saved to: {filename}")
# Usage
auditor = RBACSecurityAuditor()
auditor.audit_rbac_security()
5. Pod Security Standards
Pod Security Policy (Deprecated) → Pod Security Standards
# Pod Security Standards configuration
apiVersion: v1
kind: Namespace
metadata:
name: production
labels:
pod-security.kubernetes.io/enforce: restricted
pod-security.kubernetes.io/audit: restricted
pod-security.kubernetes.io/warn: restricted
---
# Secure pod specification
apiVersion: v1
kind: Pod
metadata:
name: secure-app
namespace: production
spec:
serviceAccountName: app-service-account
securityContext:
runAsNonRoot: true
runAsUser: 1000
runAsGroup: 1000
fsGroup: 1000
seccompProfile:
type: RuntimeDefault
containers:
- name: app
image: myapp:v1.2.3
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
capabilities:
drop:
- ALL
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
volumeMounts:
- name: tmp
mountPath: /tmp
- name: cache
mountPath: /app/cache
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-credentials
key: url
volumes:
- name: tmp
emptyDir: {}
- name: cache
emptyDir: {}
---
# Network policy for the pod
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: secure-app-network-policy
namespace: production
spec:
podSelector:
matchLabels:
app: secure-app
policyTypes:
- Ingress
- Egress
ingress:
- from:
- podSelector:
matchLabels:
app: ingress-controller
ports:
- protocol: TCP
port: 8080
egress:
- to:
- podSelector:
matchLabels:
app: database
ports:
- protocol: TCP
port: 5432
6. Container Image Security
Image Scanning and Policies
# ECR repository with image scanning
resource "aws_ecr_repository" "app" {
name = "production/app"
image_tag_mutability = "IMMUTABLE"
image_scanning_configuration {
scan_on_push = true
}
encryption_configuration {
encryption_type = "KMS"
kms_key = aws_kms_key.ecr.arn
}
lifecycle_policy {
policy = jsonencode({
rules = [
{
rulePriority = 1
description = "Keep last 10 production images"
selection = {
tagStatus = "tagged"
tagPrefixList = ["v"]
countType = "imageCountMoreThan"
countNumber = 10
}
action = {
type = "expire"
}
}
]
})
}
tags = {
Environment = "production"
Security = "hardened"
}
}
Admission Controller for Image Policy
# Open Policy Agent Gatekeeper constraint
apiVersion: templates.gatekeeper.sh/v1beta1
kind: ConstraintTemplate
metadata:
name: allowedregistries
spec:
crd:
spec:
names:
kind: AllowedRegistries
validation:
properties:
registries:
type: array
items:
type: string
targets:
- target: admission.k8s.gatekeeper.sh
rego: |
package allowedregistries
violation[{"msg": msg}] {
container := input.review.object.spec.containers[_]
not starts_with(container.image, input.parameters.registries[_])
msg := sprintf("Container image %v is not from an allowed registry", [container.image])
}
---
apiVersion: constraints.gatekeeper.sh/v1beta1
kind: AllowedRegistries
metadata:
name: must-use-ecr
spec:
match:
kinds:
- apiGroups: [""]
kinds: ["Pod"]
namespaces: ["production"]
parameters:
registries:
- "123456789012.dkr.ecr.us-east-1.amazonaws.com/"
- "123456789012.dkr.ecr.us-west-2.amazonaws.com/"
7. Secrets Management
AWS Secrets Manager Integration
# Install AWS Load Balancer Controller with IRSA
apiVersion: v1
kind: ServiceAccount
metadata:
name: secrets-store-csi-driver
namespace: kube-system
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::ACCOUNT:role/secrets-store-csi-driver-role
---
# Secret provider class for AWS Secrets Manager
apiVersion: secrets-store.csi.x-k8s.io/v1
kind: SecretProviderClass
metadata:
name: app-secrets
namespace: production
spec:
provider: aws
parameters:
objects: |
- objectName: "prod/myapp/database"
objectType: "secretsmanager"
jmesPath:
- path: "username"
objectAlias: "db_username"
- path: "password"
objectAlias: "db_password"
- path: "host"
objectAlias: "db_host"
- objectName: "prod/myapp/api-keys"
objectType: "secretsmanager"
jmesPath:
- path: "stripe_key"
objectAlias: "stripe_api_key"
---
# Pod using secrets from AWS Secrets Manager
apiVersion: v1
kind: Pod
metadata:
name: app-with-secrets
namespace: production
spec:
serviceAccountName: app-service-account
containers:
- name: app
image: myapp:latest
volumeMounts:
- name: secrets-store
mountPath: "/mnt/secrets"
readOnly: true
env:
- name: DB_USERNAME
valueFrom:
secretKeyRef:
name: app-secrets
key: db_username
volumes:
- name: secrets-store
csi:
driver: secrets-store.csi.k8s.io
readOnly: true
volumeAttributes:
secretProviderClass: app-secrets
Continuous Security Monitoring
EKS Security Monitoring Dashboard
#!/usr/bin/env python3
"""
EKS Security Monitoring Dashboard
"""
import boto3
import subprocess
import json
from datetime import datetime, timedelta
class EKSSecurityMonitor:
def __init__(self, cluster_name):
self.cluster_name = cluster_name
self.cloudwatch = boto3.client('cloudwatch')
self.eks = boto3.client('eks')
def collect_security_metrics(self):
"""Collect comprehensive security metrics"""
metrics = {
'timestamp': datetime.now().isoformat(),
'cluster_name': self.cluster_name,
'control_plane': self.check_control_plane_security(),
'network': self.check_network_security(),
'rbac': self.check_rbac_security(),
'pods': self.check_pod_security(),
'images': self.check_image_security(),
'compliance': self.check_compliance_status()
}
return metrics
def check_control_plane_security(self):
"""Check EKS control plane security"""
try:
cluster = self.eks.describe_cluster(name=self.cluster_name)['cluster']
return {
'encryption_enabled': 'encryptionConfig' in cluster,
'logging_enabled': len(cluster.get('logging', {}).get('clusterLogging', [])) > 0,
'private_endpoint': not cluster['resourcesVpcConfig']['endpointConfigResponse'].get('publicAccess', True),
'version': cluster['version']
}
except Exception as e:
return {'error': str(e)}
def check_network_security(self):
"""Check network security configuration"""
try:
# Check for network policies
result = subprocess.run(
['kubectl', 'get', 'networkpolicies', '--all-namespaces', '-o', 'json'],
capture_output=True, text=True, check=True
)
policies = json.loads(result.stdout)
return {
'network_policies_count': len(policies['items']),
'has_default_deny': any(
'default-deny' in policy['metadata']['name']
for policy in policies['items']
)
}
except Exception as e:
return {'error': str(e)}
def check_rbac_security(self):
"""Check RBAC security status"""
try:
# Check for service accounts with admin access
result = subprocess.run(
['kubectl', 'get', 'clusterrolebindings', '-o', 'json'],
capture_output=True, text=True, check=True
)
bindings = json.loads(result.stdout)
admin_bindings = [
binding for binding in bindings['items']
if binding['roleRef']['name'] == 'cluster-admin'
]
return {
'admin_bindings_count': len(admin_bindings),
'rbac_enabled': True
}
except Exception as e:
return {'error': str(e)}
def check_pod_security(self):
"""Check pod security standards compliance"""
try:
result = subprocess.run(
['kubectl', 'get', 'pods', '--all-namespaces', '-o', 'json'],
capture_output=True, text=True, check=True
)
pods = json.loads(result.stdout)
privileged_pods = 0
root_pods = 0
for pod in pods['items']:
for container in pod['spec'].get('containers', []):
security_context = container.get('securityContext', {})
if security_context.get('privileged'):
privileged_pods += 1
if security_context.get('runAsUser') == 0:
root_pods += 1
return {
'total_pods': len(pods['items']),
'privileged_pods': privileged_pods,
'root_pods': root_pods
}
except Exception as e:
return {'error': str(e)}
def check_image_security(self):
"""Check container image security"""
try:
result = subprocess.run(
['kubectl', 'get', 'pods', '--all-namespaces', '-o', 'json'],
capture_output=True, text=True, check=True
)
pods = json.loads(result.stdout)
images = set()
for pod in pods['items']:
for container in pod['spec'].get('containers', []):
images.add(container['image'])
# Check if images are from trusted registries
trusted_images = sum(
1 for image in images
if '.dkr.ecr.' in image or 'gcr.io' in image
)
return {
'total_unique_images': len(images),
'trusted_registry_images': trusted_images,
'untrusted_images': len(images) - trusted_images
}
except Exception as e:
return {'error': str(e)}
def check_compliance_status(self):
"""Check overall compliance status"""
# This would integrate with compliance scanning tools
return {
'cis_compliance_score': 85, # Example score
'last_scan': datetime.now().isoformat(),
'critical_findings': 2,
'high_findings': 5,
'medium_findings': 12
}
def send_metrics_to_cloudwatch(self, metrics):
"""Send security metrics to CloudWatch"""
metric_data = []
# Control plane metrics
if 'control_plane' in metrics:
cp = metrics['control_plane']
if 'encryption_enabled' in cp:
metric_data.append({
'MetricName': 'EncryptionEnabled',
'Value': 1 if cp['encryption_enabled'] else 0,
'Unit': 'Count'
})
# Pod security metrics
if 'pods' in metrics:
pods = metrics['pods']
if 'privileged_pods' in pods:
metric_data.append({
'MetricName': 'PrivilegedPods',
'Value': pods['privileged_pods'],
'Unit': 'Count'
})
# Send to CloudWatch
if metric_data:
self.cloudwatch.put_metric_data(
Namespace='EKS/Security',
MetricData=[
{
**metric,
'Dimensions': [
{
'Name': 'ClusterName',
'Value': self.cluster_name
}
]
} for metric in metric_data
]
)
def generate_security_report(self):
"""Generate comprehensive security report"""
metrics = self.collect_security_metrics()
# Send to CloudWatch
self.send_metrics_to_cloudwatch(metrics)
# Generate report
print(f"\n🔒 EKS Security Monitoring Report")
print(f"{'='*50}")
print(f"Cluster: {self.cluster_name}")
print(f"Timestamp: {metrics['timestamp']}")
# Control plane security
cp = metrics.get('control_plane', {})
print(f"\n🏗️ Control Plane Security:")
print(f" Encryption: {'✅' if cp.get('encryption_enabled') else '❌'}")
print(f" Logging: {'✅' if cp.get('logging_enabled') else '❌'}")
print(f" Private Endpoint: {'✅' if cp.get('private_endpoint') else '❌'}")
# Pod security
pods = metrics.get('pods', {})
print(f"\n🚀 Pod Security:")
print(f" Total Pods: {pods.get('total_pods', 'N/A')}")
print(f" Privileged Pods: {pods.get('privileged_pods', 'N/A')}")
print(f" Root Pods: {pods.get('root_pods', 'N/A')}")
# Image security
images = metrics.get('images', {})
print(f"\n📦 Image Security:")
print(f" Total Images: {images.get('total_unique_images', 'N/A')}")
print(f" Trusted Registry: {images.get('trusted_registry_images', 'N/A')}")
print(f" Untrusted Images: {images.get('untrusted_images', 'N/A')}")
# Compliance
compliance = metrics.get('compliance', {})
print(f"\n📋 Compliance Status:")
print(f" CIS Score: {compliance.get('cis_compliance_score', 'N/A')}%")
print(f" Critical Findings: {compliance.get('critical_findings', 'N/A')}")
print(f" High Findings: {compliance.get('high_findings', 'N/A')}")
# Save report
filename = f'eks_security_report_{self.cluster_name}_{datetime.now().strftime("%Y%m%d")}.json'
with open(filename, 'w') as f:
json.dump(metrics, f, indent=2)
print(f"\n📄 Report saved to: {filename}")
return metrics
# Usage
monitor = EKSSecurityMonitor('production-cluster')
report = monitor.generate_security_report()
Implementation Timeline
Week 1: Foundation Security
- Enable EKS cluster logging and encryption
- Configure private API server access
- Deploy worker nodes in private subnets
- Implement basic RBAC policies
- Set up network policies (default deny)
Week 2: Advanced Security
- Harden worker nodes with security scripts
- Deploy pod security standards
- Configure image scanning in ECR
- Implement secrets management with AWS Secrets Manager
- Set up admission controllers
Week 3: Monitoring & Compliance
- Deploy security monitoring dashboard
- Configure CloudWatch alarms for security events
- Implement compliance scanning
- Set up automated security audits
- Create incident response procedures
Week 4: Optimization & Training
- Fine-tune security policies based on findings
- Optimize performance impact of security controls
- Train development team on secure practices
- Document security procedures
- Schedule regular security reviews
ROI of EKS Security Hardening
def calculate_eks_security_roi():
"""Calculate ROI of comprehensive EKS security hardening"""
# Costs avoided through proper security
benefits = {
'prevented_crypto_mining': {
'average_attack_cost': 35000, # Average cryptomining attack cost
'probability_reduction': 0.9 # 90% reduction in successful attacks
},
'prevented_data_breach': {
'average_breach_cost': 4200000, # IBM 2023 average
'probability_reduction': 0.7 # 70% reduction
},
'compliance_efficiency': {
'audit_hours_saved': 200,
'audits_per_year': 2,
'hourly_rate': 200
},
'operational_efficiency': {
'incident_response_hours_saved': 40,
'incidents_per_year': 12,
'hourly_rate': 150
},
'developer_productivity': {
'security_issue_resolution_hours_saved': 20,
'issues_per_month': 6,
'hourly_rate': 120
}
}
annual_benefits = (
benefits['prevented_crypto_mining']['average_attack_cost'] *
benefits['prevented_crypto_mining']['probability_reduction'] +
benefits['prevented_data_breach']['average_breach_cost'] *
benefits['prevented_data_breach']['probability_reduction'] +
benefits['compliance_efficiency']['audit_hours_saved'] *
benefits['compliance_efficiency']['audits_per_year'] *
benefits['compliance_efficiency']['hourly_rate'] +
benefits['operational_efficiency']['incident_response_hours_saved'] *
benefits['operational_efficiency']['incidents_per_year'] *
benefits['operational_efficiency']['hourly_rate'] +
benefits['developer_productivity']['security_issue_resolution_hours_saved'] *
benefits['developer_productivity']['issues_per_month'] * 12 *
benefits['developer_productivity']['hourly_rate']
)
# Implementation costs
costs = {
'initial_setup': {
'engineering_hours': 160,
'hourly_rate': 150
},
'ongoing_maintenance': {
'monthly_hours': 20,
'hourly_rate': 120
},
'tooling_costs': {
'security_tools': 200, # Monthly
'monitoring': 150, # Monthly
'compliance': 300 # Monthly
},
'training': {
'team_training_hours': 40,
'hourly_rate': 150
}
}
annual_costs = (
costs['initial_setup']['engineering_hours'] *
costs['initial_setup']['hourly_rate'] +
costs['ongoing_maintenance']['monthly_hours'] * 12 *
costs['ongoing_maintenance']['hourly_rate'] +
(costs['tooling_costs']['security_tools'] +
costs['tooling_costs']['monitoring'] +
costs['tooling_costs']['compliance']) * 12 +
costs['training']['team_training_hours'] *
costs['training']['hourly_rate']
)
roi = ((annual_benefits - annual_costs) / annual_costs) * 100
print(f"\n💰 EKS Security Hardening ROI Analysis:")
print(f"Annual Benefits: ${annual_benefits:,.2f}")
print(f"Annual Costs: ${annual_costs:,.2f}")
print(f"Net Benefit: ${annual_benefits - annual_costs:,.2f}")
print(f"ROI: {roi:.0f}%")
print(f"\nBreakdown:")
print(f"• Prevented cryptomining attacks: ${benefits['prevented_crypto_mining']['average_attack_cost'] * benefits['prevented_crypto_mining']['probability_reduction']:,.2f}")
print(f"• Prevented data breaches: ${benefits['prevented_data_breach']['average_breach_cost'] * benefits['prevented_data_breach']['probability_reduction']:,.2f}")
print(f"• Operational efficiency: ${benefits['operational_efficiency']['incident_response_hours_saved'] * benefits['operational_efficiency']['incidents_per_year'] * benefits['operational_efficiency']['hourly_rate']:,.2f}")
return roi
roi = calculate_eks_security_roi()
Conclusion
EKS security hardening isn’t optional in 2025—it’s essential for protecting your applications, data, and infrastructure. The comprehensive approach in this guide addresses the most common attack vectors while maintaining operational efficiency.
The investment in proper EKS security pays dividends immediately through prevented attacks, reduced incident response time, and improved compliance posture. Most organizations see 10-25x ROI within the first year through avoided security incidents alone.
Your EKS security action plan:
- Start with the critical controls: API server security, network policies, RBAC
- Implement container security: Image scanning, pod security standards, secrets management
- Deploy comprehensive monitoring: Security metrics, automated auditing, incident response
- Maintain security posture: Regular reviews, updates, team training
Remember: Kubernetes security is complex, but following these proven practices will protect your clusters from 95% of real-world attacks. Don’t wait for a breach to implement proper security—start hardening your EKS clusters today.
Want automated EKS security hardening without the complexity? Modern platforms like PathShield can automatically detect EKS misconfigurations, provide hardening recommendations, and maintain continuous security monitoring—giving you enterprise-grade Kubernetes security with simple setup and management.