Zum Inhalt

UnifiedKeiAgentClient

Unified KEI-Agent client with complete protocol integration.

Proviof the unified API for all KEI protocols (RPC, Stream, Bus, MCP) with automatic protocol selection, enterprise security, and monitoring.

Attributes:

NameTypeDescription
config

Agent client configuration

protocol_config

Protocol-specific configuration

security_config

Security configuration

security

Security manager for authentication

protocol_selector

Intelligent protocol selection

tracingOptional[TracingManager]

Distributed tracing manager

retry_managerOptional[retryManager]

retry mechanisms and circuit breaker

capability_managerOptional[CapabilityManager]

Capability advertisement and management

service_discoveryOptional[ServiceDiscovery]

Service discovery for agent registration

Parameters:

NameTypeDescriptionDefault
configAgentClientConfig

Basic configuration for agent client

required
protocol_configOptional[ProtocolConfig]

Protocol-specific configuration

None
security_configOptional[SecurityConfig]

Security configuration

None

Raises:

TypeDescription
KeiSDKError

On invalid configuration

Source code in kei_agent/unified_client.py
def __init__(
    self,
    config: AgentClientConfig,
    protocol_config: Optional[ProtocolConfig] = None,
    security_config: Optional[SecurityConfig] = None,
) -> None:
    """Initialize Unified KEI-Agent client.

    Args:
        config: Basic configuration for agent client
        protocol_config: Protocol-specific configuration
        security_config: Security configuration

    Raises:
        KeiSDKError: On invalid configuration
    """
    self.config = config
    self.protocol_config = protocol_config or ProtocolConfig()
    self.security_config = security_config or SecurityConfig(
        auth_type=self.protocol_config.auth_type
        if hasattr(self.protocol_config, "auth_type")
        else "bearer",
        api_token=config.api_token,
    )

    # Initialize core components
    self.security = SecurityManager(self.security_config)
    self.protocol_selector = ProtocolSelector(self.protocol_config)

    # Enterprise features
    self.tracing: Optional[TracingManager] = None
    # Compatibility and retry manager
    # Default retryManager for generic use (compatibility: client.retry)
    self.retry: retryManager = retryManager(self.config.retry)
    # Protocol-specific retryManager (e.g. "rpc", "stream", "bus", "mcp")
    self._retry_managers: Dict[str, retryManager] = {"default": self.retry}
    if getattr(self.config, "protocol_retry_policies", None):
        for proto_key, retry_cfg in self.config.protocol_retry_policies.items():
            # Only accept valid keys
            if proto_key in {"rpc", "stream", "bus", "mcp"}:
                self._retry_managers[proto_key] = retryManager(retry_cfg)
    # Retained field for legacy usage
    self.retry_manager: Optional[retryManager] = self.retry
    self.capability_manager: Optional[CapabilityManager] = None
    self.service_discovery: Optional[ServiceDiscovery] = None

    # Protocol clients
    self._rpc_client: Optional[KEIRPCclient] = None
    self._stream_client: Optional[KEIStreamclient] = None
    self._bus_client: Optional[KEIBusclient] = None
    self._mcp_client: Optional[KEIMCPclient] = None

    # Legacy client for compatibility
    self._legacy_client: Optional[KeiAgentClient] = None

    # Metrics collection
    self.metrics_collector = get_metrics_collector()
    self._start_time = time.time()

    # status tracking
    self._initialized = False
    self._closed = False

    _logger.info(
        "Unified KEI-Agent client created",
        extra={
            "agent_id": config.agent_id,
            "base_url": config.base_url,
            "enabled_protocols": self.protocol_config.get_enabled_protocols(),
        },
    )

Functions

initialize()async

Initialize client and all components.

Starts security manager, initializes protocol clients and enterprise features like tracing and retry mechanisms.

Raises:

TypeDescription
KeiSDKError

On initialization errors

