Consuming Messages

FlowODM provides multiple ways to consume messages from Kafka topics.

Basic Consumption

Consume Single Message

from flowodm import FlowBaseModel, connect

connect(bootstrap_servers="localhost:9092")

class OrderEvent(FlowBaseModel):
    class Settings:
        topic = "orders"
        consumer_group = "order-processor"

    order_id: str
    amount: float

# Wait up to 5 seconds for a message
event = OrderEvent.consume_one(timeout=5.0)
if event:
    print(f"Received order: {event.order_id}")

Iterate Over Messages

# Process up to 100 messages
for event in OrderEvent.consume_iter(max_messages=100):
    print(f"Processing order: {event.order_id}")

# Process until timeout
for event in OrderEvent.consume_iter(timeout=10.0):
    process_order(event)

Asynchronous Consumption

For async applications:

import asyncio

async def main():
    # Single message
    event = await OrderEvent.aconsume_one(timeout=5.0)
    if event:
        print(f"Received: {event.order_id}")

    # Iterate asynchronously
    async for event in OrderEvent.aconsume_iter(max_messages=100):
        await process_order_async(event)

asyncio.run(main())

Consumer Groups

Consumer groups enable parallel processing:

class OrderEvent(FlowBaseModel):
    class Settings:
        topic = "orders"
        consumer_group = "order-processor"  # Group ID

    order_id: str
    amount: float

Multiple instances with the same consumer_group will share the workload.

Offset Management

Auto-Commit (Default)

By default, offsets are committed automatically:

for event in OrderEvent.consume_iter():
    process_order(event)
    # Offset committed automatically after processing

Manual Commit

For more control:

connect(
    bootstrap_servers="localhost:9092",
    consumer_config={"enable.auto.commit": False}
)

from flowodm import get_consumer

consumer = get_consumer("order-processor")

for event in OrderEvent.consume_iter():
    try:
        process_order(event)
        consumer.commit()  # Commit after successful processing
    except Exception as e:
        # Don't commit on error - message will be reprocessed
        log_error(e)

Starting Position

Control where consumption starts:

connect(
    bootstrap_servers="localhost:9092",
    consumer_config={
        "auto.offset.reset": "earliest",  # or "latest"
    }
)
  • earliest: Start from the beginning of the topic

  • latest: Start from the end (new messages only)

Deserialization

Messages are automatically deserialized from Avro:

for event in OrderEvent.consume_iter():
    # event is a fully typed OrderEvent instance
    print(event.order_id)      # str
    print(event.amount)        # float
    print(event.message_id)    # Auto-generated ID

Handling Errors

from flowodm.exceptions import ConsumerError, DeserializationError

try:
    for event in OrderEvent.consume_iter():
        process_order(event)
except DeserializationError as e:
    print(f"Failed to deserialize message: {e}")
except ConsumerError as e:
    print(f"Consumer error: {e}")

Filtering

Filter messages during consumption:

# Filter in application code
for event in OrderEvent.consume_iter():
    if event.amount > 100:  # Only process high-value orders
        process_order(event)

Consumer Configuration

Customize consumer settings:

connect(
    bootstrap_servers="localhost:9092",
    consumer_config={
        "session.timeout.ms": 30000,
        "max.poll.interval.ms": 300000,
        "max.poll.records": 500,
        "fetch.min.bytes": 1,
        "fetch.max.wait.ms": 500,
    }
)

See Settings Profiles for predefined configurations.