Zum Inhalt

🐛 Debugging Guide

Umfassender Leitfaden für das Debugging von Keiko Personal Assistant.

🔍 Debug-Modus aktivieren

Development-Debug-Modus

# Environment-Variable setzen
export KEIKO_DEBUG=true
export KEIKO_LOG_LEVEL=DEBUG

# Debug-Konfiguration
cat > config/debug.yml << EOF
debug:
  enabled: true
  log_level: DEBUG
  profiling: true
  trace_requests: true
  detailed_errors: true

logging:
  level: DEBUG
  format: detailed
  include_traceback: true
  log_sql_queries: true
EOF

# Application mit Debug-Modus starten
uvicorn main:app --reload --log-level debug

Production-Debug-Modus

# Temporärer Debug-Modus (nur für spezifische Session)
export KEIKO_TEMP_DEBUG=true

# Debug-Endpunkt aktivieren
curl -X POST http://localhost:8000/debug/enable \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  -d '{"duration_minutes": 30}'

📊 Logging & Tracing

Strukturiertes Logging

# logging_config.py
from kei_agent import get_logger

logger = get_logger("debug")
logger.info(
    "Agent-Task gestartet",
    agent_id="agent-123",
    task_id="task-456",
    task_type="text_processing",
    user_id="user-789",
)

Request-Tracing

# middleware/tracing.py
import uuid
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware

class TracingMiddleware(BaseHTTPMiddleware):
    """Middleware für Request-Tracing."""

    async def dispatch(self, request: Request, call_next):
        # Correlation-ID generieren
        correlation_id = str(uuid.uuid4())
        request.state.correlation_id = correlation_id

        # Request-Start protokollieren
        logger.info(
            "Request gestartet",
            method=request.method,
            url=str(request.url),
            correlation_id=correlation_id,
            user_agent=request.headers.get("user-agent"),
            client_ip=request.client.host
        )

        start_time = time.time()

        try:
            response = await call_next(request)

            # Request-Ende protokollieren
            duration = time.time() - start_time
            logger.info(
                "Request abgeschlossen",
                correlation_id=correlation_id,
                status_code=response.status_code,
                duration_seconds=duration
            )

            # Correlation-ID in Response-Header
            response.headers["X-Correlation-ID"] = correlation_id

            return response

        except Exception as e:
            duration = time.time() - start_time
            logger.error(
                "Request fehlgeschlagen",
                correlation_id=correlation_id,
                error_type=type(e).__name__,
                error_message=str(e),
                duration_seconds=duration,
                exc_info=True
            )
            raise

# Middleware registrieren
app.add_middleware(TracingMiddleware)

Distributed Tracing

# tracing/jaeger_config.py
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

def configure_tracing():
    """Konfiguriert Jaeger-Tracing."""

    # Tracer-Provider konfigurieren
    trace.set_tracer_provider(TracerProvider())
    tracer = trace.get_tracer(__name__)

    # Jaeger-Exporter konfigurieren
    jaeger_exporter = JaegerExporter(
        agent_host_name="localhost",
        agent_port=6831,
    )

    # Span-Processor hinzufügen
    span_processor = BatchSpanProcessor(jaeger_exporter)
    trace.get_tracer_provider().add_span_processor(span_processor)

    return tracer

# Tracing in Agent-Execution
async def execute_task_with_tracing(self, task: Task) -> TaskResult:
    """Führt Task mit Tracing aus."""

    tracer = trace.get_tracer(__name__)

    with tracer.start_as_current_span("agent.execute_task") as span:
        # Span-Attribute setzen
        span.set_attribute("agent.id", self.id)
        span.set_attribute("agent.type", self.config.type)
        span.set_attribute("task.id", task.id)
        span.set_attribute("task.type", task.type)

        try:
            result = await self._execute_task_logic(task)

            span.set_attribute("task.success", True)
            span.set_attribute("task.duration", result.execution_time)

            return result

        except Exception as e:
            span.set_attribute("task.success", False)
            span.set_attribute("error.type", type(e).__name__)
            span.set_attribute("error.message", str(e))
            span.record_exception(e)
            raise

🔧 Debug-Tools

Interactive Debugger

# debug/interactive.py (vereinfachtes Beispiel)
import asyncio
from kei_agent import get_logger

async def start_debug_session():
    logger = get_logger("debug-session")
    logger.info("Debug-Session gestartet")

if __name__ == "__main__":
    asyncio.run(start_debug_session())

Memory-Profiling

# debug/memory_profiler.py
import psutil
import tracemalloc
from memory_profiler import profile