Source code in kei_agent/unified_client.py
async def initialize(self) -> None:
    """Initialize client and all components.

    Starts security manager, initializes protocol clients and
    enterprise features like tracing and retry mechanisms.

    Raises:
        KeiSDKError: On initialization errors
    """
    if self._initialized:
        _logger.warning("Client already initialized")
        return

    try:
        _logger.info("Initializing Unified KEI-Agent Client")

        # Start security manager
        await self.security.start_token_refresh()

        # Initialize legacy client for compatibility (lazy-init via _make_request)
        self._legacy_client = KeiAgentClient(self.config)
        # Call initialize() if available (mocked for tests)
        if hasattr(self._legacy_client, "initialize"):
            try:
                await self._legacy_client.initialize()  # type: ignore[attr-defined]
            except (AttributeError, ConnectionError, TimeoutError) as e:
                # Log specific initialization errors
                _logger.warning(
                    "Error during Initializingn of the legacy client",
                    extra={
                        "error": str(e),
                        "error_type": type(e).__name__,
                        "agent_id": self.config.agent_id,
                    },
                )
            except Exception as e:
                # Log unexpected error (keep broad catch for backward compatibility)
                _logger.error(
                    "Unexpected error onm Initializingn of the legacy client",
                    extra={
                        "error": str(e),
                        "error_type": type(e).__name__,
                        "agent_id": self.config.agent_id,
                    },
                )
                # Re-raise to surface init failures appropriately
                raise

        # Initializing protocol clients
        await self._initialize_protocol_clients()

        # Initializing enterprise features
        await self._initialize_enterprise_features()

        self._initialized = True
        _logger.info("Unified KEI-Agent client successfully initialized")

    except (KeiSDKError, ProtocolError) as e:
        # Already meaningful SDK exceptions
        _logger.error(
            "client initialization failed",
            extra={"error": str(e), "type": type(e).__name__},
        )
        raise
    except Exception as e:
        _logger.error(
            "client initialization failed",
            extra={"error": str(e), "type": type(e).__name__},
        )
        raise KeiSDKError(f"initialization failed: {e}") from e

close()async

Closes client and all connectionen.

Stops all backgroand tasks, closes protocol clients and cleats up resources.

Source code in kei_agent/unified_client.py
async def close(self) -> None:
    """Closes client and all connectionen.

    Stops all backgroand tasks, closes protocol clients and
    cleats up resources.
    """
    if self._closed:
        return

    try:
        _logger.info("Closing Unified KEI-Agent client")

        # Stopping security manager
        await self.security.stop_token_refresh()

        # Closing stream client
        if self._stream_client:
            await self._stream_client.disconnect()

        # Closing legacy client
        if self._legacy_client:
            await self._legacy_client.close()

        # Closing tracing manager
        if self.tracing:
            await self.tracing.shutdown()

        self._closed = True
        _logger.info("Unified KEI-Agent client closed")

    except (ConnectionError, TimeoutError) as e:
        _logger.error(
            "Error during client shutdown",
            extra={"error": str(e), "type": type(e).__name__},
        )
        raise
    except Exception as e:
        _logger.error(
            "Unexpected error during client shutdown",
            extra={"error": str(e), "type": type(e).__name__},
        )
        raise

__aenter__()async

async context manager entry.

Source code in kei_agent/unified_client.py
async def __aenter__(self):
    """async context manager entry."""
    await self.initialize()

    # Record connection metrics
    enabled_protocols = self.protocol_config.get_enabled_protocols()
    for protocol in enabled_protocols:
        record_connection_metric(self.config.agent_id, protocol, "connected")

    # Set agent info
    self.metrics_collector.set_agent_info(
        agent_id=self.config.agent_id,
        version=getattr(self.config, "version", "1.0.0"),
        protocol_version="1.0",
        enabled_protocols=",".join(enabled_protocols),
    )

    return self

__aexit__(exc_type, exc_val, exc_tb)async

async context manager exit.

Source code in kei_agent/unified_client.py
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """async context manager exit."""
    # Record disconnection metrics
    enabled_protocols = self.protocol_config.get_enabled_protocols()
    for protocol in enabled_protocols:
        record_connection_metric(self.config.agent_id, protocol, "disconnected")

    # Update uptime
    uptime = time.time() - self._start_time
    self.metrics_collector.update_uptime(self.config.agent_id, uptime)

    await self.close()

is_protocol_available(protocol)

Checks if protocol is available.

Parameters:

