"""
Consumer loop patterns for building Kafka microservices.
Provides both synchronous and asynchronous consumer loops with:
- Graceful shutdown handling
- Error handling and retry logic
- Lifecycle hooks (on_startup, on_shutdown)
- Configurable commit strategies
"""
from __future__ import annotations
import asyncio
import logging
import signal
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Any
from flowodm.exceptions import ConsumerError
from flowodm.settings import BaseSettings, LongRunningSettings
if TYPE_CHECKING:
from flowodm.model import FlowBaseModel
logger = logging.getLogger(__name__)
[docs]
class ConsumerLoop:
"""
Synchronous consumer loop for processing Kafka messages.
Provides a main loop for microservices that consume and process messages.
Handles graceful shutdown on SIGTERM/SIGINT signals.
Example:
>>> def process_order(order: OrderEvent) -> None:
... print(f"Processing order {order.order_id}")
...
>>> loop = ConsumerLoop(
... model=OrderEvent,
... handler=process_order,
... settings=LongRunningSettings(),
... )
>>> loop.run() # Blocking
"""
[docs]
def __init__(
self,
model: type[FlowBaseModel],
handler: Callable[[Any], None],
settings: BaseSettings | None = None,
group_id: str | None = None,
error_handler: Callable[[Exception, Any, FlowBaseModel | None], None] | None = None,
on_startup: Callable[[], None] | None = None,
on_shutdown: Callable[[], None] | None = None,
commit_strategy: str = "before_processing",
max_retries: int = 0,
retry_delay: float = 1.0,
poll_timeout: float = 1.0,
):
"""
Initialize consumer loop.
Args:
model: FlowBaseModel subclass to consume
handler: Function to process each message
settings: Kafka settings profile (defaults to LongRunningSettings)
group_id: Consumer group ID (uses model's Settings if not specified)
error_handler: Optional function to handle processing errors. Receives
(exception, raw_message, deserialized_instance). The deserialized_instance
is None if deserialization failed, otherwise contains the FlowBaseModel.
on_startup: Optional function called before loop starts
on_shutdown: Optional function called after loop stops
commit_strategy: Commit timing strategy. Use "before_processing" to commit before
handler execution (at-most-once, prevents duplicates in parallel deployments),
or "after_processing" to commit after successful processing (at-least-once,
may cause duplicates in parallel deployments).
max_retries: Maximum retry attempts for failed messages
retry_delay: Delay between retries in seconds
poll_timeout: Kafka poll timeout in seconds
"""
self.model = model
self.handler = handler
self.settings = settings or LongRunningSettings()
self.group_id = group_id
self.error_handler = error_handler
self.on_startup = on_startup
self.on_shutdown = on_shutdown
self.max_retries = max_retries
self.retry_delay = retry_delay
self.poll_timeout = poll_timeout
# Validate commit strategy
VALID_STRATEGIES = {"before_processing", "after_processing"}
if commit_strategy not in VALID_STRATEGIES:
raise ValueError(
f"Invalid commit_strategy: {commit_strategy}. Must be one of {VALID_STRATEGIES}"
)
self.commit_strategy = commit_strategy
self._running = False
self._consumer: Any = None
def _setup_signal_handlers(self) -> None:
"""Setup signal handlers for graceful shutdown."""
# SIGINT (Ctrl+C) works on all platforms
signal.signal(signal.SIGINT, self._signal_handler)
# SIGTERM only exists on Unix-like systems
if hasattr(signal, "SIGTERM"):
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum: int, frame: Any) -> None:
"""Handle shutdown signals."""
logger.info(f"Received signal {signum}, initiating graceful shutdown...")
self.stop()
[docs]
def stop(self) -> None:
"""Signal the loop to stop gracefully."""
self._running = False
[docs]
def run(self) -> None:
"""
Start the consumer loop (blocking).
Processes messages until stop() is called or a signal is received.
"""
self._setup_signal_handlers()
# Call startup hook
if self.on_startup:
logger.info("Running startup hook...")
self.on_startup()
# Get consumer
self._consumer = self.model.get_consumer(self.group_id, self.settings)
self._running = True
logger.info(
f"Starting consumer loop for {self.model.__name__} on topic {self.model._get_topic()}"
)
try:
while self._running:
msg = self._consumer.poll(self.poll_timeout)
if msg is None:
continue
if msg.error():
logger.warning(f"Consumer error: {msg.error()}")
continue
self._process_message(msg)
except Exception as e:
logger.error(f"Consumer loop error: {e}")
raise ConsumerError(f"Consumer loop failed: {e}") from e
finally:
# Call shutdown hook
if self.on_shutdown:
logger.info("Running shutdown hook...")
self.on_shutdown()
# Close consumer
if self._consumer:
logger.info("Closing consumer...")
self._consumer.close()
logger.info("Consumer loop stopped")
def _commit_offset(self, msg: Any, max_attempts: int = 3) -> bool:
"""
Commit message offset with retry logic.
Args:
msg: Kafka message to commit
max_attempts: Maximum number of commit attempts
Returns:
True if commit succeeded, False otherwise
"""
import time
for attempt in range(max_attempts):
try:
self._consumer.commit(msg)
return True
except Exception as e:
if attempt == max_attempts - 1:
logger.error(f"Failed to commit offset after {max_attempts} attempts: {e}")
return False
time.sleep(0.1 * (2**attempt)) # Exponential backoff
return False
def _process_message(self, msg: Any) -> None:
"""Process a single message with retry logic."""
import time
# Early commit for before_processing strategy
if self.commit_strategy == "before_processing":
if not self._commit_offset(msg):
logger.error("Skipping message due to commit failure")
return
retries = 0
instance: FlowBaseModel | None = None
while retries <= self.max_retries:
try:
# Deserialize message
value = msg.value()
if value is None:
return
instance = self.model._deserialize_avro(value)
# Call handler
self.handler(instance)
# Late commit for after_processing strategy
if self.commit_strategy == "after_processing":
self._commit_offset(msg)
return
except Exception as e:
retries += 1
if self.max_retries > 0:
logger.warning(
f"Error processing message (attempt {retries}/{self.max_retries + 1}): {e}"
)
if retries > self.max_retries:
if self.max_retries > 0:
logger.error(f"Max retries exceeded for message: {e}")
if self.error_handler:
try:
self.error_handler(e, msg, instance)
except Exception as handler_error:
logger.error(f"Error handler failed: {handler_error}")
# Only commit on failure if using after_processing
if self.commit_strategy == "after_processing":
self._commit_offset(msg)
return
# Wait before retry
time.sleep(self.retry_delay)
[docs]
class AsyncConsumerLoop:
"""
Asynchronous consumer loop for processing Kafka messages.
Supports concurrent message processing with configurable parallelism.
Example:
>>> async def process_order(order: OrderEvent) -> None:
... await external_api.submit(order)
...
>>> loop = AsyncConsumerLoop(
... model=OrderEvent,
... handler=process_order,
... max_concurrent=20,
... )
>>> await loop.run()
"""
[docs]
def __init__(
self,
model: type[FlowBaseModel],
handler: Callable[[Any], Awaitable[None]],
settings: BaseSettings | None = None,
group_id: str | None = None,
error_handler: (
Callable[[Exception, Any, FlowBaseModel | None], Awaitable[None]] | None
) = None,
on_startup: Callable[[], Awaitable[None]] | None = None,
on_shutdown: Callable[[], Awaitable[None]] | None = None,
max_concurrent: int = 10,
commit_strategy: str = "before_processing",
max_retries: int = 0,
retry_delay: float = 1.0,
poll_timeout: float = 1.0,
):
"""
Initialize async consumer loop.
Args:
model: FlowBaseModel subclass to consume
handler: Async function to process each message
settings: Kafka settings profile
group_id: Consumer group ID
error_handler: Optional async function to handle errors. Receives
(exception, raw_message, deserialized_instance). The deserialized_instance
is None if deserialization failed, otherwise contains the FlowBaseModel.
on_startup: Optional async function called before loop starts
on_shutdown: Optional async function called after loop stops
max_concurrent: Maximum concurrent message processing tasks
commit_strategy: Commit timing strategy. Use "before_processing" to commit before
handler execution (at-most-once, prevents duplicates in parallel deployments),
or "after_processing" to commit after successful processing (at-least-once,
may cause duplicates in parallel deployments).
max_retries: Maximum retry attempts
retry_delay: Delay between retries
poll_timeout: Kafka poll timeout
"""
self.model = model
self.handler = handler
self.settings = settings or LongRunningSettings()
self.group_id = group_id
self.error_handler = error_handler
self.on_startup = on_startup
self.on_shutdown = on_shutdown
self.max_concurrent = max_concurrent
self.max_retries = max_retries
self.retry_delay = retry_delay
self.poll_timeout = poll_timeout
# Validate commit strategy
VALID_STRATEGIES = {"before_processing", "after_processing"}
if commit_strategy not in VALID_STRATEGIES:
raise ValueError(
f"Invalid commit_strategy: {commit_strategy}. Must be one of {VALID_STRATEGIES}"
)
self.commit_strategy = commit_strategy
self._running = False
self._consumer: Any = None
self._semaphore: asyncio.Semaphore | None = None
[docs]
def stop(self) -> None:
"""Signal the loop to stop gracefully."""
self._running = False
[docs]
async def run(self) -> None:
"""
Start the async consumer loop.
Processes messages until stop() is called.
"""
# Setup signal handlers for asyncio
loop = asyncio.get_running_loop()
try:
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self.stop)
except NotImplementedError:
# Windows doesn't support add_signal_handler on ProactorEventLoop
# Fall back to signal.signal() for SIGINT (Ctrl+C)
signal.signal(signal.SIGINT, lambda signum, frame: self.stop())
# Call startup hook
if self.on_startup:
logger.info("Running async startup hook...")
await self.on_startup()
# Get consumer
self._consumer = await self.model.get_async_consumer(self.group_id, self.settings)
self._semaphore = asyncio.Semaphore(self.max_concurrent)
self._running = True
logger.info(
f"Starting async consumer loop for {self.model.__name__} "
f"on topic {self.model._get_topic()}"
)
pending_tasks: set[asyncio.Task[None]] = set()
try:
while self._running:
# Poll for message
if hasattr(self._consumer, "poll_async"):
msg = await self._consumer.poll_async(self.poll_timeout)
else:
# Run sync poll in thread executor to avoid blocking event loop
msg = await asyncio.to_thread(self._consumer.poll, self.poll_timeout)
if msg is None:
# Clean up completed tasks
done = {t for t in pending_tasks if t.done()}
pending_tasks -= done
# Yield to allow other tasks to run
await asyncio.sleep(0)
continue
if msg.error():
logger.warning(f"Consumer error: {msg.error()}")
continue
# Process message with concurrency limit
await self._semaphore.acquire()
task = asyncio.create_task(self._process_message_with_semaphore(msg))
pending_tasks.add(task)
# Wait for pending tasks on shutdown
if pending_tasks:
logger.info(f"Waiting for {len(pending_tasks)} pending tasks...")
await asyncio.gather(*pending_tasks, return_exceptions=True)
except Exception as e:
logger.error(f"Async consumer loop error: {e}")
raise ConsumerError(f"Async consumer loop failed: {e}") from e
finally:
# Call shutdown hook
if self.on_shutdown:
logger.info("Running async shutdown hook...")
await self.on_shutdown()
# Close consumer
if self._consumer:
logger.info("Closing async consumer...")
try:
self._consumer.close()
except Exception:
pass
logger.info("Async consumer loop stopped")
async def _process_message_with_semaphore(self, msg: Any) -> None:
"""Process message and release semaphore when done."""
try:
await self._process_message(msg)
finally:
if self._semaphore:
self._semaphore.release()
async def _commit_offset(self, msg: Any, max_attempts: int = 3) -> bool:
"""
Commit message offset with retry logic.
Args:
msg: Kafka message to commit
max_attempts: Maximum number of commit attempts
Returns:
True if commit succeeded, False otherwise
"""
for attempt in range(max_attempts):
try:
self._consumer.commit(msg)
return True
except Exception as e:
if attempt == max_attempts - 1:
logger.error(f"Failed to commit offset after {max_attempts} attempts: {e}")
return False
await asyncio.sleep(0.1 * (2**attempt)) # Exponential backoff
return False
async def _process_message(self, msg: Any) -> None:
"""Process a single message with retry logic."""
# Early commit for before_processing strategy
if self.commit_strategy == "before_processing":
if not await self._commit_offset(msg):
logger.error("Skipping message due to commit failure")
return
retries = 0
instance: FlowBaseModel | None = None
while retries <= self.max_retries:
try:
# Deserialize message
value = msg.value()
if value is None:
return
instance = self.model._deserialize_avro(value)
# Call handler
await self.handler(instance)
# Late commit for after_processing strategy
if self.commit_strategy == "after_processing":
await self._commit_offset(msg)
return
except Exception as e:
retries += 1
if self.max_retries > 0:
logger.warning(
f"Error processing message (attempt {retries}/{self.max_retries + 1}): {e}"
)
if retries > self.max_retries:
if self.max_retries > 0:
logger.error(f"Max retries exceeded for message: {e}")
if self.error_handler:
try:
await self.error_handler(e, msg, instance)
except Exception as handler_error:
logger.error(f"Async error handler failed: {handler_error}")
# Only commit on failure if using after_processing
if self.commit_strategy == "after_processing":
await self._commit_offset(msg)
return
# Wait before retry
await asyncio.sleep(self.retry_delay)
[docs]
def consumer_loop(
model: type[FlowBaseModel],
settings: BaseSettings | None = None,
**kwargs: Any,
) -> Callable[[Callable[[Any], None]], ConsumerLoop]:
"""
Decorator to create a consumer loop from a handler function.
Example:
>>> @consumer_loop(model=OrderEvent, settings=LongRunningSettings())
... def handle_order(order: OrderEvent) -> None:
... process_order(order)
...
>>> handle_order.run() # Start the loop
"""
def decorator(handler: Callable[[Any], None]) -> ConsumerLoop:
loop = ConsumerLoop(model=model, handler=handler, settings=settings, **kwargs)
return loop
return decorator
[docs]
def async_consumer_loop(
model: type[FlowBaseModel],
settings: BaseSettings | None = None,
**kwargs: Any,
) -> Callable[[Callable[[Any], Awaitable[None]]], AsyncConsumerLoop]:
"""
Decorator to create an async consumer loop from a handler function.
Example:
>>> @async_consumer_loop(model=OrderEvent, max_concurrent=20)
... async def handle_order(order: OrderEvent) -> None:
... await process_order(order)
...
>>> await handle_order.run() # Start the loop
"""
def decorator(handler: Callable[[Any], Awaitable[None]]) -> AsyncConsumerLoop:
loop = AsyncConsumerLoop(model=model, handler=handler, settings=settings, **kwargs)
return loop
return decorator