Consumer Loops

Consumer loops provide a robust pattern for building Kafka microservices with graceful shutdown, error handling, and lifecycle management.

Basic Consumer Loop

Synchronous

from flowodm import FlowBaseModel, ConsumerLoop, connect

connect(bootstrap_servers="localhost:9092")

class OrderEvent(FlowBaseModel):
    class Settings:
        topic = "orders"
        consumer_group = "order-processor"

    order_id: str
    amount: float

def process_order(order: OrderEvent) -> None:
    print(f"Processing order {order.order_id}: ${order.amount:.2f}")
    # Your business logic here

loop = ConsumerLoop(
    model=OrderEvent,
    handler=process_order,
)

loop.run()  # Blocks until shutdown signal

Asynchronous

import asyncio
from flowodm import AsyncConsumerLoop

async def process_order_async(order: OrderEvent) -> None:
    await do_async_processing(order)

loop = AsyncConsumerLoop(
    model=OrderEvent,
    handler=process_order_async,
)

asyncio.run(loop.run())

Error Handling

Provide a custom error handler. The handler receives three arguments:

  • error: The exception that occurred

  • raw_message: The raw Kafka message

  • deserialized: The deserialized FlowBaseModel instance if deserialization succeeded (but the handler failed), or None if deserialization itself failed

def handle_error(
    error: Exception, raw_message, deserialized: OrderEvent | None
) -> None:
    if deserialized is not None:
        # Handler failed but we have the deserialized message
        print(f"Failed to process order {deserialized.order_id}: {error}")
    else:
        # Deserialization failed
        print(f"Failed to deserialize message: {error}")
    # Options:
    # - Log and continue
    # - Send to dead letter queue
    # - Alert operations team

loop = ConsumerLoop(
    model=OrderEvent,
    handler=process_order,
    error_handler=handle_error,
)

Retries

By default, retries are disabled (max_retries=0). To enable automatic retries, configure them explicitly:

loop = ConsumerLoop(
    model=OrderEvent,
    handler=process_order,
    max_retries=3,         # Retry up to 3 times (default: 0, no retries)
    retry_delay=1.0,       # Wait 1 second between retries
    error_handler=handle_error,  # Called after all retries exhausted
)

When retries are disabled, failed messages are immediately passed to the error handler without retry-related log messages.

Commit Strategies

Control when message offsets are committed to prevent duplicates or ensure reliability:

before_processing (Recommended for parallel deployments): Commits immediately after receiving, before handler execution. Prevents duplicate processing in parallel pod deployments but may lose messages if processing fails (at-most-once delivery).

after_processing (Recommended for single consumer): Commits after successful processing. Guarantees at-least-once delivery but may cause duplicates in parallel deployments.

Example:

# Prevent duplicates in Kubernetes/parallel deployments
loop = ConsumerLoop(
    model=OrderEvent,
    handler=process_order,
    commit_strategy="before_processing",
)

# Guarantee no message loss (may process duplicates)
loop = ConsumerLoop(
    model=OrderEvent,
    handler=process_order,
    commit_strategy="after_processing",
)

Best Practices:

  • Use before_processing when running multiple consumer instances in parallel

  • Make handlers idempotent when possible

  • Use error_handler to capture failed messages for dead letter queue

  • Monitor failed commits in logs/metrics

Lifecycle Hooks

Execute code on startup and shutdown:

db_connection = None

def on_startup() -> None:
    global db_connection
    print("Starting up...")
    db_connection = connect_to_database()

def on_shutdown() -> None:
    global db_connection
    print("Shutting down...")
    if db_connection:
        db_connection.close()

loop = ConsumerLoop(
    model=OrderEvent,
    handler=process_order,
    on_startup=on_startup,
    on_shutdown=on_shutdown,
)

Settings Profiles

Use predefined settings for different scenarios:

from flowodm import LongRunningSettings, BatchSettings, RealTimeSettings

# For long-running processing (ML inference, complex operations)
loop = ConsumerLoop(
    model=OrderEvent,
    handler=process_order,
    settings=LongRunningSettings(),  # Tolerates up to 10 min processing
)

# For batch processing (ETL, aggregation)
loop = ConsumerLoop(
    model=OrderEvent,
    handler=batch_process,
    settings=BatchSettings(),  # Larger batches, 5 min poll interval
)

# For real-time processing (notifications, event-driven)
loop = ConsumerLoop(
    model=OrderEvent,
    handler=quick_process,
    settings=RealTimeSettings(),  # Small batches, fast polling
)

Graceful Shutdown

Consumer loops handle shutdown signals automatically:

loop = ConsumerLoop(
    model=OrderEvent,
    handler=process_order,
)

# Handles SIGTERM and SIGINT gracefully
# - Completes current message processing
# - Commits offsets
# - Calls on_shutdown hook
# - Closes connections
loop.run()

You can also trigger shutdown programmatically:

# In another thread or async task
loop.stop()

Concurrent Processing (Async)

Process multiple messages concurrently with AsyncConsumerLoop:

async def process_order_async(order: OrderEvent) -> None:
    await asyncio.sleep(1)  # Simulate async work
    print(f"Processed {order.order_id}")

loop = AsyncConsumerLoop(
    model=OrderEvent,
    handler=process_order_async,
    max_concurrent=20,  # Process up to 20 messages concurrently
)

await loop.run()

Decorator Style

For simple use cases, use the decorator:

from flowodm import consumer_loop

@consumer_loop(OrderEvent)
def process_order(order: OrderEvent) -> None:
    print(f"Processing {order.order_id}")

# Start the loop
process_order()

Async version:

from flowodm import async_consumer_loop

@async_consumer_loop(OrderEvent)
async def process_order_async(order: OrderEvent) -> None:
    await handle_order(order)

await process_order_async()

Complete Microservice Example

#!/usr/bin/env python
import logging
from datetime import datetime
from flowodm import (
    FlowBaseModel,
    ConsumerLoop,
    LongRunningSettings,
    connect,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class OrderEvent(FlowBaseModel):
    class Settings:
        topic = "orders"
        consumer_group = "order-processor"

    order_id: str
    customer_id: str
    total_price: float
    created_at: datetime

# State
processed_count = 0

def process_order(order: OrderEvent) -> None:
    global processed_count
    logger.info(f"Processing order {order.order_id}")
    # Business logic here
    processed_count += 1

def handle_error(
    error: Exception, raw, deserialized: OrderEvent | None
) -> None:
    if deserialized is not None:
        logger.error(f"Failed to process order {deserialized.order_id}: {error}")
    else:
        logger.error(f"Failed to deserialize: {error}")
    # Send to dead letter queue, alert, etc.

def on_startup() -> None:
    logger.info("=== Order Processor Starting ===")

def on_shutdown() -> None:
    logger.info(f"=== Processed {processed_count} orders ===")

def main():
    connect(
        bootstrap_servers="localhost:9092",
        schema_registry_url="http://localhost:8081",
    )

    loop = ConsumerLoop(
        model=OrderEvent,
        handler=process_order,
        settings=LongRunningSettings(),
        error_handler=handle_error,
        on_startup=on_startup,
        on_shutdown=on_shutdown,
        max_retries=3,         # Enable retries (default: 0)
        retry_delay=1.0,
    )

    logger.info("Starting consumer loop...")
    loop.run()

if __name__ == "__main__":
    main()