Schema Registry
FlowODM integrates with Confluent Schema Registry for schema management, validation, and evolution.
Connection
Local Schema Registry
from flowodm import connect
connect(
bootstrap_servers="localhost:9092",
schema_registry_url="http://localhost:8081"
)
Confluent Cloud
connect(
bootstrap_servers="pkc-xxx.us-east-2.aws.confluent.cloud:9092",
schema_registry_url="https://psrc-xxx.us-east-2.aws.confluent.cloud",
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_username="YOUR_API_KEY",
sasl_password="YOUR_API_SECRET",
schema_registry_config={
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "SR_API_KEY:SR_API_SECRET",
}
)
Environment Variables
export KAFKA_BOOTSTRAP_SERVERS="pkc-xxx.us-east-2.aws.confluent.cloud:9092"
export KAFKA_SECURITY_PROTOCOL="SASL_SSL"
export KAFKA_SASL_MECHANISM="PLAIN"
export KAFKA_SASL_USERNAME="YOUR_API_KEY"
export KAFKA_SASL_PASSWORD="YOUR_API_SECRET"
export SCHEMA_REGISTRY_URL="https://psrc-xxx.us-east-2.aws.confluent.cloud"
# Option 1: Combined format (recommended for native confluent-kafka compatibility)
export SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="SR_API_KEY:SR_API_SECRET"
# Option 2: Separate key/secret (FlowODM will combine them)
# export SCHEMA_REGISTRY_API_KEY="SR_API_KEY"
# export SCHEMA_REGISTRY_API_SECRET="SR_API_SECRET"
from flowodm import connect
connect() # Uses environment variables
Schema Validation
Validate Model Against Registry
from flowodm.schema import validate_against_registry
class UserEvent(FlowBaseModel):
class Settings:
topic = "user-events"
user_id: str
action: str
result = validate_against_registry(
model_class=UserEvent,
subject="user-events-value",
version="latest" # or specific version number
)
if result.is_valid:
print("Model matches schema!")
else:
print("Schema mismatch:")
for error in result.errors:
print(f" - {error}")
Validate Against Local File
from flowodm.schema import validate_against_file
result = validate_against_file(UserEvent, "schemas/user_event.avsc")
if not result.is_valid:
for error in result.errors:
print(f" - {error}")
Compatibility Checking
Check schema compatibility before deploying changes:
from flowodm.schema import check_compatibility
result = check_compatibility(
model_class=UserEvent,
subject="user-events-value",
compatibility_level="BACKWARD" # BACKWARD, FORWARD, FULL, NONE
)
if result.is_compatible:
print("Schema change is backward compatible!")
else:
print(f"Incompatible: {result.message}")
Model Generation
From Schema Registry
from flowodm.schema import generate_model_from_registry
# Generate a Pydantic model from a registered schema
UserEvent = generate_model_from_registry(
subject="user-events-value",
topic="user-events",
version="latest"
)
# Use the generated model
for event in UserEvent.consume_iter():
print(event)
From Avro File
from flowodm.schema import generate_model_from_schema
# Generate from local .avsc file
UserEvent = generate_model_from_schema(
schema_path="schemas/user_event.avsc",
topic="user-events"
)
Schema Upload
Upload schemas to the registry:
from flowodm.schema import upload_schema
# Upload from file
upload_schema(
schema_path="schemas/user_event.avsc",
subject="user-events-value",
compatibility="BACKWARD"
)
Registry Operations
List Subjects
from flowodm.schema import list_subjects
subjects = list_subjects()
for subject in subjects:
print(subject)
Get Schema Versions
from flowodm.schema import get_schema_versions
versions = get_schema_versions("user-events-value")
print(f"Available versions: {versions}")
Delete Subject
from flowodm.schema import delete_subject
# Soft delete (marks as deleted)
delete_subject("user-events-value")
# Permanent delete
delete_subject("user-events-value", permanent=True)
CLI Tools
FlowODM provides CLI commands for schema operations:
# Validate models against Schema Registry
flowodm validate --models myapp.events --registry
# Validate against local files
flowodm validate --models myapp.events --schemas-dir schemas/
# Check compatibility
flowodm check-compatibility \\
--subject user-events-value \\
--model myapp.events.UserEvent
# Upload schema
flowodm upload-schema \\
--avro schemas/user_event.avsc \\
--subject user-events-value
# List subjects
flowodm list-subjects
# Get schema
flowodm get-schema --subject user-events-value
See CI/CD Integration for CI/CD integration examples.
Schema Subject Naming
By default, FlowODM uses the topic name with -value suffix:
class UserEvent(FlowBaseModel):
class Settings:
topic = "user-events"
# schema_subject defaults to "user-events-value"
Override with explicit subject:
class UserEvent(FlowBaseModel):
class Settings:
topic = "user-events"
schema_subject = "my-custom-subject"
Schema Evolution
Follow Avro schema evolution rules:
Backward Compatible Changes (consumers can read old data):
Add optional fields with defaults
Remove fields
Forward Compatible Changes (producers can write new data):
Remove optional fields
Add fields
Full Compatible Changes:
Add optional fields with defaults
Remove optional fields with defaults
Example of a backward-compatible change:
# v1
class UserEvent(FlowBaseModel):
class Settings:
topic = "user-events"
user_id: str
action: str
# v2 - Added optional field (backward compatible)
class UserEvent(FlowBaseModel):
class Settings:
topic = "user-events"
user_id: str
action: str
metadata: dict[str, str] | None = None # New optional field