· AWS Security  · 22 min read

AWS GuardDuty Custom Rules and Threat Intelligence Integration

Master AWS GuardDuty custom threat detection by creating advanced rules, integrating external threat intelligence feeds, and building automated response systems. Complete guide with production-ready code.

Master AWS GuardDuty custom threat detection by creating advanced rules, integrating external threat intelligence feeds, and building automated response systems. Complete guide with production-ready code.

AWS GuardDuty provides excellent out-of-the-box threat detection, but its real power comes from customization. By creating custom threat detection rules and integrating external threat intelligence feeds, you can build a sophisticated security monitoring system tailored to your specific environment and threat landscape.

In this comprehensive guide, we’ll show you how to extend GuardDuty beyond its default capabilities, create custom threat detection rules, integrate multiple threat intelligence sources, and build automated response systems that adapt to your unique security requirements.

Understanding GuardDuty’s Architecture

Before diving into customization, it’s crucial to understand how GuardDuty works under the hood. GuardDuty analyzes three primary data sources:

VPC Flow Logs: Network traffic patterns, communication flows, and connection metadata DNS Logs: Domain resolution requests and responses CloudTrail Event Logs: API calls, authentication events, and resource changes

GuardDuty uses machine learning models, threat intelligence feeds, and behavioral analysis to identify anomalies and known threats. However, every organization has unique infrastructure patterns, applications, and threat models that require custom detection logic.

Setting Up Enhanced GuardDuty Configuration

1. Advanced GuardDuty Setup

Let’s start with a comprehensive GuardDuty setup that maximizes detection capabilities:

import boto3
import json
from datetime import datetime, timedelta

class AdvancedGuardDutyManager:
    def __init__(self, region='us-east-1'):
        self.guardduty = boto3.client('guardduty', region_name=region)
        self.s3 = boto3.client('s3', region_name=region)
        self.lambda_client = boto3.client('lambda', region_name=region)
        self.iam = boto3.client('iam', region_name=region)
        self.region = region
        self.detector_id = None
    
    def setup_enhanced_guardduty(self):
        """Set up GuardDuty with enhanced configuration"""
        
        # Enable GuardDuty if not already enabled
        try:
            response = self.guardduty.list_detectors()
            if response['DetectorIds']:
                self.detector_id = response['DetectorIds'][0]
                print(f"Using existing GuardDuty detector: {self.detector_id}")
            else:
                # Create new detector
                response = self.guardduty.create_detector(
                    Enable=True,
                    FindingPublishingFrequency='FIFTEEN_MINUTES',
                    DataSources={
                        'S3Logs': {'Enable': True},
                        'KubernetesAuditLogs': {'Enable': True},
                        'MalwareProtection': {'ScanEc2InstanceWithFindings': {'EbsVolumes': True}}
                    }
                )
                self.detector_id = response['DetectorId']
                print(f"Created new GuardDuty detector: {self.detector_id}")
        
        except Exception as e:
            print(f"Error setting up GuardDuty: {e}")
            return False
        
        # Configure enhanced features
        self.enable_malware_protection()
        self.configure_s3_protection()
        self.setup_eks_protection()
        
        return True
    
    def enable_malware_protection(self):
        """Enable GuardDuty Malware Protection"""
        try:
            self.guardduty.update_malware_scan_settings(
                DetectorId=self.detector_id,
                ScanResourceCriteria={
                    'Include': {
                        'EC2_INSTANCE_TAG': {
                            'MapEquals': {
                                'Environment': 'Production',
                                'Scan': 'True'
                            }
                        }
                    }
                },
                EbsSnapshotPreservation='WITH_FINDINGS'
            )
            print("Malware protection enabled")
        except Exception as e:
            print(f"Error enabling malware protection: {e}")
    
    def configure_s3_protection(self):
        """Configure S3 Protection settings"""
        try:
            self.guardduty.update_detector(
                DetectorId=self.detector_id,
                DataSources={
                    'S3Logs': {'Enable': True}
                }
            )
            print("S3 protection configured")
        except Exception as e:
            print(f"Error configuring S3 protection: {e}")
    
    def setup_eks_protection(self):
        """Set up EKS Protection"""
        try:
            self.guardduty.update_detector(
                DetectorId=self.detector_id,
                DataSources={
                    'KubernetesAuditLogs': {'Enable': True}
                }
            )
            print("EKS protection enabled")
        except Exception as e:
            print(f"Error setting up EKS protection: {e}")

2. Custom Threat Intelligence Integration

Now let’s integrate external threat intelligence feeds:

def setup_threat_intelligence_feeds(self):
    """Set up custom threat intelligence feeds"""
    
    # Create S3 bucket for threat intelligence data
    ti_bucket_name = f'guardduty-threat-intel-{self.region}'
    
    try:
        self.s3.create_bucket(
            Bucket=ti_bucket_name,
            CreateBucketConfiguration={'LocationConstraint': self.region}
        )
        print(f"Created threat intelligence bucket: {ti_bucket_name}")
    except self.s3.exceptions.BucketAlreadyOwnedByYou:
        print(f"Bucket {ti_bucket_name} already exists")
    except Exception as e:
        print(f"Error creating bucket: {e}")
        return
    
    # Set up threat intelligence feeds
    threat_feeds = [
        {
            'name': 'malicious-ips',
            'type': 'IPSet',
            'format': 'TXT',
            'location': f'https://{ti_bucket_name}.s3.{self.region}.amazonaws.com/malicious-ips.txt'
        },
        {
            'name': 'malicious-domains', 
            'type': 'ThreatIntelSet',
            'format': 'TXT',
            'location': f'https://{ti_bucket_name}.s3.{self.region}.amazonaws.com/malicious-domains.txt'
        }
    ]
    
    for feed in threat_feeds:
        try:
            if feed['type'] == 'IPSet':
                response = self.guardduty.create_ip_set(
                    DetectorId=self.detector_id,
                    Name=feed['name'],
                    Format=feed['format'],
                    Location=feed['location'],
                    Activate=True
                )
                print(f"Created IP set: {feed['name']}")
            
            elif feed['type'] == 'ThreatIntelSet':
                response = self.guardduty.create_threat_intel_set(
                    DetectorId=self.detector_id,
                    Name=feed['name'],
                    Format=feed['format'],
                    Location=feed['location'],
                    Activate=True
                )
                print(f"Created threat intel set: {feed['name']}")
                
        except Exception as e:
            print(f"Error creating {feed['name']}: {e}")

