Verwaltet authentication and securitys-Features for KEI-Agent.
Supports Bearer Token, OIDC and mTLS authentication with automatischer Token-Erneuerung and Caching for Enterprise-Asatz.
Attributes:
Name | Type | Description |
---|
config | | |
_token_cache | Dict[str, Any] | Cache for authentications-Token |
_token_refresh_task | Optional[Task] | Backgroatd-Task for Token-Erneuerung |
_refresh_lock | | Lock for Thread-sichere Token-Erneuerung |
Parameters:
Name | Type | Description | Default |
---|
config | SecurityConfig | | required |
client_factory | Optional[Any] | Optional factory returning an async context manager that yields an object with a .post(...) coroutine. Defaults to httpx.AsyncClient. | None |
Raises:
Source code in kei_agent/security_manager.py
| def __init__(
self, config: SecurityConfig, client_factory: Optional[Any] = None
) -> None:
"""Initializes security manager.
Args:
config: security configuration
client_factory: Optional factory returning an async context manager that
yields an object with a .post(...) coroutine. Defaults to httpx.AsyncClient.
Raises:
SecurityError: On ungültiger configuration
"""
try:
config.validate()
except ValueError as e:
raise SecurityError(f"Ungültige security configuration: {e}") from e
# Factory for creating an AsyncClient-like context manager
if client_factory is None:
self._client_factory = lambda timeout, verify: httpx.AsyncClient(
timeout=timeout, verify=verify
)
else:
self._client_factory = client_factory
self.config = config
self._token_cache: Dict[str, Any] = {}
self._token_refresh_task: Optional[asyncio.Task] = None
self._refresh_lock = asyncio.Lock()
# Add missing attributes for backward compatibility with tests
self.auth_type = config.auth_type
_logger.info(
"security manager initialized",
extra={
"auth_type": config.auth_type,
"rbac_enabled": config.rbac_enabled,
"audit_enabled": config.audit_enabled,
},
)
|
Functions
get_auth_heathes()
async
Creates authentications-Heathes for HTTP-Requests.
Returns:
Type | Description |
---|
Dict[str, str] | dictionary with authentications-Heathes |
Raises:
Source code in kei_agent/security_manager.py
| async def get_auth_heathes(self) -> Dict[str, str]:
"""Creates authentications-Heathes for HTTP-Requests.
Returns:
dictionary with authentications-Heathes
Raises:
SecurityError: On authenticationsfehlern
"""
try:
if self.config.auth_type == Authtypee.BEARER:
return await self._get_bearer_heathes()
elif self.config.auth_type == Authtypee.OIDC:
return await self._get_oidc_heathes()
elif self.config.auth_type == Authtypee.MTLS:
return await self._get_mtls_heathes()
else:
raise SecurityError(f"Unbekatnter Auth-type: {self.config.auth_type}")
except SecurityError:
# Re-raise SecurityError as-is
raise
except (ValueError, TypeError) as e:
_logger.error(
"Configuration error during auth header creation",
extra={"error": str(e), "auth_type": self.config.auth_type},
)
raise SecurityError(
f"Invalid configuration for auth type {self.config.auth_type}: {e}"
) from e
except (ConnectionError, TimeoutError) as e:
_logger.error(
"Network error during auth header creation",
extra={"error": str(e), "auth_type": self.config.auth_type},
)
raise SecurityError(f"Network error during authentication: {e}") from e
except Exception as e:
_logger.error(
"Unexpected error during auth header creation",
extra={
"error": str(e),
"auth_type": self.config.auth_type,
"error_type": type(e).__name__,
},
)
raise SecurityError(f"Unexpected authentication error: {e}") from e
|
start_token_refresh()
async
Starts automatische Token-Erneuerung im Hintergratd.
Nur for OIDC-authentication with enablethe Token-Erneuerung.
Source code in kei_agent/security_manager.py
| async def start_token_refresh(self) -> None:
"""Starts automatische Token-Erneuerung im Hintergratd.
Nur for OIDC-authentication with enablethe Token-Erneuerung.
"""
if not self.config.requires_refresh():
_logger.debug("Token-Refresh not erforthelich")
return
if self._token_refresh_task and not self._token_refresh_task.done():
_logger.debug("Token-Refresh bereits aktiv")
return
self._token_refresh_task = asyncio.create_task(self._token_refresh_loop())
_logger.info("Token-Refresh startingd")
|
stop_token_refresh()
async
Stops automatische Token-Erneuerung.
Source code in kei_agent/security_manager.py
| async def stop_token_refresh(self) -> None:
"""Stops automatische Token-Erneuerung."""
if self._token_refresh_task and not self._token_refresh_task.done():
self._token_refresh_task.cancel()
try:
await self._token_refresh_task
except asyncio.CancelledError:
pass
_logger.info("Token-Refresh stoppingd")
|
get_security_context()
Gibt aktuellen securityskontext torück.
Returns:
Type | Description |
---|
Dict[str, Any] | dictionary with securitysinformationen |
Source code in kei_agent/security_manager.py
| def get_security_context(self) -> Dict[str, Any]:
"""Gibt aktuellen securityskontext torück.
Returns:
dictionary with securitysinformationen
"""
return {
"auth_type": self.config.auth_type,
"rbac_enabled": self.config.rbac_enabled,
"audit_enabled": self.config.audit_enabled,
"token_refresh_enabled": self.config.token_refresh_enabled,
"cached_tokens": list(self._token_cache.keys()),
"refresh_task_active": self._token_refresh_task is not None
and not self._token_refresh_task.done(),
}
|
check_permission(user_id, resource, action)
Checks user permissions for RBAC (stub for backward compatibility).
Parameters:
Name | Type | Description | Default |
---|
user_id | str | | required |
resource | str | | required |
action | str | | required |
Returns:
Type | Description |
---|
bool | True if permission granted (always True in stub implementation) |
Source code in kei_agent/security_manager.py
| def check_permission(self, user_id: str, resource: str, action: str) -> bool:
"""Checks user permissions for RBAC (stub for backward compatibility).
Args:
user_id: User identifier
resource: Resource being accessed
action: Action being performed
Returns:
True if permission granted (always True in stub implementation)
"""
if not self.config.rbac_enabled:
return True
# Stub implementation - always allow for backward compatibility
return True
|
log_audit_event(event_type, user_id, details)
Logs audit events (stub for backward compatibility).
Parameters:
Name | Type | Description | Default |
---|
event_type | str | | required |
user_id | str | User performing the action | required |
details | Dict[str, Any] | | required |
Source code in kei_agent/security_manager.py
| def log_audit_event(
self, event_type: str, user_id: str, details: Dict[str, Any]
) -> None:
"""Logs audit events (stub for backward compatibility).
Args:
event_type: Type of audit event
user_id: User performing the action
details: Additional event details
"""
if not self.config.audit_enabled:
return
# Stub implementation - just log for backward compatibility
_logger.info(
f"Audit event: {event_type}",
extra={"user_id": user_id, "event_type": event_type, "details": details},
)
|
get_ssl_context()
Gets SSL context for mTLS (stub for backward compatibility).
Returns:
Source code in kei_agent/security_manager.py
| def get_ssl_context(self) -> Optional[Any]:
"""Gets SSL context for mTLS (stub for backward compatibility).
Returns:
SSL context or None
"""
if self.config.auth_type != Authtypee.MTLS:
return None
# Stub implementation for backward compatibility
return None
|