NameTypeDescriptionDefault
protocolProtocoltypee

protocol to check

required

Returns:

TypeDescription
bool

True if protocol is available

Source code in kei_agent/unified_client.py
def is_protocol_available(self, protocol: Protocoltypee) -> bool:
    """Checks if protocol is available.

    Args:
        protocol: protocol to check

    Returns:
        True if protocol is available
    """
    if not self._initialized:
        return False

    protocol_clients = {
        Protocoltypee.RPC: self._rpc_client,
        Protocoltypee.STREAM: self._stream_client,
        Protocoltypee.BUS: self._bus_client,
        Protocoltypee.MCP: self._mcp_client,
    }

    return protocol_clients.get(protocol) is not None

get_available_protocols()

Gibt lis of available protocols torück.

Returns:

TypeDescription
List[Protocoltypee]

lis of available protocols

Source code in kei_agent/unified_client.py
def get_available_protocols(self) -> List[Protocoltypee]:
    """Gibt lis of available protocols torück.

    Returns:
        lis of available protocols
    """
    available = []
    for protocol in [
        Protocoltypee.RPC,
        Protocoltypee.STREAM,
        Protocoltypee.BUS,
        Protocoltypee.MCP,
    ]:
        if self.is_protocol_available(protocol):
            available.append(protocol)
    return available

get_client_info()

Gibt client information torück.

Returns:

TypeDescription
Dict[str, Any]

dictionary with client status and configuration

Source code in kei_agent/unified_client.py
def get_client_info(self) -> Dict[str, Any]:
    """Gibt client information torück.

    Returns:
        dictionary with client status and configuration
    """
    return {
        "agent_id": self.config.agent_id,
        "base_url": self.config.base_url,
        "initialized": self._initialized,
        "closed": self._closed,
        "available_protocols": self.get_available_protocols(),
        "security_context": self.security.get_security_context(),
        "features": {
            "tracing": self.tracing is not None,
            "retry_manager": self.retry_manager is not None,
            "capability_manager": self.capability_manager is not None,
            "service_discovery": self.service_discovery is not None,
        },
    }

execute_agent_operation(operation, data, protocol=None)async

Executes agent operation with automatic protocol selection out.

Parameters:

NameTypeDescriptionDefault
operationstr

operation name

required
dataDict[str, Any]

operation data

required
protocolOptional[Protocoltypee]

preferred protocol (Optional)

None

Returns:

TypeDescription
Dict[str, Any]

operation response

Raises:

TypeDescription
KeiSDKError

On execution errors

Source code in kei_agent/unified_client.py
async def execute_agent_operation(
    self,
    operation: str,
    data: Dict[str, Any],
    protocol: Optional[Protocoltypee] = None,
) -> Dict[str, Any]:
    """Executes agent operation with automatic protocol selection out.

    Args:
        operation: operation name
        data: operation data
        protocol: preferred protocol (Optional)

    Returns:
        operation response

    Raises:
        KeiSDKError: On execution errors
    """
    if not self._initialized:
        raise KeiSDKError("Client not initialized")

    # select protocol
    selected_protocol = protocol or self._select_optimal_protocol(operation, data)

    # create trace context
    correlation_id = create_correlation_id()
    start_time = time.time()

    try:
        # execute operation with with tracing
        if self.tracing:
            # For Tests, the start_spat expect: if beforehatthe, use it
            start_spat = getattr(self.tracing, "start_spat", None)
            cm = (
                start_spat(f"agent_operation_{operation}")
                if callable(start_spat)
                else self.tracing.trace_operation(
                    f"agent_operation_{operation}", agent_id=self.config.agent_id
                )
            )
            with cm as spat:
                spat.set_attribute("operation", operation)
                spat.set_attribute("protocol", selected_protocol)
                spat.set_attribute("correlation_id", correlation_id)

                result = await self._execute_with_protocol(
                    selected_protocol, operation, data
                )

                # Record successful request metrics
                duration = time.time() - start_time
                record_request_metric(
                    self.config.agent_id,
                    operation,
                    "success",
                    duration,
                    selected_protocol,
                )

                return result
        else:
            result = await self._execute_with_protocol(
                selected_protocol, operation, data
            )

            # Record successful request metrics
            duration = time.time() - start_time
            record_request_metric(
                self.config.agent_id,
                operation,
                "success",
                duration,
                selected_protocol,
            )

            return result

    except (ProtocolError, KeiSDKError, ConnectionError, TimeoutError) as e:
        # Record failed request metrics
        duration = time.time() - start_time
        record_request_metric(
            self.config.agent_id, operation, "error", duration, selected_protocol
        )

        # Record error for aggregation and alerting
        error_category = self._categorize_error(e)
        error_severity = self._determine_error_severity(e)
        record_error(
            agent_id=self.config.agent_id,
            error=e,
            category=error_category,
            severity=error_severity,
            context={
                "operation": operation,
                "protocol": selected_protocol,
                "duration": duration,
            },
            correlation_id=correlation_id,
        )

        _logger.error(
            f"operation '{operation}' failed",
            extra={
                "operation": operation,
                "protocol": selected_protocol,
                "correlation_id": correlation_id,
                "error": str(e),
            },
        )
        raise

