Model Definition
FlowODM models are defined by inheriting from FlowBaseModel, which extends
Pydantic’s BaseModel with Kafka-specific functionality.
Basic Model
from datetime import datetime
from flowodm import FlowBaseModel
class UserEvent(FlowBaseModel):
class Settings:
topic = "user-events"
user_id: str
action: str
timestamp: datetime
Settings Class
The inner Settings class configures Kafka-specific behavior:
Attribute |
Required |
Description |
|---|---|---|
|
Yes |
Kafka topic name for produce/consume operations |
|
No |
Schema Registry subject name (defaults to |
|
No |
Path to local Avro schema file |
|
No |
Consumer group ID for consumption |
|
No |
Field name to use as message key |
|
No |
Prepend Confluent wire format header when serializing (default |
Example with all settings:
class OrderEvent(FlowBaseModel):
class Settings:
topic = "orders"
schema_subject = "orders-value-v1"
schema_path = "schemas/order.avsc"
consumer_group = "order-processor"
key_field = "order_id"
order_id: str
customer_id: str
total: float
Confluent Wire Format
By default, FlowODM prepends the Confluent wire format
header to serialized messages when a Schema Registry is configured. This header
consists of a magic byte (0x00) followed by a 4-byte big-endian schema ID,
making messages compatible with standard Confluent consumers, Java’s
KafkaAvroDeserializer, and Kafka UI tools like AKHQ and Kafdrop.
When no Schema Registry is configured, messages are serialized as raw Avro bytes.
To disable the wire format header (e.g., for custom consumers that expect raw Avro):
class RawAvroEvent(FlowBaseModel):
class Settings:
topic = "raw-events"
confluent_wire_format = False # Produce raw Avro bytes
event_id: str
payload: str
Optional Fields
Use Python’s optional type syntax:
class UserProfile(FlowBaseModel):
class Settings:
topic = "profiles"
user_id: str
name: str
email: str | None = None # Optional field
age: int | None = None # Optional field
verified: bool = False # Default value
Complex Types
FlowODM supports nested models and complex types:
from pydantic import BaseModel
from typing import list
class Address(BaseModel):
street: str
city: str
country: str
class Customer(FlowBaseModel):
class Settings:
topic = "customers"
customer_id: str
name: str
addresses: list[Address]
tags: list[str]
metadata: dict[str, str]
Validation
Pydantic validation works seamlessly:
from pydantic import Field, field_validator
class Product(FlowBaseModel):
class Settings:
topic = "products"
product_id: str
name: str = Field(min_length=1, max_length=200)
price: float = Field(gt=0)
quantity: int = Field(ge=0)
@field_validator("product_id")
@classmethod
def validate_product_id(cls, v: str) -> str:
if not v.startswith("PROD-"):
raise ValueError("Product ID must start with 'PROD-'")
return v
Serialization
Models are serialized to Avro format when producing and deserialized when consuming. The Avro schema is automatically generated from the Pydantic model or loaded from Schema Registry.
event = UserEvent(user_id="123", action="login", timestamp=datetime.now())
# Get the Avro-compatible dict
avro_dict = event.to_avro_dict()
# Get the JSON representation
json_str = event.model_dump_json()