⚡ Performance Troubleshooting¶
Leitfaden zur Diagnose und Behebung von Performance-Problemen in Keiko Personal Assistant.
📊 Performance-Monitoring¶
Key Performance Indicators (KPIs)¶
# monitoring/performance_kpis.py
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class PerformanceKPIs:
"""Performance-KPIs für Keiko."""
# Response-Zeit-Ziele
api_response_time_p95: float = 200.0 # ms
api_response_time_p99: float = 500.0 # ms
# Durchsatz-Ziele
requests_per_second: float = 1000.0
tasks_per_minute: float = 500.0
# Resource-Ziele
cpu_usage_max: float = 70.0 # %
memory_usage_max: float = 80.0 # %
database_connections_max: float = 80.0 # % of pool
# Fehler-Ziele
error_rate_max: float = 0.1 # %
task_failure_rate_max: float = 1.0 # %
class PerformanceMonitor:
"""Performance-Monitor für KPI-Überwachung."""
def __init__(self):
self.kpis = PerformanceKPIs()
self.metrics_collector = MetricsCollector()
async def check_performance_health(self) -> Dict[str, bool]:
"""Prüft Performance-Health gegen KPIs."""
current_metrics = await self.metrics_collector.get_current_metrics()
health_status = {
'api_response_time': current_metrics['api_p95'] <= self.kpis.api_response_time_p95,
'throughput': current_metrics['rps'] >= self.kpis.requests_per_second,
'cpu_usage': current_metrics['cpu_percent'] <= self.kpis.cpu_usage_max,
'memory_usage': current_metrics['memory_percent'] <= self.kpis.memory_usage_max,
'error_rate': current_metrics['error_rate'] <= self.kpis.error_rate_max
}
return health_status
async def get_performance_alerts(self) -> List[Dict[str, str]]:
"""Ruft aktuelle Performance-Alerts ab."""
health_status = await self.check_performance_health()
alerts = []
for metric, is_healthy in health_status.items():
if not is_healthy:
alerts.append({
'metric': metric,
'severity': 'warning' if metric in ['cpu_usage', 'memory_usage'] else 'critical',
'message': f"Performance-KPI verletzt: {metric}"
})
return alerts
Real-Time Performance Dashboard¶
# monitoring/dashboard.py
from fastapi import APIRouter
from fastapi.responses import HTMLResponse
performance_router = APIRouter(prefix="/performance", tags=["performance"])
@performance_router.get("/dashboard")
async def performance_dashboard():
"""Performance-Dashboard."""
dashboard_html = """
<!DOCTYPE html>
<html>
<head>
<title>Keiko Performance Dashboard</title>
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
<style>
.metric-card {
border: 1px solid #ddd;
padding: 20px;
margin: 10px;
border-radius: 5px;
}
.metric-value {
font-size: 2em;
font-weight: bold;
}
.metric-good { color: green; }
.metric-warning { color: orange; }
.metric-critical { color: red; }
</style>
</head>
<body>
<h1>Keiko Performance Dashboard</h1>
<div id="metrics-container">
<div class="metric-card">
<h3>API Response Time (P95)</h3>
<div id="response-time" class="metric-value">Loading...</div>
</div>
<div class="metric-card">
<h3>Requests per Second</h3>
<div id="rps" class="metric-value">Loading...</div>
</div>
<div class="metric-card">
<h3>CPU Usage</h3>
<div id="cpu-usage" class="metric-value">Loading...</div>
</div>
<div class="metric-card">
<h3>Memory Usage</h3>
<div id="memory-usage" class="metric-value">Loading...</div>
</div>
</div>
<div id="response-time-chart" style="width:100%;height:400px;"></div>
<div id="throughput-chart" style="width:100%;height:400px;"></div>
<script>
async function updateMetrics() {
const response = await fetch('/performance/metrics');
const metrics = await response.json();
// Metriken aktualisieren
document.getElementById('response-time').textContent =
metrics.api_p95.toFixed(1) + ' ms';
document.getElementById('rps').textContent =
metrics.rps.toFixed(0);
document.getElementById('cpu-usage').textContent =
metrics.cpu_percent.toFixed(1) + '%';
document.getElementById('memory-usage').textContent =
metrics.memory_percent.toFixed(1) + '%';
// Farben basierend auf Thresholds
updateMetricColor('response-time', metrics.api_p95, 200, 500);
updateMetricColor('cpu-usage', metrics.cpu_percent, 70, 90);
updateMetricColor('memory-usage', metrics.memory_percent, 80, 95);
}
function updateMetricColor(elementId, value, warning, critical) {
const element = document.getElementById(elementId);
element.className = 'metric-value ' +
(value < warning ? 'metric-good' :
value < critical ? 'metric-warning' : 'metric-critical');
}
// Charts erstellen
async function createCharts() {
const response = await fetch('/performance/history');
const history = await response.json();
// Response-Time-Chart
Plotly.newPlot('response-time-chart', [{
x: history.timestamps,
y: history.response_times,
type: 'scatter',
mode: 'lines',
name: 'Response Time (ms)'
}], {
title: 'API Response Time History',
xaxis: { title: 'Time' },
yaxis: { title: 'Response Time (ms)' }
});
// Throughput-Chart
Plotly.newPlot('throughput-chart', [{
x: history.timestamps,
y: history.throughput,
type: 'scatter',
mode: 'lines',
name: 'Requests/sec'
}], {
title: 'Throughput History',
xaxis: { title: 'Time' },
yaxis: { title: 'Requests per Second' }
});
}
// Initial load
updateMetrics();
createCharts();
// Auto-refresh alle 5 Sekunden
setInterval(updateMetrics, 5000);
</script>
</body>
</html>
"""
return HTMLResponse(content=dashboard_html)
@performance_router.get("/metrics")
async def get_current_metrics():
"""Aktuelle Performance-Metriken."""
monitor = PerformanceMonitor()
metrics = await monitor.metrics_collector.get_current_metrics()
return metrics
@performance_router.get("/history")
async def get_performance_history(hours: int = 24):
"""Performance-Historie."""
# Historie aus Monitoring-System abrufen
history = await get_metrics_history(hours=hours)
return {
'timestamps': history['timestamps'],
'response_times': history['api_p95'],
'throughput': history['rps'],
'cpu_usage': history['cpu_percent'],
'memory_usage': history['memory_percent']
}
🐌 Slow Query Diagnosis¶
Database Performance Analysis¶
# diagnosis/database_analyzer.py
import asyncio
from sqlalchemy import text
class DatabasePerformanceAnalyzer:
"""Analysiert Database-Performance."""
def __init__(self, db_session):
self.db_session = db_session
async def analyze_slow_queries(self, min_duration_ms: int = 1000) -> List[Dict[str, Any]]:
"""Analysiert langsame Queries."""
query = text("""
SELECT
query,
mean_exec_time,
calls,
total_exec_time,
rows,
100.0 * shared_blks_hit / nullif(shared_blks_hit + shared_blks_read, 0) AS hit_percent
FROM pg_stat_statements
WHERE mean_exec_time > :min_duration
ORDER BY mean_exec_time DESC
LIMIT 20
""")
result = await self.db_session.execute(query, {"min_duration": min_duration_ms})
slow_queries = []
for row in result:
slow_queries.append({
'query': row.query[:200] + '...' if len(row.query) > 200 else row.query,
'mean_exec_time_ms': float(row.mean_exec_time),
'calls': row.calls,
'total_exec_time_ms': float(row.total_exec_time),
'avg_rows': row.rows / row.calls if row.calls > 0 else 0,
'cache_hit_percent': float(row.hit_percent or 0)
})
return slow_queries
async def analyze_missing_indexes(self) -> List[Dict[str, Any]]:
"""Analysiert fehlende Indizes."""
query = text("""
SELECT
schemaname,
tablename,
attname,
n_distinct,
correlation
FROM pg_stats
WHERE schemaname = 'public'
AND n_distinct > 100
AND correlation < 0.1
ORDER BY n_distinct DESC
""")
result = await self.db_session.execute(query)
missing_indexes = []
for row in result:
# Prüfen ob Index bereits existiert
index_exists = await self._check_index_exists(row.tablename, row.attname)
if not index_exists:
missing_indexes.append({
'table': row.tablename,
'column': row.attname,
'distinct_values': row.n_distinct,
'correlation': float(row.correlation),
'suggested_index': f"CREATE INDEX idx_{row.tablename}_{row.attname} ON {row.tablename}({row.attname});"
})
return missing_indexes
async def _check_index_exists(self, table_name: str, column_name: str) -> bool:
"""Prüft ob Index für Spalte existiert."""
query = text("""
SELECT COUNT(*)
FROM pg_indexes
WHERE tablename = :table_name
AND indexdef LIKE '%' || :column_name || '%'
""")
result = await self.db_session.execute(query, {
"table_name": table_name,
"column_name": column_name
})
return result.scalar() > 0
async def get_table_statistics(self) -> List[Dict[str, Any]]:
"""Ruft Tabellen-Statistiken ab."""
query = text("""
SELECT
schemaname,
tablename,
n_tup_ins as inserts,
n_tup_upd as updates,
n_tup_del as deletes,
n_live_tup as live_tuples,
n_dead_tup as dead_tuples,
last_vacuum,
last_autovacuum,
last_analyze,
last_autoanalyze
FROM pg_stat_user_tables
ORDER BY n_live_tup DESC
""")
result = await self.db_session.execute(query)
table_stats = []
for row in result:
table_stats.append({
'table': f"{row.schemaname}.{row.tablename}",
'inserts': row.inserts,
'updates': row.updates,
'deletes': row.deletes,
'live_tuples': row.live_tuples,
'dead_tuples': row.dead_tuples,
'dead_tuple_ratio': row.dead_tuples / max(row.live_tuples, 1),
'last_vacuum': row.last_vacuum,
'last_analyze': row.last_analyze,
'needs_vacuum': row.dead_tuples > 1000 and row.dead_tuples / max(row.live_tuples, 1) > 0.1
})
return table_stats
Query Optimization Recommendations¶
# diagnosis/query_optimizer.py
class QueryOptimizer:
"""Gibt Query-Optimierungs-Empfehlungen."""
async def analyze_query_plan(self, query: str) -> Dict[str, Any]:
"""Analysiert Query-Execution-Plan."""
explain_query = f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}"
result = await self.db_session.execute(text(explain_query))
plan_data = result.scalar()
analysis = {
'total_cost': plan_data[0]['Plan']['Total Cost'],
'execution_time': plan_data[0]['Execution Time'],
'planning_time': plan_data[0]['Planning Time'],
'recommendations': []
}
# Analyse des Plans
plan = plan_data[0]['Plan']
analysis['recommendations'].extend(self._analyze_plan_node(plan))
return analysis
def _analyze_plan_node(self, node: Dict[str, Any]) -> List[str]:
"""Analysiert einzelnen Plan-Node."""
recommendations = []
# Sequential Scan Detection
if node.get('Node Type') == 'Seq Scan':
table_name = node.get('Relation Name', 'unknown')
recommendations.append(
f"Sequential Scan auf {table_name} - Index könnte helfen"
)
# Nested Loop mit hohen Kosten
if node.get('Node Type') == 'Nested Loop' and node.get('Total Cost', 0) > 1000:
recommendations.append(
"Nested Loop mit hohen Kosten - JOIN-Optimierung prüfen"
)
# Sort mit hohem Memory-Verbrauch
if node.get('Node Type') == 'Sort' and node.get('Sort Space Used', 0) > 1000:
recommendations.append(
"Sort-Operation mit hohem Memory-Verbrauch - work_mem erhöhen"
)
# Hash Join ohne Index
if node.get('Node Type') == 'Hash Join':
recommendations.append(
"Hash Join - Index auf JOIN-Spalten prüfen"
)
# Rekursive Analyse für Child-Nodes
for child in node.get('Plans', []):
recommendations.extend(self._analyze_plan_node(child))
return recommendations
async def suggest_index_optimizations(self, table_name: str) -> List[str]:
"""Schlägt Index-Optimierungen vor."""
suggestions = []
# Häufig verwendete WHERE-Spalten
frequent_where_columns = await self._get_frequent_where_columns(table_name)
for column in frequent_where_columns:
suggestions.append(
f"CREATE INDEX idx_{table_name}_{column} ON {table_name}({column});"
)
# Häufig verwendete ORDER BY-Spalten
frequent_order_columns = await self._get_frequent_order_columns(table_name)
for column in frequent_order_columns:
suggestions.append(
f"CREATE INDEX idx_{table_name}_{column}_order ON {table_name}({column});"
)
# Composite-Indizes für häufige WHERE-Kombinationen
frequent_combinations = await self._get_frequent_column_combinations(table_name)
for combination in frequent_combinations:
columns = ', '.join(combination)
suggestions.append(
f"CREATE INDEX idx_{table_name}_{'_'.join(combination)} ON {table_name}({columns});"
)
return suggestions
🚀 Application Performance Tuning¶
Async Performance Optimization¶
# optimization/async_optimizer.py
import asyncio
from typing import List, Callable, Any
class AsyncPerformanceOptimizer:
"""Optimiert Async-Performance."""
def __init__(self, max_concurrent: int = 100):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def batch_process_with_concurrency(
self,
items: List[Any],
processor: Callable,
batch_size: int = 50
) -> List[Any]:
"""Verarbeitet Items in Batches mit Concurrency-Control."""
results = []
# Items in Batches aufteilen
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# Batch parallel verarbeiten
batch_tasks = [
self._process_with_semaphore(processor, item)
for item in batch
]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
# Erfolgreiche Ergebnisse sammeln
for result in batch_results:
if not isinstance(result, Exception):
results.append(result)
else:
logger.error(f"Batch processing error: {result}")
# Kurze Pause zwischen Batches
await asyncio.sleep(0.1)
return results
async def _process_with_semaphore(self, processor: Callable, item: Any) -> Any:
"""Verarbeitet Item mit Semaphore-Control."""
async with self.semaphore:
return await processor(item)
async def optimize_database_operations(self, operations: List[Callable]) -> List[Any]:
"""Optimiert Database-Operationen."""
# Operationen nach Typ gruppieren
read_ops = [op for op in operations if getattr(op, 'is_read', True)]
write_ops = [op for op in operations if not getattr(op, 'is_read', True)]
# Read-Operationen parallel ausführen
read_results = await asyncio.gather(*read_ops, return_exceptions=True)
# Write-Operationen sequenziell ausführen (für Konsistenz)
write_results = []
for write_op in write_ops:
try:
result = await write_op()
write_results.append(result)
except Exception as e:
logger.error(f"Write operation failed: {e}")
write_results.append(e)
return read_results + write_results
Memory Optimization¶
# optimization/memory_optimizer.py
import gc
import weakref
from typing import Dict, Any, Optional
class MemoryOptimizer:
"""Optimiert Memory-Nutzung."""
def __init__(self):
self.object_cache: Dict[str, weakref.WeakValueDictionary] = {}
self.memory_threshold_mb = 1000 # 1GB
def get_cached_object(self, cache_key: str, object_id: str) -> Optional[Any]:
"""Ruft Objekt aus Cache ab."""
if cache_key not in self.object_cache:
self.object_cache[cache_key] = weakref.WeakValueDictionary()
return self.object_cache[cache_key].get(object_id)
def cache_object(self, cache_key: str, object_id: str, obj: Any) -> None:
"""Cached Objekt."""
if cache_key not in self.object_cache:
self.object_cache[cache_key] = weakref.WeakValueDictionary()
self.object_cache[cache_key][object_id] = obj
def check_memory_usage(self) -> Dict[str, float]:
"""Prüft aktuelle Memory-Nutzung."""
import psutil
process = psutil.Process()
memory_info = process.memory_info()
return {
'rss_mb': memory_info.rss / 1024 / 1024,
'vms_mb': memory_info.vms / 1024 / 1024,
'percent': process.memory_percent()
}
async def optimize_memory_if_needed(self) -> bool:
"""Optimiert Memory wenn nötig."""
memory_usage = self.check_memory_usage()
if memory_usage['rss_mb'] > self.memory_threshold_mb:
logger.warning(f"High memory usage: {memory_usage['rss_mb']:.1f}MB")
# Garbage Collection erzwingen
collected = gc.collect()
logger.info(f"Garbage collection freed {collected} objects")
# Cache leeren
self.clear_caches()
# Memory-Usage nach Optimierung prüfen
new_memory_usage = self.check_memory_usage()
memory_freed = memory_usage['rss_mb'] - new_memory_usage['rss_mb']
logger.info(f"Memory optimization freed {memory_freed:.1f}MB")
return memory_freed > 50 # Erfolgreich wenn >50MB befreit
return False
def clear_caches(self) -> None:
"""Leert alle Caches."""
for cache in self.object_cache.values():
cache.clear()
logger.info("All caches cleared")
# Memory-optimierter Decorator
def memory_optimized(func):
"""Decorator für Memory-Optimierung."""
optimizer = MemoryOptimizer()
@wraps(func)
async def wrapper(*args, **kwargs):
# Memory vor Ausführung prüfen
await optimizer.optimize_memory_if_needed()
try:
result = await func(*args, **kwargs)
return result
finally:
# Memory nach Ausführung prüfen
await optimizer.optimize_memory_if_needed()
return wrapper
📈 Performance Testing¶
Load Testing¶
# testing/load_test.py
import asyncio
import aiohttp
import time
from dataclasses import dataclass
from typing import List
@dataclass
class LoadTestResult:
"""Load-Test-Ergebnis."""
total_requests: int
successful_requests: int
failed_requests: int
avg_response_time: float
p95_response_time: float
p99_response_time: float
requests_per_second: float
error_rate: float
class LoadTester:
"""Load-Tester für Keiko API."""
def __init__(self, base_url: str, auth_token: str):
self.base_url = base_url
self.auth_token = auth_token
self.response_times: List[float] = []
self.errors: List[str] = []
async def run_load_test(
self,
endpoint: str,
concurrent_users: int = 50,
duration_seconds: int = 300,
request_data: dict = None
) -> LoadTestResult:
"""Führt Load-Test aus."""
start_time = time.time()
end_time = start_time + duration_seconds
# Semaphore für Concurrency-Control
semaphore = asyncio.Semaphore(concurrent_users)
# Tasks für alle User starten
tasks = []
for user_id in range(concurrent_users):
task = asyncio.create_task(
self._user_session(semaphore, endpoint, end_time, request_data, user_id)
)
tasks.append(task)
# Auf alle Tasks warten
await asyncio.gather(*tasks, return_exceptions=True)
# Ergebnisse berechnen
return self._calculate_results(duration_seconds)
async def _user_session(
self,
semaphore: asyncio.Semaphore,
endpoint: str,
end_time: float,
request_data: dict,
user_id: int
):
"""Simuliert einzelne User-Session."""
async with aiohttp.ClientSession() as session:
while time.time() < end_time:
async with semaphore:
await self._make_request(session, endpoint, request_data, user_id)
# Kurze Pause zwischen Requests
await asyncio.sleep(0.1)
async def _make_request(
self,
session: aiohttp.ClientSession,
endpoint: str,
request_data: dict,
user_id: int
):
"""Macht einzelnen Request."""
url = f"{self.base_url}{endpoint}"
headers = {"Authorization": f"Bearer {self.auth_token}"}
start_time = time.time()
try:
if request_data:
async with session.post(url, json=request_data, headers=headers) as response:
await response.text() # Response lesen
response_time = time.time() - start_time
self.response_times.append(response_time * 1000) # ms
if response.status >= 400:
self.errors.append(f"HTTP {response.status}")
else:
async with session.get(url, headers=headers) as response:
await response.text()
response_time = time.time() - start_time
self.response_times.append(response_time * 1000) # ms
if response.status >= 400:
self.errors.append(f"HTTP {response.status}")
except Exception as e:
response_time = time.time() - start_time
self.response_times.append(response_time * 1000)
self.errors.append(str(e))
def _calculate_results(self, duration_seconds: int) -> LoadTestResult:
"""Berechnet Test-Ergebnisse."""
total_requests = len(self.response_times)
failed_requests = len(self.errors)
successful_requests = total_requests - failed_requests
if self.response_times:
avg_response_time = sum(self.response_times) / len(self.response_times)
sorted_times = sorted(self.response_times)
p95_index = int(len(sorted_times) * 0.95)
p99_index = int(len(sorted_times) * 0.99)
p95_response_time = sorted_times[p95_index]
p99_response_time = sorted_times[p99_index]
else:
avg_response_time = 0
p95_response_time = 0
p99_response_time = 0
requests_per_second = total_requests / duration_seconds
error_rate = (failed_requests / total_requests * 100) if total_requests > 0 else 0
return LoadTestResult(
total_requests=total_requests,
successful_requests=successful_requests,
failed_requests=failed_requests,
avg_response_time=avg_response_time,
p95_response_time=p95_response_time,
p99_response_time=p99_response_time,
requests_per_second=requests_per_second,
error_rate=error_rate
)
# Load-Test ausführen
async def run_api_load_test():
"""Führt API-Load-Test aus."""
tester = LoadTester(
base_url="http://localhost:8000",
auth_token="your-auth-token"
)
# Agent-Listing-Test
result = await tester.run_load_test(
endpoint="/api/v1/agents",
concurrent_users=50,
duration_seconds=300
)
print(f"Load Test Results:")
print(f"Total Requests: {result.total_requests}")
print(f"Successful: {result.successful_requests}")
print(f"Failed: {result.failed_requests}")
print(f"Avg Response Time: {result.avg_response_time:.1f}ms")
print(f"P95 Response Time: {result.p95_response_time:.1f}ms")
print(f"P99 Response Time: {result.p99_response_time:.1f}ms")
print(f"Requests/sec: {result.requests_per_second:.1f}")
print(f"Error Rate: {result.error_rate:.2f}%")
if __name__ == "__main__":
asyncio.run(run_api_load_test())
Performance-Optimierung-Tipps
- Überwachen Sie kontinuierlich die Key-Performance-Indicators
- Nutzen Sie Database-Query-Analyse für Optimierungen
- Implementieren Sie effektive Caching-Strategien
- Optimieren Sie Async-Operations für bessere Concurrency
- Führen Sie regelmäßige Load-Tests durch
Performance-Monitoring
- Setzen Sie realistische Performance-Ziele
- Überwachen Sie Trends, nicht nur absolute Werte
- Berücksichtigen Sie Business-Kontext bei Optimierungen
- Dokumentieren Sie Performance-Änderungen