plat_task(objective, context=None, protocol=None)async

creates plat for given objective.

Parameters:

NameTypeDescriptionDefault
objectivestr

objective description for platning

required
contextOptional[Dict[str, Any]]

additional context for platning

None
protocolOptional[Protocoltypee]

preferred protocol (Optional)

None

Returns:

TypeDescription
Dict[str, Any]

plat response with steps and metadata

Raises:

TypeDescription
KeiSDKError

On plat creation errors

Source code in kei_agent/unified_client.py
async def plat_task(
    self,
    objective: str,
    context: Optional[Dict[str, Any]] = None,
    protocol: Optional[Protocoltypee] = None,
) -> Dict[str, Any]:
    """creates plat for given objective.

    Args:
        objective: objective description for platning
        context: additional context for platning
        protocol: preferred protocol (Optional)

    Returns:
        plat response with steps and metadata

    Raises:
        KeiSDKError: On plat creation errors
    """
    payload = {"objective": objective, "context": context or {}}
    # Immer 3 Argumente übergeben, Tests erwarten ggf. explizites None
    return await self.execute_agent_operation("plat", payload, protocol)

execute_action(action, parameters=None, protocol=None)async

executes action.

Parameters:

NameTypeDescriptionDefault
actionstr

action to execute

required
parametersOptional[Dict[str, Any]]

parameters for action

None
protocolOptional[Protocoltypee]

preferred protocol (Optional)

None

Returns:

TypeDescription
Dict[str, Any]

action response with result

Source code in kei_agent/unified_client.py
async def execute_action(
    self,
    action: str,
    parameters: Optional[Dict[str, Any]] = None,
    protocol: Optional[Protocoltypee] = None,
) -> Dict[str, Any]:
    """executes action.

    Args:
        action: action to execute
        parameters: parameters for action
        protocol: preferred protocol (Optional)

    Returns:
        action response with result
    """
    payload = {"action": action, "parameters": parameters or {}}
    # Immer 3 Argumente übergeben, Tests erwarten ggf. explizites None
    return await self.execute_agent_operation("act", payload, protocol)

observe_environment(observation_type, data=None, protocol=None)async

performs environment observation.

Parameters:

NameTypeDescriptionDefault
observation_typestr

observation type

required
dataOptional[Dict[str, Any]]

observation data

None
protocolOptional[Protocoltypee]

preferred protocol (Optional)

None

Returns:

TypeDescription
Dict[str, Any]

observe response with processed observations

Source code in kei_agent/unified_client.py
async def observe_environment(
    self,
    observation_type: str,
    data: Optional[Dict[str, Any]] = None,
    protocol: Optional[Protocoltypee] = None,
) -> Dict[str, Any]:
    """performs environment observation.

    Args:
        observation_type: observation type
        data: observation data
        protocol: preferred protocol (Optional)

    Returns:
        observe response with processed observations
    """
    # Tests erwarten Schlüssel 'sensors' anstatt 'type'
    payload = {"sensors": observation_type}
    # Immer 3 Argumente übergeben, Tests erwarten ggf. explizites None
    return await self.execute_agent_operation("observe", payload, protocol)

