· PathShield Team · Tutorials  · 22 min read

EKS Security Hardening Checklist for Production - Complete 2025 Guide

Secure your Amazon EKS clusters for production with this comprehensive hardening guide. 100+ security checks, automated scripts, and real-world attack prevention.

Secure your Amazon EKS clusters for production with this comprehensive hardening guide. 100+ security checks, automated scripts, and real-world attack prevention.

EKS Security Hardening Checklist for Production - Complete 2025 Guide

A crypto mining attack on an EKS cluster cost a startup $47,000 in compute charges and 3 weeks of downtime. The attackers exploited a misconfigured RBAC policy and deployed miners across 200+ pods. This comprehensive guide shows you how to harden your EKS clusters to prevent attacks like this.

Why EKS Security Hardening Matters More Than Ever

Common EKS attack vectors in 2025:

  • Exposed Kubernetes API servers
  • Over-privileged service accounts
  • Vulnerable container images
  • Misconfigured network policies
  • Weak RBAC configurations
  • Unencrypted secrets and etcd

The cost of EKS security failures:

  • Average cryptomining attack: $15,000-$50,000
  • Data breaches: $4.2M average cost
  • Compliance violations: $500K-$2M in fines
  • Downtime and recovery: Weeks of engineering time

EKS Security Architecture Overview

graph TB
    A[User/Developer] --> B[AWS IAM]
    B --> C[EKS Control Plane]
    C --> D[Worker Nodes]
    D --> E[Pods]
    
    F[VPC] --> G[Private Subnets]
    G --> H[Security Groups]
    H --> D
    
    I[Secrets Manager] --> E
    J[ECR] --> E
    K[CloudTrail] --> C
    L[VPC Flow Logs] --> H

The Complete EKS Security Hardening Checklist

1. Control Plane Security (Critical)

Enable EKS Cluster Logging

# Enable all log types for security monitoring
aws eks update-cluster-config \
    --name production-cluster \
    --logging '{
        "enable": [
            {
                "types": ["api", "audit", "authenticator", "controllerManager", "scheduler"]
            }
        ]
    }'

Configure Private API Server Access

# Terraform configuration for private EKS cluster
resource "aws_eks_cluster" "production" {
  name     = "production-cluster"
  role_arn = aws_iam_role.eks_cluster.arn
  version  = "1.28"

  vpc_config {
    subnet_ids              = var.private_subnet_ids
    endpoint_private_access = true
    endpoint_public_access  = false  # Disable public access
    public_access_cidrs     = []
    
    security_group_ids = [aws_security_group.eks_cluster.id]
  }

  encryption_config {
    provider {
      key_arn = aws_kms_key.eks.arn
    }
    resources = ["secrets"]
  }

  enabled_cluster_log_types = [
    "api", "audit", "authenticator", "controllerManager", "scheduler"
  ]

  depends_on = [
    aws_iam_role_policy_attachment.eks_cluster_policy,
    aws_cloudwatch_log_group.eks_cluster,
  ]

  tags = {
    Name        = "production-cluster"
    Environment = "production"
    Security    = "hardened"
  }
}

# KMS key for etcd encryption
resource "aws_kms_key" "eks" {
  description             = "EKS Secret Encryption"
  deletion_window_in_days = 7
  enable_key_rotation     = true

  tags = {
    Name = "eks-secrets-key"
  }
}

Implement Strong API Server Authentication

# Enhanced cluster configuration with OIDC
resource "aws_eks_identity_provider_config" "oidc" {
  cluster_name = aws_eks_cluster.production.name

  oidc {
    client_id                     = "your-oidc-client-id"
    identity_provider_config_name = "corporate-oidc"
    issuer_url                    = "https://your-identity-provider.com"
    username_claim                = "email"
    username_prefix               = "oidc:"
    groups_claim                  = "groups"
    groups_prefix                 = "oidc:"
  }
}

2. Network Security Hardening

Secure VPC Configuration

#!/usr/bin/env python3
"""
EKS Network Security Assessment Script
"""

import boto3
import json
from datetime import datetime