def update_threat_intelligence_feeds(self):
    """Update threat intelligence feeds with fresh data"""
    
    # Fetch threat intelligence from multiple sources
    threat_sources = [
        'https://raw.githubusercontent.com/stamparm/ipsum/master/ipsum.txt',
        'https://raw.githubusercontent.com/firehol/blocklist-ipsets/master/firehol_level1.netset',
        'https://raw.githubusercontent.com/Spam404/lists/master/main-blacklist.txt'
    ]
    
    malicious_ips = set()
    malicious_domains = set()
    
    import requests
    
    for source in threat_sources:
        try:
            response = requests.get(source, timeout=30)
            if response.status_code == 200:
                lines = response.text.strip().split('\n')
                for line in lines:
                    line = line.strip()
                    if line and not line.startswith('#'):
                        # Simple IP detection
                        if '.' in line and all(part.isdigit() for part in line.split('.')):
                            malicious_ips.add(line)
                        # Simple domain detection
                        elif '.' in line and not line.replace('.', '').replace('-', '').isdigit():
                            malicious_domains.add(line)
        except Exception as e:
            print(f"Error fetching from {source}: {e}")
    
    # Upload to S3
    bucket_name = f'guardduty-threat-intel-{self.region}'
    
    # Upload malicious IPs
    ip_content = '\n'.join(sorted(malicious_ips))
    self.s3.put_object(
        Bucket=bucket_name,
        Key='malicious-ips.txt',
        Body=ip_content,
        ContentType='text/plain'
    )
    
    # Upload malicious domains
    domain_content = '\n'.join(sorted(malicious_domains))
    self.s3.put_object(
        Bucket=bucket_name,
        Key='malicious-domains.txt',
        Body=domain_content,
        ContentType='text/plain'
    )
    
    print(f"Updated threat intelligence: {len(malicious_ips)} IPs, {len(malicious_domains)} domains")
    
    # Update GuardDuty feeds
    self.update_guardduty_feeds()

def update_guardduty_feeds(self):
    """Update GuardDuty threat intelligence feeds"""
    
    try:
        # Get existing IP sets
        ip_sets = self.guardduty.list_ip_sets(DetectorId=self.detector_id)
        for ip_set_id in ip_sets['IpSetIds']:
            self.guardduty.update_ip_set(
                DetectorId=self.detector_id,
                IpSetId=ip_set_id,
                Activate=True
            )
        
        # Get existing threat intel sets
        threat_intel_sets = self.guardduty.list_threat_intel_sets(DetectorId=self.detector_id)
        for ti_set_id in threat_intel_sets['ThreatIntelSetIds']:
            self.guardduty.update_threat_intel_set(
                DetectorId=self.detector_id,
                ThreatIntelSetId=ti_set_id,
                Activate=True
            )
        
        print("GuardDuty threat intelligence feeds updated")
        
    except Exception as e:
        print(f"Error updating GuardDuty feeds: {e}")

Creating Custom Detection Rules

1. Custom Finding Types

Create custom finding types for your specific environment:

def create_custom_finding_types(self):
    """Create custom finding types for organization-specific threats"""
    
    custom_findings = [
        {
            'type': 'Custom:Cryptocurrency/BitcoinTraffic',
            'description': 'Cryptocurrency mining traffic detected',
            'severity': 8.0,
            'confidence': 7.5
        },
        {
            'type': 'Custom:Exfiltration/LargeDataTransfer', 
            'description': 'Unusual large data transfer detected',
            'severity': 7.0,
            'confidence': 6.0
        },
        {
            'type': 'Custom:Persistence/ScheduledTaskCreation',
            'description': 'Suspicious scheduled task creation',
            'severity': 6.5,
            'confidence': 8.0
        },
        {
            'type': 'Custom:Defense-Evasion/LogDeletion',
            'description': 'Security log deletion detected', 
            'severity': 9.0,
            'confidence': 9.0
        }
    ]
    
    # Store custom finding definitions
    bucket_name = f'guardduty-custom-rules-{self.region}'
    
    try:
        self.s3.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={'LocationConstraint': self.region}
        )
    except:
        pass  # Bucket might already exist
    
    # Upload custom finding definitions
    self.s3.put_object(
        Bucket=bucket_name,
        Key='custom-findings.json',
        Body=json.dumps(custom_findings, indent=2),
        ContentType='application/json'
    )
    
    print(f"Created {len(custom_findings)} custom finding types")

def generate_custom_findings(self, finding_type, details):
    """Generate custom GuardDuty findings"""
    
    # Create custom finding
    finding = {
        'accountId': boto3.Session().get_credentials().access_key.split(':')[4] if ':' in boto3.Session().get_credentials().access_key else 'unknown',
        'region': self.region,
        'type': finding_type,
        'id': f"custom-{datetime.now().strftime('%Y%m%d%H%M%S')}",
        'partition': 'aws',
        'arn': f"arn:aws:guardduty:{self.region}:*:detector/{self.detector_id}/finding/custom-{datetime.now().strftime('%Y%m%d%H%M%S')}",
        'createdAt': datetime.utcnow().isoformat() + 'Z',
        'updatedAt': datetime.utcnow().isoformat() + 'Z',
        'severity': details.get('severity', 5.0),
        'confidence': details.get('confidence', 5.0),
        'title': details.get('title', 'Custom Security Finding'),
        'description': details.get('description', 'Custom security event detected'),
        'service': {
            'serviceName': 'custom-detection',
            'detectorId': self.detector_id,
            'resourceRole': 'TARGET',
            'eventFirstSeen': datetime.utcnow().isoformat() + 'Z',
            'eventLastSeen': datetime.utcnow().isoformat() + 'Z',
            'archived': False,
            'count': 1
        },
        'resource': details.get('resource', {}),
        'additionalInfo': details.get('additionalInfo', {})
    }
    
    # Send to custom processing pipeline
    self.process_custom_finding(finding)
    
    return finding

def process_custom_finding(self, finding):
    """Process custom findings through analysis pipeline"""
    
    # Store finding for analysis
    bucket_name = f'guardduty-custom-rules-{self.region}'
    
    finding_key = f"findings/{datetime.now().strftime('%Y/%m/%d')}/{finding['id']}.json"
    
    self.s3.put_object(
        Bucket=bucket_name,
        Key=finding_key,
        Body=json.dumps(finding, indent=2),
        ContentType='application/json'
    )
    
    # Trigger automated analysis
    self.trigger_finding_analysis(finding)

def trigger_finding_analysis(self, finding):
    """Trigger automated analysis of custom findings"""
    
    # Invoke Lambda function for analysis
    lambda_payload = {
        'finding': finding,
        'analysis_type': 'custom',
        'timestamp': datetime.utcnow().isoformat()
    }
    
    try:
        self.lambda_client.invoke(
            FunctionName='guardduty-custom-analysis',
            InvocationType='Event',
            Payload=json.dumps(lambda_payload)
        )
        print(f"Triggered analysis for finding: {finding['id']}")
    except Exception as e:
        print(f"Error triggering analysis: {e}")