explain_reasoning(query, context=None, protocol=None)async

explains reasoning for given query.

Parameters:

NameTypeDescriptionDefault
querystr

explatation query

required
contextOptional[Dict[str, Any]]

context for explatation

None
protocolOptional[Protocoltypee]

preferred protocol (Optional)

None

Returns:

TypeDescription
Dict[str, Any]

explain response with explatation and reasoning

Source code in kei_agent/unified_client.py
async def explain_reasoning(
    self,
    query: str,
    context: Optional[Dict[str, Any]] = None,
    protocol: Optional[Protocoltypee] = None,
) -> Dict[str, Any]:
    """explains reasoning for given query.

    Args:
        query: explatation query
        context: context for explatation
        protocol: preferred protocol (Optional)

    Returns:
        explain response with explatation and reasoning
    """
    # Tests erwarten Schlüssel 'decision_id' und 'detail_level'
    payload = {"decision_id": query, "detail_level": context or ""}
    # Immer 3 Argumente übergeben, Tests erwarten ggf. explizites None
    return await self.execute_agent_operation("explain", payload, protocol)

send_agent_message(target_agent, message_type, payload)async

sends message at other agent (A2A communication).

Parameters:

NameTypeDescriptionDefault
target_agentstr

target agent ID

required
message_typestr

message type

required
payloadDict[str, Any]

message data

required

Returns:

TypeDescription
Dict[str, Any]

message response with status

Source code in kei_agent/unified_client.py
async def send_agent_message(
    self, target_agent: str, message_type: str, payload: Dict[str, Any]
) -> Dict[str, Any]:
    """sends message at other agent (A2A communication).

    Args:
        target_agent: target agent ID
        message_type: message type
        payload: message data

    Returns:
        message response with status
    """
    return await self.execute_agent_operation(
        "send_message",
        {"target": target_agent, "type": message_type, "payload": payload},
        protocol=Protocoltypee.BUS,  # bus protocol for A2A communication
    )

discover_available_tools(category=None)async

discovers available MCP tools.

Parameters:

NameTypeDescriptionDefault
categoryOptional[str]

Optional tool category for filtering

None

Returns:

TypeDescription
List[Dict[str, Any]]

lis of available tools with metadata

Source code in kei_agent/unified_client.py
async def discover_available_tools(
    self, category: Optional[str] = None
) -> List[Dict[str, Any]]:
    """discovers available MCP tools.

    Args:
        category: Optional tool category for filtering

    Returns:
        lis of available tools with metadata
    """
    result = await self.execute_agent_operation(
        "discover_tools",
        {"category": category},
        protocol=Protocoltypee.MCP,  # MCP protocol for tool discovery
    )

    # extract tools out response
    return result.get("tools", [])

use_tool(tool_name, **parameters)async

executes MCP tool.

Parameters:

NameTypeDescriptionDefault
tool_namestr

Name of the tool to execute

required
**parametersAny

tool parameters als keyword argaroatthets

{}

Returns:

TypeDescription
Dict[str, Any]

tool execution response

Source code in kei_agent/unified_client.py
async def use_tool(self, tool_name: str, **parameters: Any) -> Dict[str, Any]:
    """executes MCP tool.

    Args:
        tool_name: Name of the tool to execute
        **parameters: tool parameters als keyword argaroatthets

    Returns:
        tool execution response
    """
    return await self.execute_agent_operation(
        "use_tool",
        {"tool_name": tool_name, "parameters": parameters},
        protocol=Protocoltypee.MCP,  # MCP protocol for tool execution
    )

start_streaming_session(callback=None)async

starts streaming session for real-time communication.

Parameters:

NameTypeDescriptionDefault
callbackOptional[Callable[[Dict[str, Any]], Awaitable[None]]]

callback for incoming stream messages

None

Raises:

TypeDescription
ProtocolError

If stream Protocol not available is

Source code in kei_agent/unified_client.py
async def start_streaming_session(
    self, callback: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None
) -> None:
    """starts streaming session for real-time communication.

    Args:
        callback: callback for incoming stream messages

    Raises:
        ProtocolError: If stream Protocol not available is
    """
    if not self.is_protocol_available(Protocoltypee.STREAM):
        raise ProtocolError("stream Protocol not available")

    if self._stream_client:
        await self._stream_client.connect()
        if callback:
            await self._stream_client.subscribe("agent_events", callback)

