Zum Inhalt

✅ Input-Validation

Keiko Personal Assistant implementiert umfassende Input-Validation für Enterprise-Sicherheit und Datenintegrität.

🛡️ Validation-Architektur

Mehrschichtige Validierung

graph TB
    subgraph "Client-Side Validation"
        JS[JavaScript Validation]
        TS[TypeScript Type Checking]
    end

    subgraph "API Gateway Validation"
        SCHEMA[Schema Validation]
        RATE[Rate Limiting]
        SIZE[Size Limits]
    end

    subgraph "Application Validation"
        PYDANTIC[Pydantic Models]
        BUSINESS[Business Rules]
        SANITIZE[Data Sanitization]
    end

    subgraph "Database Validation"
        CONSTRAINTS[DB Constraints]
        TRIGGERS[Validation Triggers]
    end

    JS --> SCHEMA
    TS --> RATE
    SCHEMA --> PYDANTIC
    RATE --> BUSINESS
    SIZE --> SANITIZE
    PYDANTIC --> CONSTRAINTS
    BUSINESS --> TRIGGERS

📋 Schema-Validation

Pydantic-Modelle

Agent-Task-Validation

from pydantic import BaseModel, Field, validator
from typing import Optional, Dict, Any, List
from enum import Enum

class TaskPriority(str, Enum):
    LOW = "low"
    NORMAL = "normal"
    HIGH = "high"
    CRITICAL = "critical"

class AgentTaskRequest(BaseModel):
    """Validierung für Agent-Task-Anfragen."""

    task_type: str = Field(
        ...,
        min_length=1,
        max_length=100,
        regex=r"^[a-zA-Z0-9_-]+$",
        description="Task-Typ (alphanumerisch, Unterstriche, Bindestriche)"
    )

    parameters: Dict[str, Any] = Field(
        default_factory=dict,
        description="Task-Parameter"
    )

    priority: TaskPriority = Field(
        default=TaskPriority.NORMAL,
        description="Task-Priorität"
    )

    timeout_seconds: Optional[int] = Field(
        default=300,
        ge=1,
        le=3600,
        description="Timeout in Sekunden (1-3600)"
    )

    metadata: Optional[Dict[str, str]] = Field(
        default_factory=dict,
        description="Zusätzliche Metadaten"
    )

    @validator('parameters')
    def validate_parameters(cls, v):
        """Validiert Task-Parameter."""
        if not isinstance(v, dict):
            raise ValueError("Parameters müssen ein Dictionary sein")

        # Maximale Verschachtelungstiefe prüfen
        if cls._get_dict_depth(v) > 5:
            raise ValueError("Parameter-Verschachtelung zu tief (max. 5 Ebenen)")

        # Gefährliche Schlüssel prüfen
        dangerous_keys = ['__class__', '__module__', 'eval', 'exec']
        if any(key in str(v) for key in dangerous_keys):
            raise ValueError("Gefährliche Parameter-Schlüssel erkannt")

        return v

    @validator('metadata')
    def validate_metadata(cls, v):
        """Validiert Metadaten."""
        if v is None:
            return {}

        # Maximale Anzahl Metadaten-Felder
        if len(v) > 20:
            raise ValueError("Zu viele Metadaten-Felder (max. 20)")

        # Schlüssel- und Wert-Längen prüfen
        for key, value in v.items():
            if len(key) > 50:
                raise ValueError(f"Metadaten-Schlüssel zu lang: {key}")
            if len(str(value)) > 500:
                raise ValueError(f"Metadaten-Wert zu lang für Schlüssel: {key}")

        return v

    @staticmethod
    def _get_dict_depth(d: Dict, depth: int = 0) -> int:
        """Berechnet die Verschachtelungstiefe eines Dictionaries."""
        if not isinstance(d, dict) or not d:
            return depth
        return max(
            AgentTaskRequest._get_dict_depth(v, depth + 1)
            for v in d.values()
            if isinstance(v, dict)
        ) if any(isinstance(v, dict) for v in d.values()) else depth

MCP-Server-Registration

from pydantic import BaseModel, Field, validator, HttpUrl
from typing import Optional, Dict