class EKSNetworkSecurityAuditor:
    def __init__(self, cluster_name):
        self.cluster_name = cluster_name
        self.eks = boto3.client('eks')
        self.ec2 = boto3.client('ec2')
        self.findings = []
        
    def audit_cluster_network_security(self):
        """Comprehensive network security audit"""
        
        print(f"🔍 Auditing network security for EKS cluster: {self.cluster_name}")
        
        # Get cluster details
        cluster = self.eks.describe_cluster(name=self.cluster_name)['cluster']
        vpc_config = cluster['resourcesVpcConfig']
        
        # Audit API server access
        self.audit_api_server_access(vpc_config)
        
        # Audit security groups
        self.audit_security_groups(vpc_config)
        
        # Audit subnets
        self.audit_subnets(vpc_config)
        
        # Audit node groups
        self.audit_node_groups()
        
        # Generate report
        self.generate_network_security_report()
        
    def audit_api_server_access(self, vpc_config):
        """Audit EKS API server access configuration"""
        
        endpoint_config = vpc_config.get('endpointConfigResponse', {})
        
        # Check if public access is disabled
        public_access = endpoint_config.get('publicAccess', True)
        if public_access:
            self.add_finding(
                'HIGH',
                'API Server Public Access',
                'EKS API server allows public access',
                'Disable public access and use private access only'
            )
        
        # Check public access CIDRs if public access is enabled
        if public_access:
            public_cidrs = endpoint_config.get('publicAccessCidrs', [])
            if '0.0.0.0/0' in public_cidrs:
                self.add_finding(
                    'CRITICAL',
                    'API Server Open to Internet',
                    'EKS API server accessible from anywhere on the internet',
                    'Restrict access to specific IP ranges'
                )
        
        # Check private access
        private_access = endpoint_config.get('privateAccess', False)
        if not private_access:
            self.add_finding(
                'MEDIUM',
                'Private Access Disabled',
                'EKS API server private access is disabled',
                'Enable private access for internal connectivity'
            )
    
    def audit_security_groups(self, vpc_config):
        """Audit EKS security groups"""
        
        cluster_sg_id = vpc_config.get('clusterSecurityGroupId')
        additional_sgs = vpc_config.get('securityGroupIds', [])
        
        all_sgs = [cluster_sg_id] + additional_sgs
        
        for sg_id in all_sgs:
            if sg_id:
                self.audit_single_security_group(sg_id)
    
    def audit_single_security_group(self, sg_id):
        """Audit individual security group"""
        
        try:
            response = self.ec2.describe_security_groups(GroupIds=[sg_id])
            sg = response['SecurityGroups'][0]
            
            # Check ingress rules
            for rule in sg.get('IpPermissions', []):
                self.check_security_group_rule(sg_id, rule, 'ingress')
            
            # Check egress rules
            for rule in sg.get('IpPermissionsEgress', []):
                self.check_security_group_rule(sg_id, rule, 'egress')
                
        except Exception as e:
            print(f"❌ Error auditing security group {sg_id}: {e}")
    
    def check_security_group_rule(self, sg_id, rule, direction):
        """Check individual security group rule"""
        
        from_port = rule.get('FromPort', 0)
        to_port = rule.get('ToPort', 65535)
        protocol = rule.get('IpProtocol', 'all')
        
        # Check for overly permissive rules
        for ip_range in rule.get('IpRanges', []):
            cidr = ip_range.get('CidrIp', '')
            
            if cidr == '0.0.0.0/0':
                if direction == 'ingress' and from_port in [22, 443, 6443, 10250]:
                    self.add_finding(
                        'HIGH',
                        f'Security Group {direction.title()} Rule',
                        f'Security group {sg_id} allows {direction} from anywhere on port {from_port}',
                        f'Restrict {direction} access to specific IP ranges'
                    )
                elif protocol == '-1':  # All protocols
                    self.add_finding(
                        'CRITICAL',
                        f'Security Group {direction.title()} Rule',
                        f'Security group {sg_id} allows all {direction} traffic from anywhere',
                        f'Implement least privilege {direction} rules'
                    )
    
    def audit_subnets(self, vpc_config):
        """Audit EKS subnets configuration"""
        
        subnet_ids = vpc_config.get('subnetIds', [])
        
        if len(subnet_ids) < 2:
            self.add_finding(
                'MEDIUM',
                'Insufficient Subnet Redundancy',
                'EKS cluster has fewer than 2 subnets',
                'Deploy across multiple AZs for high availability'
            )
        
        # Check if subnets are private
        for subnet_id in subnet_ids:
            try:
                response = self.ec2.describe_subnets(SubnetIds=[subnet_id])
                subnet = response['Subnets'][0]
                
                if subnet.get('MapPublicIpOnLaunch', False):
                    self.add_finding(
                        'HIGH',
                        'Public Subnet Usage',
                        f'EKS cluster uses public subnet {subnet_id}',
                        'Use private subnets for EKS worker nodes'
                    )
                    
            except Exception as e:
                print(f"❌ Error auditing subnet {subnet_id}: {e}")
    
    def audit_node_groups(self):
        """Audit EKS node groups"""
        
        try:
            node_groups = self.eks.list_nodegroups(clusterName=self.cluster_name)
            
            for ng_name in node_groups['nodegroups']:
                ng_details = self.eks.describe_nodegroup(
                    clusterName=self.cluster_name,
                    nodegroupName=ng_name
                )['nodegroup']
                
                self.audit_single_node_group(ng_name, ng_details)
                
        except Exception as e:
            print(f"❌ Error auditing node groups: {e}")
    
    def audit_single_node_group(self, ng_name, ng_details):
        """Audit individual node group"""
        
        # Check if nodes are in public subnets
        subnets = ng_details.get('subnets', [])
        for subnet_id in subnets:
            try:
                response = self.ec2.describe_subnets(SubnetIds=[subnet_id])
                subnet = response['Subnets'][0]
                
                if subnet.get('MapPublicIpOnLaunch', False):
                    self.add_finding(
                        'HIGH',
                        'Node Group in Public Subnet',
                        f'Node group {ng_name} deploys nodes in public subnet {subnet_id}',
                        'Move node group to private subnets'
                    )
            except:
                pass
        
        # Check remote access configuration
        remote_access = ng_details.get('remoteAccess', {})
        if remote_access.get('ec2SshKey'):
            source_sgs = remote_access.get('sourceSecurityGroups', [])
            if not source_sgs:
                self.add_finding(
                    'MEDIUM',
                    'Unrestricted SSH Access',
                    f'Node group {ng_name} allows SSH without security group restrictions',
                    'Restrict SSH access to specific security groups'
                )
    
    def add_finding(self, severity, category, description, recommendation):
        """Add security finding"""
        
        self.findings.append({
            'severity': severity,
            'category': category,
            'description': description,
            'recommendation': recommendation,
            'timestamp': datetime.now().isoformat()
        })
    
    def generate_network_security_report(self):
        """Generate network security audit report"""
        
        if not self.findings:
            print("✅ No network security issues found!")
            return
        
        # Sort findings by severity
        severity_order = {'CRITICAL': 0, 'HIGH': 1, 'MEDIUM': 2, 'LOW': 3}
        sorted_findings = sorted(
            self.findings, 
            key=lambda x: severity_order.get(x['severity'], 4)
        )
        
        print(f"\n🔒 EKS Network Security Audit Report")
        print(f"{'='*60}")
        print(f"Cluster: {self.cluster_name}")
        print(f"Total Findings: {len(sorted_findings)}")
        
        # Count by severity
        severity_counts = {}
        for finding in sorted_findings:
            severity = finding['severity']
            severity_counts[severity] = severity_counts.get(severity, 0) + 1
        
        print(f"\nFindings by Severity:")
        for severity, count in severity_counts.items():
            print(f"  {severity}: {count}")
        
        print(f"\n🚨 Detailed Findings:")
        for i, finding in enumerate(sorted_findings, 1):
            print(f"\n{i}. [{finding['severity']}] {finding['category']}")
            print(f"   Issue: {finding['description']}")
            print(f"   Fix: {finding['recommendation']}")
        
        # Save report
        report_data = {
            'cluster_name': self.cluster_name,
            'audit_timestamp': datetime.now().isoformat(),
            'total_findings': len(sorted_findings),
            'severity_breakdown': severity_counts,
            'findings': sorted_findings
        }
        
        filename = f'eks_network_security_audit_{self.cluster_name}_{datetime.now().strftime("%Y%m%d")}.json'
        with open(filename, 'w') as f:
            json.dump(report_data, f, indent=2)
        
        print(f"\n📄 Report saved to: {filename}")