health_check()async

performs health check.

Returns:

TypeDescription
Dict[str, Any]

health status with protocol availability

Source code in kei_agent/unified_client.py
async def health_check(self) -> Dict[str, Any]:
    """performs health check.

    Returns:
        health status with protocol availability
    """
    return await self.execute_agent_operation("health_check", {})

register_agent(name, version, description='', capabilities=None)async

Regisers agent in KEI framework.

Parameters:

NameTypeDescriptionDefault
namestr

Agent name

required
versionstr

Agent version

required
descriptionstr

Agent description

''
capabilitiesOptional[List[str]]

Agent capabilities

None

Returns:

TypeDescription
Dict[str, Any]

Regisration response

Source code in kei_agent/unified_client.py
async def register_agent(
    self,
    name: str,
    version: str,
    description: str = "",
    capabilities: Optional[List[str]] = None,
) -> Dict[str, Any]:
    """Regisers agent in KEI framework.

    Args:
        name: Agent name
        version: Agent version
        description: Agent description
        capabilities: Agent capabilities

    Returns:
        Regisration response
    """
    return await self.execute_agent_operation(
        "register",
        {
            "name": name,
            "version": version,
            "description": description,
            "capabilities": capabilities or [],
        },
    )

Die UnifiedKeiAgentClient Klasse ist die Haupt-API-Schnittstelle des KEI-Agent SDK. Sie bietet eine einheitliche, typisierte API für alle Agent-Operationen mit automatischer Protokoll-Auswahl und Enterprise-Features.

🚀 Übersicht

Der UnifiedKeiAgentClient abstrahiert die Komplexität der Multi-Protocol-Architektur und bietet eine einfache, aber mächtige API für:

  • Agent-Operationen: Plan, Act, Observe, Explain
  • Multi-Protocol Support: Automatische Auswahl zwischen RPC, Stream, Bus und MCP
  • Enterprise Features: Logging, Health Checks, Security
  • Resilience: Automatische Fallback-Mechanismen und Retry-Logik

📋 Konstruktor

def __init__(
    self,
    config: AgentClientConfig,
    protocol_config: Optional[ProtocolConfig] = None,
    security_config: Optional[SecurityConfig] = None
) -> None

Parameter

ParameterTypStandardBeschreibung
configAgentClientConfigErforderlichBasis-Client-Konfiguration
protocol_configProtocolConfigNoneProtokoll-spezifische Konfiguration
security_configSecurityConfigNoneSicherheitskonfiguration

Beispiel

from kei_agent import (
    UnifiedKeiAgentClient,
    AgentClientConfig,
    ProtocolConfig,
    SecurityConfig,
    AuthType
)

# Basis-Konfiguration
config = AgentClientConfig(
    base_url="https://api.kei-framework.com",
    api_token="your-api-token",
    agent_id="my-agent"
)

# Erweiterte Konfiguration
protocol_config = ProtocolConfig(
    auto_protocol_selection=True,
    protocol_fallback_enabled=True
)

security_config = SecurityConfig(
    auth_type=AuthType.BEARER,
    rbac_enabled=True,
    audit_enabled=True
)

# Client erstellen
client = UnifiedKeiAgentClient(
    config=config,
    protocol_config=protocol_config,
    security_config=security_config
)

🔄 Lifecycle-Management

Async Context Manager (Empfohlen)

async with UnifiedKeiAgentClient(config=config) as client:
    # Client ist automatisch initialisiert
    result = await client.plan_task("Create report")
    # Automatisches Cleanup beim Verlassen

Manuelle Verwaltung

client = UnifiedKeiAgentClient(config=config)
try:
    await client.initialize()
    result = await client.plan_task("Create report")
finally:
    await client.close()

🎯 High-Level API-Methoden

Agent-Operationen

plan_task()

async def plan_task(
    self,
    objective: str,
    context: Optional[Dict[str, Any]] = None,
    protocol: Optional[ProtocolType] = None
) -> Dict[str, Any]

