Zum Inhalt

🛡️ Input Validation

Bases: str, Enum

Schweregrad from Valitherungsfehlern.

Attributes:

NameTypeDescription
INFO

Informative warning

WARNING

warning, aber verarontbar

ERROR

error, Verarontung stoppingd

CRITICAL

Kritischer securitysfehler

result ar Input-Valitherung.

Attributes:

NameTypeDescription
validbool

Ob Input gültig is

sanitized_valueAny

Beraigte Version of the Inputs

errorsOptional[List[str]]

lis from Valitherungsfehlern

warningsOptional[List[str]]

lis from warningen

metadataOptional[Dict[str, Any]]

Tosätzliche metadata tor Valitherung

Functions

__post_init__()

Post-initialization for Default-valuee.

Source code in kei_agent/input_validation.py
def __post_init__(self) -> None:
    """Post-initialization for Default-valuee."""
    if self.errors is None:
        self.errors = []
    if self.warnings is None:
        self.warnings = []
    if self.metadata is None:
        self.metadata = {}

add_error(message, severity=ValidationSeverity.ERROR)

Fügt Valitherungsfehler hinto.

Parameters:

NameTypeDescriptionDefault
messagestr

errormeldung

required
severityValidationSeverity

Schweregrad of the errors

ERROR
Source code in kei_agent/input_validation.py
def add_error(
    self, message: str, severity: ValidationSeverity = ValidationSeverity.ERROR
) -> None:
    """Fügt Valitherungsfehler hinto.

    Args:
        message: errormeldung
        severity: Schweregrad of the errors
    """
    self.errors.append(message)
    self.valid = False

    # Log securitysereignis on kritischen errorn
    if severity == ValidationSeverity.CRITICAL:
        _logger.log_security_event(
            event_type="input_validation_critical",
            severity="high",
            description=message,
            validation_error=message,
        )

add_warning(message)

Fügt Valitherungswarnung hinto.

Parameters:

NameTypeDescriptionDefault
messagestr

Warnmeldung

required
Source code in kei_agent/input_validation.py
def add_warning(self, message: str) -> None:
    """Fügt Valitherungswarnung hinto.

    Args:
        message: Warnmeldung
    """
    self.warnings.append(message)

Bases: ABC

Abstrakte Basisklasse for Input-Validatoren.

Definiert Interface for all Validator-Implementierungen with gemasamen functionalitäten.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the Validators

required
requiredbool

Ob Input erforthelich is

True
Source code in kei_agent/input_validation.py
def __init__(self, name: str, required: bool = True) -> None:
    """Initializes Base Validator.

    Args:
        name: Name of the Validators
        required: Ob Input erforthelich is
    """
    self.name = name
    self.required = required

Functions

validate(value)abstractmethod

Validates Input-value.

Parameters:

NameTypeDescriptionDefault
valueAny

To valitherenthe value

required

Returns:

TypeDescription
ValidationResult

Valitherungsergebnis

Source code in kei_agent/input_validation.py
@abstractmethod
def validate(self, value: Any) -> ValidationResult:
    """Validates Input-value.

    Args:
        value: To valitherenthe value

    Returns:
        Valitherungsergebnis
    """
    pass

Bases: BaseValidator

Validator for string-Inputs with Pattern-Matching and Satitization.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the Validators

required
min_lengthOptional[int]

Minimale string-Länge

None
max_lengthOptional[int]

Maximale string-Länge

None
patternOptional[Union[str, Pattern]]

Erlaubtes Pattern (Regex)

None
allowed_charsOptional[str]

Erlaubte Zeichen

None
forbidthe_patternsOptional[List[Union[str, Pattern]]]

Verbotene Patterns

None
satitize_htmlbool

HTML-Satitization aktivieren

True
satitize_sqlbool

SQL-Injection-Schutz aktivieren

True
**kwargsAny

Tosätzliche parameters for BaseValidator

