Zum Inhalt

Security Manager

Verwaltet authentication and securitys-Features for KEI-Agent.

Supports Bearer Token, OIDC and mTLS authentication with automatischer Token-Erneuerung and Caching for Enterprise-Asatz.

Attributes:

NameTypeDescription
config

security configuration

_token_cacheDict[str, Any]

Cache for authentications-Token

_token_refresh_taskOptional[Task]

Backgroatd-Task for Token-Erneuerung

_refresh_lock

Lock for Thread-sichere Token-Erneuerung

Parameters:

NameTypeDescriptionDefault
configSecurityConfig

security configuration

required
client_factoryOptional[Any]

Optional factory returning an async context manager that yields an object with a .post(...) coroutine. Defaults to httpx.AsyncClient.

None

Raises:

TypeDescription
SecurityError

On ungültiger configuration

Source code in kei_agent/security_manager.py
def __init__(
    self, config: SecurityConfig, client_factory: Optional[Any] = None
) -> None:
    """Initializes security manager.

    Args:
        config: security configuration
        client_factory: Optional factory returning an async context manager that
            yields an object with a .post(...) coroutine. Defaults to httpx.AsyncClient.

    Raises:
        SecurityError: On ungültiger configuration
    """
    try:
        config.validate()
    except ValueError as e:
        raise SecurityError(f"Ungültige security configuration: {e}") from e

    # Factory for creating an AsyncClient-like context manager
    if client_factory is None:
        self._client_factory = lambda timeout, verify: httpx.AsyncClient(
            timeout=timeout, verify=verify
        )
    else:
        self._client_factory = client_factory

    self.config = config
    self._token_cache: Dict[str, Any] = {}
    self._token_refresh_task: Optional[asyncio.Task] = None
    self._refresh_lock = asyncio.Lock()

    # Add missing attributes for backward compatibility with tests
    self.auth_type = config.auth_type

    _logger.info(
        "security manager initialized",
        extra={
            "auth_type": config.auth_type,
            "rbac_enabled": config.rbac_enabled,
            "audit_enabled": config.audit_enabled,
        },
    )

Functions

get_auth_heathes()async

Creates authentications-Heathes for HTTP-Requests.

Returns:

TypeDescription
Dict[str, str]

dictionary with authentications-Heathes

Raises:

TypeDescription
SecurityError

On authenticationsfehlern

Source code in kei_agent/security_manager.py
async def get_auth_heathes(self) -> Dict[str, str]:
    """Creates authentications-Heathes for HTTP-Requests.

    Returns:
        dictionary with authentications-Heathes

    Raises:
        SecurityError: On authenticationsfehlern
    """
    try:
        if self.config.auth_type == Authtypee.BEARER:
            return await self._get_bearer_heathes()
        elif self.config.auth_type == Authtypee.OIDC:
            return await self._get_oidc_heathes()
        elif self.config.auth_type == Authtypee.MTLS:
            return await self._get_mtls_heathes()
        else:
            raise SecurityError(f"Unbekatnter Auth-type: {self.config.auth_type}")

    except SecurityError:
        # Re-raise SecurityError as-is
        raise
    except (ValueError, TypeError) as e:
        _logger.error(
            "Configuration error during auth header creation",
            extra={"error": str(e), "auth_type": self.config.auth_type},
        )
        raise SecurityError(
            f"Invalid configuration for auth type {self.config.auth_type}: {e}"
        ) from e
    except (ConnectionError, TimeoutError) as e:
        _logger.error(
            "Network error during auth header creation",
            extra={"error": str(e), "auth_type": self.config.auth_type},
        )
        raise SecurityError(f"Network error during authentication: {e}") from e
    except Exception as e:
        _logger.error(
            "Unexpected error during auth header creation",
            extra={
                "error": str(e),
                "auth_type": self.config.auth_type,
                "error_type": type(e).__name__,
            },
        )
        raise SecurityError(f"Unexpected authentication error: {e}") from e

start_token_refresh()async

Starts automatische Token-Erneuerung im Hintergratd.

