📊 Enterprise-Monitoring¶
Keiko Personal Assistant bietet umfassende Monitoring- und Observability-Funktionen für Enterprise-Umgebungen.
🏗️ Monitoring-Architektur¶
Observability-Stack¶
graph TB
subgraph "Application Layer"
APP[Keiko Application]
AGENTS[Agent Services]
MCP[MCP Servers]
end
subgraph "Metrics Collection"
PROM[Prometheus]
OTEL[OpenTelemetry Collector]
CUSTOM[Custom Metrics]
end
subgraph "Logging"
STRUCT[Structured Logging]
FLUENTD[Fluentd/Fluent Bit]
ELK[ELK Stack]
end
subgraph "Tracing"
JAEGER[Jaeger]
ZIPKIN[Zipkin]
TRACES[Distributed Traces]
end
subgraph "Visualization"
GRAFANA[Grafana Dashboards]
KIBANA[Kibana]
ALERTS[Alert Manager]
end
subgraph "Storage"
TSDB[Time Series DB]
ES[Elasticsearch]
S3[Object Storage]
end
APP --> PROM
AGENTS --> OTEL
MCP --> CUSTOM
APP --> STRUCT
STRUCT --> FLUENTD
FLUENTD --> ELK
APP --> JAEGER
AGENTS --> ZIPKIN
ZIPKIN --> TRACES
PROM --> GRAFANA
ELK --> KIBANA
GRAFANA --> ALERTS
PROM --> TSDB
ELK --> ES
TRACES --> S3
📈 Metriken & KPIs¶
System-Metriken¶
Performance-Metriken¶
from prometheus_client import Counter, Histogram, Gauge, Summary
import time
# Request-Metriken
REQUEST_COUNT = Counter(
'keiko_requests_total',
'Gesamtanzahl der Requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'keiko_request_duration_seconds',
'Request-Dauer in Sekunden',
['method', 'endpoint'],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
# Agent-Metriken
AGENT_TASKS_ACTIVE = Gauge(
'keiko_agent_tasks_active',
'Anzahl aktiver Agent-Tasks',
['agent_id', 'task_type']
)
AGENT_TASK_DURATION = Summary(
'keiko_agent_task_duration_seconds',
'Agent-Task-Dauer in Sekunden',
['agent_id', 'task_type', 'status']
)
# MCP-Metriken
MCP_TOOL_CALLS = Counter(
'keiko_mcp_tool_calls_total',
'Gesamtanzahl der MCP-Tool-Aufrufe',
['server_name', 'tool_name', 'status']
)
MCP_SERVER_HEALTH = Gauge(
'keiko_mcp_server_health',
'MCP-Server-Gesundheitsstatus (1=healthy, 0=unhealthy)',
['server_name']
)
Business-Metriken¶
# Benutzer-Metriken
USER_SESSIONS_ACTIVE = Gauge(
'keiko_user_sessions_active',
'Anzahl aktiver Benutzer-Sessions'
)
USER_ACTIONS = Counter(
'keiko_user_actions_total',
'Gesamtanzahl der Benutzer-Aktionen',
['action_type', 'user_role']
)
# Fehler-Metriken
ERROR_RATE = Counter(
'keiko_errors_total',
'Gesamtanzahl der Fehler',
['error_type', 'component', 'severity']
)
SECURITY_EVENTS = Counter(
'keiko_security_events_total',
'Gesamtanzahl der Sicherheitsereignisse',
['event_type', 'severity', 'source']
)
Custom Metrics Decorator¶
from functools import wraps
import time
from typing import Callable
def monitor_performance(metric_name: str, labels: dict = None):
"""Decorator für Performance-Monitoring."""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
labels_dict = labels or {}
labels_dict.update({
'function': func.__name__,
'module': func.__module__
})
try:
result = await func(*args, **kwargs)
labels_dict['status'] = 'success'
return result
except Exception as e:
labels_dict['status'] = 'error'
labels_dict['error_type'] = type(e).__name__
ERROR_RATE.labels(**labels_dict).inc()
raise
finally:
duration = time.time() - start_time
REQUEST_DURATION.labels(**labels_dict).observe(duration)
REQUEST_COUNT.labels(**labels_dict).inc()
return wrapper
return decorator
# Verwendung
@monitor_performance('agent_task_execution', {'component': 'agent'})
async def execute_agent_task(agent_id: str, task: dict):
"""Führt eine Agent-Task aus mit Performance-Monitoring."""
# Implementation
pass
📝 Structured Logging¶
Logging-Konfiguration¶
import logging
import json
from datetime import datetime
from typing import Dict, Any
import uuid
class StructuredFormatter(logging.Formatter):
"""Strukturierter JSON-Formatter für Logs."""
def format(self, record: logging.LogRecord) -> str:
"""Formatiert Log-Record als JSON."""
log_entry = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# Zusätzliche Felder aus dem Record
if hasattr(record, 'correlation_id'):
log_entry['correlation_id'] = record.correlation_id
if hasattr(record, 'user_id'):
log_entry['user_id'] = record.user_id
if hasattr(record, 'agent_id'):
log_entry['agent_id'] = record.agent_id
if hasattr(record, 'task_id'):
log_entry['task_id'] = record.task_id
# Exception-Informationen
if record.exc_info:
log_entry['exception'] = {
'type': record.exc_info[0].__name__,
'message': str(record.exc_info[1]),
'traceback': self.formatException(record.exc_info)
}
return json.dumps(log_entry, ensure_ascii=False)
# Logger-Konfiguration
def setup_logging():
"""Konfiguriert strukturiertes Logging."""
# Root-Logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# Console-Handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(StructuredFormatter())
root_logger.addHandler(console_handler)
# File-Handler für Audit-Logs
audit_handler = logging.FileHandler('/var/log/keiko/audit.log')
audit_handler.setFormatter(StructuredFormatter())
audit_handler.setLevel(logging.WARNING)
audit_logger = logging.getLogger('keiko.audit')
audit_logger.addHandler(audit_handler)
audit_logger.setLevel(logging.INFO)
Correlation-ID-Middleware¶
from fastapi import Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware
import uuid
import logging
class CorrelationMiddleware(BaseHTTPMiddleware):
"""Middleware für Correlation-IDs."""
async def dispatch(self, request: Request, call_next):
# Correlation-ID aus Header oder generieren
correlation_id = request.headers.get('X-Correlation-ID', str(uuid.uuid4()))
# Correlation-ID in Request-State speichern
request.state.correlation_id = correlation_id
# Logger mit Correlation-ID konfigurieren
logger = logging.getLogger('keiko.request')
logger = logging.LoggerAdapter(logger, {'correlation_id': correlation_id})
# Request verarbeiten
response = await call_next(request)
# Correlation-ID in Response-Header setzen
response.headers['X-Correlation-ID'] = correlation_id
return response
🔍 Distributed Tracing¶
OpenTelemetry-Konfiguration¶
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
def setup_tracing():
"""Konfiguriert OpenTelemetry Tracing."""
# Tracer-Provider konfigurieren
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Jaeger-Exporter konfigurieren
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger-agent",
agent_port=6831,
)
# Span-Processor hinzufügen
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Auto-Instrumentierung aktivieren
FastAPIInstrumentor.instrument()
RequestsInstrumentor.instrument()
return tracer
# Custom Tracing
tracer = setup_tracing()
async def traced_agent_execution(agent_id: str, task: dict):
"""Agent-Ausführung mit Tracing."""
with tracer.start_as_current_span("agent_task_execution") as span:
# Span-Attribute setzen
span.set_attribute("agent.id", agent_id)
span.set_attribute("task.type", task.get('task_type'))
span.set_attribute("task.priority", task.get('priority', 'normal'))
try:
# Agent-Task ausführen
result = await execute_task(agent_id, task)
span.set_attribute("task.status", "completed")
span.set_attribute("task.result_size", len(str(result)))
return result
except Exception as e:
span.set_attribute("task.status", "failed")
span.set_attribute("error.type", type(e).__name__)
span.set_attribute("error.message", str(e))
span.record_exception(e)
raise
🚨 Alerting & Notifications¶
Alert-Regeln¶
# Prometheus Alert Rules
groups:
- name: keiko.rules
rules:
# High Error Rate
- alert: HighErrorRate
expr: rate(keiko_errors_total[5m]) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "Hohe Fehlerrate erkannt"
description: "Fehlerrate von {{ $value }} in den letzten 5 Minuten"
# Agent Task Failures
- alert: AgentTaskFailures
expr: rate(keiko_agent_task_duration_seconds_count{status="failed"}[5m]) > 0.05
for: 1m
labels:
severity: critical
annotations:
summary: "Agent-Task-Fehler erkannt"
description: "Agent {{ $labels.agent_id }} hat Fehlerrate von {{ $value }}"
# MCP Server Down
- alert: MCPServerDown
expr: keiko_mcp_server_health == 0
for: 30s
labels:
severity: critical
annotations:
summary: "MCP-Server nicht erreichbar"
description: "MCP-Server {{ $labels.server_name }} ist nicht erreichbar"
# High Memory Usage
- alert: HighMemoryUsage
expr: process_resident_memory_bytes / 1024 / 1024 > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Hoher Speicherverbrauch"
description: "Speicherverbrauch: {{ $value }}MB"
Notification-Channels¶
from abc import ABC, abstractmethod
from typing import Dict, Any
import aiohttp
import smtplib
from email.mime.text import MIMEText
class NotificationChannel(ABC):
"""Basis-Klasse für Notification-Channels."""
@abstractmethod
async def send_notification(self, alert: Dict[str, Any]) -> bool:
"""Sendet eine Benachrichtigung."""
pass
class SlackNotification(NotificationChannel):
"""Slack-Benachrichtigungen."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def send_notification(self, alert: Dict[str, Any]) -> bool:
"""Sendet Slack-Benachrichtigung."""
severity_colors = {
'critical': '#FF0000',
'warning': '#FFA500',
'info': '#00FF00'
}
payload = {
'attachments': [{
'color': severity_colors.get(alert.get('severity'), '#808080'),
'title': alert.get('summary', 'Keiko Alert'),
'text': alert.get('description', ''),
'fields': [
{
'title': 'Severity',
'value': alert.get('severity', 'unknown'),
'short': True
},
{
'title': 'Component',
'value': alert.get('component', 'unknown'),
'short': True
}
],
'timestamp': alert.get('timestamp')
}]
}
async with aiohttp.ClientSession() as session:
async with session.post(self.webhook_url, json=payload) as response:
return response.status == 200
class EmailNotification(NotificationChannel):
"""E-Mail-Benachrichtigungen."""
def __init__(self, smtp_host: str, smtp_port: int, username: str, password: str):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.username = username
self.password = password
async def send_notification(self, alert: Dict[str, Any]) -> bool:
"""Sendet E-Mail-Benachrichtigung."""
subject = f"Keiko Alert: {alert.get('summary', 'Unknown Alert')}"
body = f"""
Alert Details:
Summary: {alert.get('summary', 'N/A')}
Description: {alert.get('description', 'N/A')}
Severity: {alert.get('severity', 'N/A')}
Component: {alert.get('component', 'N/A')}
Timestamp: {alert.get('timestamp', 'N/A')}
"""
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.username
msg['To'] = alert.get('recipient', 'admin@example.com')
try:
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
return True
except Exception:
return False
📊 Dashboards¶
Grafana-Dashboard-Konfiguration¶
{
"dashboard": {
"title": "Keiko Personal Assistant - Overview",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"targets": [
{
"expr": "rate(keiko_requests_total[5m])",
"legendFormat": "{{method}} {{endpoint}}"
}
]
},
{
"title": "Error Rate",
"type": "graph",
"targets": [
{
"expr": "rate(keiko_errors_total[5m])",
"legendFormat": "{{error_type}}"
}
]
},
{
"title": "Agent Tasks",
"type": "graph",
"targets": [
{
"expr": "keiko_agent_tasks_active",
"legendFormat": "{{agent_id}} - {{task_type}}"
}
]
},
{
"title": "MCP Server Health",
"type": "stat",
"targets": [
{
"expr": "keiko_mcp_server_health",
"legendFormat": "{{server_name}}"
}
]
}
]
}
}
📋 Monitoring-Checkliste¶
Metriken¶
- System-Metriken (CPU, Memory, Disk, Network) erfasst
- Application-Metriken (Requests, Errors, Latency) implementiert
- Business-Metriken (User Actions, Tasks) definiert
- Custom-Metriken für spezifische Use Cases erstellt
Logging¶
- Strukturiertes Logging implementiert
- Correlation-IDs für Request-Tracking aktiviert
- Log-Aggregation konfiguriert
- Log-Retention definiert
Tracing¶
- Distributed Tracing aktiviert
- Service-Dependencies visualisiert
- Performance-Bottlenecks identifiziert
Alerting¶
- Alert-Regeln definiert
- Notification-Channels konfiguriert
- Escalation-Policies implementiert
- Alert-Fatigue vermieden
Dashboards¶
- System-Overview Dashboard erstellt
- Application-Performance Dashboard konfiguriert
- Business-KPIs Dashboard implementiert
- Security-Monitoring Dashboard aktiviert
Best Practices
- Implementieren Sie Monitoring von Anfang an, nicht als Nachgedanke
- Verwenden Sie aussagekräftige Metriken-Namen und Labels
- Setzen Sie sinnvolle Alert-Schwellwerte basierend auf historischen Daten
- Dokumentieren Sie Ihre Monitoring-Setup für das Operations-Team