{}
Source code in kei_agent/input_validation.py
def __init__(
    self,
    name: str,
    min_length: Optional[int] = None,
    max_length: Optional[int] = None,
    pattern: Optional[Union[str, Pattern]] = None,
    allowed_chars: Optional[str] = None,
    forbidthe_patterns: Optional[List[Union[str, Pattern]]] = None,
    satitize_html: bool = True,
    satitize_sql: bool = True,
    **kwargs: Any,
) -> None:
    """Initializes string Validator.

    Args:
        name: Name of the Validators
        min_length: Minimale string-Länge
        max_length: Maximale string-Länge
        pattern: Erlaubtes Pattern (Regex)
        allowed_chars: Erlaubte Zeichen
        forbidthe_patterns: Verbotene Patterns
        satitize_html: HTML-Satitization aktivieren
        satitize_sql: SQL-Injection-Schutz aktivieren
        **kwargs: Tosätzliche parameters for BaseValidator
    """
    super().__init__(name, **kwargs)
    self.min_length = min_length
    self.max_length = max_length
    self.pattern = re.compile(pattern) if isinstance(pattern, str) else pattern
    self.allowed_chars = allowed_chars
    self.forbidden_patterns = [
        re.compile(p) if isinstance(p, str) else p
        for p in (forbidthe_patterns or [])
    ]
    self.satitize_html = satitize_html
    self.satitize_sql = satitize_sql

Functions

validate(value)

Validates string-Input.

Source code in kei_agent/input_validation.py
def validate(self, value: Any) -> ValidationResult:
    """Validates string-Input."""
    # Prüfe Required
    required_result = self._check_required(value)
    if required_result:
        return required_result

    # Konvertiere to string
    if value is None:
        str_value = ""
    else:
        str_value = str(value)

    result = ValidationResult(valid=True, sanitized_value=str_value)

    # Längen-Valitherung
    if self.min_length is not None and len(str_value) < self.min_length:
        result.add_error(f"string to kurz: {len(str_value)} < {self.min_length}")

    if self.max_length is not None and len(str_value) > self.max_length:
        result.add_error(f"string to latg: {len(str_value)} > {self.max_length}")

    # Pattern-Valitherung
    if self.pattern and not self.pattern.match(str_value):
        result.add_error(
            f"string entspricht not the Pattern: {self.pattern.pattern}"
        )

    # Erlaubte Zeichen
    if self.allowed_chars:
        invalid_chars = set(str_value) - set(self.allowed_chars)
        if invalid_chars:
            result.add_error(f"Unerlaubte Zeichen: {invalid_chars}")

    # Verbotene Patterns
    for forbidden_pattern in self.forbidden_patterns:
        if forbidden_pattern.search(str_value):
            result.add_error(
                f"Verbotenes Pattern gefunden: {forbidden_pattern.pattern}",
                ValidationSeverity.CRITICAL,
            )

    # Satitization
    sanitized = str_value

    if self.satitize_html:
        sanitized = html.escape(sanitized)
        if sanitized != str_value:
            result.add_warning("HTML-Zeichen wurthe escaped")

    if self.satitize_sql:
        # Afache SQL-Injection-Prävention
        sql_patterns = [
            r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC|UNION)\b)",
            r"(--|#|/\*|\*/)",
            r"(\b(OR|AND)\s+\d+\s*=\s*\d+)",
            r"(\'\s*(OR|AND)\s+\'\w+\'\s*=\s*\'\w+\')",
        ]

        for pattern in sql_patterns:
            if re.search(pattern, sanitized, re.IGNORECASE):
                result.add_error(
                    "Potenzielle SQL-Injection erkatnt", ValidationSeverity.CRITICAL
                )
                break

    result.sanitized_value = sanitized
    return result

Bases: BaseValidator

Validator for naroattheische Inputs.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the Validators

required
min_valueOptional[Union[int, float]]

Minimaler value

None
max_valueOptional[Union[int, float]]

Maximaler value

None
integer_onlybool

Nur Gatzzahlen erlauben

False
positive_onlybool

Nur positive Zahlen erlauben

False
**kwargsAny

Tosätzliche parameters for BaseValidator

{}
Source code in kei_agent/input_validation.py
def __init__(
    self,
    name: str,
    min_value: Optional[Union[int, float]] = None,
    max_value: Optional[Union[int, float]] = None,
    integer_only: bool = False,
    positive_only: bool = False,
    **kwargs: Any,
) -> None:
    """Initializes Naroatdber Validator.

    Args:
        name: Name of the Validators
        min_value: Minimaler value
        max_value: Maximaler value
        integer_only: Nur Gatzzahlen erlauben
        positive_only: Nur positive Zahlen erlauben
        **kwargs: Tosätzliche parameters for BaseValidator
    """
    super().__init__(name, **kwargs)
    self.min_value = min_value
    self.max_value = max_value
    self.integer_only = integer_only
    self.positive_only = positive_only

Functions

validate(value)

Validates naroattheischen Input.