class MCPServerRegistration(BaseModel):
    """Validierung für MCP-Server-Registrierung."""

    server_name: str = Field(
        ...,
        min_length=3,
        max_length=50,
        regex=r"^[a-zA-Z0-9][a-zA-Z0-9_-]*[a-zA-Z0-9]$",
        description="Server-Name (3-50 Zeichen, alphanumerisch)"
    )

    base_url: HttpUrl = Field(
        ...,
        description="Basis-URL des MCP-Servers"
    )

    timeout_seconds: float = Field(
        default=30.0,
        ge=1.0,
        le=300.0,
        description="Timeout in Sekunden (1-300)"
    )

    description: Optional[str] = Field(
        default="",
        max_length=500,
        description="Server-Beschreibung (max. 500 Zeichen)"
    )

    authentication: Optional[Dict[str, str]] = Field(
        default_factory=dict,
        description="Authentifizierungsinformationen"
    )

    @validator('base_url')
    def validate_base_url(cls, v):
        """Validiert die Basis-URL."""
        url_str = str(v)

        # Nur HTTPS in Produktion
        if not url_str.startswith(('http://', 'https://')):
            raise ValueError("URL muss mit http:// oder https:// beginnen")

        # Localhost nur in Entwicklung
        if 'localhost' in url_str or '127.0.0.1' in url_str:
            import os
            if os.getenv('ENVIRONMENT') == 'production':
                raise ValueError("Localhost-URLs in Produktion nicht erlaubt")

        return v

    @validator('authentication')
    def validate_authentication(cls, v):
        """Validiert Authentifizierungsinformationen."""
        if not v:
            return v

        allowed_types = ['api_key', 'bearer_token', 'basic_auth']
        auth_type = v.get('type')

        if auth_type not in allowed_types:
            raise ValueError(f"Authentifizierungstyp muss einer von {allowed_types} sein")

        # Spezifische Validierung je nach Typ
        if auth_type == 'api_key' and 'api_key' not in v:
            raise ValueError("API-Key erforderlich für api_key Authentifizierung")

        if auth_type == 'bearer_token' and 'token' not in v:
            raise ValueError("Token erforderlich für bearer_token Authentifizierung")

        if auth_type == 'basic_auth':
            if 'username' not in v or 'password' not in v:
                raise ValueError("Username und Password erforderlich für basic_auth")

        return v

🔒 Sicherheits-Validation

SQL-Injection-Schutz

import re
from typing import Any

class SecurityValidator:
    """Sicherheits-Validierungen für Eingaben."""

    # Gefährliche SQL-Patterns
    SQL_INJECTION_PATTERNS = [
        r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC|UNION)\b)",
        r"(--|#|/\*|\*/)",
        r"(\b(OR|AND)\s+\d+\s*=\s*\d+)",
        r"(\b(OR|AND)\s+['\"].*['\"])",
        r"(;|\|\||&&)"
    ]

    # XSS-Patterns
    XSS_PATTERNS = [
        r"<script[^>]*>.*?</script>",
        r"javascript:",
        r"on\w+\s*=",
        r"<iframe[^>]*>",
        r"<object[^>]*>",
        r"<embed[^>]*>"
    ]

    @classmethod
    def validate_sql_injection(cls, value: str) -> bool:
        """Prüft auf SQL-Injection-Versuche."""
        if not isinstance(value, str):
            return True

        value_lower = value.lower()

        for pattern in cls.SQL_INJECTION_PATTERNS:
            if re.search(pattern, value_lower, re.IGNORECASE):
                return False

        return True

    @classmethod
    def validate_xss(cls, value: str) -> bool:
        """Prüft auf XSS-Versuche."""
        if not isinstance(value, str):
            return True

        for pattern in cls.XSS_PATTERNS:
            if re.search(pattern, value, re.IGNORECASE):
                return False

        return True

    @classmethod
    def sanitize_string(cls, value: str, max_length: int = 1000) -> str:
        """Bereinigt und begrenzt String-Eingaben."""
        if not isinstance(value, str):
            return str(value)

        # Länge begrenzen
        value = value[:max_length]

        # Gefährliche Zeichen entfernen
        value = re.sub(r'[<>"\']', '', value)

        # Mehrfache Leerzeichen normalisieren
        value = re.sub(r'\s+', ' ', value)

        # Führende/nachfolgende Leerzeichen entfernen
        value = value.strip()

        return value

Custom Validators

from pydantic import validator

def secure_string_validator(field_name: str, max_length: int = 1000):
    """Erstellt einen sicheren String-Validator."""

    def validator_func(cls, v):
        if v is None:
            return v

        if not isinstance(v, str):
            raise ValueError(f"{field_name} muss ein String sein")

        # Sicherheitsprüfungen
        if not SecurityValidator.validate_sql_injection(v):
            raise ValueError(f"{field_name} enthält potentiell gefährliche SQL-Patterns")

        if not SecurityValidator.validate_xss(v):
            raise ValueError(f"{field_name} enthält potentiell gefährliche XSS-Patterns")

        # Längenprüfung
        if len(v) > max_length:
            raise ValueError(f"{field_name} zu lang (max. {max_length} Zeichen)")

        # Bereinigung
        return SecurityValidator.sanitize_string(v, max_length)

    return validator(field_name, allow_reuse=True)(validator_func)

📊 Rate Limiting & Size Limits

Request-Limits

from fastapi import HTTPException, Request
from typing import Dict
import time