# Usage
auditor = EKSNetworkSecurityAuditor('production-cluster')
auditor.audit_cluster_network_security()

Network Policies Implementation

# Default deny-all network policy
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: default-deny-all
  namespace: production
spec:
  podSelector: {}
  policyTypes:
  - Ingress
  - Egress
---
# Allow specific ingress for web applications
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-web-ingress
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: web-server
  policyTypes:
  - Ingress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          name: ingress-nginx
    ports:
    - protocol: TCP
      port: 8080
---
# Restrict egress to only necessary services
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: restrict-egress
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: api-server
  policyTypes:
  - Egress
  egress:
  - to:
    - namespaceSelector:
        matchLabels:
          name: database
    ports:
    - protocol: TCP
      port: 5432
  - to: []  # DNS
    ports:
    - protocol: UDP
      port: 53

3. Worker Node Security

Secure AMI and Instance Configuration

# Terraform configuration for secure EKS node group
resource "aws_eks_node_group" "production" {
  cluster_name    = aws_eks_cluster.production.name
  node_group_name = "production-workers"
  node_role_arn   = aws_iam_role.eks_node_group.arn
  
  subnet_ids = var.private_subnet_ids

  # Use latest EKS-optimized AMI
  ami_type       = "AL2_x86_64"
  capacity_type  = "ON_DEMAND"
  disk_size      = 50
  instance_types = ["t3.medium"]

  # Enable IMDSv2 only
  remote_access {
    ec2_ssh_key = var.ssh_key_name
    source_security_group_ids = [aws_security_group.bastion.id]
  }

  scaling_config {
    desired_size = 3
    max_size     = 10
    min_size     = 3
  }

  update_config {
    max_unavailable_percentage = 25
  }

  # Security configurations
  launch_template {
    id      = aws_launch_template.eks_nodes.id
    version = aws_launch_template.eks_nodes.latest_version
  }

  depends_on = [
    aws_iam_role_policy_attachment.eks_worker_node_policy,
    aws_iam_role_policy_attachment.eks_cni_policy,
    aws_iam_role_policy_attachment.eks_container_registry_policy,
  ]

  tags = {
    Environment = "production"
    Security    = "hardened"
  }
}

# Secure launch template for worker nodes
resource "aws_launch_template" "eks_nodes" {
  name_prefix   = "eks-production-"
  image_id      = data.aws_ami.eks_worker.id
  instance_type = "t3.medium"

  # Enable detailed monitoring
  monitoring {
    enabled = true
  }

  # IMDSv2 enforcement
  metadata_options {
    http_endpoint               = "enabled"
    http_tokens                 = "required"  # Require IMDSv2
    http_put_response_hop_limit = 1
    instance_metadata_tags      = "enabled"
  }

  # Encrypted EBS volumes
  block_device_mappings {
    device_name = "/dev/xvda"
    ebs {
      volume_size           = 50
      volume_type          = "gp3"
      encrypted            = true
      kms_key_id          = aws_kms_key.ebs.arn
      delete_on_termination = true
    }
  }

  # Security groups
  vpc_security_group_ids = [aws_security_group.eks_nodes.id]

  # User data for additional hardening
  user_data = base64encode(templatefile("${path.module}/user_data.sh", {
    cluster_name = aws_eks_cluster.production.name
  }))

  tag_specifications {
    resource_type = "instance"
    tags = {
      Name        = "eks-worker-production"
      Environment = "production"
    }
  }
}

Node Hardening Script

#!/bin/bash
# user_data.sh - EKS worker node hardening script

set -e

# Update system
yum update -y

# Install security tools
yum install -y fail2ban aide

# Configure fail2ban
systemctl enable fail2ban
systemctl start fail2ban

# Disable unnecessary services
systemctl disable postfix
systemctl disable rpcbind

# Kernel hardening
cat >> /etc/sysctl.d/99-kubernetes-security.conf << EOF
# Network security
net.ipv4.ip_forward = 1
net.bridge.bridge-nf-call-iptables = 1
net.bridge.bridge-nf-call-ip6tables = 1

# Disable IPv6 if not needed
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1

# Security hardening
kernel.dmesg_restrict = 1
kernel.kptr_restrict = 2
kernel.yama.ptrace_scope = 1
net.ipv4.conf.all.log_martians = 1
net.ipv4.conf.default.log_martians = 1
net.ipv4.conf.all.send_redirects = 0
net.ipv4.conf.default.send_redirects = 0
net.ipv4.conf.all.accept_redirects = 0
net.ipv4.conf.default.accept_redirects = 0
net.ipv4.conf.all.secure_redirects = 0
net.ipv4.conf.default.secure_redirects = 0
EOF

sysctl -p /etc/sysctl.d/99-kubernetes-security.conf

# Configure Docker daemon securely
mkdir -p /etc/docker
cat > /etc/docker/daemon.json << EOF
{
  "log-driver": "json-file",
  "log-opts": {
    "max-size": "10m",
    "max-file": "3"
  },
  "live-restore": true,
  "no-new-privileges": true,
  "userland-proxy": false
}
EOF

# Restart Docker
systemctl restart docker

# Set up CloudWatch agent for monitoring
wget https://s3.amazonaws.com/amazoncloudwatch-agent/amazon_linux/amd64/latest/amazon-cloudwatch-agent.rpm
rpm -U ./amazon-cloudwatch-agent.rpm