2. Advanced Behavioral Analysis

Implement sophisticated behavioral analysis:

def setup_behavioral_analysis(self):
    """Set up behavioral analysis for custom threat detection"""
    
    behavioral_rules = [
        {
            'name': 'unusual_api_volume',
            'description': 'Detect unusual API call volume',
            'threshold': 1000,
            'window': 3600,  # 1 hour
            'baseline_days': 7
        },
        {
            'name': 'off_hours_access',
            'description': 'Detect access during unusual hours',
            'allowed_hours': [9, 10, 11, 12, 13, 14, 15, 16, 17],
            'severity': 6.0
        },
        {
            'name': 'geographical_anomaly',
            'description': 'Detect access from unusual locations',
            'allowed_countries': ['US', 'CA'],
            'severity': 8.0
        },
        {
            'name': 'privilege_escalation_pattern',
            'description': 'Detect privilege escalation patterns',
            'suspicious_actions': [
                'AttachUserPolicy',
                'PutUserPolicy', 
                'CreateRole',
                'AttachRolePolicy'
            ],
            'threshold': 5,
            'window': 1800  # 30 minutes
        }
    ]
    
    # Store behavioral rules
    bucket_name = f'guardduty-custom-rules-{self.region}'
    
    self.s3.put_object(
        Bucket=bucket_name,
        Key='behavioral-rules.json',
        Body=json.dumps(behavioral_rules, indent=2),
        ContentType='application/json'
    )
    
    print(f"Created {len(behavioral_rules)} behavioral analysis rules")

def analyze_user_behavior(self, username, hours=24):
    """Analyze user behavior for anomalies"""
    
    # Get user's CloudTrail events
    cloudtrail = boto3.client('cloudtrail')
    
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=hours)
    
    try:
        events = cloudtrail.lookup_events(
            LookupAttributes=[
                {
                    'AttributeKey': 'Username',
                    'AttributeValue': username
                }
            ],
            StartTime=start_time,
            EndTime=end_time
        )
        
        user_events = events['Events']
        
        # Analyze patterns
        anomalies = []
        
        # Check API volume
        if len(user_events) > 1000:  # Threshold
            anomalies.append({
                'type': 'unusual_api_volume',
                'severity': 7.0,
                'details': f'User {username} made {len(user_events)} API calls in {hours} hours'
            })
        
        # Check time patterns
        for event in user_events:
            event_hour = event['EventTime'].hour
            if event_hour < 9 or event_hour > 17:  # Outside business hours
                anomalies.append({
                    'type': 'off_hours_access',
                    'severity': 6.0,
                    'details': f'Access at {event["EventTime"]} outside business hours'
                })
                break  # Don't spam alerts
        
        # Check geographical patterns
        source_ips = set()
        for event in user_events:
            if 'SourceIPAddress' in event:
                source_ips.add(event['SourceIPAddress'])
        
        for ip in source_ips:
            # In production, use IP geolocation service
            if self.is_suspicious_location(ip):
                anomalies.append({
                    'type': 'geographical_anomaly',
                    'severity': 8.0,
                    'details': f'Access from suspicious IP: {ip}'
                })
        
        # Generate custom findings for anomalies
        for anomaly in anomalies:
            self.generate_custom_findings(
                f"Custom:Behavior/{anomaly['type']}", 
                {
                    'severity': anomaly['severity'],
                    'description': anomaly['details'],
                    'resource': {
                        'type': 'IAMUser',
                        'id': username
                    }
                }
            )
        
        return anomalies
        
    except Exception as e:
        print(f"Error analyzing user behavior: {e}")
        return []

def is_suspicious_location(self, ip_address):
    """Check if IP address is from suspicious location"""
    
    # This is a simplified example
    # In production, use a proper IP geolocation service
    
    suspicious_ranges = [
        '192.168.',  # Private networks shouldn't be in CloudTrail
        '10.',       # Private networks
        '172.16.',   # Private networks
    ]
    
    for range_prefix in suspicious_ranges:
        if ip_address.startswith(range_prefix):
            return True
    
    return False

Building Custom Lambda Functions for Analysis

1. Advanced Finding Analysis

Create a Lambda function for sophisticated finding analysis:

