API Reference
This page contains the complete API reference for FlowODM.
Connection
Thread-safe singleton for Kafka and Schema Registry connection management.
Provides both synchronous (confluent-kafka) and asynchronous client access.
- class flowodm.connection.KafkaConnection[source]
Bases:
objectThread-safe singleton managing Kafka producers, consumers, and Schema Registry.
- Configuration via environment variables:
KAFKA_BOOTSTRAP_SERVERS: Kafka broker addresses (required)
KAFKA_SECURITY_PROTOCOL: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
KAFKA_SASL_MECHANISM: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
KAFKA_SASL_USERNAME: SASL username
KAFKA_SASL_PASSWORD: SASL password
SCHEMA_REGISTRY_URL: Schema Registry URL
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: Basic auth in format “key:secret” (alternative to separate key/secret)
SCHEMA_REGISTRY_API_KEY: Confluent Cloud API key
SCHEMA_REGISTRY_API_SECRET: Confluent Cloud API secret
Example
>>> from flowodm.connection import connect, get_producer >>> connect(bootstrap_servers="localhost:9092") >>> producer = get_producer()
- configure(bootstrap_servers=None, security_protocol=None, sasl_mechanism=None, sasl_username=None, sasl_password=None, schema_registry_url=None, schema_registry_api_key=None, schema_registry_api_secret=None, schema_registry_basic_auth_user_info=None)[source]
Configure the connection with explicit parameters.
Parameters override environment variables.
- Parameters:
bootstrap_servers (str | None)
security_protocol (str | None)
sasl_mechanism (str | None)
sasl_username (str | None)
sasl_password (str | None)
schema_registry_url (str | None)
schema_registry_api_key (str | None)
schema_registry_api_secret (str | None)
schema_registry_basic_auth_user_info (str | None)
- Return type:
None
- property producer: Producer
Get synchronous Kafka producer.
Creates producer on first access (lazy initialization).
- get_consumer(group_id, topics, settings=None)[source]
Get or create a Kafka consumer for the given group and topics.
- Parameters:
group_id (str) – Consumer group ID
settings (BaseSettings | None) – Optional settings profile for consumer configuration
- Returns:
Configured Consumer instance
- Return type:
Consumer
- property schema_registry: SchemaRegistryClient
Get Schema Registry client.
Creates client on first access (lazy initialization).
- async get_async_producer()[source]
Get asynchronous Kafka producer.
Note: Requires confluent-kafka >= 2.13.0 for AIOProducer support. Falls back to sync producer wrapped in asyncio if not available.
- Return type:
- async get_async_consumer(group_id, topics, settings=None)[source]
Get asynchronous Kafka consumer.
Note: Requires confluent-kafka >= 2.13.0 for AIOConsumer support. Falls back to sync consumer for older versions.
- Parameters:
group_id (str)
settings (BaseSettings | None)
- Return type:
- flowodm.connection.get_kafka_connection()[source]
Get the global KafkaConnection singleton instance.
- Return type:
- flowodm.connection.connect(bootstrap_servers=None, security_protocol=None, sasl_mechanism=None, sasl_username=None, sasl_password=None, schema_registry_url=None, schema_registry_api_key=None, schema_registry_api_secret=None, schema_registry_basic_auth_user_info=None)[source]
Configure and return the global Kafka connection.
- Parameters:
bootstrap_servers (str | None) – Kafka broker addresses (e.g., “localhost:9092”)
security_protocol (str | None) – Security protocol (PLAINTEXT, SSL, SASL_SSL)
sasl_mechanism (str | None) – SASL mechanism (PLAIN, SCRAM-SHA-256, OAUTHBEARER)
sasl_username (str | None) – SASL username
sasl_password (str | None) – SASL password
schema_registry_url (str | None) – Schema Registry URL
schema_registry_api_key (str | None) – Schema Registry API key (Confluent Cloud)
schema_registry_api_secret (str | None) – Schema Registry API secret (Confluent Cloud)
schema_registry_basic_auth_user_info (str | None) – Schema Registry basic auth (format: “key:secret”)
- Returns:
Configured KafkaConnection instance
- Return type:
Example
>>> connect( ... bootstrap_servers="localhost:9092", ... schema_registry_url="http://localhost:8081" ... )
- flowodm.connection.get_consumer(group_id, topics, settings=None)[source]
Get or create a Kafka consumer.
- Parameters:
group_id (str)
settings (BaseSettings | None)
- Return type:
Consumer
- flowodm.connection.get_schema_registry()[source]
Get the Schema Registry client.
- Return type:
SchemaRegistryClient
Model
FlowBaseModel - Pydantic v2 base class for Kafka message models.
Provides both synchronous and asynchronous methods for produce/consume operations.
- class flowodm.model.FlowBaseModel[source]
Bases:
BaseModelBase class for Kafka message models with ODM functionality.
Provides both synchronous and asynchronous methods for produce/consume. Maps Pydantic models to Avro schemas automatically.
Subclasses must define an inner Settings class:
Example
- class UserEvent(FlowBaseModel):
- class Settings:
topic = “user-events” schema_subject = “user-events-value” # Optional consumer_group = “my-service” # Optional
user_id: str action: str timestamp: datetime
- model_config = {'extra': 'forbid', 'populate_by_name': True, 'validate_by_alias': True, 'validate_by_name': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class Settings[source]
Bases:
objectSettings for the Kafka model.
Configuration class for defining Kafka topic, schema, and consumer settings. The type annotations provide the documentation for each setting.
- classmethod get_producer()[source]
Get sync Kafka producer. Override for custom connection logic.
- Return type:
Producer
- async classmethod get_async_producer()[source]
Get async Kafka producer. Override for custom connection logic.
- Return type:
- classmethod get_consumer(group_id=None, settings=None)[source]
Get sync Kafka consumer. Override for custom connection logic.
- Parameters:
group_id (str | None)
settings (BaseSettings | None)
- Return type:
Consumer
- async classmethod get_async_consumer(group_id=None, settings=None)[source]
Get async Kafka consumer. Override for custom connection logic.
- Parameters:
group_id (str | None)
settings (BaseSettings | None)
- Return type:
- produce_nowait(callback=None)[source]
Produce message to Kafka (non-blocking, fire-and-forget).
- Parameters:
callback (Any | None) – Optional delivery callback function(err, msg)
- Return type:
None
- produce(timeout=10.0)[source]
Produce message and wait for delivery confirmation (blocking).
- Parameters:
timeout (float) – Maximum time to wait for delivery (seconds)
- Return type:
None
- classmethod produce_many(messages, flush=True)[source]
Produce multiple messages (batch).
- Parameters:
messages (list[FlowBaseModel]) – List of model instances to produce
flush (bool) – Whether to wait for all deliveries
- Returns:
Number of messages produced
- Return type:
- async classmethod aproduce_many(messages)[source]
Produce multiple messages asynchronously.
- Parameters:
messages (list[FlowBaseModel])
- Return type:
- classmethod consume_one(timeout=1.0, group_id=None, settings=None)[source]
Consume single message (synchronous).
- Parameters:
timeout (float) – Poll timeout in seconds
group_id (str | None) – Consumer group ID (uses Settings.consumer_group if not specified)
settings (BaseSettings | None) – Optional settings profile
- Returns:
Model instance or None if no message available
- Return type:
T | None
- classmethod consume_iter(timeout=1.0, group_id=None, settings=None)[source]
Iterate over messages (synchronous generator).
- Parameters:
timeout (float) – Poll timeout in seconds
group_id (str | None) – Consumer group ID
settings (BaseSettings | None) – Optional settings profile
- Yields:
Model instances
- Return type:
Iterator[T]
- classmethod consume_batch(max_messages, timeout=1.0, group_id=None, settings=None)[source]
Consume batch of messages.
- Parameters:
max_messages (int) – Maximum number of messages to consume
timeout (float) – Poll timeout in seconds
group_id (str | None) – Consumer group ID
settings (BaseSettings | None) – Optional settings profile
- Returns:
List of model instances
- Return type:
list[T]
- async classmethod aconsume_one(timeout=1.0, group_id=None, settings=None)[source]
Consume single message (asynchronous).
- Parameters:
timeout (float)
group_id (str | None)
settings (BaseSettings | None)
- Return type:
T | None
- classmethod aconsume_iter(timeout=1.0, group_id=None, settings=None)[source]
Iterate over messages (async generator).
- Parameters:
timeout (float)
group_id (str | None)
settings (BaseSettings | None)
- Return type:
Consumer
Consumer loop patterns for building Kafka microservices.
Provides both synchronous and asynchronous consumer loops with: - Graceful shutdown handling - Error handling and retry logic - Lifecycle hooks (on_startup, on_shutdown) - Configurable commit strategies
- class flowodm.consumer.ConsumerLoop[source]
Bases:
objectSynchronous consumer loop for processing Kafka messages.
Provides a main loop for microservices that consume and process messages. Handles graceful shutdown on SIGTERM/SIGINT signals.
Example
>>> def process_order(order: OrderEvent) -> None: ... print(f"Processing order {order.order_id}") ... >>> loop = ConsumerLoop( ... model=OrderEvent, ... handler=process_order, ... settings=LongRunningSettings(), ... ) >>> loop.run() # Blocking
- __init__(model, handler, settings=None, group_id=None, error_handler=None, on_startup=None, on_shutdown=None, commit_strategy='before_processing', max_retries=0, retry_delay=1.0, poll_timeout=1.0)[source]
Initialize consumer loop.
- Parameters:
model (type[FlowBaseModel]) – FlowBaseModel subclass to consume
handler (Callable[[Any], None]) – Function to process each message
settings (BaseSettings | None) – Kafka settings profile (defaults to LongRunningSettings)
group_id (str | None) – Consumer group ID (uses model’s Settings if not specified)
error_handler (Callable[[Exception, Any, FlowBaseModel | None], None] | None) – Optional function to handle processing errors. Receives (exception, raw_message, deserialized_instance). The deserialized_instance is None if deserialization failed, otherwise contains the FlowBaseModel.
on_startup (Callable[[], None] | None) – Optional function called before loop starts
on_shutdown (Callable[[], None] | None) – Optional function called after loop stops
commit_strategy (str) – Commit timing strategy. Use “before_processing” to commit before handler execution (at-most-once, prevents duplicates in parallel deployments), or “after_processing” to commit after successful processing (at-least-once, may cause duplicates in parallel deployments).
max_retries (int) – Maximum retry attempts for failed messages
retry_delay (float) – Delay between retries in seconds
poll_timeout (float) – Kafka poll timeout in seconds
- class flowodm.consumer.AsyncConsumerLoop[source]
Bases:
objectAsynchronous consumer loop for processing Kafka messages.
Supports concurrent message processing with configurable parallelism.
Example
>>> async def process_order(order: OrderEvent) -> None: ... await external_api.submit(order) ... >>> loop = AsyncConsumerLoop( ... model=OrderEvent, ... handler=process_order, ... max_concurrent=20, ... ) >>> await loop.run()
- __init__(model, handler, settings=None, group_id=None, error_handler=None, on_startup=None, on_shutdown=None, max_concurrent=10, commit_strategy='before_processing', max_retries=0, retry_delay=1.0, poll_timeout=1.0)[source]
Initialize async consumer loop.
- Parameters:
model (type[FlowBaseModel]) – FlowBaseModel subclass to consume
handler (Callable[[Any], Awaitable[None]]) – Async function to process each message
settings (BaseSettings | None) – Kafka settings profile
group_id (str | None) – Consumer group ID
error_handler (Callable[[Exception, Any, FlowBaseModel | None], Awaitable[None]] | None) – Optional async function to handle errors. Receives (exception, raw_message, deserialized_instance). The deserialized_instance is None if deserialization failed, otherwise contains the FlowBaseModel.
on_startup (Callable[[], Awaitable[None]] | None) – Optional async function called before loop starts
on_shutdown (Callable[[], Awaitable[None]] | None) – Optional async function called after loop stops
max_concurrent (int) – Maximum concurrent message processing tasks
commit_strategy (str) – Commit timing strategy. Use “before_processing” to commit before handler execution (at-most-once, prevents duplicates in parallel deployments), or “after_processing” to commit after successful processing (at-least-once, may cause duplicates in parallel deployments).
max_retries (int) – Maximum retry attempts
retry_delay (float) – Delay between retries
poll_timeout (float) – Kafka poll timeout
- flowodm.consumer.consumer_loop(model, settings=None, **kwargs)[source]
Decorator to create a consumer loop from a handler function.
Example
>>> @consumer_loop(model=OrderEvent, settings=LongRunningSettings()) ... def handle_order(order: OrderEvent) -> None: ... process_order(order) ... >>> handle_order.run() # Start the loop
- Parameters:
model (type[FlowBaseModel])
settings (BaseSettings | None)
kwargs (Any)
- Return type:
Callable[[Callable[[Any], None]], ConsumerLoop]
- flowodm.consumer.async_consumer_loop(model, settings=None, **kwargs)[source]
Decorator to create an async consumer loop from a handler function.
Example
>>> @async_consumer_loop(model=OrderEvent, max_concurrent=20) ... async def handle_order(order: OrderEvent) -> None: ... await process_order(order) ... >>> await handle_order.run() # Start the loop
- Parameters:
model (type[FlowBaseModel])
settings (BaseSettings | None)
kwargs (Any)
- Return type:
Callable[[Callable[[Any], Awaitable[None]]], AsyncConsumerLoop]
Settings
Predefined Kafka settings profiles for different use cases.
- class flowodm.settings.BaseSettings[source]
Bases:
objectBase settings class with common Kafka configuration options.
- __init__(session_timeout_ms=45000, heartbeat_interval_ms=15000, max_poll_interval_ms=300000, max_poll_records=500, auto_offset_reset='earliest', enable_auto_commit=False, extra_config=<factory>)
- class flowodm.settings.LongRunningSettings[source]
Bases:
BaseSettingsSettings optimized for long-running processing tasks.
Use when processing a single message may take several minutes, such as ML inference, complex calculations, or external API calls.
Timeouts are extended to prevent unnecessary consumer rebalances during long processing.
- __init__(session_timeout_ms=300000, heartbeat_interval_ms=60000, max_poll_interval_ms=600000, max_poll_records=100, auto_offset_reset='earliest', enable_auto_commit=False, extra_config=<factory>)
- class flowodm.settings.BatchSettings[source]
Bases:
BaseSettingsSettings optimized for batch processing.
Use for ETL jobs, data aggregation, or bulk operations where throughput is more important than latency.
- __init__(session_timeout_ms=45000, heartbeat_interval_ms=15000, max_poll_interval_ms=300000, max_poll_records=500, auto_offset_reset='earliest', enable_auto_commit=False, extra_config=<factory>)
- class flowodm.settings.RealTimeSettings[source]
Bases:
BaseSettingsSettings optimized for real-time processing.
Use for event-driven microservices, notifications, or any scenario where low latency is critical.
- __init__(session_timeout_ms=10000, heartbeat_interval_ms=3000, max_poll_interval_ms=30000, max_poll_records=10, auto_offset_reset='latest', enable_auto_commit=True, extra_config=<factory>)
- class flowodm.settings.HighThroughputSettings[source]
Bases:
BaseSettingsSettings optimized for maximum throughput.
Use for high-volume data ingestion where some message loss is acceptable in exchange for performance.
- __init__(session_timeout_ms=45000, heartbeat_interval_ms=15000, max_poll_interval_ms=300000, max_poll_records=1000, auto_offset_reset='latest', enable_auto_commit=True, extra_config=<factory>)
- class flowodm.settings.ReliableSettings[source]
Bases:
BaseSettingsSettings optimized for maximum reliability.
Use when message delivery guarantees are critical, such as financial transactions or audit logs.
- __init__(session_timeout_ms=60000, heartbeat_interval_ms=20000, max_poll_interval_ms=300000, max_poll_records=100, auto_offset_reset='earliest', enable_auto_commit=False, extra_config=<factory>)
Schema
Schema utilities for Avro schema generation, validation, and registry operations.
- class flowodm.schema.CompatibilityResult[source]
Bases:
objectResult of schema compatibility check.
- flowodm.schema.load_schema_from_file(path)[source]
Load Avro schema from .avsc file.
- Parameters:
- Returns:
Parsed Avro schema as dict
- Raises:
FileNotFoundError – If schema file doesn’t exist
json.JSONDecodeError – If file is not valid JSON
- Return type:
- flowodm.schema.load_schema_from_registry(subject, version='latest')[source]
Load Avro schema from Schema Registry.
- flowodm.schema.generate_model_from_schema(schema_source, topic, class_name=None, consumer_group=None)[source]
Generate a FlowBaseModel subclass from an Avro schema.
- Parameters:
- Returns:
Dynamically generated FlowBaseModel subclass
- Return type:
Example
>>> UserEvent = generate_model_from_schema("schemas/user.avsc", topic="users") >>> event = UserEvent(user_id="123", action="login")
- flowodm.schema.generate_model_from_registry(subject, topic, version='latest', class_name=None, consumer_group=None)[source]
Generate a FlowBaseModel subclass from Schema Registry.
- Parameters:
- Returns:
Dynamically generated FlowBaseModel subclass
- Return type:
- flowodm.schema.validate_against_file(model_class, schema_path)[source]
Validate that a Pydantic model matches a local Avro schema file.
- Parameters:
model_class (type[FlowBaseModel]) – FlowBaseModel subclass to validate
schema_path (str | Path) – Path to .avsc file
- Returns:
ValidationResult with is_valid flag and any errors
- Return type:
- flowodm.schema.validate_against_registry(model_class, subject, version='latest')[source]
Validate that a Pydantic model matches a Schema Registry schema.
- Parameters:
model_class (type[FlowBaseModel]) – FlowBaseModel subclass to validate
subject (str) – Schema Registry subject name
version (str | int) – Schema version (“latest” or version number)
- Returns:
ValidationResult with is_valid flag and any errors
- Return type:
- flowodm.schema.check_compatibility(model_class, subject, compatibility_level='BACKWARD')[source]
Check if model’s schema is compatible with existing registry schemas.
- Parameters:
model_class (type[FlowBaseModel]) – FlowBaseModel subclass
subject (str) – Schema Registry subject name
compatibility_level (str) – BACKWARD, FORWARD, FULL, or NONE
- Returns:
CompatibilityResult with is_compatible flag and message
- Return type:
- flowodm.schema.upload_schema(schema_path, subject, compatibility_level=None)[source]
Upload an Avro schema file to Schema Registry.
Exceptions
Custom exceptions for FlowODM.
- exception flowodm.exceptions.FlowODMError[source]
Bases:
ExceptionBase exception for all FlowODM errors.
- exception flowodm.exceptions.ConnectionError[source]
Bases:
FlowODMErrorRaised when connection to Kafka or Schema Registry fails.
- exception flowodm.exceptions.ConfigurationError[source]
Bases:
FlowODMErrorRaised when configuration is invalid or missing.
- exception flowodm.exceptions.SchemaError[source]
Bases:
FlowODMErrorBase exception for schema-related errors.
- exception flowodm.exceptions.SchemaValidationError[source]
Bases:
SchemaErrorRaised when schema validation fails.
- exception flowodm.exceptions.SchemaRegistryError[source]
Bases:
SchemaErrorRaised when Schema Registry operations fail.
- exception flowodm.exceptions.SchemaCompatibilityError[source]
Bases:
SchemaErrorRaised when schema compatibility check fails.
- exception flowodm.exceptions.SerializationError[source]
Bases:
FlowODMErrorRaised when message serialization fails.
- exception flowodm.exceptions.DeserializationError[source]
Bases:
FlowODMErrorRaised when message deserialization fails.
- exception flowodm.exceptions.ProducerError[source]
Bases:
FlowODMErrorRaised when message production fails.
- exception flowodm.exceptions.ConsumerError[source]
Bases:
FlowODMErrorRaised when message consumption fails.
- exception flowodm.exceptions.TopicError[source]
Bases:
FlowODMErrorRaised when topic-related operations fail.
- exception flowodm.exceptions.SettingsError[source]
Bases:
FlowODMErrorRaised when model Settings class is invalid or missing.
CLI
Command-line interface for FlowODM schema operations.
Provides commands for: - Validating models against schemas - Uploading schemas to Schema Registry - Checking schema compatibility - Listing schema subjects
- flowodm.cli.cmd_validate(args)[source]
Validate models against schemas.
- Parameters:
args (Namespace)
- Return type:
None
- flowodm.cli.cmd_upload_schema(args)[source]
Upload schema to Schema Registry.
- Parameters:
args (Namespace)
- Return type:
None
- flowodm.cli.cmd_check_compatibility(args)[source]
Check schema compatibility.
- Parameters:
args (Namespace)
- Return type:
None