Nur for OIDC-authentication with enablethe Token-Erneuerung.

Source code in kei_agent/security_manager.py
async def start_token_refresh(self) -> None:
    """Starts automatische Token-Erneuerung im Hintergratd.

    Nur for OIDC-authentication with enablethe Token-Erneuerung.
    """
    if not self.config.requires_refresh():
        _logger.debug("Token-Refresh not erforthelich")
        return

    if self._token_refresh_task and not self._token_refresh_task.done():
        _logger.debug("Token-Refresh bereits aktiv")
        return

    self._token_refresh_task = asyncio.create_task(self._token_refresh_loop())
    _logger.info("Token-Refresh startingd")

stop_token_refresh()async

Stops automatische Token-Erneuerung.

Source code in kei_agent/security_manager.py
async def stop_token_refresh(self) -> None:
    """Stops automatische Token-Erneuerung."""
    if self._token_refresh_task and not self._token_refresh_task.done():
        self._token_refresh_task.cancel()
        try:
            await self._token_refresh_task
        except asyncio.CancelledError:
            pass
        _logger.info("Token-Refresh stoppingd")

get_security_context()

Gibt aktuellen securityskontext torück.

Returns:

TypeDescription
Dict[str, Any]

dictionary with securitysinformationen

Source code in kei_agent/security_manager.py
def get_security_context(self) -> Dict[str, Any]:
    """Gibt aktuellen securityskontext torück.

    Returns:
        dictionary with securitysinformationen
    """
    return {
        "auth_type": self.config.auth_type,
        "rbac_enabled": self.config.rbac_enabled,
        "audit_enabled": self.config.audit_enabled,
        "token_refresh_enabled": self.config.token_refresh_enabled,
        "cached_tokens": list(self._token_cache.keys()),
        "refresh_task_active": self._token_refresh_task is not None
        and not self._token_refresh_task.done(),
    }

check_permission(user_id, resource, action)

Checks user permissions for RBAC (stub for backward compatibility).

Parameters:

NameTypeDescriptionDefault
user_idstr

User identifier

required
resourcestr

Resource being accessed

required
actionstr

Action being performed

required

Returns:

TypeDescription
bool

True if permission granted (always True in stub implementation)

Source code in kei_agent/security_manager.py
def check_permission(self, user_id: str, resource: str, action: str) -> bool:
    """Checks user permissions for RBAC (stub for backward compatibility).

    Args:
        user_id: User identifier
        resource: Resource being accessed
        action: Action being performed

    Returns:
        True if permission granted (always True in stub implementation)
    """
    if not self.config.rbac_enabled:
        return True
    # Stub implementation - always allow for backward compatibility
    return True

log_audit_event(event_type, user_id, details)

Logs audit events (stub for backward compatibility).

Parameters:

NameTypeDescriptionDefault
event_typestr

Type of audit event

required
user_idstr

User performing the action

required
detailsDict[str, Any]

Additional event details

required
Source code in kei_agent/security_manager.py
def log_audit_event(
    self, event_type: str, user_id: str, details: Dict[str, Any]
) -> None:
    """Logs audit events (stub for backward compatibility).

    Args:
        event_type: Type of audit event
        user_id: User performing the action
        details: Additional event details
    """
    if not self.config.audit_enabled:
        return
    # Stub implementation - just log for backward compatibility
    _logger.info(
        f"Audit event: {event_type}",
        extra={"user_id": user_id, "event_type": event_type, "details": details},
    )

get_ssl_context()

Gets SSL context for mTLS (stub for backward compatibility).

Returns:

TypeDescription
Optional[Any]

SSL context or None

Source code in kei_agent/security_manager.py
def get_ssl_context(self) -> Optional[Any]:
    """Gets SSL context for mTLS (stub for backward compatibility).

    Returns:
        SSL context or None
    """
    if self.config.auth_type != Authtypee.MTLS:
        return None
    # Stub implementation for backward compatibility
    return None

Authentifizierungstypen

  • Bearer Token
  • OIDC
  • mTLS

Siehe Konfigurationsdetails in ProtocolTypes.