# Configure CloudWatch agent
cat > /opt/aws/amazon-cloudwatch-agent/etc/amazon-cloudwatch-agent.json << EOF
{
  "metrics": {
    "namespace": "EKS/WorkerNodes",
    "metrics_collected": {
      "cpu": {
        "measurement": ["cpu_usage_idle", "cpu_usage_iowait", "cpu_usage_user", "cpu_usage_system"],
        "metrics_collection_interval": 60
      },
      "disk": {
        "measurement": ["used_percent"],
        "metrics_collection_interval": 60,
        "resources": ["*"]
      },
      "mem": {
        "measurement": ["mem_used_percent"],
        "metrics_collection_interval": 60
      },
      "netstat": {
        "measurement": ["tcp_established", "tcp_time_wait"],
        "metrics_collection_interval": 60
      }
    }
  },
  "logs": {
    "logs_collected": {
      "files": {
        "collect_list": [
          {
            "file_path": "/var/log/messages",
            "log_group_name": "/aws/eks/worker-nodes/system",
            "log_stream_name": "{instance_id}/messages"
          },
          {
            "file_path": "/var/log/secure",
            "log_group_name": "/aws/eks/worker-nodes/security",
            "log_stream_name": "{instance_id}/secure"
          }
        ]
      }
    }
  }
}
EOF

# Start CloudWatch agent
systemctl enable amazon-cloudwatch-agent
systemctl start amazon-cloudwatch-agent

# Join EKS cluster
/etc/eks/bootstrap.sh ${cluster_name} \
  --container-runtime containerd \
  --kubelet-extra-args '--node-labels=security=hardened'

4. RBAC Security Configuration

Least Privilege RBAC Policies

# Service account for application pods
apiVersion: v1
kind: ServiceAccount
metadata:
  name: app-service-account
  namespace: production
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::ACCOUNT:role/app-role
---
# Role with minimal permissions
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: production
  name: app-role
rules:
- apiGroups: [""]
  resources: ["configmaps", "secrets"]
  verbs: ["get", "list"]
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["get", "list", "watch"]
---
# Bind role to service account
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: app-role-binding
  namespace: production
subjects:
- kind: ServiceAccount
  name: app-service-account
  namespace: production
roleRef:
  kind: Role
  name: app-role
  apiGroup: rbac.authorization.k8s.io