def create_analysis_lambda(self):
    """Create Lambda function for custom finding analysis"""
    
    lambda_code = '''
import json
import boto3
from datetime import datetime, timedelta

def lambda_handler(event, context):
    """Analyze custom GuardDuty findings"""
    
    finding = event['finding']
    analysis_type = event.get('analysis_type', 'standard')
    
    # Initialize AWS clients
    guardduty = boto3.client('guardduty')
    s3 = boto3.client('s3')
    sns = boto3.client('sns')
    
    analysis_results = {
        'finding_id': finding['id'],
        'analysis_timestamp': datetime.utcnow().isoformat(),
        'risk_score': calculate_risk_score(finding),
        'recommended_actions': get_recommended_actions(finding),
        'context_analysis': analyze_context(finding),
        'threat_indicators': extract_threat_indicators(finding)
    }
    
    # Store analysis results
    bucket_name = 'guardduty-analysis-results'
    s3.put_object(
        Bucket=bucket_name,
        Key=f"analysis/{finding['id']}.json",
        Body=json.dumps(analysis_results, indent=2)
    )
    
    # Send high-risk findings to security team
    if analysis_results['risk_score'] > 8.0:
        send_security_alert(finding, analysis_results)
    
    return {
        'statusCode': 200,
        'body': json.dumps(analysis_results)
    }

def calculate_risk_score(finding):
    """Calculate comprehensive risk score"""
    
    base_score = finding.get('severity', 5.0)
    confidence = finding.get('confidence', 5.0)
    
    # Adjust based on resource type
    resource_multipliers = {
        'IAMUser': 1.2,
        'S3Bucket': 1.1,
        'EC2Instance': 1.0,
        'RDSInstance': 1.3
    }
    
    resource_type = finding.get('resource', {}).get('type', 'Unknown')
    multiplier = resource_multipliers.get(resource_type, 1.0)
    
    # Adjust based on time (recent findings are higher risk)
    time_factor = 1.0
    if 'createdAt' in finding:
        created_time = datetime.fromisoformat(finding['createdAt'].replace('Z', '+00:00'))
        age_hours = (datetime.utcnow().replace(tzinfo=created_time.tzinfo) - created_time).total_seconds() / 3600
        if age_hours < 1:
            time_factor = 1.2
        elif age_hours < 24:
            time_factor = 1.1
    
    risk_score = (base_score * confidence / 10) * multiplier * time_factor
    return min(risk_score, 10.0)  # Cap at 10

def get_recommended_actions(finding):
    """Get recommended actions based on finding type"""
    
    finding_type = finding.get('type', '')
    
    action_map = {
        'Cryptocurrency': [
            'Block network traffic to cryptocurrency pools',
            'Investigate affected instances',
            'Check for unauthorized software'
        ],
        'Exfiltration': [
            'Review data access logs',
            'Check network traffic patterns',
            'Verify user permissions'
        ],
        'Persistence': [
            'Review scheduled tasks and services',
            'Check for unauthorized accounts',
            'Audit system configurations'
        ],
        'Defense-Evasion': [
            'Review security tool configurations',
            'Check for disabled security features',
            'Audit log retention policies'
        ]
    }
    
    for category, actions in action_map.items():
        if category in finding_type:
            return actions
    
    return ['Investigate the finding details', 'Review related logs', 'Verify system integrity']

def analyze_context(finding):
    """Analyze finding context for additional insights"""
    
    context = {
        'related_findings': [],
        'user_history': {},
        'resource_relationships': {},
        'timeline_analysis': {}
    }
    
    # In production, this would query other data sources
    # to build comprehensive context
    
    return context

def extract_threat_indicators(finding):
    """Extract threat indicators from finding"""
    
    indicators = {
        'ips': [],
        'domains': [],
        'user_agents': [],
        'file_hashes': [],
        'ttps': []  # Tactics, Techniques, and Procedures
    }
    
    # Extract from finding details
    if 'service' in finding:
        service_info = finding['service']
        if 'remoteIpDetails' in service_info:
            indicators['ips'].append(service_info['remoteIpDetails'].get('ipAddressV4'))
    
    return indicators

def send_security_alert(finding, analysis):
    """Send security alert for high-risk findings"""
    
    message = {
        'alert_type': 'HIGH_RISK_FINDING',
        'finding_id': finding['id'],
        'finding_type': finding.get('type'),
        'risk_score': analysis['risk_score'],
        'description': finding.get('description'),
        'recommended_actions': analysis['recommended_actions'],
        'timestamp': datetime.utcnow().isoformat()
    }
    
    sns = boto3.client('sns')
    sns.publish(
        TopicArn='arn:aws:sns:us-east-1:*:security-alerts',
        Message=json.dumps(message, indent=2),
        Subject=f"High Risk Security Finding: {finding.get('type')}"
    )
'''
    
    # Create the Lambda function
    try:
        function_name = 'guardduty-custom-analysis'
        
        # Create or update function
        try:
            self.lambda_client.create_function(
                FunctionName=function_name,
                Runtime='python3.9',
                Role='arn:aws:iam::*:role/lambda-guardduty-analysis-role',
                Handler='index.lambda_handler',
                Code={'ZipFile': lambda_code.encode()},
                Description='Custom GuardDuty finding analysis',
                Timeout=300,
                MemorySize=512
            )
            print(f"Created Lambda function: {function_name}")
        except self.lambda_client.exceptions.ResourceConflictException:
            self.lambda_client.update_function_code(
                FunctionName=function_name,
                ZipFile=lambda_code.encode()
            )
            print(f"Updated Lambda function: {function_name}")
            
    except Exception as e:
        print(f"Error creating analysis Lambda: {e}")

2. Threat Intelligence Enrichment

Create a function to enrich findings with threat intelligence:

def create_threat_intel_enrichment_lambda(self):
    """Create Lambda function for threat intelligence enrichment"""
    
    enrichment_code = '''
import json
import boto3
import requests
from datetime import datetime

def lambda_handler(event, context):
    """Enrich GuardDuty findings with threat intelligence"""
    
    finding = event['detail']
    
    # Extract indicators from finding
    indicators = extract_indicators(finding)
    
    # Enrich with threat intelligence
    enrichment_data = {}
    
    for indicator_type, values in indicators.items():
        enrichment_data[indicator_type] = []
        for value in values:
            intel = get_threat_intelligence(indicator_type, value)
            if intel:
                enrichment_data[indicator_type].append({
                    'indicator': value,
                    'intelligence': intel
                })
    
    # Store enriched data
    store_enrichment_data(finding['id'], enrichment_data)
    
    # Update finding severity if high-confidence threat intel found
    update_finding_severity(finding, enrichment_data)
    
    return {
        'statusCode': 200,
        'body': json.dumps({
            'finding_id': finding['id'],
            'enrichment_data': enrichment_data
        })
    }

def extract_indicators(finding):
    """Extract threat indicators from GuardDuty finding"""
    
    indicators = {
        'ips': [],
        'domains': [],
        'urls': []
    }
    
    # Extract from service details
    if 'service' in finding:
        service = finding['service']
        
        # Remote IP details
        if 'remoteIpDetails' in service:
            ip_details = service['remoteIpDetails']
            if 'ipAddressV4' in ip_details:
                indicators['ips'].append(ip_details['ipAddressV4'])
        
        # Action details
        if 'action' in service:
            action = service['action']
            
            # DNS request action
            if action.get('actionType') == 'DNS_REQUEST':
                dns_details = action.get('dnsRequestAction', {})
                if 'domain' in dns_details:
                    indicators['domains'].append(dns_details['domain'])
            
            # Network connection action
            elif action.get('actionType') == 'NETWORK_CONNECTION':
                conn_details = action.get('networkConnectionAction', {})
                if 'remoteIpDetails' in conn_details:
                    remote_ip = conn_details['remoteIpDetails'].get('ipAddressV4')
                    if remote_ip:
                        indicators['ips'].append(remote_ip)
    
    return indicators

def get_threat_intelligence(indicator_type, indicator_value):
    """Get threat intelligence for indicator"""
    
    intel_sources = {
        'ips': [
            'https://api.virustotal.com/api/v3/ip_addresses/',
            'https://api.abuseipdb.com/api/v2/check'
        ],
        'domains': [
            'https://api.virustotal.com/api/v3/domains/',
            'https://api.urlvoid.com/api1000/'
        ]
    }
    
    intelligence = {
        'reputation': 'unknown',
        'categories': [],
        'first_seen': None,
        'last_seen': None,
        'confidence': 0
    }
    
    # Query threat intelligence APIs
    # This is a simplified example - in production, use proper API keys and error handling
    
    if indicator_type == 'ips':
        # Example: Check against known malicious IP lists
        malicious_ips = get_malicious_ip_list()
        if indicator_value in malicious_ips:
            intelligence['reputation'] = 'malicious'
            intelligence['categories'] = ['botnet', 'malware']
            intelligence['confidence'] = 9
    
    elif indicator_type == 'domains':
        # Example: Check against known malicious domains
        malicious_domains = get_malicious_domain_list()
        if indicator_value in malicious_domains:
            intelligence['reputation'] = 'malicious'
            intelligence['categories'] = ['phishing', 'malware']
            intelligence['confidence'] = 8
    
    return intelligence if intelligence['reputation'] != 'unknown' else None

def get_malicious_ip_list():
    """Get list of known malicious IPs"""
    # In production, this would fetch from your threat intelligence platform
    return [
        '192.168.1.100',
        '10.0.0.50'
    ]

def get_malicious_domain_list():
    """Get list of known malicious domains"""
    # In production, this would fetch from your threat intelligence platform
    return [
        'evil.example.com',
        'malware.test'
    ]

def store_enrichment_data(finding_id, enrichment_data):
    """Store threat intelligence enrichment data"""
    
    s3 = boto3.client('s3')
    bucket_name = 'guardduty-threat-intel-enrichment'
    
    try:
        s3.put_object(
            Bucket=bucket_name,
            Key=f"enrichment/{finding_id}.json",
            Body=json.dumps(enrichment_data, indent=2),
            ContentType='application/json'
        )
    except Exception as e:
        print(f"Error storing enrichment data: {e}")

def update_finding_severity(finding, enrichment_data):
    """Update finding severity based on threat intelligence"""
    
    # Calculate threat intelligence score
    ti_score = 0
    for indicator_type, indicators in enrichment_data.items():
        for indicator in indicators:
            confidence = indicator['intelligence'].get('confidence', 0)
            if confidence >= 8:
                ti_score += 2
            elif confidence >= 6:
                ti_score += 1
    
    # Update severity if high-confidence threat intelligence found
    if ti_score >= 3:
        # In production, you would update the finding via GuardDuty API
        # or send to a SIEM system
        print(f"High-confidence threat intelligence found for finding {finding['id']}")
'''
    
    # Create the Lambda function
    try:
        function_name = 'guardduty-threat-intel-enrichment'
        
        self.lambda_client.create_function(
            FunctionName=function_name,
            Runtime='python3.9',
            Role='arn:aws:iam::*:role/lambda-guardduty-enrichment-role',
            Handler='index.lambda_handler',
            Code={'ZipFile': enrichment_code.encode()},
            Description='GuardDuty threat intelligence enrichment',
            Timeout=300,
            MemorySize=512
        )
        print(f"Created threat intelligence enrichment function: {function_name}")
        
    except self.lambda_client.exceptions.ResourceConflictException:
        self.lambda_client.update_function_code(
            FunctionName=function_name,
            ZipFile=enrichment_code.encode()
        )
        print(f"Updated threat intelligence enrichment function: {function_name}")
    except Exception as e:
        print(f"Error creating enrichment Lambda: {e}")

