UnifiedKeiAgentClient¶
Unified KEI-Agent client with complete protocol integration.
Proviof the unified API for all KEI protocols (RPC, Stream, Bus, MCP) with automatic protocol selection, enterprise security, and monitoring.
Attributes:
Name | Type | Description |
---|---|---|
config | Agent client configuration | |
protocol_config | Protocol-specific configuration | |
security_config | Security configuration | |
security | Security manager for authentication | |
protocol_selector | Intelligent protocol selection | |
tracing | Optional[TracingManager] | Distributed tracing manager |
retry_manager | Optional[retryManager] | retry mechanisms and circuit breaker |
capability_manager | Optional[CapabilityManager] | Capability advertisement and management |
service_discovery | Optional[ServiceDiscovery] | Service discovery for agent registration |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config | AgentClientConfig | Basic configuration for agent client | required |
protocol_config | Optional[ProtocolConfig] | Protocol-specific configuration | None |
security_config | Optional[SecurityConfig] | Security configuration | None |
Raises:
Type | Description |
---|---|
KeiSDKError | On invalid configuration |
Source code in kei_agent/unified_client.py
Functions¶
initialize()
async
¶
Initialize client and all components.
Starts security manager, initializes protocol clients and enterprise features like tracing and retry mechanisms.
Raises:
Type | Description |
---|---|
KeiSDKError | On initialization errors |
Source code in kei_agent/unified_client.py
close()
async
¶
Closes client and all connectionen.
Stops all backgroand tasks, closes protocol clients and cleats up resources.
Source code in kei_agent/unified_client.py
__aenter__()
async
¶
async context manager entry.
Source code in kei_agent/unified_client.py
__aexit__(exc_type, exc_val, exc_tb)
async
¶
async context manager exit.
Source code in kei_agent/unified_client.py
is_protocol_available(protocol)
¶
Checks if protocol is available.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
protocol | Protocoltypee | protocol to check | required |
Returns:
Type | Description |
---|---|
bool | True if protocol is available |
Source code in kei_agent/unified_client.py
get_available_protocols()
¶
Gibt lis of available protocols torück.
Returns:
Type | Description |
---|---|
List[Protocoltypee] | lis of available protocols |
Source code in kei_agent/unified_client.py
get_client_info()
¶
Gibt client information torück.
Returns:
Type | Description |
---|---|
Dict[str, Any] | dictionary with client status and configuration |
Source code in kei_agent/unified_client.py
execute_agent_operation(operation, data, protocol=None)
async
¶
Executes agent operation with automatic protocol selection out.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
operation | str | operation name | required |
data | Dict[str, Any] | operation data | required |
protocol | Optional[Protocoltypee] | preferred protocol (Optional) | None |
Returns:
Type | Description |
---|---|
Dict[str, Any] | operation response |
Raises:
Type | Description |
---|---|
KeiSDKError | On execution errors |
Source code in kei_agent/unified_client.py
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 |
|
plat_task(objective, context=None, protocol=None)
async
¶
creates plat for given objective.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
objective | str | objective description for platning | required |
context | Optional[Dict[str, Any]] | additional context for platning | None |
protocol | Optional[Protocoltypee] | preferred protocol (Optional) | None |
Returns:
Type | Description |
---|---|
Dict[str, Any] | plat response with steps and metadata |
Raises:
Type | Description |
---|---|
KeiSDKError | On plat creation errors |
Source code in kei_agent/unified_client.py
execute_action(action, parameters=None, protocol=None)
async
¶
executes action.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action | str | action to execute | required |
parameters | Optional[Dict[str, Any]] | parameters for action | None |
protocol | Optional[Protocoltypee] | preferred protocol (Optional) | None |
Returns:
Type | Description |
---|---|
Dict[str, Any] | action response with result |
Source code in kei_agent/unified_client.py
observe_environment(observation_type, data=None, protocol=None)
async
¶
performs environment observation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
observation_type | str | observation type | required |
data | Optional[Dict[str, Any]] | observation data | None |
protocol | Optional[Protocoltypee] | preferred protocol (Optional) | None |
Returns:
Type | Description |
---|---|
Dict[str, Any] | observe response with processed observations |
Source code in kei_agent/unified_client.py
explain_reasoning(query, context=None, protocol=None)
async
¶
explains reasoning for given query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query | str | explatation query | required |
context | Optional[Dict[str, Any]] | context for explatation | None |
protocol | Optional[Protocoltypee] | preferred protocol (Optional) | None |
Returns:
Type | Description |
---|---|
Dict[str, Any] | explain response with explatation and reasoning |
Source code in kei_agent/unified_client.py
send_agent_message(target_agent, message_type, payload)
async
¶
sends message at other agent (A2A communication).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target_agent | str | target agent ID | required |
message_type | str | message type | required |
payload | Dict[str, Any] | message data | required |
Returns:
Type | Description |
---|---|
Dict[str, Any] | message response with status |
Source code in kei_agent/unified_client.py
discover_available_tools(category=None)
async
¶
discovers available MCP tools.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
category | Optional[str] | Optional tool category for filtering | None |
Returns:
Type | Description |
---|---|
List[Dict[str, Any]] | lis of available tools with metadata |
Source code in kei_agent/unified_client.py
use_tool(tool_name, **parameters)
async
¶
executes MCP tool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tool_name | str | Name of the tool to execute | required |
**parameters | Any | tool parameters als keyword argaroatthets | {} |
Returns:
Type | Description |
---|---|
Dict[str, Any] | tool execution response |
Source code in kei_agent/unified_client.py
start_streaming_session(callback=None)
async
¶
starts streaming session for real-time communication.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback | Optional[Callable[[Dict[str, Any]], Awaitable[None]]] | callback for incoming stream messages | None |
Raises:
Type | Description |
---|---|
ProtocolError | If stream Protocol not available is |
Source code in kei_agent/unified_client.py
health_check()
async
¶
register_agent(name, version, description='', capabilities=None)
async
¶
Regisers agent in KEI framework.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name | str | Agent name | required |
version | str | Agent version | required |
description | str | Agent description | '' |
capabilities | Optional[List[str]] | Agent capabilities | None |
Returns:
Type | Description |
---|---|
Dict[str, Any] | Regisration response |
Source code in kei_agent/unified_client.py
Die UnifiedKeiAgentClient
Klasse ist die Haupt-API-Schnittstelle des KEI-Agent SDK. Sie bietet eine einheitliche, typisierte API für alle Agent-Operationen mit automatischer Protokoll-Auswahl und Enterprise-Features.
🚀 Übersicht¶
Der UnifiedKeiAgentClient
abstrahiert die Komplexität der Multi-Protocol-Architektur und bietet eine einfache, aber mächtige API für:
- Agent-Operationen: Plan, Act, Observe, Explain
- Multi-Protocol Support: Automatische Auswahl zwischen RPC, Stream, Bus und MCP
- Enterprise Features: Logging, Health Checks, Security
- Resilience: Automatische Fallback-Mechanismen und Retry-Logik
📋 Konstruktor¶
def __init__(
self,
config: AgentClientConfig,
protocol_config: Optional[ProtocolConfig] = None,
security_config: Optional[SecurityConfig] = None
) -> None
Parameter¶
Parameter | Typ | Standard | Beschreibung |
---|---|---|---|
config | AgentClientConfig | Erforderlich | Basis-Client-Konfiguration |
protocol_config | ProtocolConfig | None | Protokoll-spezifische Konfiguration |
security_config | SecurityConfig | None | Sicherheitskonfiguration |
Beispiel¶
from kei_agent import (
UnifiedKeiAgentClient,
AgentClientConfig,
ProtocolConfig,
SecurityConfig,
AuthType
)
# Basis-Konfiguration
config = AgentClientConfig(
base_url="https://api.kei-framework.com",
api_token="your-api-token",
agent_id="my-agent"
)
# Erweiterte Konfiguration
protocol_config = ProtocolConfig(
auto_protocol_selection=True,
protocol_fallback_enabled=True
)
security_config = SecurityConfig(
auth_type=AuthType.BEARER,
rbac_enabled=True,
audit_enabled=True
)
# Client erstellen
client = UnifiedKeiAgentClient(
config=config,
protocol_config=protocol_config,
security_config=security_config
)
🔄 Lifecycle-Management¶
Async Context Manager (Empfohlen)¶
async with UnifiedKeiAgentClient(config=config) as client:
# Client ist automatisch initialisiert
result = await client.plan_task("Create report")
# Automatisches Cleanup beim Verlassen
Manuelle Verwaltung¶
client = UnifiedKeiAgentClient(config=config)
try:
await client.initialize()
result = await client.plan_task("Create report")
finally:
await client.close()
🎯 High-Level API-Methoden¶
Agent-Operationen¶
plan_task()¶
async def plan_task(
self,
objective: str,
context: Optional[Dict[str, Any]] = None,
protocol: Optional[ProtocolType] = None
) -> Dict[str, Any]
Erstellt einen Plan für ein gegebenes Ziel.
Parameter:
objective
: Ziel-Beschreibung für die Planungcontext
: Zusätzlicher Kontext für die Planungprotocol
: Bevorzugtes Protokoll (optional)
Beispiel:
plan = await client.plan_task(
objective="Erstelle einen Quartalsbericht",
context={
"format": "pdf",
"quarter": "Q4-2024",
"sections": ["summary", "financials", "outlook"]
}
)
print(f"Plan ID: {plan['plan_id']}")
execute_action()¶
async def execute_action(
self,
action: str,
parameters: Optional[Dict[str, Any]] = None,
protocol: Optional[ProtocolType] = None
) -> Dict[str, Any]
Führt eine spezifische Aktion aus.
Parameter:
action
: Name der auszuführenden Aktionparameters
: Parameter für die Aktionprotocol
: Bevorzugtes Protokoll (optional)
Beispiel:
result = await client.execute_action(
action="generate_report",
parameters={
"template": "quarterly_template",
"data_source": "financial_db",
"output_format": "pdf"
}
)
print(f"Action ID: {result['action_id']}")
observe_environment()¶
async def observe_environment(
self,
observation_type: str,
data: Optional[Dict[str, Any]] = None,
protocol: Optional[ProtocolType] = None
) -> Dict[str, Any]
Führt Umgebungsbeobachtung durch.
Parameter:
observation_type
: Typ der Beobachtungdata
: Beobachtungsdatenprotocol
: Bevorzugtes Protokoll (optional)
Beispiel:
observation = await client.observe_environment(
observation_type="system_metrics",
data={
"interval": 60,
"metrics": ["cpu", "memory", "disk"]
}
)
print(f"Observation ID: {observation['observation_id']}")
explain_reasoning()¶
async def explain_reasoning(
self,
query: str,
context: Optional[Dict[str, Any]] = None,
protocol: Optional[ProtocolType] = None
) -> Dict[str, Any]
Erklärt das Reasoning für eine gegebene Anfrage.
Parameter:
query
: Erklärungsanfragecontext
: Kontext für die Erklärungprotocol
: Bevorzugtes Protokoll (optional)
Beispiel:
explanation = await client.explain_reasoning(
query="Warum wurde diese Vorlage gewählt?",
context={"action_id": "action-123"}
)
print(f"Erklärung: {explanation['explanation']}")
Kommunikations-Methoden¶
send_agent_message()¶
async def send_agent_message(
self,
target_agent: str,
message_type: str,
payload: Dict[str, Any]
) -> Dict[str, Any]
Sendet Nachricht an anderen Agent (A2A-Kommunikation).
Beispiel:
response = await client.send_agent_message(
target_agent="data-processor-agent",
message_type="task_request",
payload={
"task": "analyze_sales_data",
"dataset": "q4_2024_sales",
"priority": "high"
}
)
print(f"Message ID: {response['message_id']}")
start_streaming_session()¶
async def start_streaming_session(
self,
callback: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None
) -> None
Startet Streaming-Session für Echtzeit-Kommunikation.
Beispiel:
async def message_handler(message: Dict[str, Any]):
print(f"Received: {message}")
await client.start_streaming_session(callback=message_handler)
Tool-Integration¶
discover_available_tools()¶
Entdeckt verfügbare MCP-Tools.
Beispiel:
tools = await client.discover_available_tools("math")
for tool in tools:
print(f"Tool: {tool['name']} - {tool['description']}")
use_tool()¶
Führt MCP-Tool aus.
Beispiel:
result = await client.use_tool(
"calculator",
expression="(100 * 1.08) - 50"
)
print(f"Ergebnis: {result['result']}")
🔧 System-Methoden¶
health_check()¶
Führt System-Health-Check durch.
Beispiel:
health = await client.health_check()
print(f"Status: {health['status']}")
print(f"Uptime: {health.get('uptime', 'unknown')}")
register_agent()¶
async def register_agent(
self,
name: str,
version: str,
description: str = "",
capabilities: Optional[List[str]] = None
) -> Dict[str, Any]
Registriert Agent im KEI-Framework.
Beispiel:
registration = await client.register_agent(
name="Report Generator",
version="1.0.0",
description="Automated report generation agent",
capabilities=["pdf_generation", "data_analysis", "chart_creation"]
)
print(f"Registered: {registration['agent_id']}")
🔍 Informations-Methoden¶
get_client_info()¶
Gibt Client-Informationen zurück.
Beispiel:
info = client.get_client_info()
print(f"Agent ID: {info['agent_id']}")
print(f"Initialized: {info['initialized']}")
print(f"Available Protocols: {info['available_protocols']}")
print(f"Features: {info['features']}")
get_available_protocols()¶
Gibt Liste verfügbarer Protokolle zurück.
Beispiel:
is_protocol_available()¶
Prüft ob spezifisches Protokoll verfügbar ist.
Beispiel:
⚡ Low-Level API¶
execute_agent_operation()¶
async def execute_agent_operation(
self,
operation: str,
data: Dict[str, Any],
protocol: Optional[ProtocolType] = None
) -> Dict[str, Any]
Führt Agent-Operation mit automatischer Protokoll-Auswahl aus.
Beispiel:
# Automatische Protokoll-Auswahl
result = await client.execute_agent_operation(
"custom_operation",
{"param1": "value1", "param2": "value2"}
)
# Explizite Protokoll-Auswahl
result = await client.execute_agent_operation(
"stream_operation",
{"data": "real-time"},
protocol=ProtocolType.STREAM
)
🚨 Exception Handling¶
Der Client kann verschiedene Exceptions werfen:
from kei_agent.exceptions import KeiSDKError, ProtocolError, SecurityError
try:
async with UnifiedKeiAgentClient(config=config) as client:
result = await client.plan_task("objective")
except SecurityError as e:
# Authentifizierungs-/Autorisierungsfehler
print(f"Security error: {e}")
except ProtocolError as e:
# Protokoll-spezifische Fehler
print(f"Protocol error: {e}")
except KeiSDKError as e:
# Allgemeine SDK-Fehler
print(f"SDK error: {e}")
except Exception as e:
# Unerwartete Fehler
print(f"Unexpected error: {e}")
🎯 Best Practices¶
1. Verwenden Sie Async Context Manager¶
# ✅ Empfohlen
async with UnifiedKeiAgentClient(config=config) as client:
result = await client.plan_task("objective")
# ❌ Vermeiden
client = UnifiedKeiAgentClient(config=config)
await client.initialize()
# ... vergessen close() aufzurufen
2. Nutzen Sie High-Level APIs¶
# ✅ Empfohlen - High-Level API
plan = await client.plan_task("Create report")
# ❌ Vermeiden - Low-Level API ohne Grund
plan = await client.execute_agent_operation("plan", {"objective": "Create report"})
3. Konfigurieren Sie Enterprise Features¶
# ✅ Production-ready Konfiguration
protocol_config = ProtocolConfig(
auto_protocol_selection=True,
protocol_fallback_enabled=True
)
security_config = SecurityConfig(
rbac_enabled=True,
audit_enabled=True
)
4. Implementieren Sie Error Handling¶
# ✅ Robuste Fehlerbehandlung
try:
result = await client.plan_task("objective")
except ProtocolError as e:
# Fallback-Strategie
result = await client.execute_agent_operation(
"plan",
{"objective": "objective"},
protocol=ProtocolType.RPC
)
Siehe auch:
- ProtocolTypes → - Konfigurationsklassen
- Enterprise Logging → - Logging-Integration
- Examples → - Praktische Beispiele