Zum Inhalt

🔄 Migration Guide

Umfassender Leitfaden für die Migration zu Keiko Personal Assistant v2.0 und neueren Versionen.

📋 Migration-Übersicht

Unterstützte Migration-Pfade

graph LR
    subgraph "Legacy Versions"
        V1_0[v1.0.x]
        V1_5[v1.5.x]
        V1_9[v1.9.x]
    end

    subgraph "Current Version"
        V2_0[v2.0.x]
    end

    subgraph "Migration Tools"
        AUTO[Automatic Migration]
        MANUAL[Manual Migration]
        HYBRID[Hybrid Migration]
    end

    V1_0 --> AUTO
    V1_5 --> HYBRID
    V1_9 --> MANUAL

    AUTO --> V2_0
    HYBRID --> V2_0
    MANUAL --> V2_0

Migration-Strategien

Von VersionZu VersionStrategieAufwandDowntime
v1.0.xv2.0.xVollständige MigrationHoch4-8 Stunden
v1.5.xv2.0.xHybride MigrationMittel2-4 Stunden
v1.9.xv2.0.xInkrementelle MigrationNiedrig1-2 Stunden

🚀 Schnellstart-Migration

Pre-Migration Checklist

# 1. Backup erstellen
./scripts/backup-system.sh --full --verify

# 2. Abhängigkeiten prüfen
./scripts/check-dependencies.sh

# 3. Migration-Tool herunterladen
curl -L https://github.com/oscharko/keiko-migration/releases/latest/download/migrate.sh -o migrate.sh
chmod +x migrate.sh

# 4. Migration-Plan erstellen
./migrate.sh --plan --from-version=1.9.0 --to-version=2.0.0

# 5. Dry-Run durchführen
./migrate.sh --dry-run --config=migration-config.yml

Automatische Migration (v1.9.x → v2.0.x)

# Migration-Konfiguration
cat > migration-config.yml << EOF
source:
  version: "1.9.0"
  database_url: "postgresql://user:pass@localhost/keiko_v1"
  config_path: "/opt/keiko-v1/config"
  data_path: "/opt/keiko-v1/data"

target:
  version: "2.0.0"
  database_url: "postgresql://user:pass@localhost/keiko_v2"
  config_path: "/opt/keiko-v2/config"
  data_path: "/opt/keiko-v2/data"

migration:
  strategy: "automatic"
  preserve_data: true
  migrate_configs: true
  migrate_users: true
  migrate_agents: true
  migrate_tasks: true

backup:
  enabled: true
  path: "/backup/keiko-migration"
  verify: true

rollback:
  enabled: true
  create_checkpoint: true
EOF

# Migration ausführen
./migrate.sh --config=migration-config.yml --execute

📊 Daten-Migration

Database Schema Migration

# migration/database_migrator.py
from keiko.migration import DatabaseMigrator, MigrationStep