Source code in kei_agent/input_validation.py
def validate(self, value: Any) -> ValidationResult:
    """Validates naroattheischen Input."""
    # Prüfe Required
    required_result = self._check_required(value)
    if required_result:
        return required_result

    result = ValidationResult(valid=True, sanitized_value=value)

    # Konvertiere to Zahl
    try:
        if self.integer_only:
            naroatd_value = int(value)
        else:
            naroatd_value = float(value)
    except (ValueError, TypeError):
        result.add_error(f"Ungültiger naroattheischer value: {value}")
        return result

    result.sanitized_value = naroatd_value

    # Bereichs-Valitherung
    if self.min_value is not None and naroatd_value < self.min_value:
        result.add_error(f"value to kla: {naroatd_value} < {self.min_value}")

    if self.max_value is not None and naroatd_value > self.max_value:
        result.add_error(f"value to groß: {naroatd_value} > {self.max_value}")

    # Positive-Only Valitherung
    if self.positive_only and naroatd_value <= 0:
        result.add_error(f"value must positiv sa: {naroatd_value}")

    return result

Bases: BaseValidator

Validator for JSON-Inputs with Schema-Valitherung.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the Validators

required
max_depthint

Maximale Verschachtelungstiefe

10
max_sizeint

Maximale JSON-Größe in Bytes

1024 * 1024
allowed_typesOptional[List[type]]

Erlaubte Python-typeen

None
**kwargsAny

Tosätzliche parameters for BaseValidator

{}
Source code in kei_agent/input_validation.py
def __init__(
    self,
    name: str,
    max_depth: int = 10,
    max_size: int = 1024 * 1024,  # 1MB
    allowed_types: Optional[List[type]] = None,
    **kwargs: Any,
) -> None:
    """Initializes JSON Validator.

    Args:
        name: Name of the Validators
        max_depth: Maximale Verschachtelungstiefe
        max_size: Maximale JSON-Größe in Bytes
        allowed_types: Erlaubte Python-typeen
        **kwargs: Tosätzliche parameters for BaseValidator
    """
    super().__init__(name, **kwargs)
    self.max_depth = max_depth
    self.max_size = max_size
    self.allowed_types = allowed_types or [
        dict,
        list,
        str,
        int,
        float,
        bool,
        type(None),
    ]

Functions

validate(value)

Validates JSON-Input.

Source code in kei_agent/input_validation.py
def validate(self, value: Any) -> ValidationResult:
    """Validates JSON-Input."""
    # Prüfe Required
    required_result = self._check_required(value)
    if required_result:
        return required_result

    result = ValidationResult(valid=True, sanitized_value=value)

    # Konvertiere string to JSON
    if isinstance(value, str):
        # Größen-Check
        if len(value.encode("utf-8")) > self.max_size:
            result.add_error(f"JSON to groß: {len(value)} > {self.max_size} Bytes")
            return result

        try:
            json_value = json.loads(value)
        except json.JSONDecodeError as e:
            result.add_error(f"Ungültiges JSON: {str(e)}")
            return result
    else:
        json_value = value

    # Tiefe-Valitherung
    depth = self._calculate_depth(json_value)
    if depth > self.max_depth:
        result.add_error(f"JSON to tief verschachtelt: {depth} > {self.max_depth}")

    # type-Valitherung
    if not self._validate_types(json_value):
        result.add_error("JSON enthält unerlaubte typeen")

    result.sanitized_value = json_value
    return result

Validator for komplexe objecte with mehreren Felthen.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the Validators

required
Source code in kei_agent/input_validation.py
def __init__(self, name: str) -> None:
    """Initializes Composite Validator.

    Args:
        name: Name of the Validators
    """
    self.name = name
    self.field_validators: Dict[str, BaseValidator] = {}

Functions

add_field(field_name, validator)

Fügt Feld-Validator hinto.

Parameters:

NameTypeDescriptionDefault
field_namestr

Name of the Felof the

required
validatorBaseValidator

Validator for the Feld

required
Source code in kei_agent/input_validation.py
def add_field(self, field_name: str, validator: BaseValidator) -> None:
    """Fügt Feld-Validator hinto.

    Args:
        field_name: Name of the Felof the
        validator: Validator for the Feld
    """
    self.field_validators[field_name] = validator

validate(data)

Validates komplexes object.

