Source code for flowodm.schema

"""
Schema utilities for Avro schema generation, validation, and registry operations.
"""

from __future__ import annotations

import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any

from flowodm.connection import get_schema_registry
from flowodm.exceptions import (
    SchemaCompatibilityError,
    SchemaRegistryError,
)

if TYPE_CHECKING:
    from flowodm.model import FlowBaseModel


[docs] @dataclass class ValidationResult: """Result of schema validation.""" is_valid: bool errors: list[str] = field(default_factory=list) warnings: list[str] = field(default_factory=list)
[docs] @dataclass class CompatibilityResult: """Result of schema compatibility check.""" is_compatible: bool message: str = "" compatibility_level: str = ""
[docs] def load_schema_from_file(path: str | Path) -> dict[str, Any]: """ Load Avro schema from .avsc file. Args: path: Path to .avsc file Returns: Parsed Avro schema as dict Raises: FileNotFoundError: If schema file doesn't exist json.JSONDecodeError: If file is not valid JSON """ path = Path(path) with open(path) as f: result: dict[str, Any] = json.load(f) return result
[docs] def load_schema_from_registry( subject: str, version: str | int = "latest", ) -> dict[str, Any]: """ Load Avro schema from Schema Registry. Args: subject: Schema Registry subject name version: Schema version ("latest" or version number) Returns: Parsed Avro schema as dict Raises: SchemaRegistryError: If schema cannot be retrieved """ try: registry = get_schema_registry() if version == "latest": schema_version = registry.get_latest_version(subject) else: schema_version = registry.get_version(subject, int(version)) schema_str = schema_version.schema.schema_str if not schema_str: raise ValueError("Schema string is empty") schema_dict: dict[str, Any] = json.loads(schema_str) return schema_dict except Exception as e: raise SchemaRegistryError(f"Failed to load schema from registry: {e}") from e
[docs] def generate_model_from_schema( schema_source: str | Path | dict[str, Any], topic: str, class_name: str | None = None, consumer_group: str | None = None, ) -> type[FlowBaseModel]: """ Generate a FlowBaseModel subclass from an Avro schema. Args: schema_source: Path to .avsc file, schema dict, or Schema Registry subject topic: Kafka topic name class_name: Optional class name (defaults to schema record name) consumer_group: Optional consumer group Returns: Dynamically generated FlowBaseModel subclass Example: >>> UserEvent = generate_model_from_schema("schemas/user.avsc", topic="users") >>> event = UserEvent(user_id="123", action="login") """ from pydantic import create_model from flowodm.model import FlowBaseModel # Load schema if isinstance(schema_source, dict): schema = schema_source elif isinstance(schema_source, (str, Path)): path = Path(schema_source) if path.exists() and path.suffix == ".avsc": schema = load_schema_from_file(path) else: # Treat as Schema Registry subject schema = load_schema_from_registry(str(schema_source)) else: raise ValueError(f"Invalid schema source: {schema_source}") # Extract class name model_name = class_name or schema.get("name", "GeneratedModel") # Convert Avro fields to Pydantic field definitions field_definitions: dict[str, Any] = {} for avro_field in schema.get("fields", []): field_name = avro_field["name"] avro_type = avro_field["type"] python_type = _avro_type_to_python(avro_type) default = avro_field.get("default", ...) if default is ...: field_definitions[field_name] = (python_type, ...) else: field_definitions[field_name] = (python_type, default) # Create Settings class class GeneratedSettings: topic: str | None = None consumer_group: str | None = None GeneratedSettings.topic = topic GeneratedSettings.consumer_group = consumer_group # Create model class model = create_model( model_name, __base__=FlowBaseModel, **field_definitions, ) # Attach Settings model.Settings = GeneratedSettings # type: ignore return model
[docs] def generate_model_from_registry( subject: str, topic: str, version: str | int = "latest", class_name: str | None = None, consumer_group: str | None = None, ) -> type[FlowBaseModel]: """ Generate a FlowBaseModel subclass from Schema Registry. Args: subject: Schema Registry subject name topic: Kafka topic name version: Schema version ("latest" or version number) class_name: Optional class name consumer_group: Optional consumer group Returns: Dynamically generated FlowBaseModel subclass """ schema = load_schema_from_registry(subject, version) return generate_model_from_schema( schema, topic=topic, class_name=class_name, consumer_group=consumer_group, )
def _avro_type_to_python(avro_type: Any) -> Any: """Convert Avro type to Python type annotation.""" # Handle union types (nullable) if isinstance(avro_type, list): # Find non-null type non_null_types = [t for t in avro_type if t != "null"] if not non_null_types: return type(None) if len(non_null_types) == 1: return _avro_type_to_python(non_null_types[0]) | None # Multiple types - use Any return Any # Handle complex types if isinstance(avro_type, dict): logical_type = avro_type.get("logicalType") base_type = avro_type.get("type") if logical_type == "timestamp-millis": from datetime import datetime return datetime if logical_type == "date": from datetime import date return date if logical_type == "decimal": from decimal import Decimal return Decimal if logical_type == "uuid": return str # Recurse for base type if base_type: return _avro_type_to_python(base_type) return Any # Handle primitive types type_mapping: dict[str, type] = { "null": type(None), "boolean": bool, "int": int, "long": int, "float": float, "double": float, "bytes": bytes, "string": str, } return type_mapping.get(avro_type, Any)
[docs] def validate_against_file( model_class: type[FlowBaseModel], schema_path: str | Path, ) -> ValidationResult: """ Validate that a Pydantic model matches a local Avro schema file. Args: model_class: FlowBaseModel subclass to validate schema_path: Path to .avsc file Returns: ValidationResult with is_valid flag and any errors """ schema = load_schema_from_file(schema_path) return _validate_model_against_schema(model_class, schema)
[docs] def validate_against_registry( model_class: type[FlowBaseModel], subject: str, version: str | int = "latest", ) -> ValidationResult: """ Validate that a Pydantic model matches a Schema Registry schema. Args: model_class: FlowBaseModel subclass to validate subject: Schema Registry subject name version: Schema version ("latest" or version number) Returns: ValidationResult with is_valid flag and any errors """ schema = load_schema_from_registry(subject, version) return _validate_model_against_schema(model_class, schema)
def _validate_model_against_schema( model_class: type[FlowBaseModel], schema: dict[str, Any], ) -> ValidationResult: """ Internal function to validate model against schema. Checks: 1. All required schema fields exist in model 2. All model fields exist in schema (no extra fields) 3. Field types are compatible """ errors: list[str] = [] warnings: list[str] = [] # Get model fields model_fields = set(model_class.model_fields.keys()) # Get schema fields schema_fields_list = schema.get("fields", []) schema_fields = {f["name"]: f for f in schema_fields_list} schema_field_names = set(schema_fields.keys()) # Check for missing fields (in schema but not in model) missing_fields = schema_field_names - model_fields for field_name in missing_fields: field_info = schema_fields[field_name] is_nullable = _is_nullable_avro_type(field_info["type"]) has_default = "default" in field_info if not is_nullable and not has_default: errors.append(f"Missing required field '{field_name}' from schema") else: warnings.append(f"Missing optional field '{field_name}' from schema") # Check for extra fields (in model but not in schema) extra_fields = model_fields - schema_field_names for field_name in extra_fields: errors.append(f"Extra field '{field_name}' not in schema") # Check type compatibility for common fields common_fields = model_fields & schema_field_names for field_name in common_fields: model_field = model_class.model_fields[field_name] schema_field = schema_fields[field_name] if not _types_compatible(model_field.annotation, schema_field["type"]): errors.append( f"Field '{field_name}' type mismatch: " f"model has {model_field.annotation}, schema expects {schema_field['type']}" ) return ValidationResult( is_valid=len(errors) == 0, errors=errors, warnings=warnings, ) def _is_nullable_avro_type(avro_type: Any) -> bool: """Check if Avro type is nullable (union with null).""" if isinstance(avro_type, list): return "null" in avro_type return bool(avro_type == "null") def _types_compatible(python_type: Any, avro_type: Any) -> bool: """Check if Python type is compatible with Avro type.""" # Handle Union types (Optional / int | None) if hasattr(python_type, "__args__"): args = python_type.__args__ python_is_nullable = type(None) in args non_none_types = [a for a in args if a is not type(None)] # For nullable Python type, Avro should also be nullable if python_is_nullable: # If Python is nullable, get the base Avro type if isinstance(avro_type, list): avro_base_types = [t for t in avro_type if t != "null"] if non_none_types and avro_base_types: # Check if the non-null types are compatible return _types_compatible(non_none_types[0], avro_base_types[0]) if non_none_types: return _types_compatible(non_none_types[0], avro_type) # Handle nullable Avro type with non-nullable Python type if isinstance(avro_type, list) and "null" in avro_type: non_null_avro_types = [t for t in avro_type if t != "null"] if non_null_avro_types: return _types_compatible(python_type, non_null_avro_types[0]) # Get expected Python type from Avro expected_python_type = _avro_type_to_python(avro_type) # Handle Optional expected types if hasattr(expected_python_type, "__args__"): args = getattr(expected_python_type, "__args__", ()) if type(None) in args: non_none_types = [a for a in args if a is not type(None)] if non_none_types: expected_python_type = non_none_types[0] # Simple type comparison if python_type == expected_python_type: return True # Check for compatible numeric types if python_type in (int, float) and expected_python_type in (int, float): return True # Fallback - be permissive return bool(expected_python_type == Any)
[docs] def check_compatibility( model_class: type[FlowBaseModel], subject: str, compatibility_level: str = "BACKWARD", ) -> CompatibilityResult: """ Check if model's schema is compatible with existing registry schemas. Args: model_class: FlowBaseModel subclass subject: Schema Registry subject name compatibility_level: BACKWARD, FORWARD, FULL, or NONE Returns: CompatibilityResult with is_compatible flag and message """ try: from confluent_kafka.schema_registry import Schema registry = get_schema_registry() # Generate schema from model model_schema = model_class._generate_avro_schema() schema_str = json.dumps(model_schema) new_schema = Schema(schema_str, "AVRO") # Check compatibility is_compatible = registry.test_compatibility(subject, new_schema) return CompatibilityResult( is_compatible=is_compatible, message="Schema is compatible" if is_compatible else "Schema is not compatible", compatibility_level=compatibility_level, ) except Exception as e: raise SchemaCompatibilityError( f"Compatibility check failed: {e}", compatibility_level=compatibility_level, ) from e
[docs] def upload_schema( schema_path: str | Path, subject: str, compatibility_level: str | None = None, ) -> int: """ Upload an Avro schema file to Schema Registry. Args: schema_path: Path to .avsc file subject: Schema Registry subject name compatibility_level: Optional compatibility level to set Returns: Schema ID from registry Raises: SchemaRegistryError: If upload fails """ try: from confluent_kafka.schema_registry import Schema registry = get_schema_registry() # Load schema from file schema_dict = load_schema_from_file(schema_path) schema_str = json.dumps(schema_dict) schema = Schema(schema_str, "AVRO") # Set compatibility level if specified if compatibility_level: try: registry.set_compatibility(subject, compatibility_level) except Exception: pass # May fail if subject doesn't exist yet # Register schema schema_id = registry.register_schema(subject, schema) return schema_id except Exception as e: raise SchemaRegistryError(f"Failed to upload schema: {e}") from e
[docs] def list_subjects() -> list[str]: """ List all subjects in Schema Registry. Returns: List of subject names """ registry = get_schema_registry() return registry.get_subjects()
[docs] def get_schema_versions(subject: str) -> list[int]: """ Get all versions of a schema subject. Args: subject: Schema Registry subject name Returns: List of version numbers """ registry = get_schema_registry() return registry.get_versions(subject)
[docs] def delete_subject(subject: str, permanent: bool = False) -> list[int]: """ Delete a schema subject from registry. Args: subject: Schema Registry subject name permanent: If True, permanently delete (cannot be recovered) Returns: List of deleted version numbers """ registry = get_schema_registry() return registry.delete_subject(subject, permanent=permanent)