Migration Guide
This guide helps you migrate from other Kafka libraries to FlowODM.
From aiokafka
Before (aiokafka)
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
async def produce():
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode()
)
await producer.start()
try:
await producer.send('orders', {'order_id': '123', 'amount': 99.99})
finally:
await producer.stop()
async def consume():
consumer = AIOKafkaConsumer(
'orders',
bootstrap_servers='localhost:9092',
value_deserializer=lambda v: json.loads(v)
)
await consumer.start()
try:
async for msg in consumer:
print(msg.value)
finally:
await consumer.stop()
After (FlowODM)
from flowodm import FlowBaseModel, connect
connect(bootstrap_servers="localhost:9092")
class OrderEvent(FlowBaseModel):
class Settings:
topic = "orders"
consumer_group = "order-processor"
order_id: str
amount: float
async def produce():
order = OrderEvent(order_id="123", amount=99.99)
await order.aproduce()
async def consume():
async for event in OrderEvent.aconsume_iter():
print(event)
From confluent-kafka
Before (confluent-kafka)
from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
sr_client = SchemaRegistryClient({'url': 'http://localhost:8081'})
schema_str = '{"type": "record", "name": "Order", "fields": [...]}'
avro_serializer = AvroSerializer(sr_client, schema_str)
avro_deserializer = AvroDeserializer(sr_client, schema_str)
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', value=avro_serializer({'order_id': '123'}))
producer.flush()
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])
while True:
msg = consumer.poll(1.0)
if msg is not None:
order = avro_deserializer(msg.value())
print(order)
After (FlowODM)
from flowodm import FlowBaseModel, connect
connect(
bootstrap_servers="localhost:9092",
schema_registry_url="http://localhost:8081"
)
class OrderEvent(FlowBaseModel):
class Settings:
topic = "orders"
consumer_group = "order-processor"
order_id: str
# Produce
order = OrderEvent(order_id="123")
order.produce()
# Consume
for event in OrderEvent.consume_iter():
print(event)
From faust
Before (faust)
import faust
app = faust.App('myapp', broker='kafka://localhost:9092')
class Order(faust.Record):
order_id: str
amount: float
orders_topic = app.topic('orders', value_type=Order)
@app.agent(orders_topic)
async def process_orders(orders):
async for order in orders:
print(f'Processing {order.order_id}')
if __name__ == '__main__':
app.main()
After (FlowODM)
from flowodm import FlowBaseModel, AsyncConsumerLoop, connect
connect(bootstrap_servers="localhost:9092")
class Order(FlowBaseModel):
class Settings:
topic = "orders"
consumer_group = "myapp"
order_id: str
amount: float
async def process_order(order: Order) -> None:
print(f'Processing {order.order_id}')
loop = AsyncConsumerLoop(
model=Order,
handler=process_order,
)
if __name__ == '__main__':
import asyncio
asyncio.run(loop.run())
Key Differences
Feature |
Other Libraries |
FlowODM |
|---|---|---|
Model Definition |
Dataclasses, Records, or dicts |
Pydantic models with Settings |
Schema Management |
Manual setup |
Automatic with Settings |
Serialization |
Explicit serializers |
Built-in Avro support |
Consumer Groups |
Per-consumer config |
Per-model Settings |
Error Handling |
Manual try/catch |
Built-in error handlers |
Lifecycle |
Manual setup/teardown |
ConsumerLoop hooks |
Benefits of FlowODM
Type Safety: Full Pydantic validation and type hints
Less Boilerplate: Model defines everything in one place
Schema Registry: Automatic integration with validation
Consumer Loops: Production-ready patterns out of the box
Settings Profiles: Optimized configurations for common scenarios
CLI Tools: Schema management in CI/CD pipelines