class KeikoV2DatabaseMigrator(DatabaseMigrator):
    """Database-Migrator für Keiko v2.0."""

    def __init__(self, source_db_url: str, target_db_url: str):
        super().__init__(source_db_url, target_db_url)
        self.migration_steps = self._define_migration_steps()

    def _define_migration_steps(self) -> List[MigrationStep]:
        """Definiert Migration-Schritte."""

        return [
            # Schritt 1: Schema-Updates
            MigrationStep(
                name="update_user_schema",
                description="Aktualisiert User-Schema für v2.0",
                sql_commands=[
                    "ALTER TABLE users ADD COLUMN role VARCHAR(50) DEFAULT 'user'",
                    "ALTER TABLE users ADD COLUMN last_login TIMESTAMP",
                    "ALTER TABLE users ADD COLUMN preferences JSONB DEFAULT '{}'",
                    "CREATE INDEX idx_users_role ON users(role)",
                    "CREATE INDEX idx_users_last_login ON users(last_login)"
                ],
                rollback_commands=[
                    "DROP INDEX IF EXISTS idx_users_last_login",
                    "DROP INDEX IF EXISTS idx_users_role",
                    "ALTER TABLE users DROP COLUMN IF EXISTS preferences",
                    "ALTER TABLE users DROP COLUMN IF EXISTS last_login",
                    "ALTER TABLE users DROP COLUMN IF EXISTS role"
                ]
            ),

            # Schritt 2: Agent-Schema
            MigrationStep(
                name="update_agent_schema",
                description="Aktualisiert Agent-Schema für v2.0",
                sql_commands=[
                    "ALTER TABLE agents ADD COLUMN capabilities JSONB DEFAULT '[]'",
                    "ALTER TABLE agents ADD COLUMN metadata JSONB DEFAULT '{}'",
                    "ALTER TABLE agents ADD COLUMN version VARCHAR(20) DEFAULT '2.0.0'",
                    "UPDATE agents SET capabilities = '[]' WHERE capabilities IS NULL",
                    "CREATE INDEX idx_agents_capabilities ON agents USING GIN(capabilities)",
                    "CREATE INDEX idx_agents_version ON agents(version)"
                ]
            ),

            # Schritt 3: Task-Schema
            MigrationStep(
                name="update_task_schema",
                description="Aktualisiert Task-Schema für v2.0",
                sql_commands=[
                    "ALTER TABLE tasks ADD COLUMN priority VARCHAR(20) DEFAULT 'normal'",
                    "ALTER TABLE tasks ADD COLUMN timeout_seconds INTEGER DEFAULT 300",
                    "ALTER TABLE tasks ADD COLUMN retry_count INTEGER DEFAULT 0",
                    "ALTER TABLE tasks ADD COLUMN metadata JSONB DEFAULT '{}'",
                    "CREATE INDEX idx_tasks_priority ON tasks(priority)",
                    "CREATE INDEX idx_tasks_timeout ON tasks(timeout_seconds)"
                ]
            ),

            # Schritt 4: Neue Tabellen
            MigrationStep(
                name="create_new_tables",
                description="Erstellt neue Tabellen für v2.0",
                sql_commands=[
                    """
                    CREATE TABLE mcp_servers (
                        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                        name VARCHAR(100) UNIQUE NOT NULL,
                        base_url VARCHAR(500) NOT NULL,
                        auth_config JSONB DEFAULT '{}',
                        status VARCHAR(20) DEFAULT 'inactive',
                        created_at TIMESTAMP DEFAULT NOW(),
                        updated_at TIMESTAMP DEFAULT NOW()
                    )
                    """,
                    """
                    CREATE TABLE audit_logs (
                        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                        user_id UUID REFERENCES users(id),
                        action VARCHAR(100) NOT NULL,
                        resource_type VARCHAR(50) NOT NULL,
                        resource_id VARCHAR(100),
                        details JSONB DEFAULT '{}',
                        ip_address INET,
                        user_agent TEXT,
                        created_at TIMESTAMP DEFAULT NOW()
                    )
                    """,
                    "CREATE INDEX idx_audit_logs_user_action ON audit_logs(user_id, action)",
                    "CREATE INDEX idx_audit_logs_resource ON audit_logs(resource_type, resource_id)",
                    "CREATE INDEX idx_audit_logs_created_at ON audit_logs(created_at)"
                ]
            )
        ]

    async def migrate(self) -> MigrationResult:
        """Führt vollständige Migration durch."""

        result = MigrationResult()

        try:
            # Pre-Migration-Checks
            await self._pre_migration_checks()

            # Backup erstellen
            backup_path = await self._create_backup()
            result.backup_path = backup_path

            # Migration-Schritte ausführen
            for step in self.migration_steps:
                step_result = await self._execute_migration_step(step)
                result.add_step_result(step.name, step_result)

                if not step_result.success:
                    raise MigrationException(f"Migration-Schritt fehlgeschlagen: {step.name}")

            # Post-Migration-Validierung
            await self._post_migration_validation()

            result.success = True
            result.message = "Migration erfolgreich abgeschlossen"

        except Exception as e:
            result.success = False
            result.error = str(e)

            # Rollback bei Fehler
            await self._rollback_migration()

        return result

    async def _execute_migration_step(self, step: MigrationStep) -> StepResult:
        """Führt einzelnen Migration-Schritt aus."""

        step_result = StepResult(step_name=step.name)

        try:
            async with self.target_db.begin() as conn:
                for command in step.sql_commands:
                    await conn.execute(text(command))

            step_result.success = True
            step_result.message = f"Schritt {step.name} erfolgreich"

        except Exception as e:
            step_result.success = False
            step_result.error = str(e)

            # Rollback für diesen Schritt
            if step.rollback_commands:
                try:
                    async with self.target_db.begin() as conn:
                        for command in reversed(step.rollback_commands):
                            await conn.execute(text(command))
                except Exception as rollback_error:
                    step_result.rollback_error = str(rollback_error)

        return step_result