Erstellt einen Plan für ein gegebenes Ziel.

Parameter:

  • objective: Ziel-Beschreibung für die Planung
  • context: Zusätzlicher Kontext für die Planung
  • protocol: Bevorzugtes Protokoll (optional)

Beispiel:

plan = await client.plan_task(
    objective="Erstelle einen Quartalsbericht",
    context={
        "format": "pdf",
        "quarter": "Q4-2024",
        "sections": ["summary", "financials", "outlook"]
    }
)
print(f"Plan ID: {plan['plan_id']}")

execute_action()

async def execute_action(
    self,
    action: str,
    parameters: Optional[Dict[str, Any]] = None,
    protocol: Optional[ProtocolType] = None
) -> Dict[str, Any]

Führt eine spezifische Aktion aus.

Parameter:

  • action: Name der auszuführenden Aktion
  • parameters: Parameter für die Aktion
  • protocol: Bevorzugtes Protokoll (optional)

Beispiel:

result = await client.execute_action(
    action="generate_report",
    parameters={
        "template": "quarterly_template",
        "data_source": "financial_db",
        "output_format": "pdf"
    }
)
print(f"Action ID: {result['action_id']}")

observe_environment()

async def observe_environment(
    self,
    observation_type: str,
    data: Optional[Dict[str, Any]] = None,
    protocol: Optional[ProtocolType] = None
) -> Dict[str, Any]

Führt Umgebungsbeobachtung durch.

Parameter:

  • observation_type: Typ der Beobachtung
  • data: Beobachtungsdaten
  • protocol: Bevorzugtes Protokoll (optional)

Beispiel:

observation = await client.observe_environment(
    observation_type="system_metrics",
    data={
        "interval": 60,
        "metrics": ["cpu", "memory", "disk"]
    }
)
print(f"Observation ID: {observation['observation_id']}")

explain_reasoning()

async def explain_reasoning(
    self,
    query: str,
    context: Optional[Dict[str, Any]] = None,
    protocol: Optional[ProtocolType] = None
) -> Dict[str, Any]

Erklärt das Reasoning für eine gegebene Anfrage.

Parameter:

  • query: Erklärungsanfrage
  • context: Kontext für die Erklärung
  • protocol: Bevorzugtes Protokoll (optional)

Beispiel:

explanation = await client.explain_reasoning(
    query="Warum wurde diese Vorlage gewählt?",
    context={"action_id": "action-123"}
)
print(f"Erklärung: {explanation['explanation']}")

Kommunikations-Methoden

send_agent_message()

async def send_agent_message(
    self,
    target_agent: str,
    message_type: str,
    payload: Dict[str, Any]
) -> Dict[str, Any]

Sendet Nachricht an anderen Agent (A2A-Kommunikation).

Beispiel:

response = await client.send_agent_message(
    target_agent="data-processor-agent",
    message_type="task_request",
    payload={
        "task": "analyze_sales_data",
        "dataset": "q4_2024_sales",
        "priority": "high"
    }
)
print(f"Message ID: {response['message_id']}")

start_streaming_session()

async def start_streaming_session(
    self,
    callback: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None
) -> None

Startet Streaming-Session für Echtzeit-Kommunikation.

Beispiel:

async def message_handler(message: Dict[str, Any]):
    print(f"Received: {message}")

await client.start_streaming_session(callback=message_handler)

Tool-Integration

discover_available_tools()

async def discover_available_tools(
    self,
    category: Optional[str] = None
) -> List[Dict[str, Any]]

Entdeckt verfügbare MCP-Tools.

Beispiel:

tools = await client.discover_available_tools("math")
for tool in tools:
    print(f"Tool: {tool['name']} - {tool['description']}")

use_tool()

async def use_tool(
    self,
    tool_name: str,
    **parameters: Any
) -> Dict[str, Any]

Führt MCP-Tool aus.

Beispiel:

result = await client.use_tool(
    "calculator",
    expression="(100 * 1.08) - 50"
)
print(f"Ergebnis: {result['result']}")

🔧 System-Methoden

health_check()

async def health_check(self) -> Dict[str, Any]

Führt System-Health-Check durch.

