Migration Guide

This guide helps you migrate from other Kafka libraries to FlowODM.

From aiokafka

Before (aiokafka)

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json

async def produce():
    producer = AIOKafkaProducer(
        bootstrap_servers='localhost:9092',
        value_serializer=lambda v: json.dumps(v).encode()
    )
    await producer.start()
    try:
        await producer.send('orders', {'order_id': '123', 'amount': 99.99})
    finally:
        await producer.stop()

async def consume():
    consumer = AIOKafkaConsumer(
        'orders',
        bootstrap_servers='localhost:9092',
        value_deserializer=lambda v: json.loads(v)
    )
    await consumer.start()
    try:
        async for msg in consumer:
            print(msg.value)
    finally:
        await consumer.stop()

After (FlowODM)

from flowodm import FlowBaseModel, connect

connect(bootstrap_servers="localhost:9092")

class OrderEvent(FlowBaseModel):
    class Settings:
        topic = "orders"
        consumer_group = "order-processor"

    order_id: str
    amount: float

async def produce():
    order = OrderEvent(order_id="123", amount=99.99)
    await order.aproduce()

async def consume():
    async for event in OrderEvent.aconsume_iter():
        print(event)

From confluent-kafka

Before (confluent-kafka)

from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer

sr_client = SchemaRegistryClient({'url': 'http://localhost:8081'})
schema_str = '{"type": "record", "name": "Order", "fields": [...]}'

avro_serializer = AvroSerializer(sr_client, schema_str)
avro_deserializer = AvroDeserializer(sr_client, schema_str)

producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', value=avro_serializer({'order_id': '123'}))
producer.flush()

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])

while True:
    msg = consumer.poll(1.0)
    if msg is not None:
        order = avro_deserializer(msg.value())
        print(order)

After (FlowODM)

from flowodm import FlowBaseModel, connect

connect(
    bootstrap_servers="localhost:9092",
    schema_registry_url="http://localhost:8081"
)

class OrderEvent(FlowBaseModel):
    class Settings:
        topic = "orders"
        consumer_group = "order-processor"

    order_id: str

# Produce
order = OrderEvent(order_id="123")
order.produce()

# Consume
for event in OrderEvent.consume_iter():
    print(event)

From faust

Before (faust)

import faust

app = faust.App('myapp', broker='kafka://localhost:9092')

class Order(faust.Record):
    order_id: str
    amount: float

orders_topic = app.topic('orders', value_type=Order)

@app.agent(orders_topic)
async def process_orders(orders):
    async for order in orders:
        print(f'Processing {order.order_id}')

if __name__ == '__main__':
    app.main()

After (FlowODM)

from flowodm import FlowBaseModel, AsyncConsumerLoop, connect

connect(bootstrap_servers="localhost:9092")

class Order(FlowBaseModel):
    class Settings:
        topic = "orders"
        consumer_group = "myapp"

    order_id: str
    amount: float

async def process_order(order: Order) -> None:
    print(f'Processing {order.order_id}')

loop = AsyncConsumerLoop(
    model=Order,
    handler=process_order,
)

if __name__ == '__main__':
    import asyncio
    asyncio.run(loop.run())

Key Differences

Feature

Other Libraries

FlowODM

Model Definition

Dataclasses, Records, or dicts

Pydantic models with Settings

Schema Management

Manual setup

Automatic with Settings

Serialization

Explicit serializers

Built-in Avro support

Consumer Groups

Per-consumer config

Per-model Settings

Error Handling

Manual try/catch

Built-in error handlers

Lifecycle

Manual setup/teardown

ConsumerLoop hooks

Benefits of FlowODM

  1. Type Safety: Full Pydantic validation and type hints

  2. Less Boilerplate: Model defines everything in one place

  3. Schema Registry: Automatic integration with validation

  4. Consumer Loops: Production-ready patterns out of the box

  5. Settings Profiles: Optimized configurations for common scenarios

  6. CLI Tools: Schema management in CI/CD pipelines