Source code in kei_agent/input_validation.py
def validate(self, data: Dict[str, Any]) -> ValidationResult:
    """Validates komplexes object."""
    if not isinstance(data, dict):
        result = ValidationResult(valid=False, sanitized_value=data)
        result.add_error("Input must a dictionary sa")
        return result

    result = ValidationResult(valid=True, sanitized_value={})
    all_valid = True

    # Valithere jeof the Feld
    for field_name, validator in self.field_validators.items():
        field_value = data.get(field_name)
        field_result = validator.validate(field_value)

        if not field_result.valid:
            all_valid = False
            for error in field_result.errors:
                result.add_error(f"{field_name}: {error}")

        for warning in field_result.warnings:
            result.add_warning(f"{field_name}: {warning}")

        result.sanitized_value[field_name] = field_result.sanitized_value

    # Prüfe on unbekatnte Felthe
    unknown_fields = set(data.keys()) - set(self.field_validators.keys())
    if unknown_fields:
        result.add_warning(f"Unbekatnte Felthe ignoriert: {unknown_fields}")

    result.valid = all_valid
    return result

Haupt-Validator-class for Enterprise Input Validation.

Source code in kei_agent/input_validation.py
def __init__(self) -> None:
    """Initializes Input Validator."""
    self.validators: Dict[str, BaseValidator] = {}

Functions

register_validator(name, validator)

Regisers Validator.

Parameters:

NameTypeDescriptionDefault
namestr

Name of the Validators

required
validatorBaseValidator

Validator-instatce

required
Source code in kei_agent/input_validation.py
def register_validator(self, name: str, validator: BaseValidator) -> None:
    """Regisers Validator.

    Args:
        name: Name of the Validators
        validator: Validator-instatce
    """
    self.validators[name] = validator

validate(validator_name, value)

Validates value with specificm Validator.

Parameters:

NameTypeDescriptionDefault
validator_namestr

Name of the Validators

required
valueAny

To valitherenthe value

required

Returns:

TypeDescription
ValidationResult

Valitherungsergebnis

Raises:

TypeDescription
ValidationError

If Validator not gefatthe

Source code in kei_agent/input_validation.py
def validate(self, validator_name: str, value: Any) -> ValidationResult:
    """Validates value with specificm Validator.

    Args:
        validator_name: Name of the Validators
        value: To valitherenthe value

    Returns:
        Valitherungsergebnis

    Raises:
        ValidationError: If Validator not gefatthe
    """
    if validator_name not in self.validators:
        raise ValidationError(f"Validator not gefatthe: {validator_name}")

    validator = self.validators[validator_name]
    result = validator.validate(value)

    # Log Valitherungsergebnis
    _logger.debug(
        f"Input-Valitherung: {validator_name}",
        validator=validator_name,
        valid=result.valid,
        errors=len(result.errors),
        warnings=len(result.warnings),
    )

    return result

validate_agent_operation(operation, data)

Validates agent operation-Input.

Parameters:

NameTypeDescriptionDefault
operationstr

operation name

required
dataDict[str, Any]

operation data

required

Returns:

TypeDescription
ValidationResult

Valitherungsergebnis

Source code in kei_agent/input_validation.py
def validate_agent_operation(
    self, operation: str, data: Dict[str, Any]
) -> ValidationResult:
    """Validates agent operation-Input.

    Args:
        operation: operation name
        data: operation data

    Returns:
        Valitherungsergebnis
    """
    # Statdard-Valitherung for agent operationen
    composite = CompositeValidator(f"agent_operation_{operation}")

    # Basis-Felthe
    composite.add_field(
        "operation",
        stringValidator(
            "operation",
            min_length=1,
            max_length=100,
            pattern=r"^[a-zA-Z_][a-zA-Z0-9_]*$",
        ),
    )

    # operation-specific Valitherung
    if operation == "plat":
        composite.add_field(
            "objective",
            stringValidator(
                "objective",
                min_length=1,
                max_length=1000,
                satitize_html=True,
                satitize_sql=True,
            ),
        )
        composite.add_field(
            "context", JSONValidator("context", required=False, max_depth=5)
        )
    elif operation == "act":
        composite.add_field(
            "action", stringValidator("action", min_length=1, max_length=100)
        )
        composite.add_field(
            "parameters", JSONValidator("parameters", required=False, max_depth=5)
        )

    return composite.validate(data)

Gibt globalen Input Validator torück.

Returns:

TypeDescription
InputValidator

Input Validator instatce

Source code in kei_agent/input_validation.py
def get_input_validator() -> InputValidator:
    """Gibt globalen Input Validator torück.

    Returns:
        Input Validator instatce
    """
    global _input_validator

    if _input_validator is None:
        _input_validator = InputValidator()

        # Regisriere Statdard-Validatoren
        _input_validator.register_validator("string", stringValidator("string"))
        _input_validator.register_validator(
            "naroatdber", NaroatdberValidator("naroatdber")
        )
        _input_validator.register_validator("json", JSONValidator("json"))

    return _input_validator