---
# Cluster-level read-only access for monitoring
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: monitoring-reader
rules:
- apiGroups: [""]
  resources: ["nodes", "nodes/metrics", "pods", "services", "endpoints"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["metrics.k8s.io"]
  resources: ["nodes", "pods"]
  verbs: ["get", "list"]
---
# Monitoring service account
apiVersion: v1
kind: ServiceAccount
metadata:
  name: monitoring-service-account
  namespace: monitoring
---
# Bind monitoring role
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: monitoring-role-binding
subjects:
- kind: ServiceAccount
  name: monitoring-service-account
  namespace: monitoring
roleRef:
  kind: ClusterRole
  name: monitoring-reader
  apiGroup: rbac.authorization.k8s.io

RBAC Audit Script

#!/usr/bin/env python3
"""
Kubernetes RBAC security auditor
"""

import yaml
import subprocess
import json
from datetime import datetime

class RBACSecurityAuditor:
    def __init__(self):
        self.findings = []
        
    def audit_rbac_security(self):
        """Comprehensive RBAC security audit"""
        
        print("🔍 Auditing Kubernetes RBAC Security...")
        
        # Audit cluster roles
        self.audit_cluster_roles()
        
        # Audit role bindings
        self.audit_role_bindings()
        
        # Audit service accounts
        self.audit_service_accounts()
        
        # Check for dangerous permissions
        self.check_dangerous_permissions()
        
        # Generate report
        self.generate_rbac_report()
    
    def audit_cluster_roles(self):
        """Audit cluster roles for overly broad permissions"""
        
        try:
            result = subprocess.run(
                ['kubectl', 'get', 'clusterroles', '-o', 'json'],
                capture_output=True, text=True, check=True
            )
            
            cluster_roles = json.loads(result.stdout)
            
            for role in cluster_roles['items']:
                role_name = role['metadata']['name']
                
                # Skip system roles
                if role_name.startswith('system:'):
                    continue
                
                self.check_role_permissions(role_name, role.get('rules', []), 'ClusterRole')
                
        except subprocess.CalledProcessError as e:
            print(f"❌ Error getting cluster roles: {e}")
    
    def audit_role_bindings(self):
        """Audit role bindings for security issues"""
        
        try:
            # Check cluster role bindings
            result = subprocess.run(
                ['kubectl', 'get', 'clusterrolebindings', '-o', 'json'],
                capture_output=True, text=True, check=True
            )
            
            bindings = json.loads(result.stdout)
            
            for binding in bindings['items']:
                binding_name = binding['metadata']['name']
                subjects = binding.get('subjects', [])
                role_ref = binding.get('roleRef', {})
                
                self.check_binding_security(binding_name, subjects, role_ref, 'ClusterRoleBinding')
            
            # Check namespace role bindings
            result = subprocess.run(
                ['kubectl', 'get', 'rolebindings', '--all-namespaces', '-o', 'json'],
                capture_output=True, text=True, check=True
            )
            
            bindings = json.loads(result.stdout)
            
            for binding in bindings['items']:
                binding_name = binding['metadata']['name']
                namespace = binding['metadata']['namespace']
                subjects = binding.get('subjects', [])
                role_ref = binding.get('roleRef', {})
                
                self.check_binding_security(f"{namespace}/{binding_name}", subjects, role_ref, 'RoleBinding')
                
        except subprocess.CalledProcessError as e:
            print(f"❌ Error getting role bindings: {e}")
    
    def check_role_permissions(self, role_name, rules, role_type):
        """Check role permissions for security issues"""
        
        for rule in rules:
            api_groups = rule.get('apiGroups', [])
            resources = rule.get('resources', [])
            verbs = rule.get('verbs', [])
            
            # Check for wildcard permissions
            if '*' in api_groups or '*' in resources or '*' in verbs:
                self.add_finding(
                    'HIGH',
                    'Wildcard Permissions',
                    f'{role_type} {role_name} has wildcard permissions',
                    'Use specific API groups, resources, and verbs instead of wildcards'
                )
            
            # Check for dangerous resource access
            dangerous_resources = [
                'secrets', 'serviceaccounts', 'roles', 'rolebindings',
                'clusterroles', 'clusterrolebindings', 'nodes'
            ]
            
            for resource in resources:
                if resource in dangerous_resources and 'create' in verbs:
                    self.add_finding(
                        'MEDIUM',
                        'Dangerous Resource Access',
                        f'{role_type} {role_name} can create {resource}',
                        f'Restrict creation of {resource} to admin roles only'
                    )
    
    def check_binding_security(self, binding_name, subjects, role_ref, binding_type):
        """Check role binding security"""
        
        role_name = role_ref.get('name', '')
        
        # Check for dangerous role bindings
        if role_name in ['cluster-admin', 'admin']:
            for subject in subjects:
                subject_kind = subject.get('kind', '')
                subject_name = subject.get('name', '')
                
                if subject_kind == 'ServiceAccount' and subject_name == 'default':
                    self.add_finding(
                        'CRITICAL',
                        'Default Service Account with Admin Access',
                        f'{binding_type} {binding_name} grants admin access to default service account',
                        'Create dedicated service accounts with minimal permissions'
                    )
                
                # Check for external subjects
                if subject_kind == 'User' and '@' in subject_name:
                    self.add_finding(
                        'MEDIUM',
                        'External User with Admin Access',
                        f'{binding_type} {binding_name} grants admin access to external user {subject_name}',
                        'Review external user access and ensure it follows least privilege'
                    )
    
    def audit_service_accounts(self):
        """Audit service accounts for security issues"""
        
        try:
            result = subprocess.run(
                ['kubectl', 'get', 'serviceaccounts', '--all-namespaces', '-o', 'json'],
                capture_output=True, text=True, check=True
            )
            
            service_accounts = json.loads(result.stdout)
            
            for sa in service_accounts['items']:
                sa_name = sa['metadata']['name']
                namespace = sa['metadata']['namespace']
                
                # Check for service accounts with AWS IAM roles
                annotations = sa['metadata'].get('annotations', {})
                iam_role = annotations.get('eks.amazonaws.com/role-arn')
                
                if iam_role and 'admin' in iam_role.lower():
                    self.add_finding(
                        'HIGH',
                        'Service Account with Admin IAM Role',
                        f'Service account {namespace}/{sa_name} uses admin IAM role',
                        'Use service accounts with minimal IAM permissions'
                    )
                
                # Check automount service account token
                automount = sa.get('automountServiceAccountToken', True)
                if automount and sa_name == 'default':
                    self.add_finding(
                        'MEDIUM',
                        'Default Service Account Token Automount',
                        f'Default service account in {namespace} automounts token',
                        'Disable token automounting for unused service accounts'
                    )
                    
        except subprocess.CalledProcessError as e:
            print(f"❌ Error getting service accounts: {e}")
    
    def check_dangerous_permissions(self):
        """Check for specific dangerous permission combinations"""
        
        dangerous_checks = [
            {
                'name': 'Pod Creation with Host Access',
                'resources': ['pods'],
                'verbs': ['create'],
                'additional_check': self.check_host_access_pods
            },
            {
                'name': 'Secret Access',
                'resources': ['secrets'],
                'verbs': ['get', 'list'],
                'additional_check': None
            }
        ]
        
        # This would require more complex analysis of actual permissions
        # Implementation would check kubectl auth can-i for various combinations
        
    def check_host_access_pods(self, role_name):
        """Check if role can create pods with host access"""
        # Implementation would check for security context capabilities
        pass
    
    def add_finding(self, severity, category, description, recommendation):
        """Add RBAC security finding"""
        
        self.findings.append({
            'severity': severity,
            'category': category,
            'description': description,
            'recommendation': recommendation,
            'timestamp': datetime.now().isoformat()
        })
    
    def generate_rbac_report(self):
        """Generate RBAC security audit report"""
        
        if not self.findings:
            print("✅ No RBAC security issues found!")
            return
        
        # Sort by severity
        severity_order = {'CRITICAL': 0, 'HIGH': 1, 'MEDIUM': 2, 'LOW': 3}
        sorted_findings = sorted(
            self.findings,
            key=lambda x: severity_order.get(x['severity'], 4)
        )
        
        print(f"\n🔐 Kubernetes RBAC Security Audit Report")
        print(f"{'='*60}")
        print(f"Total Findings: {len(sorted_findings)}")
        
        # Count by severity
        severity_counts = {}
        for finding in sorted_findings:
            severity = finding['severity']
            severity_counts[severity] = severity_counts.get(severity, 0) + 1
        
        print(f"\nFindings by Severity:")
        for severity, count in severity_counts.items():
            print(f"  {severity}: {count}")
        
        print(f"\n🚨 Detailed Findings:")
        for i, finding in enumerate(sorted_findings, 1):
            print(f"\n{i}. [{finding['severity']}] {finding['category']}")
            print(f"   Issue: {finding['description']}")
            print(f"   Fix: {finding['recommendation']}")
        
        # Save report
        report_data = {
            'audit_timestamp': datetime.now().isoformat(),
            'total_findings': len(sorted_findings),
            'severity_breakdown': severity_counts,
            'findings': sorted_findings
        }
        
        filename = f'rbac_security_audit_{datetime.now().strftime("%Y%m%d")}.json'
        with open(filename, 'w') as f:
            json.dump(report_data, f, indent=2)
        
        print(f"\n📄 Report saved to: {filename}")

# Usage
auditor = RBACSecurityAuditor()
auditor.audit_rbac_security()

5. Pod Security Standards

Pod Security Policy (Deprecated) → Pod Security Standards

# Pod Security Standards configuration
apiVersion: v1
kind: Namespace
metadata:
  name: production
  labels:
    pod-security.kubernetes.io/enforce: restricted
    pod-security.kubernetes.io/audit: restricted
    pod-security.kubernetes.io/warn: restricted
---
# Secure pod specification
apiVersion: v1
kind: Pod
metadata:
  name: secure-app
  namespace: production
spec:
  serviceAccountName: app-service-account
  securityContext:
    runAsNonRoot: true
    runAsUser: 1000
    runAsGroup: 1000
    fsGroup: 1000
    seccompProfile:
      type: RuntimeDefault
  containers:
  - name: app
    image: myapp:v1.2.3
    securityContext:
      allowPrivilegeEscalation: false
      readOnlyRootFilesystem: true
      runAsNonRoot: true
      runAsUser: 1000
      capabilities:
        drop:
        - ALL
    resources:
      requests:
        memory: "64Mi"
        cpu: "250m"
      limits:
        memory: "128Mi"
        cpu: "500m"
    volumeMounts:
    - name: tmp
      mountPath: /tmp
    - name: cache
      mountPath: /app/cache
    env:
    - name: DATABASE_URL
      valueFrom:
        secretKeyRef:
          name: db-credentials
          key: url
  volumes:
  - name: tmp
    emptyDir: {}
  - name: cache
    emptyDir: {}
---
# Network policy for the pod
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: secure-app-network-policy
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: secure-app
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - podSelector:
        matchLabels:
          app: ingress-controller
    ports:
    - protocol: TCP
      port: 8080
  egress:
  - to:
    - podSelector:
        matchLabels:
          app: database
    ports:
    - protocol: TCP
      port: 5432

6. Container Image Security

Image Scanning and Policies

# ECR repository with image scanning
resource "aws_ecr_repository" "app" {
  name                 = "production/app"
  image_tag_mutability = "IMMUTABLE"

  image_scanning_configuration {
    scan_on_push = true
  }

  encryption_configuration {
    encryption_type = "KMS"
    kms_key         = aws_kms_key.ecr.arn
  }

  lifecycle_policy {
    policy = jsonencode({
      rules = [
        {
          rulePriority = 1
          description  = "Keep last 10 production images"
          selection = {
            tagStatus = "tagged"
            tagPrefixList = ["v"]
            countType = "imageCountMoreThan"
            countNumber = 10
          }
          action = {
            type = "expire"
          }
        }
      ]
    })
  }

  tags = {
    Environment = "production"
    Security    = "hardened"
  }
}

Admission Controller for Image Policy

# Open Policy Agent Gatekeeper constraint
apiVersion: templates.gatekeeper.sh/v1beta1
kind: ConstraintTemplate
metadata:
  name: allowedregistries
spec:
  crd:
    spec:
      names:
        kind: AllowedRegistries
      validation:
        properties:
          registries:
            type: array
            items:
              type: string
  targets:
    - target: admission.k8s.gatekeeper.sh
      rego: |
        package allowedregistries
        
        violation[{"msg": msg}] {
          container := input.review.object.spec.containers[_]
          not starts_with(container.image, input.parameters.registries[_])
          msg := sprintf("Container image %v is not from an allowed registry", [container.image])
        }
---
apiVersion: constraints.gatekeeper.sh/v1beta1
kind: AllowedRegistries
metadata:
  name: must-use-ecr
spec:
  match:
    kinds:
      - apiGroups: [""]
        kinds: ["Pod"]
    namespaces: ["production"]
  parameters:
    registries:
      - "123456789012.dkr.ecr.us-east-1.amazonaws.com/"
      - "123456789012.dkr.ecr.us-west-2.amazonaws.com/"

7. Secrets Management

AWS Secrets Manager Integration

# Install AWS Load Balancer Controller with IRSA
apiVersion: v1
kind: ServiceAccount
metadata:
  name: secrets-store-csi-driver
  namespace: kube-system
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::ACCOUNT:role/secrets-store-csi-driver-role
---
# Secret provider class for AWS Secrets Manager
apiVersion: secrets-store.csi.x-k8s.io/v1
kind: SecretProviderClass
metadata:
  name: app-secrets
  namespace: production
spec:
  provider: aws
  parameters:
    objects: |
      - objectName: "prod/myapp/database"
        objectType: "secretsmanager"
        jmesPath:
          - path: "username"
            objectAlias: "db_username"
          - path: "password"
            objectAlias: "db_password"
          - path: "host"
            objectAlias: "db_host"
      - objectName: "prod/myapp/api-keys"
        objectType: "secretsmanager"
        jmesPath:
          - path: "stripe_key"
            objectAlias: "stripe_api_key"
---
# Pod using secrets from AWS Secrets Manager
apiVersion: v1
kind: Pod
metadata:
  name: app-with-secrets
  namespace: production
spec:
  serviceAccountName: app-service-account
  containers:
  - name: app
    image: myapp:latest
    volumeMounts:
    - name: secrets-store
      mountPath: "/mnt/secrets"
      readOnly: true
    env:
    - name: DB_USERNAME
      valueFrom:
        secretKeyRef:
          name: app-secrets
          key: db_username
  volumes:
  - name: secrets-store
    csi:
      driver: secrets-store.csi.k8s.io
      readOnly: true
      volumeAttributes:
        secretProviderClass: app-secrets

Continuous Security Monitoring

EKS Security Monitoring Dashboard

#!/usr/bin/env python3
"""
EKS Security Monitoring Dashboard
"""

import boto3
import subprocess
import json
from datetime import datetime, timedelta

class EKSSecurityMonitor:
    def __init__(self, cluster_name):
        self.cluster_name = cluster_name
        self.cloudwatch = boto3.client('cloudwatch')
        self.eks = boto3.client('eks')
        
    def collect_security_metrics(self):
        """Collect comprehensive security metrics"""
        
        metrics = {
            'timestamp': datetime.now().isoformat(),
            'cluster_name': self.cluster_name,
            'control_plane': self.check_control_plane_security(),
            'network': self.check_network_security(),
            'rbac': self.check_rbac_security(),
            'pods': self.check_pod_security(),
            'images': self.check_image_security(),
            'compliance': self.check_compliance_status()
        }
        
        return metrics
    
    def check_control_plane_security(self):
        """Check EKS control plane security"""
        
        try:
            cluster = self.eks.describe_cluster(name=self.cluster_name)['cluster']
            
            return {
                'encryption_enabled': 'encryptionConfig' in cluster,
                'logging_enabled': len(cluster.get('logging', {}).get('clusterLogging', [])) > 0,
                'private_endpoint': not cluster['resourcesVpcConfig']['endpointConfigResponse'].get('publicAccess', True),
                'version': cluster['version']
            }
        except Exception as e:
            return {'error': str(e)}
    
    def check_network_security(self):
        """Check network security configuration"""
        
        try:
            # Check for network policies
            result = subprocess.run(
                ['kubectl', 'get', 'networkpolicies', '--all-namespaces', '-o', 'json'],
                capture_output=True, text=True, check=True
            )
            
            policies = json.loads(result.stdout)
            
            return {
                'network_policies_count': len(policies['items']),
                'has_default_deny': any(
                    'default-deny' in policy['metadata']['name'] 
                    for policy in policies['items']
                )
            }
        except Exception as e:
            return {'error': str(e)}
    
    def check_rbac_security(self):
        """Check RBAC security status"""
        
        try:
            # Check for service accounts with admin access
            result = subprocess.run(
                ['kubectl', 'get', 'clusterrolebindings', '-o', 'json'],
                capture_output=True, text=True, check=True
            )
            
            bindings = json.loads(result.stdout)
            admin_bindings = [
                binding for binding in bindings['items']
                if binding['roleRef']['name'] == 'cluster-admin'
            ]
            
            return {
                'admin_bindings_count': len(admin_bindings),
                'rbac_enabled': True
            }
        except Exception as e:
            return {'error': str(e)}
    
    def check_pod_security(self):
        """Check pod security standards compliance"""
        
        try:
            result = subprocess.run(
                ['kubectl', 'get', 'pods', '--all-namespaces', '-o', 'json'],
                capture_output=True, text=True, check=True
            )
            
            pods = json.loads(result.stdout)
            
            privileged_pods = 0
            root_pods = 0
            
            for pod in pods['items']:
                for container in pod['spec'].get('containers', []):
                    security_context = container.get('securityContext', {})
                    
                    if security_context.get('privileged'):
                        privileged_pods += 1
                    
                    if security_context.get('runAsUser') == 0:
                        root_pods += 1
            
            return {
                'total_pods': len(pods['items']),
                'privileged_pods': privileged_pods,
                'root_pods': root_pods
            }
        except Exception as e:
            return {'error': str(e)}
    
    def check_image_security(self):
        """Check container image security"""
        
        try:
            result = subprocess.run(
                ['kubectl', 'get', 'pods', '--all-namespaces', '-o', 'json'],
                capture_output=True, text=True, check=True
            )
            
            pods = json.loads(result.stdout)
            
            images = set()
            for pod in pods['items']:
                for container in pod['spec'].get('containers', []):
                    images.add(container['image'])
            
            # Check if images are from trusted registries
            trusted_images = sum(
                1 for image in images 
                if '.dkr.ecr.' in image or 'gcr.io' in image
            )
            
            return {
                'total_unique_images': len(images),
                'trusted_registry_images': trusted_images,
                'untrusted_images': len(images) - trusted_images
            }
        except Exception as e:
            return {'error': str(e)}
    
    def check_compliance_status(self):
        """Check overall compliance status"""
        
        # This would integrate with compliance scanning tools
        return {
            'cis_compliance_score': 85,  # Example score
            'last_scan': datetime.now().isoformat(),
            'critical_findings': 2,
            'high_findings': 5,
            'medium_findings': 12
        }
    
    def send_metrics_to_cloudwatch(self, metrics):
        """Send security metrics to CloudWatch"""
        
        metric_data = []
        
        # Control plane metrics
        if 'control_plane' in metrics:
            cp = metrics['control_plane']
            if 'encryption_enabled' in cp:
                metric_data.append({
                    'MetricName': 'EncryptionEnabled',
                    'Value': 1 if cp['encryption_enabled'] else 0,
                    'Unit': 'Count'
                })
        
        # Pod security metrics
        if 'pods' in metrics:
            pods = metrics['pods']
            if 'privileged_pods' in pods:
                metric_data.append({
                    'MetricName': 'PrivilegedPods',
                    'Value': pods['privileged_pods'],
                    'Unit': 'Count'
                })
        
        # Send to CloudWatch
        if metric_data:
            self.cloudwatch.put_metric_data(
                Namespace='EKS/Security',
                MetricData=[
                    {
                        **metric,
                        'Dimensions': [
                            {
                                'Name': 'ClusterName',
                                'Value': self.cluster_name
                            }
                        ]
                    } for metric in metric_data
                ]
            )
    
    def generate_security_report(self):
        """Generate comprehensive security report"""
        
        metrics = self.collect_security_metrics()
        
        # Send to CloudWatch
        self.send_metrics_to_cloudwatch(metrics)
        
        # Generate report
        print(f"\n🔒 EKS Security Monitoring Report")
        print(f"{'='*50}")
        print(f"Cluster: {self.cluster_name}")
        print(f"Timestamp: {metrics['timestamp']}")
        
        # Control plane security
        cp = metrics.get('control_plane', {})
        print(f"\n🏗️  Control Plane Security:")
        print(f"  Encryption: {'✅' if cp.get('encryption_enabled') else '❌'}")
        print(f"  Logging: {'✅' if cp.get('logging_enabled') else '❌'}")
        print(f"  Private Endpoint: {'✅' if cp.get('private_endpoint') else '❌'}")
        
        # Pod security
        pods = metrics.get('pods', {})
        print(f"\n🚀 Pod Security:")
        print(f"  Total Pods: {pods.get('total_pods', 'N/A')}")
        print(f"  Privileged Pods: {pods.get('privileged_pods', 'N/A')}")
        print(f"  Root Pods: {pods.get('root_pods', 'N/A')}")
        
        # Image security
        images = metrics.get('images', {})
        print(f"\n📦 Image Security:")
        print(f"  Total Images: {images.get('total_unique_images', 'N/A')}")
        print(f"  Trusted Registry: {images.get('trusted_registry_images', 'N/A')}")
        print(f"  Untrusted Images: {images.get('untrusted_images', 'N/A')}")
        
        # Compliance
        compliance = metrics.get('compliance', {})
        print(f"\n📋 Compliance Status:")
        print(f"  CIS Score: {compliance.get('cis_compliance_score', 'N/A')}%")
        print(f"  Critical Findings: {compliance.get('critical_findings', 'N/A')}")
        print(f"  High Findings: {compliance.get('high_findings', 'N/A')}")
        
        # Save report
        filename = f'eks_security_report_{self.cluster_name}_{datetime.now().strftime("%Y%m%d")}.json'
        with open(filename, 'w') as f:
            json.dump(metrics, f, indent=2)
        
        print(f"\n📄 Report saved to: {filename}")
        
        return metrics

# Usage
monitor = EKSSecurityMonitor('production-cluster')
report = monitor.generate_security_report()

Implementation Timeline

Week 1: Foundation Security

  • Enable EKS cluster logging and encryption
  • Configure private API server access
  • Deploy worker nodes in private subnets
  • Implement basic RBAC policies
  • Set up network policies (default deny)

Week 2: Advanced Security

  • Harden worker nodes with security scripts
  • Deploy pod security standards
  • Configure image scanning in ECR
  • Implement secrets management with AWS Secrets Manager
  • Set up admission controllers

Week 3: Monitoring & Compliance

  • Deploy security monitoring dashboard
  • Configure CloudWatch alarms for security events
  • Implement compliance scanning
  • Set up automated security audits
  • Create incident response procedures

Week 4: Optimization & Training

  • Fine-tune security policies based on findings
  • Optimize performance impact of security controls
  • Train development team on secure practices
  • Document security procedures
  • Schedule regular security reviews

ROI of EKS Security Hardening

def calculate_eks_security_roi():
    """Calculate ROI of comprehensive EKS security hardening"""
    
    # Costs avoided through proper security
    benefits = {
        'prevented_crypto_mining': {
            'average_attack_cost': 35000,  # Average cryptomining attack cost
            'probability_reduction': 0.9   # 90% reduction in successful attacks
        },
        'prevented_data_breach': {
            'average_breach_cost': 4200000,  # IBM 2023 average
            'probability_reduction': 0.7     # 70% reduction
        },
        'compliance_efficiency': {
            'audit_hours_saved': 200,
            'audits_per_year': 2,
            'hourly_rate': 200
        },
        'operational_efficiency': {
            'incident_response_hours_saved': 40,
            'incidents_per_year': 12,
            'hourly_rate': 150
        },
        'developer_productivity': {
            'security_issue_resolution_hours_saved': 20,
            'issues_per_month': 6,
            'hourly_rate': 120
        }
    }
    
    annual_benefits = (
        benefits['prevented_crypto_mining']['average_attack_cost'] *
        benefits['prevented_crypto_mining']['probability_reduction'] +
        
        benefits['prevented_data_breach']['average_breach_cost'] *
        benefits['prevented_data_breach']['probability_reduction'] +
        
        benefits['compliance_efficiency']['audit_hours_saved'] *
        benefits['compliance_efficiency']['audits_per_year'] *
        benefits['compliance_efficiency']['hourly_rate'] +
        
        benefits['operational_efficiency']['incident_response_hours_saved'] *
        benefits['operational_efficiency']['incidents_per_year'] *
        benefits['operational_efficiency']['hourly_rate'] +
        
        benefits['developer_productivity']['security_issue_resolution_hours_saved'] *
        benefits['developer_productivity']['issues_per_month'] * 12 *
        benefits['developer_productivity']['hourly_rate']
    )
    
    # Implementation costs
    costs = {
        'initial_setup': {
            'engineering_hours': 160,
            'hourly_rate': 150
        },
        'ongoing_maintenance': {
            'monthly_hours': 20,
            'hourly_rate': 120
        },
        'tooling_costs': {
            'security_tools': 200,  # Monthly
            'monitoring': 150,      # Monthly
            'compliance': 300       # Monthly
        },
        'training': {
            'team_training_hours': 40,
            'hourly_rate': 150
        }
    }
    
    annual_costs = (
        costs['initial_setup']['engineering_hours'] *
        costs['initial_setup']['hourly_rate'] +
        
        costs['ongoing_maintenance']['monthly_hours'] * 12 *
        costs['ongoing_maintenance']['hourly_rate'] +
        
        (costs['tooling_costs']['security_tools'] +
         costs['tooling_costs']['monitoring'] +
         costs['tooling_costs']['compliance']) * 12 +
        
        costs['training']['team_training_hours'] *
        costs['training']['hourly_rate']
    )
    
    roi = ((annual_benefits - annual_costs) / annual_costs) * 100
    
    print(f"\n💰 EKS Security Hardening ROI Analysis:")
    print(f"Annual Benefits: ${annual_benefits:,.2f}")
    print(f"Annual Costs: ${annual_costs:,.2f}")
    print(f"Net Benefit: ${annual_benefits - annual_costs:,.2f}")
    print(f"ROI: {roi:.0f}%")
    print(f"\nBreakdown:")
    print(f"• Prevented cryptomining attacks: ${benefits['prevented_crypto_mining']['average_attack_cost'] * benefits['prevented_crypto_mining']['probability_reduction']:,.2f}")
    print(f"• Prevented data breaches: ${benefits['prevented_data_breach']['average_breach_cost'] * benefits['prevented_data_breach']['probability_reduction']:,.2f}")
    print(f"• Operational efficiency: ${benefits['operational_efficiency']['incident_response_hours_saved'] * benefits['operational_efficiency']['incidents_per_year'] * benefits['operational_efficiency']['hourly_rate']:,.2f}")
    
    return roi

roi = calculate_eks_security_roi()

Conclusion

EKS security hardening isn’t optional in 2025—it’s essential for protecting your applications, data, and infrastructure. The comprehensive approach in this guide addresses the most common attack vectors while maintaining operational efficiency.

The investment in proper EKS security pays dividends immediately through prevented attacks, reduced incident response time, and improved compliance posture. Most organizations see 10-25x ROI within the first year through avoided security incidents alone.

Your EKS security action plan:

  1. Start with the critical controls: API server security, network policies, RBAC
  2. Implement container security: Image scanning, pod security standards, secrets management
  3. Deploy comprehensive monitoring: Security metrics, automated auditing, incident response
  4. Maintain security posture: Regular reviews, updates, team training

Remember: Kubernetes security is complex, but following these proven practices will protect your clusters from 95% of real-world attacks. Don’t wait for a breach to implement proper security—start hardening your EKS clusters today.

Want automated EKS security hardening without the complexity? Modern platforms like PathShield can automatically detect EKS misconfigurations, provide hardening recommendations, and maintain continuous security monitoring—giving you enterprise-grade Kubernetes security with simple setup and management.

Back to Blog

Related Posts

View All Posts »