Automated Response and Orchestration

1. Security Orchestration System

Build a comprehensive security orchestration system:

def create_security_orchestration_system(self):
    """Create security orchestration and automated response system"""
    
    orchestration_code = '''
import json
import boto3
from datetime import datetime, timedelta

def lambda_handler(event, context):
    """Orchestrate security response actions"""
    
    finding = event['detail']
    finding_type = finding.get('type', '')
    severity = finding.get('severity')
    
    # Initialize response actions
    response_actions = []
    
    # Determine response based on finding type and severity
    if severity >= 7.0:
        response_actions.extend(get_high_severity_actions(finding))
    elif severity >= 4.0:
        response_actions.extend(get_medium_severity_actions(finding))
    else:
        response_actions.extend(get_low_severity_actions(finding))
    
    # Add finding-specific actions
    response_actions.extend(get_finding_specific_actions(finding_type, finding))
    
    # Execute response actions
    execution_results = []
    for action in response_actions:
        try:
            result = execute_response_action(action, finding)
            execution_results.append({
                'action': action,
                'status': 'success',
                'result': result
            })
        except Exception as e:
            execution_results.append({
                'action': action,
                'status': 'failed',
                'error': str(e)
            })
    
    # Log response actions
    log_response_actions(finding['id'], execution_results)
    
    # Send summary to security team
    send_response_summary(finding, execution_results)
    
    return {
        'statusCode': 200,
        'body': json.dumps({
            'finding_id': finding['id'],
            'actions_executed': len(execution_results),
            'results': execution_results
        })
    }

def get_high_severity_actions(finding):
    """Get response actions for high severity findings"""
    return [
        'isolate_affected_resources',
        'block_malicious_ips',
        'disable_compromised_users',
        'create_incident_ticket',
        'notify_security_team',
        'collect_forensic_evidence'
    ]

def get_medium_severity_actions(finding):
    """Get response actions for medium severity findings"""
    return [
        'log_detailed_analysis',
        'monitor_affected_resources', 
        'update_security_groups',
        'notify_resource_owners'
    ]

def get_low_severity_actions(finding):
    """Get response actions for low severity findings"""
    return [
        'log_finding_details',
        'update_metrics',
        'schedule_review'
    ]

def get_finding_specific_actions(finding_type, finding):
    """Get finding-specific response actions"""
    
    specific_actions = []
    
    if 'Cryptocurrency' in finding_type:
        specific_actions.extend([
            'block_crypto_pools',
            'scan_for_miners',
            'check_compute_usage'
        ])
    
    elif 'Backdoor' in finding_type:
        specific_actions.extend([
            'isolate_instance',
            'capture_memory_dump',
            'analyze_network_connections'
        ])
    
    elif 'Exfiltration' in finding_type:
        specific_actions.extend([
            'analyze_data_transfers',
            'review_access_patterns',
            'check_data_classification'
        ])
    
    elif 'Persistence' in finding_type:
        specific_actions.extend([
            'audit_scheduled_tasks',
            'check_startup_programs',
            'review_service_accounts'
        ])
    
    return specific_actions

def execute_response_action(action, finding):
    """Execute individual response action"""
    
    if action == 'isolate_affected_resources':
        return isolate_resources(finding)
    elif action == 'block_malicious_ips':
        return block_ips(finding)
    elif action == 'disable_compromised_users':
        return disable_users(finding)
    elif action == 'create_incident_ticket':
        return create_incident(finding)
    elif action == 'collect_forensic_evidence':
        return collect_evidence(finding)
    else:
        return log_action(action, finding)

def isolate_resources(finding):
    """Isolate affected resources"""
    
    ec2 = boto3.client('ec2')
    
    # Get resource details from finding
    resource = finding.get('resource', {})
    
    if resource.get('resourceType') == 'Instance':
        instance_id = resource.get('instanceDetails', {}).get('instanceId')
        if instance_id:
            # Create isolation security group
            isolation_sg = create_isolation_security_group(ec2)
            
            # Apply isolation security group
            ec2.modify_instance_attribute(
                InstanceId=instance_id,
                Groups=[isolation_sg['GroupId']]
            )
            
            return f"Isolated instance {instance_id}"
    
    return "No resources to isolate"

def create_isolation_security_group(ec2):
    """Create security group for resource isolation"""
    
    try:
        response = ec2.create_security_group(
            GroupName=f'isolation-sg-{datetime.now().strftime("%Y%m%d%H%M%S")}',
            Description='Isolation security group for compromised resources'
        )
        
        # Remove all rules (default deny all)
        # Only allow specific administrative access if needed
        
        return response
    except Exception as e:
        print(f"Error creating isolation security group: {e}")
        raise

def block_ips(finding):
    """Block malicious IP addresses"""
    
    ec2 = boto3.client('ec2')
    
    # Extract IP addresses from finding
    malicious_ips = []
    
    service = finding.get('service', {})
    if 'remoteIpDetails' in service:
        ip = service['remoteIpDetails'].get('ipAddressV4')
        if ip:
            malicious_ips.append(ip)
    
    blocked_count = 0
    for ip in malicious_ips:
        try:
            # Add to security group rules to block IP
            # This is a simplified example
            blocked_count += 1
        except Exception as e:
            print(f"Error blocking IP {ip}: {e}")
    
    return f"Blocked {blocked_count} IP addresses"

def disable_users(finding):
    """Disable compromised user accounts"""
    
    iam = boto3.client('iam')
    
    # Extract user information from finding
    user_identity = finding.get('service', {}).get('userDetails', {})
    username = user_identity.get('userName')
    
    if username:
        try:
            # Attach deny-all policy to user
            iam.attach_user_policy(
                UserName=username,
                PolicyArn='arn:aws:iam::aws:policy/AWSDenyAll'
            )
            
            return f"Disabled user {username}"
        except Exception as e:
            print(f"Error disabling user {username}: {e}")
            return f"Failed to disable user {username}"
    
    return "No users to disable"

def create_incident(finding):
    """Create incident ticket"""
    
    # In production, integrate with your ticketing system
    incident_data = {
        'title': f"Security Incident: {finding.get('type')}",
        'description': finding.get('description'),
        'severity': finding.get('severity'),
        'finding_id': finding.get('id'),
        'created_at': datetime.utcnow().isoformat()
    }
    
    # Store incident data
    s3 = boto3.client('s3')
    s3.put_object(
        Bucket='security-incidents',
        Key=f"incidents/{finding['id']}.json",
        Body=json.dumps(incident_data, indent=2)
    )
    
    return f"Created incident ticket for finding {finding['id']}"

def collect_evidence(finding):
    """Collect forensic evidence"""
    
    evidence_data = {
        'finding_id': finding['id'],
        'collection_time': datetime.utcnow().isoformat(),
        'finding_details': finding,
        'evidence_sources': []
    }
    
    # Collect relevant logs and data
    resource = finding.get('resource', {})
    
    if resource.get('resourceType') == 'Instance':
        instance_id = resource.get('instanceDetails', {}).get('instanceId')
        if instance_id:
            # In production, create EBS snapshots, collect memory dumps, etc.
            evidence_data['evidence_sources'].append({
                'type': 'ebs_snapshot',
                'instance_id': instance_id,
                'status': 'collected'
            })
    
    # Store evidence metadata
    s3 = boto3.client('s3')
    s3.put_object(
        Bucket='forensic-evidence',
        Key=f"evidence/{finding['id']}/metadata.json",
        Body=json.dumps(evidence_data, indent=2)
    )
    
    return f"Collected forensic evidence for finding {finding['id']}"

def log_action(action, finding):
    """Log response action"""
    
    log_entry = {
        'action': action,
        'finding_id': finding['id'],
        'timestamp': datetime.utcnow().isoformat(),
        'status': 'executed'
    }
    
    print(json.dumps(log_entry))
    return f"Logged action: {action}"

def log_response_actions(finding_id, results):
    """Log all response actions"""
    
    s3 = boto3.client('s3')
    
    log_data = {
        'finding_id': finding_id,
        'response_timestamp': datetime.utcnow().isoformat(),
        'actions': results
    }
    
    s3.put_object(
        Bucket='security-response-logs',
        Key=f"responses/{finding_id}.json",
        Body=json.dumps(log_data, indent=2)
    )

def send_response_summary(finding, results):
    """Send response summary to security team"""
    
    sns = boto3.client('sns')
    
    summary = {
        'finding_id': finding['id'],
        'finding_type': finding.get('type'),
        'severity': finding.get('severity'),
        'actions_taken': len(results),
        'successful_actions': len([r for r in results if r['status'] == 'success']),
        'failed_actions': len([r for r in results if r['status'] == 'failed']),
        'timestamp': datetime.utcnow().isoformat()
    }
    
    sns.publish(
        TopicArn='arn:aws:sns:us-east-1:*:security-response-summary',
        Message=json.dumps(summary, indent=2),
        Subject=f"Security Response Summary: {finding.get('type')}"
    )
'''
    
    # Create orchestration Lambda
    try:
        function_name = 'guardduty-security-orchestration'
        
        self.lambda_client.create_function(
            FunctionName=function_name,
            Runtime='python3.9',
            Role='arn:aws:iam::*:role/lambda-security-orchestration-role',
            Handler='index.lambda_handler',
            Code={'ZipFile': orchestration_code.encode()},
            Description='Security orchestration and automated response',
            Timeout=900,  # 15 minutes
            MemorySize=1024
        )
        print(f"Created security orchestration function: {function_name}")
        
    except self.lambda_client.exceptions.ResourceConflictException:
        self.lambda_client.update_function_code(
            FunctionName=function_name,
            ZipFile=orchestration_code.encode()
        )
        print(f"Updated security orchestration function: {function_name}")
    except Exception as e:
        print(f"Error creating orchestration Lambda: {e}")

EventBridge Integration and Workflow Automation

1. Advanced Event Processing

Set up sophisticated event processing workflows:

def setup_eventbridge_integration(self):
    """Set up EventBridge integration for GuardDuty findings"""
    
    events = boto3.client('events')
    
    # Create custom event bus for security events
    try:
        events.create_event_bus(Name='security-events')
        print("Created security events bus")
    except Exception as e:
        if "already exists" not in str(e):
            print(f"Error creating event bus: {e}")
    
    # Create rules for different finding types
    rules = [
        {
            'Name': 'guardduty-high-severity',
            'EventPattern': {
                'source': ['aws.guardduty'],
                'detail-type': ['GuardDuty Finding'],
                'detail': {
                    'severity': [{'numeric': ['>=', 7.0]}]
                }
            },
            'Targets': [
                {
                    'Id': '1',
                    'Arn': f'arn:aws:lambda:{self.region}:*:function:guardduty-security-orchestration'
                },
                {
                    'Id': '2', 
                    'Arn': f'arn:aws:sns:{self.region}:*:security-alerts'
                }
            ]
        },
        {
            'Name': 'guardduty-cryptocurrency',
            'EventPattern': {
                'source': ['aws.guardduty'],
                'detail-type': ['GuardDuty Finding'],
                'detail': {
                    'type': [{'prefix': 'CryptoCurrency'}]
                }
            },
            'Targets': [
                {
                    'Id': '1',
                    'Arn': f'arn:aws:lambda:{self.region}:*:function:guardduty-crypto-response'
                }
            ]
        },
        {
            'Name': 'guardduty-exfiltration',
            'EventPattern': {
                'source': ['aws.guardduty'],
                'detail-type': ['GuardDuty Finding'],
                'detail': {
                    'type': [{'prefix': 'Exfiltration'}]
                }
            },
            'Targets': [
                {
                    'Id': '1',
                    'Arn': f'arn:aws:lambda:{self.region}:*:function:guardduty-exfiltration-response'
                }
            ]
        }
    ]
    
    # Create rules
    for rule in rules:
        try:
            events.put_rule(
                Name=rule['Name'],
                EventPattern=json.dumps(rule['EventPattern']),
                State='ENABLED',
                Description=f"Process {rule['Name']} GuardDuty findings"
            )
            
            # Add targets
            events.put_targets(
                Rule=rule['Name'],
                Targets=rule['Targets']
            )
            
            print(f"Created EventBridge rule: {rule['Name']}")
            
        except Exception as e:
            print(f"Error creating rule {rule['Name']}: {e}")

def create_step_functions_workflow(self):
    """Create Step Functions workflow for complex security investigations"""
    
    stepfunctions = boto3.client('stepfunctions')
    
    workflow_definition = {
        "Comment": "Security Investigation Workflow",
        "StartAt": "AnalyzeFinding",
        "States": {
            "AnalyzeFinding": {
                "Type": "Task",
                "Resource": f"arn:aws:lambda:{self.region}:*:function:guardduty-custom-analysis",
                "Next": "EvaluateRisk"
            },
            "EvaluateRisk": {
                "Type": "Choice",
                "Choices": [
                    {
                        "Variable": "$.risk_score",
                        "NumericGreaterThan": 8.0,
                        "Next": "HighRiskResponse"
                    },
                    {
                        "Variable": "$.risk_score",
                        "NumericGreaterThan": 5.0,
                        "Next": "MediumRiskResponse"  
                    }
                ],
                "Default": "LowRiskResponse"
            },
            "HighRiskResponse": {
                "Type": "Parallel",
                "Branches": [
                    {
                        "StartAt": "IsolateResources",
                        "States": {
                            "IsolateResources": {
                                "Type": "Task",
                                "Resource": f"arn:aws:lambda:{self.region}:*:function:isolate-resources",
                                "End": True
                            }
                        }
                    },
                    {
                        "StartAt": "CollectEvidence",
                        "States": {
                            "CollectEvidence": {
                                "Type": "Task",
                                "Resource": f"arn:aws:lambda:{self.region}:*:function:collect-evidence", 
                                "End": True
                            }
                        }
                    },
                    {
                        "StartAt": "NotifySecurityTeam",
                        "States": {
                            "NotifySecurityTeam": {
                                "Type": "Task",
                                "Resource": f"arn:aws:sns:{self.region}:*:security-alerts",
                                "End": True
                            }
                        }
                    }
                ],
                "Next": "GenerateReport"
            },
            "MediumRiskResponse": {
                "Type": "Task",
                "Resource": f"arn:aws:lambda:{self.region}:*:function:medium-risk-actions",
                "Next": "GenerateReport"
            },
            "LowRiskResponse": {
                "Type": "Task", 
                "Resource": f"arn:aws:lambda:{self.region}:*:function:log-finding",
                "Next": "GenerateReport"
            },
            "GenerateReport": {
                "Type": "Task",
                "Resource": f"arn:aws:lambda:{self.region}:*:function:generate-investigation-report",
                "End": True
            }
        }
    }
    
    try:
        response = stepfunctions.create_state_machine(
            name='security-investigation-workflow',
            definition=json.dumps(workflow_definition),
            roleArn=f'arn:aws:iam::*:role/stepfunctions-security-workflow-role'
        )
        
        print(f"Created Step Functions workflow: {response['stateMachineArn']}")
        
    except Exception as e:
        print(f"Error creating Step Functions workflow: {e}")

Monitoring and Metrics

1. Custom Metrics and Dashboards

Create comprehensive monitoring for your custom GuardDuty setup:

def create_monitoring_dashboard(self):
    """Create CloudWatch dashboard for GuardDuty monitoring"""
    
    cloudwatch = boto3.client('cloudwatch')
    
    dashboard_body = {
        "widgets": [
            {
                "type": "metric",
                "x": 0,
                "y": 0,
                "width": 12,
                "height": 6,
                "properties": {
                    "metrics": [
                        ["AWS/GuardDuty", "FindingCount", "DetectorId", self.detector_id]
                    ],
                    "period": 300,
                    "stat": "Sum",
                    "region": self.region,
                    "title": "GuardDuty Findings",
                    "yAxis": {
                        "left": {
                            "min": 0
                        }
                    }
                }
            },
            {
                "type": "metric", 
                "x": 12,
                "y": 0,
                "width": 12,
                "height": 6,
                "properties": {
                    "metrics": [
                        ["Custom/Security", "ThreatIntelligenceHits"],
                        ["Custom/Security", "CustomRuleTriggered"],
                        ["Custom/Security", "AutomatedResponsesExecuted"]
                    ],
                    "period": 300,
                    "stat": "Sum", 
                    "region": self.region,
                    "title": "Custom Security Metrics"
                }
            },
            {
                "type": "log",
                "x": 0,
                "y": 6,
                "width": 24,
                "height": 6,
                "properties": {
                    "query": f"SOURCE '/aws/guardduty/{self.detector_id}' | fields @timestamp, type, severity, title\n| filter severity >= 7.0\n| sort @timestamp desc\n| limit 20",
                    "region": self.region,
                    "title": "High Severity Findings",
                    "view": "table"
                }
            }
        ]
    }
    
    try:
        cloudwatch.put_dashboard(
            DashboardName='GuardDuty-Advanced-Monitoring',
            DashboardBody=json.dumps(dashboard_body)
        )
        print("Created advanced GuardDuty monitoring dashboard")
    except Exception as e:
        print(f"Error creating dashboard: {e}")