class MemoryProfiler:
    """Memory-Profiling für Keiko."""

    def __init__(self):
        self.process = psutil.Process()
        tracemalloc.start()

    def get_memory_usage(self) -> dict:
        """Ruft aktuelle Memory-Usage ab."""

        memory_info = self.process.memory_info()
        memory_percent = self.process.memory_percent()

        # Tracemalloc-Statistiken
        current, peak = tracemalloc.get_traced_memory()

        return {
            'rss_mb': memory_info.rss / 1024 / 1024,
            'vms_mb': memory_info.vms / 1024 / 1024,
            'percent': memory_percent,
            'traced_current_mb': current / 1024 / 1024,
            'traced_peak_mb': peak / 1024 / 1024
        }

    def get_top_memory_consumers(self, limit: int = 10) -> list:
        """Ruft Top-Memory-Consumer ab."""

        snapshot = tracemalloc.take_snapshot()
        top_stats = snapshot.statistics('lineno')

        return [
            {
                'filename': stat.traceback.format()[0],
                'size_mb': stat.size / 1024 / 1024,
                'count': stat.count
            }
            for stat in top_stats[:limit]
        ]

# Memory-Profiling-Decorator
def memory_profile(func):
    """Decorator für Memory-Profiling."""

    @wraps(func)
    async def wrapper(*args, **kwargs):
        profiler = MemoryProfiler()

        # Memory vor Ausführung
        memory_before = profiler.get_memory_usage()

        try:
            result = await func(*args, **kwargs)

            # Memory nach Ausführung
            memory_after = profiler.get_memory_usage()

            # Memory-Diff protokollieren
            memory_diff = memory_after['rss_mb'] - memory_before['rss_mb']

            logger.info(
                f"Memory-Profiling: {func.__name__}",
                memory_before_mb=memory_before['rss_mb'],
                memory_after_mb=memory_after['rss_mb'],
                memory_diff_mb=memory_diff,
                function=func.__name__
            )

            return result

        except Exception as e:
            logger.error(f"Function {func.__name__} failed during memory profiling: {e}")
            raise

    return wrapper

# Verwendung
@memory_profile
async def process_large_dataset(data):
    """Verarbeitet große Datenmengen."""
    # Implementation
    pass

Performance-Profiling

# debug/performance_profiler.py
import cProfile
import pstats
import time
from functools import wraps

class PerformanceProfiler:
    """Performance-Profiling für Keiko."""

    def __init__(self):
        self.profiler = cProfile.Profile()
        self.stats = None

    def start_profiling(self):
        """Startet Profiling."""
        self.profiler.enable()

    def stop_profiling(self):
        """Stoppt Profiling."""
        self.profiler.disable()
        self.stats = pstats.Stats(self.profiler)

    def get_top_functions(self, limit: int = 20) -> list:
        """Ruft Top-Funktionen nach Ausführungszeit ab."""

        if not self.stats:
            return []

        # Nach cumulative time sortieren
        self.stats.sort_stats('cumulative')

        # Top-Funktionen extrahieren
        top_functions = []
        for func, (cc, nc, tt, ct, callers) in list(self.stats.stats.items())[:limit]:
            top_functions.append({
                'function': f"{func[0]}:{func[1]}({func[2]})",
                'calls': nc,
                'total_time': tt,
                'cumulative_time': ct,
                'per_call': ct / nc if nc > 0 else 0
            })

        return top_functions

    def save_profile(self, filename: str):
        """Speichert Profiling-Ergebnisse."""
        if self.stats:
            self.stats.dump_stats(filename)

# Performance-Profiling-Decorator
def performance_profile(func):
    """Decorator für Performance-Profiling."""

    @wraps(func)
    async def wrapper(*args, **kwargs):
        profiler = PerformanceProfiler()

        start_time = time.time()
        profiler.start_profiling()

        try:
            result = await func(*args, **kwargs)

            profiler.stop_profiling()
            execution_time = time.time() - start_time

            # Top-Funktionen protokollieren
            top_functions = profiler.get_top_functions(5)

            logger.info(
                f"Performance-Profiling: {func.__name__}",
                execution_time=execution_time,
                top_functions=top_functions,
                function=func.__name__
            )

            # Profiling-Daten speichern (optional)
            if execution_time > 5.0:  # Nur bei langsamen Funktionen
                filename = f"/tmp/profile_{func.__name__}_{int(time.time())}.prof"
                profiler.save_profile(filename)
                logger.info(f"Profiling-Daten gespeichert: {filename}")

            return result

        except Exception as e:
            profiler.stop_profiling()
            logger.error(f"Function {func.__name__} failed during profiling: {e}")
            raise

    return wrapper

🔍 Debug-Endpunkte

Debug-API-Endpunkte

# api/debug.py (Ausschnitt)
from fastapi import APIRouter
from datetime import datetime
from kei_agent import get_health_manager, APIHealthCheck

debug_router = APIRouter(prefix="/debug", tags=["debug"])

@debug_router.get("/health")
async def debug_health():
    health = get_health_manager()
    health.register_check(APIHealthCheck(name="kei_api", url="https://api.kei-framework.com/health"))
    summary = await health.run_all_checks()
    return {"timestamp": datetime.utcnow().isoformat(), "overall_status": summary.overall_status}