Daten-Transformation

# migration/data_transformer.py
class DataTransformer:
    """Transformiert Daten zwischen Versionen."""

    async def transform_agents_v1_to_v2(self, v1_agents: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Transformiert Agent-Daten von v1 zu v2."""

        v2_agents = []

        for v1_agent in v1_agents:
            v2_agent = {
                'id': v1_agent['id'],
                'name': v1_agent['name'],
                'type': v1_agent.get('agent_type', 'generic'),
                'status': v1_agent.get('status', 'inactive'),
                'created_at': v1_agent['created_at'],
                'updated_at': v1_agent.get('updated_at', v1_agent['created_at']),

                # Neue Felder in v2
                'capabilities': self._extract_capabilities(v1_agent),
                'metadata': self._extract_metadata(v1_agent),
                'version': '2.0.0'
            }

            # Konfiguration transformieren
            if 'config' in v1_agent:
                v2_agent['configuration'] = self._transform_agent_config(v1_agent['config'])

            v2_agents.append(v2_agent)

        return v2_agents

    def _extract_capabilities(self, v1_agent: Dict[str, Any]) -> List[str]:
        """Extrahiert Capabilities aus v1-Agent."""

        capabilities = []

        # Aus v1-Konfiguration ableiten
        config = v1_agent.get('config', {})

        if config.get('supports_text_processing'):
            capabilities.append('text_processing')

        if config.get('supports_image_generation'):
            capabilities.append('image_generation')

        if config.get('supports_data_analysis'):
            capabilities.append('data_analysis')

        # Fallback basierend auf Agent-Typ
        agent_type = v1_agent.get('agent_type', 'generic')
        if agent_type == 'text_processor':
            capabilities.extend(['text_processing', 'summarization'])
        elif agent_type == 'image_generator':
            capabilities.extend(['image_generation', 'image_editing'])

        return list(set(capabilities))  # Duplikate entfernen

    def _extract_metadata(self, v1_agent: Dict[str, Any]) -> Dict[str, Any]:
        """Extrahiert Metadata aus v1-Agent."""

        metadata = {
            'migrated_from': 'v1.x',
            'migration_date': datetime.utcnow().isoformat(),
            'original_type': v1_agent.get('agent_type')
        }

        # Zusätzliche Metadaten aus v1-Konfiguration
        config = v1_agent.get('config', {})
        if 'description' in config:
            metadata['description'] = config['description']

        if 'tags' in config:
            metadata['tags'] = config['tags']

        return metadata

    async def transform_tasks_v1_to_v2(self, v1_tasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Transformiert Task-Daten von v1 zu v2."""

        v2_tasks = []

        for v1_task in v1_tasks:
            v2_task = {
                'id': v1_task['id'],
                'user_id': v1_task['user_id'],
                'agent_id': v1_task['agent_id'],
                'task_type': v1_task.get('type', 'generic'),
                'parameters': v1_task.get('params', {}),
                'status': self._map_task_status(v1_task.get('status')),
                'created_at': v1_task['created_at'],
                'started_at': v1_task.get('started_at'),
                'completed_at': v1_task.get('completed_at'),

                # Neue Felder in v2
                'priority': self._determine_priority(v1_task),
                'timeout_seconds': v1_task.get('timeout', 300),
                'retry_count': v1_task.get('retries', 0),
                'metadata': {
                    'migrated_from': 'v1.x',
                    'original_status': v1_task.get('status')
                }
            }

            v2_tasks.append(v2_task)

        return v2_tasks

    def _map_task_status(self, v1_status: str) -> str:
        """Mappt v1-Task-Status zu v2-Status."""

        status_mapping = {
            'created': 'pending',
            'queued': 'pending',
            'running': 'running',
            'finished': 'completed',
            'failed': 'failed',
            'cancelled': 'cancelled'
        }

        return status_mapping.get(v1_status, 'pending')

    def _determine_priority(self, v1_task: Dict[str, Any]) -> str:
        """Bestimmt Priorität basierend auf v1-Task-Daten."""

        # Priorität aus v1-Parametern ableiten
        params = v1_task.get('params', {})

        if params.get('urgent'):
            return 'high'
        elif params.get('background'):
            return 'low'
        else:
            return 'normal'

⚙️ Konfigurations-Migration

Config-File-Migration

# migration/config_migrator.py
import yaml
import json
from pathlib import Path

class ConfigMigrator:
    """Migriert Konfigurationsdateien zwischen Versionen."""

    def __init__(self, source_config_path: str, target_config_path: str):
        self.source_path = Path(source_config_path)
        self.target_path = Path(target_config_path)

    async def migrate_configs(self) -> ConfigMigrationResult:
        """Migriert alle Konfigurationsdateien."""

        result = ConfigMigrationResult()

        try:
            # Hauptkonfiguration migrieren
            main_config = await self._migrate_main_config()
            result.add_config('main', main_config)

            # Database-Konfiguration migrieren
            db_config = await self._migrate_database_config()
            result.add_config('database', db_config)

            # Agent-Konfigurationen migrieren
            agent_configs = await self._migrate_agent_configs()
            result.add_config('agents', agent_configs)

            # Security-Konfiguration migrieren
            security_config = await self._migrate_security_config()
            result.add_config('security', security_config)

            result.success = True

        except Exception as e:
            result.success = False
            result.error = str(e)

        return result

    async def _migrate_main_config(self) -> Dict[str, Any]:
        """Migriert Hauptkonfiguration."""

        # v1-Konfiguration laden
        v1_config_file = self.source_path / 'config.yml'
        with open(v1_config_file, 'r') as f:
            v1_config = yaml.safe_load(f)

        # v2-Konfiguration erstellen
        v2_config = {
            'version': '2.0.0',
            'environment': v1_config.get('env', 'development'),

            # Server-Konfiguration
            'server': {
                'host': v1_config.get('host', '0.0.0.0'),
                'port': v1_config.get('port', 8000),
                'workers': v1_config.get('workers', 4),
                'reload': v1_config.get('debug', False)
            },

            # Logging-Konfiguration
            'logging': {
                'level': v1_config.get('log_level', 'INFO'),
                'format': 'json',  # Neu in v2
                'structured': True,  # Neu in v2
                'file_path': v1_config.get('log_file', '/var/log/keiko/app.log')
            },

            # Monitoring-Konfiguration (neu in v2)
            'monitoring': {
                'enabled': True,
                'metrics_port': 9090,
                'health_check_interval': 30
            }
        }

        # v2-Konfiguration speichern
        v2_config_file = self.target_path / 'config.yml'
        v2_config_file.parent.mkdir(parents=True, exist_ok=True)

        with open(v2_config_file, 'w') as f:
            yaml.dump(v2_config, f, default_flow_style=False)

        return v2_config

    async def _migrate_database_config(self) -> Dict[str, Any]:
        """Migriert Database-Konfiguration."""

        v1_db_file = self.source_path / 'database.yml'
        with open(v1_db_file, 'r') as f:
            v1_db_config = yaml.safe_load(f)

        v2_db_config = {
            'default': {
                'url': v1_db_config.get('database_url'),
                'pool_size': v1_db_config.get('pool_size', 20),
                'max_overflow': v1_db_config.get('max_overflow', 30),
                'pool_timeout': 30,  # Neu in v2
                'pool_recycle': 3600,  # Neu in v2
                'echo': False
            },

            # Read-Replicas (neu in v2)
            'read_replicas': v1_db_config.get('read_replicas', []),

            # Migration-Konfiguration
            'migrations': {
                'auto_upgrade': False,
                'backup_before_migration': True
            }
        }

        # v2-Database-Konfiguration speichern
        v2_db_file = self.target_path / 'database.yml'
        with open(v2_db_file, 'w') as f:
            yaml.dump(v2_db_config, f, default_flow_style=False)

        return v2_db_config

    async def _migrate_agent_configs(self) -> Dict[str, Any]:
        """Migriert Agent-Konfigurationen."""

        v1_agents_dir = self.source_path / 'agents'
        v2_agents_dir = self.target_path / 'agents'
        v2_agents_dir.mkdir(parents=True, exist_ok=True)

        migrated_configs = {}

        # Alle v1-Agent-Konfigurationen durchgehen
        for v1_config_file in v1_agents_dir.glob('*.yml'):
            with open(v1_config_file, 'r') as f:
                v1_agent_config = yaml.safe_load(f)

            # v2-Agent-Konfiguration erstellen
            v2_agent_config = {
                'name': v1_agent_config['name'],
                'type': v1_agent_config.get('type', 'generic'),
                'version': '2.0.0',

                # Capabilities (neu in v2)
                'capabilities': self._extract_agent_capabilities(v1_agent_config),

                # Konfiguration
                'configuration': {
                    'timeout_seconds': v1_agent_config.get('timeout', 300),
                    'max_concurrent_tasks': v1_agent_config.get('max_tasks', 1),
                    'retry_policy': {
                        'max_retries': v1_agent_config.get('max_retries', 3),
                        'retry_delay': v1_agent_config.get('retry_delay', 1.0)
                    }
                },

                # External-Konfiguration
                'external_config': v1_agent_config.get('external', {})
            }

            # v2-Konfiguration speichern
            v2_config_file = v2_agents_dir / v1_config_file.name
            with open(v2_config_file, 'w') as f:
                yaml.dump(v2_agent_config, f, default_flow_style=False)

            migrated_configs[v1_config_file.stem] = v2_agent_config

        return migrated_configs

🔄 Rollback-Strategien

Automatischer Rollback

# rollback.sh
#!/bin/bash

BACKUP_PATH="/backup/keiko-migration"
ROLLBACK_LOG="/var/log/keiko/rollback.log"

echo "$(date): Starting Keiko rollback process" >> $ROLLBACK_LOG

# 1. Services stoppen
echo "Stopping Keiko v2 services..." >> $ROLLBACK_LOG
systemctl stop keiko-v2
systemctl stop keiko-workers-v2

# 2. Database rollback
echo "Rolling back database..." >> $ROLLBACK_LOG
psql -U postgres -d keiko_v2 < $BACKUP_PATH/database_rollback.sql

# 3. Konfiguration wiederherstellen
echo "Restoring configuration..." >> $ROLLBACK_LOG
cp -r $BACKUP_PATH/config/* /opt/keiko-v1/config/

# 4. Daten wiederherstellen
echo "Restoring data..." >> $ROLLBACK_LOG
rsync -av $BACKUP_PATH/data/ /opt/keiko-v1/data/

# 5. v1 Services starten
echo "Starting Keiko v1 services..." >> $ROLLBACK_LOG
systemctl start keiko-v1
systemctl start keiko-workers-v1

# 6. Health check
sleep 30
if curl -f http://localhost:8000/health; then
    echo "$(date): Rollback completed successfully" >> $ROLLBACK_LOG
    exit 0
else
    echo "$(date): Rollback failed - manual intervention required" >> $ROLLBACK_LOG
    exit 1
fi

📋 Migration-Checkliste

Pre-Migration

  • Vollständiges Backup erstellt und verifiziert
  • Abhängigkeiten geprüft (Python 3.9+, PostgreSQL 13+, Redis 6+)
  • Disk-Space ausreichend (mindestens 2x aktuelle Datengröße)
  • Downtime-Fenster geplant und kommuniziert
  • Rollback-Plan getestet
  • Migration-Tools heruntergeladen und getestet

During Migration

  • Services gestoppt in korrekter Reihenfolge
  • Database-Migration erfolgreich
  • Daten-Transformation abgeschlossen
  • Konfigurations-Migration durchgeführt
  • Post-Migration-Tests bestanden

Post-Migration

  • Funktionalitäts-Tests erfolgreich
  • Performance-Tests durchgeführt
  • User-Acceptance-Tests bestanden
  • Monitoring aktiviert und konfiguriert
  • Backup-Strategie für v2 implementiert
  • Dokumentation aktualisiert

Wichtige Hinweise

  • Führen Sie immer einen vollständigen Backup vor der Migration durch
  • Testen Sie die Migration in einer Staging-Umgebung
  • Planen Sie ausreichend Zeit für unvorhergesehene Probleme ein
  • Halten Sie den Rollback-Plan bereit

Support

Bei Problemen während der Migration wenden Sie sich an das Support-Team oder erstellen Sie ein Issue im GitHub-Repository.