class RateLimiter:
    """Rate-Limiting für API-Endpunkte."""

    def __init__(self):
        self.requests: Dict[str, list] = {}
        self.limits = {
            "default": {"requests": 100, "window": 60},  # 100 req/min
            "auth": {"requests": 10, "window": 60},      # 10 req/min
            "tasks": {"requests": 20, "window": 60},     # 20 req/min
            "upload": {"requests": 5, "window": 60}      # 5 req/min
        }

    def check_rate_limit(self, client_ip: str, endpoint_type: str = "default") -> bool:
        """Prüft Rate-Limit für Client."""
        now = time.time()
        limit_config = self.limits.get(endpoint_type, self.limits["default"])

        # Client-Requests initialisieren
        if client_ip not in self.requests:
            self.requests[client_ip] = []

        # Alte Requests entfernen
        window_start = now - limit_config["window"]
        self.requests[client_ip] = [
            req_time for req_time in self.requests[client_ip]
            if req_time > window_start
        ]

        # Limit prüfen
        if len(self.requests[client_ip]) >= limit_config["requests"]:
            return False

        # Request hinzufügen
        self.requests[client_ip].append(now)
        return True

# FastAPI Dependency
async def rate_limit_dependency(request: Request, endpoint_type: str = "default"):
    """Rate-Limiting Dependency für FastAPI."""
    client_ip = request.client.host
    rate_limiter = RateLimiter()

    if not rate_limiter.check_rate_limit(client_ip, endpoint_type):
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded",
            headers={"Retry-After": "60"}
        )

Content-Size-Limits

from fastapi import HTTPException, Request
import json

class ContentSizeValidator:
    """Validiert Content-Größen."""

    MAX_SIZES = {
        "json": 1024 * 1024,      # 1 MB
        "text": 512 * 1024,       # 512 KB
        "file": 10 * 1024 * 1024, # 10 MB
        "image": 5 * 1024 * 1024   # 5 MB
    }

    @classmethod
    async def validate_request_size(cls, request: Request, content_type: str = "json"):
        """Validiert Request-Größe."""
        max_size = cls.MAX_SIZES.get(content_type, cls.MAX_SIZES["json"])

        # Content-Length Header prüfen
        content_length = request.headers.get("content-length")
        if content_length and int(content_length) > max_size:
            raise HTTPException(
                status_code=413,
                detail=f"Request zu groß (max. {max_size} bytes)"
            )

        # Body-Größe prüfen
        body = await request.body()
        if len(body) > max_size:
            raise HTTPException(
                status_code=413,
                detail=f"Request-Body zu groß (max. {max_size} bytes)"
            )

        return body

🔍 Validation-Middleware

FastAPI-Middleware

from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.base import BaseHTTPMiddleware
import json

class ValidationMiddleware(BaseHTTPMiddleware):
    """Middleware für umfassende Input-Validation."""

    async def dispatch(self, request: Request, call_next):
        # Content-Type validieren
        content_type = request.headers.get("content-type", "")

        if request.method in ["POST", "PUT", "PATCH"]:
            # JSON-Content validieren
            if "application/json" in content_type:
                try:
                    body = await request.body()
                    if body:
                        # JSON-Parsing testen
                        json.loads(body)

                        # Größe prüfen
                        await ContentSizeValidator.validate_request_size(
                            request, "json"
                        )
                except json.JSONDecodeError:
                    raise HTTPException(
                        status_code=400,
                        detail="Ungültiges JSON-Format"
                    )
                except Exception as e:
                    raise HTTPException(
                        status_code=400,
                        detail=f"Validation-Fehler: {str(e)}"
                    )

        # Request weiterleiten
        response = await call_next(request)
        return response

# Middleware registrieren
app = FastAPI()
app.add_middleware(ValidationMiddleware)

📋 Validation-Checkliste

Input-Validation

  • Schema-Validation mit Pydantic implementiert
  • SQL-Injection-Schutz aktiviert
  • XSS-Schutz implementiert
  • Rate-Limiting konfiguriert
  • Content-Size-Limits gesetzt
  • Content-Type-Validation aktiviert
  • Encoding-Validation implementiert
  • File-Upload-Validation konfiguriert

Sicherheits-Validation

  • Gefährliche Patterns erkannt und blockiert
  • Input-Sanitization implementiert
  • Output-Encoding aktiviert
  • CSRF-Schutz implementiert
  • Path-Traversal-Schutz aktiviert
  • Command-Injection-Schutz implementiert

Performance-Validation

  • Request-Timeouts konfiguriert
  • Memory-Limits gesetzt
  • CPU-Limits definiert
  • Concurrent-Request-Limits implementiert

Sicherheitshinweis

Input-Validation ist nur eine Schicht der Sicherheit. Kombinieren Sie sie mit anderen Sicherheitsmaßnahmen wie Authentifizierung, Autorisierung und Monitoring.

Performance-Tipp

Implementieren Sie Validation-Caching für häufig validierte Patterns, um die Performance zu verbessern.