@debug_router.get("/metrics")
async def debug_metrics():
    """Performance-Metriken."""

    return {
        'request_metrics': get_request_metrics(),
        'database_metrics': get_database_metrics(),
        'agent_metrics': get_agent_metrics(),
        'task_metrics': get_task_metrics(),
        'system_metrics': get_system_metrics()
    }

@debug_router.get("/config")
async def debug_config(admin_user = Depends(require_admin)):
    """Aktuelle Konfiguration (nur für Admins)."""

    config = get_current_config()

    # Sensitive Daten entfernen
    sanitized_config = sanitize_config(config)

    return sanitized_config

@debug_router.get("/logs")
async def debug_logs(
    level: str = "INFO",
    limit: int = 100,
    admin_user = Depends(require_admin)
):
    """Recent-Log-Entries."""

    logs = await get_recent_logs(level=level, limit=limit)

    return {
        'logs': logs,
        'total_count': len(logs),
        'level_filter': level
    }

@debug_router.post("/gc")
async def debug_garbage_collection(admin_user = Depends(require_admin)):
    """Erzwingt Garbage Collection."""

    import gc

    before_count = len(gc.get_objects())
    collected = gc.collect()
    after_count = len(gc.get_objects())

    return {
        'objects_before': before_count,
        'objects_after': after_count,
        'objects_collected': collected,
        'objects_freed': before_count - after_count
    }

@debug_router.get("/agents/{agent_id}/debug")
async def debug_agent(agent_id: str):
    """Debug-Informationen für spezifischen Agent."""

    agent = await get_agent(agent_id)
    if not agent:
        raise HTTPException(404, "Agent not found")

    debug_info = {
        'agent_id': agent_id,
        'status': agent.status,
        'current_tasks': await get_agent_current_tasks(agent_id),
        'recent_tasks': await get_agent_recent_tasks(agent_id, limit=10),
        'performance_stats': await get_agent_performance_stats(agent_id),
        'resource_usage': await get_agent_resource_usage(agent_id),
        'error_history': await get_agent_error_history(agent_id, limit=5)
    }

    return debug_info

Debug-CLI-Tools

# cli/debug.py (vereinfachtes Beispiel)
import click
from kei_agent import get_logger

@click.group()
def debug():
    pass

@debug.command()
def ping():
    logger = get_logger("debug-cli")
    logger.info("Debug CLI ping")

if __name__ == '__main__':
    debug()

@debug.command()
@click.option('--duration', default=60, help='Profiling-Dauer in Sekunden')
@click.option('--output', default='profile.prof', help='Output-Datei')
def profile(duration, output):
    """Startet Performance-Profiling."""

    async def run_profiling():
        profiler = PerformanceProfiler()

        print(f"Starte Profiling für {duration} Sekunden...")
        profiler.start_profiling()

        await asyncio.sleep(duration)

        profiler.stop_profiling()
        profiler.save_profile(output)

        print(f"Profiling-Daten gespeichert: {output}")

        # Top-Funktionen anzeigen
        top_functions = profiler.get_top_functions(10)
        print("\nTop-Funktionen:")
        for func in top_functions:
            print(f"  {func['function']}: {func['cumulative_time']:.3f}s")

    asyncio.run(run_profiling())

@debug.command()
@click.option('--level', default='ERROR', help='Log-Level')
@click.option('--follow', '-f', is_flag=True, help='Follow-Modus')
def logs(level, follow):
    """Zeigt Logs an."""

    if follow:
        # Tail-ähnliche Funktionalität
        import time

        while True:
            logs = get_recent_logs(level=level, limit=10)
            for log in logs:
                print(f"{log['timestamp']} [{log['level']}] {log['message']}")

            time.sleep(1)
    else:
        logs = get_recent_logs(level=level, limit=50)
        for log in logs:
            print(f"{log['timestamp']} [{log['level']}] {log['message']}")

if __name__ == '__main__':
    debug()

🧪 Testing & Debugging

Unit-Test-Debugging

# tests/debug_test.py (Beispielstruktur)
import pytest

@pytest.mark.asyncio
async def test_plan_task_happy_path():
    assert True

!!! tip "Debug-Best-Practices" - Verwenden Sie strukturiertes Logging für bessere Nachverfolgbarkeit - Aktivieren Sie Tracing nur bei Bedarf (Performance-Impact) - Nutzen Sie Memory-Profiling bei Verdacht auf Memory-Leaks - Implementieren Sie umfassende Debug-Endpunkte für Production-Debugging

Production-Debugging

Seien Sie vorsichtig beim Aktivieren von Debug-Modi in Production: - Begrenzen Sie Debug-Sessions zeitlich - Beschränken Sie Zugriff auf Admin-Benutzer - Überwachen Sie Performance-Impact - Deaktivieren Sie Debug-Modi nach Problemlösung