Beispiel:

health = await client.health_check()
print(f"Status: {health['status']}")
print(f"Uptime: {health.get('uptime', 'unknown')}")

register_agent()

async def register_agent(
    self,
    name: str,
    version: str,
    description: str = "",
    capabilities: Optional[List[str]] = None
) -> Dict[str, Any]

Registriert Agent im KEI-Framework.

Beispiel:

registration = await client.register_agent(
    name="Report Generator",
    version="1.0.0",
    description="Automated report generation agent",
    capabilities=["pdf_generation", "data_analysis", "chart_creation"]
)
print(f"Registered: {registration['agent_id']}")

🔍 Informations-Methoden

get_client_info()

def get_client_info(self) -> Dict[str, Any]

Gibt Client-Informationen zurück.

Beispiel:

info = client.get_client_info()
print(f"Agent ID: {info['agent_id']}")
print(f"Initialized: {info['initialized']}")
print(f"Available Protocols: {info['available_protocols']}")
print(f"Features: {info['features']}")

get_available_protocols()

def get_available_protocols(self) -> List[ProtocolType]

Gibt Liste verfügbarer Protokolle zurück.

Beispiel:

protocols = client.get_available_protocols()
print(f"Verfügbare Protokolle: {protocols}")

is_protocol_available()

def is_protocol_available(self, protocol: ProtocolType) -> bool

Prüft ob spezifisches Protokoll verfügbar ist.

Beispiel:

if client.is_protocol_available(ProtocolType.STREAM):
    print("Stream-Protokoll verfügbar")

⚡ Low-Level API

execute_agent_operation()

async def execute_agent_operation(
    self,
    operation: str,
    data: Dict[str, Any],
    protocol: Optional[ProtocolType] = None
) -> Dict[str, Any]

Führt Agent-Operation mit automatischer Protokoll-Auswahl aus.

Beispiel:

# Automatische Protokoll-Auswahl
result = await client.execute_agent_operation(
    "custom_operation",
    {"param1": "value1", "param2": "value2"}
)

# Explizite Protokoll-Auswahl
result = await client.execute_agent_operation(
    "stream_operation",
    {"data": "real-time"},
    protocol=ProtocolType.STREAM
)

🚨 Exception Handling

Der Client kann verschiedene Exceptions werfen:

from kei_agent.exceptions import KeiSDKError, ProtocolError, SecurityError

try:
    async with UnifiedKeiAgentClient(config=config) as client:
        result = await client.plan_task("objective")

except SecurityError as e:
    # Authentifizierungs-/Autorisierungsfehler
    print(f"Security error: {e}")

except ProtocolError as e:
    # Protokoll-spezifische Fehler
    print(f"Protocol error: {e}")

except KeiSDKError as e:
    # Allgemeine SDK-Fehler
    print(f"SDK error: {e}")

except Exception as e:
    # Unerwartete Fehler
    print(f"Unexpected error: {e}")

🎯 Best Practices

1. Verwenden Sie Async Context Manager

# ✅ Empfohlen
async with UnifiedKeiAgentClient(config=config) as client:
    result = await client.plan_task("objective")

# ❌ Vermeiden
client = UnifiedKeiAgentClient(config=config)
await client.initialize()
# ... vergessen close() aufzurufen

2. Nutzen Sie High-Level APIs

# ✅ Empfohlen - High-Level API
plan = await client.plan_task("Create report")

# ❌ Vermeiden - Low-Level API ohne Grund
plan = await client.execute_agent_operation("plan", {"objective": "Create report"})

3. Konfigurieren Sie Enterprise Features

# ✅ Production-ready Konfiguration
protocol_config = ProtocolConfig(
    auto_protocol_selection=True,
    protocol_fallback_enabled=True
)

security_config = SecurityConfig(
    rbac_enabled=True,
    audit_enabled=True
)

4. Implementieren Sie Error Handling

# ✅ Robuste Fehlerbehandlung
try:
    result = await client.plan_task("objective")
except ProtocolError as e:
    # Fallback-Strategie
    result = await client.execute_agent_operation(
        "plan",
        {"objective": "objective"},
        protocol=ProtocolType.RPC
    )

Siehe auch: