🔒 Security Troubleshooting¶
Leitfaden zur Diagnose und Behebung von Sicherheitsproblemen in Keiko Personal Assistant.
🔐 Authentication Issues¶
JWT Token Problems¶
Problem: JWT-Token werden nicht akzeptiert oder sind ungültig.
Diagnose:
# Token-Validierung testen
curl -H "Authorization: Bearer $TOKEN" http://localhost:8000/api/v1/auth/validate
# Token-Details dekodieren (ohne Verifikation)
echo $TOKEN | cut -d. -f2 | base64 -d | jq
# Token-Expiration prüfen
python3 -c "
import jwt
import json
token = '$TOKEN'
decoded = jwt.decode(token, options={'verify_signature': False})
print(json.dumps(decoded, indent=2))
"
Häufige Ursachen & Lösungen:
Abgelaufene Tokens
# auth/token_validator.py import jwt from datetime import datetime, timezone def validate_token_expiration(token: str) -> bool: """Prüft Token-Expiration.""" try: decoded = jwt.decode(token, options={"verify_signature": False}) exp = decoded.get('exp') if exp: exp_datetime = datetime.fromtimestamp(exp, tz=timezone.utc) now = datetime.now(timezone.utc) if exp_datetime < now: logger.warning(f"Token expired at {exp_datetime}, current time: {now}") return False return True except Exception as e: logger.error(f"Token validation error: {e}") return False # Token-Refresh implementieren async def refresh_token_if_needed(token: str) -> str: """Refresht Token wenn nötig.""" if not validate_token_expiration(token): # Token refresh refresh_token = get_refresh_token() new_token = await auth_service.refresh_access_token(refresh_token) return new_token return token
Falsche Signatur
# config/jwt_config.py import os from cryptography.hazmat.primitives import serialization # JWT-Konfiguration validieren JWT_SECRET = os.getenv('JWT_SECRET') JWT_ALGORITHM = os.getenv('JWT_ALGORITHM', 'HS256') if not JWT_SECRET: raise ValueError("JWT_SECRET environment variable required") # Für RS256/ES256 - Public/Private Key validieren if JWT_ALGORITHM.startswith('RS') or JWT_ALGORITHM.startswith('ES'): private_key_path = os.getenv('JWT_PRIVATE_KEY_PATH') public_key_path = os.getenv('JWT_PUBLIC_KEY_PATH') if not private_key_path or not public_key_path: raise ValueError("Private/Public key paths required for asymmetric algorithms") # Keys laden und validieren with open(private_key_path, 'rb') as f: private_key = serialization.load_pem_private_key(f.read(), password=None) with open(public_key_path, 'rb') as f: public_key = serialization.load_pem_public_key(f.read())
Token-Format-Probleme
# auth/token_extractor.py import re from fastapi import HTTPException, Request def extract_bearer_token(request: Request) -> str: """Extrahiert Bearer-Token aus Request.""" auth_header = request.headers.get('Authorization') if not auth_header: raise HTTPException(401, "Authorization header missing") # Bearer-Token-Format validieren bearer_pattern = r'^Bearer\s+([A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+)$' match = re.match(bearer_pattern, auth_header) if not match: raise HTTPException(401, "Invalid Authorization header format") return match.group(1) # Alternative Token-Quellen def extract_token_from_multiple_sources(request: Request) -> str: """Extrahiert Token aus verschiedenen Quellen.""" # 1. Authorization Header auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): return auth_header[7:] # 2. Query Parameter (für WebSocket) token = request.query_params.get('token') if token: return token # 3. Cookie (falls konfiguriert) token = request.cookies.get('access_token') if token: return token raise HTTPException(401, "No valid token found")
Session Management Issues¶
Problem: Session-Probleme bei Multi-Instance-Deployment.
Diagnose:
# Redis-Session-Keys prüfen
redis-cli keys "session:*"
# Session-Details anzeigen
redis-cli hgetall "session:user123"
# Session-Expiration prüfen
redis-cli ttl "session:user123"
Lösungen:
Sticky Sessions konfigurieren
Shared Session Storage
# session/redis_session_store.py import redis.asyncio as redis import json from typing import Dict, Any, Optional class RedisSessionStore: """Redis-basierter Session-Store.""" def __init__(self, redis_url: str, session_ttl: int = 3600): self.redis = redis.from_url(redis_url) self.session_ttl = session_ttl async def create_session(self, user_id: str, session_data: Dict[str, Any]) -> str: """Erstellt neue Session.""" session_id = generate_session_id() session_key = f"session:{session_id}" session_data.update({ 'user_id': user_id, 'created_at': time.time(), 'last_accessed': time.time() }) await self.redis.hset(session_key, mapping={ k: json.dumps(v) if not isinstance(v, str) else v for k, v in session_data.items() }) await self.redis.expire(session_key, self.session_ttl) return session_id async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """Ruft Session-Daten ab.""" session_key = f"session:{session_id}" session_data = await self.redis.hgetall(session_key) if not session_data: return None # Last-accessed aktualisieren await self.redis.hset(session_key, 'last_accessed', time.time()) await self.redis.expire(session_key, self.session_ttl) # JSON-Daten dekodieren decoded_data = {} for key, value in session_data.items(): try: decoded_data[key] = json.loads(value) except (json.JSONDecodeError, TypeError): decoded_data[key] = value return decoded_data async def delete_session(self, session_id: str) -> bool: """Löscht Session.""" session_key = f"session:{session_id}" deleted = await self.redis.delete(session_key) return deleted > 0
🛡️ Authorization Problems¶
RBAC Permission Issues¶
Problem: Benutzer haben nicht die erwarteten Berechtigungen.
Diagnose:
# debug/permission_debugger.py
async def debug_user_permissions(user_id: str) -> Dict[str, Any]:
"""Debuggt Benutzer-Berechtigungen."""
user = await user_service.get_user(user_id)
if not user:
return {"error": "User not found"}
# Rollen abrufen
user_roles = await rbac_service.get_user_roles(user_id)
# Berechtigungen für jede Rolle
role_permissions = {}
for role in user_roles:
permissions = await rbac_service.get_role_permissions(role.name)
role_permissions[role.name] = [p.name for p in permissions]
# Effektive Berechtigungen
effective_permissions = await rbac_service.get_user_permissions(user_id)
return {
"user_id": user_id,
"username": user.username,
"roles": [r.name for r in user_roles],
"role_permissions": role_permissions,
"effective_permissions": [p.name for p in effective_permissions],
"permission_count": len(effective_permissions)
}
# Permission-Check mit Debugging
async def check_permission_with_debug(user_id: str, permission: str) -> Dict[str, Any]:
"""Prüft Berechtigung mit Debug-Informationen."""
has_permission = await rbac_service.check_permission(user_id, permission)
debug_info = await debug_user_permissions(user_id)
debug_info.update({
"requested_permission": permission,
"has_permission": has_permission,
"permission_source": None
})
# Quelle der Berechtigung finden
if has_permission:
for role, permissions in debug_info["role_permissions"].items():
if permission in permissions:
debug_info["permission_source"] = role
break
return debug_info
Lösungen:
Permission-Inheritance prüfen
# rbac/permission_resolver.py class PermissionResolver: """Löst Berechtigungen mit Inheritance auf.""" async def resolve_user_permissions(self, user_id: str) -> Set[str]: """Löst alle Benutzer-Berechtigungen auf.""" permissions = set() # Direkte Benutzer-Berechtigungen user_permissions = await self.get_direct_user_permissions(user_id) permissions.update(p.name for p in user_permissions) # Rollen-basierte Berechtigungen user_roles = await self.get_user_roles(user_id) for role in user_roles: role_permissions = await self.resolve_role_permissions(role.name) permissions.update(role_permissions) return permissions async def resolve_role_permissions(self, role_name: str) -> Set[str]: """Löst Rollen-Berechtigungen mit Inheritance auf.""" permissions = set() visited_roles = set() await self._resolve_role_permissions_recursive(role_name, permissions, visited_roles) return permissions async def _resolve_role_permissions_recursive( self, role_name: str, permissions: Set[str], visited_roles: Set[str] ): """Rekursive Auflösung von Rollen-Inheritance.""" if role_name in visited_roles: return # Zirkuläre Abhängigkeit vermeiden visited_roles.add(role_name) # Direkte Rollen-Berechtigungen role_permissions = await self.get_direct_role_permissions(role_name) permissions.update(p.name for p in role_permissions) # Parent-Rollen parent_roles = await self.get_parent_roles(role_name) for parent_role in parent_roles: await self._resolve_role_permissions_recursive( parent_role.name, permissions, visited_roles )
Permission-Caching optimieren
# rbac/permission_cache.py import asyncio from typing import Set, Optional class PermissionCache: """Cache für Berechtigungen.""" def __init__(self, redis_client, cache_ttl: int = 300): self.redis = redis_client self.cache_ttl = cache_ttl self.local_cache: Dict[str, Set[str]] = {} self.cache_timestamps: Dict[str, float] = {} async def get_user_permissions(self, user_id: str) -> Optional[Set[str]]: """Ruft Benutzer-Berechtigungen aus Cache ab.""" # L1: Local Cache if self._is_local_cache_valid(user_id): return self.local_cache[user_id] # L2: Redis Cache cache_key = f"permissions:user:{user_id}" cached_permissions = await self.redis.smembers(cache_key) if cached_permissions: permissions = set(cached_permissions) self._update_local_cache(user_id, permissions) return permissions return None async def cache_user_permissions(self, user_id: str, permissions: Set[str]): """Cached Benutzer-Berechtigungen.""" # L1: Local Cache self._update_local_cache(user_id, permissions) # L2: Redis Cache cache_key = f"permissions:user:{user_id}" if permissions: await self.redis.sadd(cache_key, *permissions) await self.redis.expire(cache_key, self.cache_ttl) async def invalidate_user_permissions(self, user_id: str): """Invalidiert Benutzer-Berechtigungen-Cache.""" # Local Cache self.local_cache.pop(user_id, None) self.cache_timestamps.pop(user_id, None) # Redis Cache cache_key = f"permissions:user:{user_id}" await self.redis.delete(cache_key) def _is_local_cache_valid(self, user_id: str) -> bool: """Prüft ob Local-Cache gültig ist.""" if user_id not in self.local_cache: return False timestamp = self.cache_timestamps.get(user_id, 0) return time.time() - timestamp < self.cache_ttl def _update_local_cache(self, user_id: str, permissions: Set[str]): """Aktualisiert Local-Cache.""" self.local_cache[user_id] = permissions self.cache_timestamps[user_id] = time.time()
🔍 Security Audit & Monitoring¶
Security Event Detection¶
# security/event_detector.py
from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum
class SecurityEventType(Enum):
FAILED_LOGIN = "failed_login"
SUSPICIOUS_ACTIVITY = "suspicious_activity"
PRIVILEGE_ESCALATION = "privilege_escalation"
DATA_ACCESS_VIOLATION = "data_access_violation"
RATE_LIMIT_EXCEEDED = "rate_limit_exceeded"
@dataclass
class SecurityEvent:
event_type: SecurityEventType
user_id: Optional[str]
ip_address: str
user_agent: str
timestamp: datetime
details: Dict[str, Any]
severity: str # low, medium, high, critical
class SecurityEventDetector:
"""Erkennt Security-Events."""
def __init__(self):
self.failed_login_attempts: Dict[str, List[datetime]] = {}
self.suspicious_ips: Set[str] = set()
async def detect_failed_login_pattern(
self,
user_id: str,
ip_address: str,
user_agent: str
) -> Optional[SecurityEvent]:
"""Erkennt Failed-Login-Patterns."""
now = datetime.utcnow()
key = f"{user_id}:{ip_address}"
# Failed-Login-Attempts tracken
if key not in self.failed_login_attempts:
self.failed_login_attempts[key] = []
self.failed_login_attempts[key].append(now)
# Alte Attempts entfernen (älter als 1 Stunde)
cutoff = now - timedelta(hours=1)
self.failed_login_attempts[key] = [
attempt for attempt in self.failed_login_attempts[key]
if attempt > cutoff
]
# Threshold prüfen
if len(self.failed_login_attempts[key]) >= 5:
return SecurityEvent(
event_type=SecurityEventType.FAILED_LOGIN,
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent,
timestamp=now,
details={
"failed_attempts": len(self.failed_login_attempts[key]),
"time_window": "1 hour"
},
severity="high"
)
return None
async def detect_privilege_escalation(
self,
user_id: str,
old_permissions: Set[str],
new_permissions: Set[str],
ip_address: str
) -> Optional[SecurityEvent]:
"""Erkennt Privilege-Escalation."""
added_permissions = new_permissions - old_permissions
# Kritische Berechtigungen prüfen
critical_permissions = {
"system:admin", "users:delete", "agents:delete",
"system:config", "security:manage"
}
critical_added = added_permissions & critical_permissions
if critical_added:
return SecurityEvent(
event_type=SecurityEventType.PRIVILEGE_ESCALATION,
user_id=user_id,
ip_address=ip_address,
user_agent="",
timestamp=datetime.utcnow(),
details={
"added_permissions": list(added_permissions),
"critical_permissions": list(critical_added)
},
severity="critical"
)
return None
async def detect_suspicious_activity(
self,
user_id: str,
action: str,
resource: str,
ip_address: str,
user_agent: str
) -> Optional[SecurityEvent]:
"""Erkennt verdächtige Aktivitäten."""
# Ungewöhnliche IP-Adressen
user_ips = await self.get_user_ip_history(user_id)
if ip_address not in user_ips and len(user_ips) > 0:
# Geolocation-Check
user_country = await self.get_ip_country(user_ips[0])
current_country = await self.get_ip_country(ip_address)
if user_country != current_country:
return SecurityEvent(
event_type=SecurityEventType.SUSPICIOUS_ACTIVITY,
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent,
timestamp=datetime.utcnow(),
details={
"reason": "unusual_location",
"user_country": user_country,
"current_country": current_country,
"action": action,
"resource": resource
},
severity="medium"
)
# Ungewöhnliche Zeiten
now = datetime.utcnow()
if now.hour < 6 or now.hour > 22: # Außerhalb Geschäftszeiten
return SecurityEvent(
event_type=SecurityEventType.SUSPICIOUS_ACTIVITY,
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent,
timestamp=now,
details={
"reason": "unusual_time",
"hour": now.hour,
"action": action,
"resource": resource
},
severity="low"
)
return None
# Security-Event-Handler
class SecurityEventHandler:
"""Behandelt Security-Events."""
async def handle_security_event(self, event: SecurityEvent):
"""Behandelt Security-Event."""
# Event protokollieren
await self.log_security_event(event)
# Automatische Reaktionen
if event.severity == "critical":
await self.handle_critical_event(event)
elif event.severity == "high":
await self.handle_high_severity_event(event)
# Benachrichtigungen senden
await self.send_security_notification(event)
async def handle_critical_event(self, event: SecurityEvent):
"""Behandelt kritische Security-Events."""
if event.event_type == SecurityEventType.PRIVILEGE_ESCALATION:
# Benutzer temporär sperren
await self.temporarily_lock_user(event.user_id, duration_minutes=30)
# Admin-Benachrichtigung
await self.notify_security_team(event)
elif event.event_type == SecurityEventType.DATA_ACCESS_VIOLATION:
# Zugriff protokollieren und blockieren
await self.block_ip_address(event.ip_address, duration_hours=24)
async def handle_high_severity_event(self, event: SecurityEvent):
"""Behandelt High-Severity-Events."""
if event.event_type == SecurityEventType.FAILED_LOGIN:
# IP-Adresse temporär blockieren
await self.rate_limit_ip(event.ip_address, duration_minutes=15)
# Benutzer über verdächtige Aktivität informieren
await self.notify_user_suspicious_activity(event.user_id, event)
Security Compliance Monitoring¶
# security/compliance_monitor.py
class ComplianceMonitor:
"""Überwacht Security-Compliance."""
async def check_gdpr_compliance(self) -> Dict[str, bool]:
"""Prüft GDPR-Compliance."""
checks = {
"data_encryption": await self.check_data_encryption(),
"access_logging": await self.check_access_logging(),
"data_retention": await self.check_data_retention_policy(),
"user_consent": await self.check_user_consent_tracking(),
"data_portability": await self.check_data_portability_support(),
"right_to_erasure": await self.check_erasure_capability()
}
return checks
async def check_security_headers(self) -> Dict[str, bool]:
"""Prüft Security-Headers."""
# Test-Request an API
async with aiohttp.ClientSession() as session:
async with session.get("http://localhost:8000/health") as response:
headers = response.headers
return {
"strict_transport_security": "Strict-Transport-Security" in headers,
"content_security_policy": "Content-Security-Policy" in headers,
"x_frame_options": "X-Frame-Options" in headers,
"x_content_type_options": "X-Content-Type-Options" in headers,
"x_xss_protection": "X-XSS-Protection" in headers,
"referrer_policy": "Referrer-Policy" in headers
}
async def check_password_policy_compliance(self) -> Dict[str, Any]:
"""Prüft Password-Policy-Compliance."""
# Schwache Passwörter in Database prüfen
weak_passwords = await self.find_weak_passwords()
# Password-Policy-Einstellungen prüfen
policy = await self.get_password_policy()
return {
"min_length_enforced": policy.min_length >= 8,
"complexity_enforced": policy.require_special_chars and policy.require_numbers,
"weak_passwords_count": len(weak_passwords),
"password_expiry_enabled": policy.max_age_days is not None,
"password_history_enabled": policy.history_count > 0
}
Security-Alerts
Bei kritischen Security-Events: - Sofortige Benachrichtigung des Security-Teams - Automatische Sperrung betroffener Accounts - Detaillierte Forensik-Logs erstellen - Incident-Response-Prozess aktivieren
Security-Best-Practices
- Implementieren Sie umfassende Security-Event-Detection
- Nutzen Sie Multi-Factor-Authentication für privilegierte Accounts
- Überwachen Sie kontinuierlich Compliance-Status
- Führen Sie regelmäßige Security-Audits durch
- Dokumentieren Sie alle Security-Incidents