Consumer Loops
Consumer loops provide a robust pattern for building Kafka microservices with graceful shutdown, error handling, and lifecycle management.
Basic Consumer Loop
Synchronous
from flowodm import FlowBaseModel, ConsumerLoop, connect
connect(bootstrap_servers="localhost:9092")
class OrderEvent(FlowBaseModel):
class Settings:
topic = "orders"
consumer_group = "order-processor"
order_id: str
amount: float
def process_order(order: OrderEvent) -> None:
print(f"Processing order {order.order_id}: ${order.amount:.2f}")
# Your business logic here
loop = ConsumerLoop(
model=OrderEvent,
handler=process_order,
)
loop.run() # Blocks until shutdown signal
Asynchronous
import asyncio
from flowodm import AsyncConsumerLoop
async def process_order_async(order: OrderEvent) -> None:
await do_async_processing(order)
loop = AsyncConsumerLoop(
model=OrderEvent,
handler=process_order_async,
)
asyncio.run(loop.run())
Error Handling
Provide a custom error handler. The handler receives three arguments:
error: The exception that occurredraw_message: The raw Kafka messagedeserialized: The deserializedFlowBaseModelinstance if deserialization succeeded (but the handler failed), orNoneif deserialization itself failed
def handle_error(
error: Exception, raw_message, deserialized: OrderEvent | None
) -> None:
if deserialized is not None:
# Handler failed but we have the deserialized message
print(f"Failed to process order {deserialized.order_id}: {error}")
else:
# Deserialization failed
print(f"Failed to deserialize message: {error}")
# Options:
# - Log and continue
# - Send to dead letter queue
# - Alert operations team
loop = ConsumerLoop(
model=OrderEvent,
handler=process_order,
error_handler=handle_error,
)
Retries
By default, retries are disabled (max_retries=0). To enable automatic retries,
configure them explicitly:
loop = ConsumerLoop(
model=OrderEvent,
handler=process_order,
max_retries=3, # Retry up to 3 times (default: 0, no retries)
retry_delay=1.0, # Wait 1 second between retries
error_handler=handle_error, # Called after all retries exhausted
)
When retries are disabled, failed messages are immediately passed to the error handler without retry-related log messages.
Commit Strategies
Control when message offsets are committed to prevent duplicates or ensure reliability:
before_processing (Recommended for parallel deployments): Commits immediately after receiving, before handler execution. Prevents duplicate processing in parallel pod deployments but may lose messages if processing fails (at-most-once delivery).
after_processing (Recommended for single consumer): Commits after successful processing. Guarantees at-least-once delivery but may cause duplicates in parallel deployments.
Example:
# Prevent duplicates in Kubernetes/parallel deployments
loop = ConsumerLoop(
model=OrderEvent,
handler=process_order,
commit_strategy="before_processing",
)
# Guarantee no message loss (may process duplicates)
loop = ConsumerLoop(
model=OrderEvent,
handler=process_order,
commit_strategy="after_processing",
)
Best Practices:
Use
before_processingwhen running multiple consumer instances in parallelMake handlers idempotent when possible
Use
error_handlerto capture failed messages for dead letter queueMonitor failed commits in logs/metrics
Lifecycle Hooks
Execute code on startup and shutdown:
db_connection = None
def on_startup() -> None:
global db_connection
print("Starting up...")
db_connection = connect_to_database()
def on_shutdown() -> None:
global db_connection
print("Shutting down...")
if db_connection:
db_connection.close()
loop = ConsumerLoop(
model=OrderEvent,
handler=process_order,
on_startup=on_startup,
on_shutdown=on_shutdown,
)
Settings Profiles
Use predefined settings for different scenarios:
from flowodm import LongRunningSettings, BatchSettings, RealTimeSettings
# For long-running processing (ML inference, complex operations)
loop = ConsumerLoop(
model=OrderEvent,
handler=process_order,
settings=LongRunningSettings(), # Tolerates up to 10 min processing
)
# For batch processing (ETL, aggregation)
loop = ConsumerLoop(
model=OrderEvent,
handler=batch_process,
settings=BatchSettings(), # Larger batches, 5 min poll interval
)
# For real-time processing (notifications, event-driven)
loop = ConsumerLoop(
model=OrderEvent,
handler=quick_process,
settings=RealTimeSettings(), # Small batches, fast polling
)
Graceful Shutdown
Consumer loops handle shutdown signals automatically:
loop = ConsumerLoop(
model=OrderEvent,
handler=process_order,
)
# Handles SIGTERM and SIGINT gracefully
# - Completes current message processing
# - Commits offsets
# - Calls on_shutdown hook
# - Closes connections
loop.run()
You can also trigger shutdown programmatically:
# In another thread or async task
loop.stop()
Concurrent Processing (Async)
Process multiple messages concurrently with AsyncConsumerLoop:
async def process_order_async(order: OrderEvent) -> None:
await asyncio.sleep(1) # Simulate async work
print(f"Processed {order.order_id}")
loop = AsyncConsumerLoop(
model=OrderEvent,
handler=process_order_async,
max_concurrent=20, # Process up to 20 messages concurrently
)
await loop.run()
Decorator Style
For simple use cases, use the decorator:
from flowodm import consumer_loop
@consumer_loop(OrderEvent)
def process_order(order: OrderEvent) -> None:
print(f"Processing {order.order_id}")
# Start the loop
process_order()
Async version:
from flowodm import async_consumer_loop
@async_consumer_loop(OrderEvent)
async def process_order_async(order: OrderEvent) -> None:
await handle_order(order)
await process_order_async()
Complete Microservice Example
#!/usr/bin/env python
import logging
from datetime import datetime
from flowodm import (
FlowBaseModel,
ConsumerLoop,
LongRunningSettings,
connect,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class OrderEvent(FlowBaseModel):
class Settings:
topic = "orders"
consumer_group = "order-processor"
order_id: str
customer_id: str
total_price: float
created_at: datetime
# State
processed_count = 0
def process_order(order: OrderEvent) -> None:
global processed_count
logger.info(f"Processing order {order.order_id}")
# Business logic here
processed_count += 1
def handle_error(
error: Exception, raw, deserialized: OrderEvent | None
) -> None:
if deserialized is not None:
logger.error(f"Failed to process order {deserialized.order_id}: {error}")
else:
logger.error(f"Failed to deserialize: {error}")
# Send to dead letter queue, alert, etc.
def on_startup() -> None:
logger.info("=== Order Processor Starting ===")
def on_shutdown() -> None:
logger.info(f"=== Processed {processed_count} orders ===")
def main():
connect(
bootstrap_servers="localhost:9092",
schema_registry_url="http://localhost:8081",
)
loop = ConsumerLoop(
model=OrderEvent,
handler=process_order,
settings=LongRunningSettings(),
error_handler=handle_error,
on_startup=on_startup,
on_shutdown=on_shutdown,
max_retries=3, # Enable retries (default: 0)
retry_delay=1.0,
)
logger.info("Starting consumer loop...")
loop.run()
if __name__ == "__main__":
main()