def setup_custom_metrics(self):
    """Set up custom metrics for GuardDuty monitoring"""
    
    cloudwatch = boto3.client('cloudwatch')
    
    # Send custom metrics
    metrics = [
        {
            'MetricName': 'ThreatIntelligenceHits',
            'Value': 0,
            'Unit': 'Count',
            'Dimensions': [
                {
                    'Name': 'DetectorId',
                    'Value': self.detector_id
                }
            ]
        },
        {
            'MetricName': 'CustomRuleTriggered',
            'Value': 0,
            'Unit': 'Count'
        },
        {
            'MetricName': 'AutomatedResponsesExecuted',
            'Value': 0,
            'Unit': 'Count'
        }
    ]
    
    try:
        cloudwatch.put_metric_data(
            Namespace='Custom/Security',
            MetricData=metrics
        )
        print("Initialized custom security metrics")
    except Exception as e:
        print(f"Error setting up custom metrics: {e}")

Complete Implementation Script

Here’s a complete script that ties everything together:

#!/usr/bin/env python3

import boto3
import json
import time
import schedule
from datetime import datetime

class ComprehensiveGuardDutyManager:
    def __init__(self, region='us-east-1'):
        self.guardduty_manager = AdvancedGuardDutyManager(region)
        self.region = region
        
    def full_setup(self):
        """Complete setup of advanced GuardDuty system"""
        
        print("🔧 Setting up enhanced GuardDuty configuration...")
        if not self.guardduty_manager.setup_enhanced_guardduty():
            print("❌ Failed to set up GuardDuty")
            return False
        
        print("🔍 Setting up threat intelligence feeds...")
        self.guardduty_manager.setup_threat_intelligence_feeds()
        
        print("📊 Creating custom finding types...")
        self.guardduty_manager.create_custom_finding_types()
        
        print("🧠 Setting up behavioral analysis...")
        self.guardduty_manager.setup_behavioral_analysis()
        
        print("⚡ Creating Lambda functions...")
        self.guardduty_manager.create_analysis_lambda()
        self.guardduty_manager.create_threat_intel_enrichment_lambda()
        self.guardduty_manager.create_security_orchestration_system()
        
        print("🔄 Setting up EventBridge integration...")
        self.guardduty_manager.setup_eventbridge_integration()
        
        print("📈 Creating monitoring dashboard...")
        self.guardduty_manager.create_monitoring_dashboard()
        self.guardduty_manager.setup_custom_metrics()
        
        print("✅ Advanced GuardDuty setup completed!")
        return True
    
    def run_continuous_monitoring(self):
        """Run continuous monitoring and updates"""
        
        print("🚀 Starting continuous GuardDuty monitoring...")
        
        # Schedule regular updates
        schedule.every(6).hours.do(self.update_threat_intelligence)
        schedule.every(1).hour.do(self.analyze_recent_findings)
        schedule.every(12).hours.do(self.optimize_rules)
        
        # Main monitoring loop
        while True:
            try:
                schedule.run_pending()
                time.sleep(300)  # Check every 5 minutes
            except KeyboardInterrupt:
                print("\n🛑 Stopping continuous monitoring...")
                break
            except Exception as e:
                print(f"❌ Error in monitoring loop: {e}")
                time.sleep(60)  # Wait before retrying
    
    def update_threat_intelligence(self):
        """Update threat intelligence feeds"""
        print(f"📡 [{datetime.now()}] Updating threat intelligence feeds...")
        self.guardduty_manager.update_threat_intelligence_feeds()
    
    def analyze_recent_findings(self):
        """Analyze recent findings for patterns"""
        print(f"🔍 [{datetime.now()}] Analyzing recent findings...")
        
        # Get recent findings
        findings = self.guardduty_manager.guardduty.list_findings(
            DetectorId=self.guardduty_manager.detector_id,
            FindingCriteria={
                'Criterion': {
                    'updatedAt': {
                        'Gte': int((datetime.utcnow() - timedelta(hours=1)).timestamp() * 1000)
                    }
                }
            }
        )
        
        if findings['FindingIds']:
            print(f"   Found {len(findings['FindingIds'])} recent findings")
            # Process findings for patterns, trends, etc.
    
    def optimize_rules(self):
        """Optimize detection rules based on performance"""
        print(f"⚙️  [{datetime.now()}] Optimizing detection rules...")
        # Analyze rule performance and adjust thresholds

if __name__ == "__main__":
    import argparse
    
    parser = argparse.ArgumentParser(description='Advanced GuardDuty Management')
    parser.add_argument('--setup', action='store_true', help='Run full setup')
    parser.add_argument('--monitor', action='store_true', help='Run continuous monitoring')
    parser.add_argument('--region', default='us-east-1', help='AWS region')
    
    args = parser.parse_args()
    
    manager = ComprehensiveGuardDutyManager(region=args.region)
    
    if args.setup:
        manager.full_setup()
    
    if args.monitor:
        manager.run_continuous_monitoring()
    
    if not args.setup and not args.monitor:
        print("Use --setup to configure or --monitor to run continuous monitoring")

Beyond DIY GuardDuty: The PathShield Advantage

While building custom GuardDuty rules and threat intelligence integration provides powerful security capabilities, it comes with significant challenges:

Complexity Management: Maintaining custom Lambda functions, threat intelligence feeds, and complex EventBridge workflows requires substantial DevOps expertise and ongoing maintenance.

Threat Intelligence Quality: Gathering, validating, and maintaining high-quality threat intelligence feeds is a full-time job that requires specialized security expertise.

False Positive Management: Custom rules often generate high false positive rates without sophisticated machine learning and contextual analysis.

Coverage Gaps: Building comprehensive security coverage across all AWS services and attack vectors requires deep security expertise and constant updates as new threats emerge.

Operational Overhead: Managing custom security orchestration, incident response workflows, and forensic evidence collection becomes a significant operational burden as your infrastructure scales.

This is where PathShield transforms your approach to AWS security monitoring. Instead of building and maintaining complex custom GuardDuty extensions, PathShield provides:

  • Expert-Built Detection Rules: Security rules created and maintained by AWS security experts, continuously updated for new threats
  • High-Quality Threat Intelligence: Curated threat intelligence from multiple premium sources, automatically integrated and validated
  • Intelligent Alert Correlation: Advanced machine learning that correlates findings across services to reduce false positives and identify sophisticated attacks
  • Automated Response Workflows: Pre-built incident response playbooks that adapt to your environment without custom coding
  • Zero Maintenance Security: Complete security monitoring that scales with your infrastructure without operational overhead

Ready to move beyond DIY security complexity? Start your free PathShield trial and get enterprise-grade AWS security monitoring without the engineering overhead.

Back to Blog

Related Posts

View All Posts »