API Reference

This page contains the complete API reference for FlowODM.

Connection

Thread-safe singleton for Kafka and Schema Registry connection management.

Provides both synchronous (confluent-kafka) and asynchronous client access.

class flowodm.connection.KafkaConnection[source]

Bases: object

Thread-safe singleton managing Kafka producers, consumers, and Schema Registry.

Configuration via environment variables:
  • KAFKA_BOOTSTRAP_SERVERS: Kafka broker addresses (required)

  • KAFKA_SECURITY_PROTOCOL: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL

  • KAFKA_SASL_MECHANISM: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER

  • KAFKA_SASL_USERNAME: SASL username

  • KAFKA_SASL_PASSWORD: SASL password

  • SCHEMA_REGISTRY_URL: Schema Registry URL

  • SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: Basic auth in format “key:secret” (alternative to separate key/secret)

  • SCHEMA_REGISTRY_API_KEY: Confluent Cloud API key

  • SCHEMA_REGISTRY_API_SECRET: Confluent Cloud API secret

Example

>>> from flowodm.connection import connect, get_producer
>>> connect(bootstrap_servers="localhost:9092")
>>> producer = get_producer()
static __new__(cls)[source]
Return type:

KafkaConnection

__init__()[source]
Return type:

None

configure(bootstrap_servers=None, security_protocol=None, sasl_mechanism=None, sasl_username=None, sasl_password=None, schema_registry_url=None, schema_registry_api_key=None, schema_registry_api_secret=None, schema_registry_basic_auth_user_info=None)[source]

Configure the connection with explicit parameters.

Parameters override environment variables.

Parameters:
  • bootstrap_servers (str | None)

  • security_protocol (str | None)

  • sasl_mechanism (str | None)

  • sasl_username (str | None)

  • sasl_password (str | None)

  • schema_registry_url (str | None)

  • schema_registry_api_key (str | None)

  • schema_registry_api_secret (str | None)

  • schema_registry_basic_auth_user_info (str | None)

Return type:

None

property producer: Producer

Get synchronous Kafka producer.

Creates producer on first access (lazy initialization).

get_consumer(group_id, topics, settings=None)[source]

Get or create a Kafka consumer for the given group and topics.

Parameters:
  • group_id (str) – Consumer group ID

  • topics (list[str]) – List of topics to subscribe to

  • settings (BaseSettings | None) – Optional settings profile for consumer configuration

Returns:

Configured Consumer instance

Return type:

Consumer

property schema_registry: SchemaRegistryClient

Get Schema Registry client.

Creates client on first access (lazy initialization).

async get_async_producer()[source]

Get asynchronous Kafka producer.

Note: Requires confluent-kafka >= 2.13.0 for AIOProducer support. Falls back to sync producer wrapped in asyncio if not available.

Return type:

Any

async get_async_consumer(group_id, topics, settings=None)[source]

Get asynchronous Kafka consumer.

Note: Requires confluent-kafka >= 2.13.0 for AIOConsumer support. Falls back to sync consumer for older versions.

Parameters:
Return type:

Any

close_connection()[source]

Close all Kafka clients and cleanup resources.

Return type:

None

flowodm.connection.get_kafka_connection()[source]

Get the global KafkaConnection singleton instance.

Return type:

KafkaConnection

flowodm.connection.connect(bootstrap_servers=None, security_protocol=None, sasl_mechanism=None, sasl_username=None, sasl_password=None, schema_registry_url=None, schema_registry_api_key=None, schema_registry_api_secret=None, schema_registry_basic_auth_user_info=None)[source]

Configure and return the global Kafka connection.

Parameters:
  • bootstrap_servers (str | None) – Kafka broker addresses (e.g., “localhost:9092”)

  • security_protocol (str | None) – Security protocol (PLAINTEXT, SSL, SASL_SSL)

  • sasl_mechanism (str | None) – SASL mechanism (PLAIN, SCRAM-SHA-256, OAUTHBEARER)

  • sasl_username (str | None) – SASL username

  • sasl_password (str | None) – SASL password

  • schema_registry_url (str | None) – Schema Registry URL

  • schema_registry_api_key (str | None) – Schema Registry API key (Confluent Cloud)

  • schema_registry_api_secret (str | None) – Schema Registry API secret (Confluent Cloud)

  • schema_registry_basic_auth_user_info (str | None) – Schema Registry basic auth (format: “key:secret”)

Returns:

Configured KafkaConnection instance

Return type:

KafkaConnection

Example

>>> connect(
...     bootstrap_servers="localhost:9092",
...     schema_registry_url="http://localhost:8081"
... )
flowodm.connection.get_producer()[source]

Get the global Kafka producer.

Return type:

Producer

flowodm.connection.get_consumer(group_id, topics, settings=None)[source]

Get or create a Kafka consumer.

Parameters:
Return type:

Consumer

flowodm.connection.get_schema_registry()[source]

Get the Schema Registry client.

Return type:

SchemaRegistryClient

async flowodm.connection.get_async_producer()[source]

Get the async Kafka producer.

Return type:

Any

async flowodm.connection.get_async_consumer(group_id, topics, settings=None)[source]

Get or create an async Kafka consumer.

Parameters:
Return type:

Any

Model

FlowBaseModel - Pydantic v2 base class for Kafka message models.

Provides both synchronous and asynchronous methods for produce/consume operations.

class flowodm.model.FlowBaseModel[source]

Bases: BaseModel

Base class for Kafka message models with ODM functionality.

Provides both synchronous and asynchronous methods for produce/consume. Maps Pydantic models to Avro schemas automatically.

Subclasses must define an inner Settings class:

Example

class UserEvent(FlowBaseModel):
class Settings:

topic = “user-events” schema_subject = “user-events-value” # Optional consumer_group = “my-service” # Optional

user_id: str action: str timestamp: datetime

model_config = {'extra': 'forbid', 'populate_by_name': True, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class Settings[source]

Bases: object

Settings for the Kafka model.

Configuration class for defining Kafka topic, schema, and consumer settings. The type annotations provide the documentation for each setting.

topic: str | None = None

Kafka topic name (required)

schema_subject: str | None = None

Schema Registry subject (defaults to {topic}-value)

schema_path: str | None = None

Path to local .avsc file (optional)

consumer_group: str | None = None

Consumer group ID (optional)

key_field: str | None = None

Field name to use as message key (optional)

key_serializer: str = 'string'

“string”, “avro”, “json”

Type:

Key serialization format

value_serializer: str = 'avro'

“avro”, “json”

Type:

Value serialization format

confluent_wire_format: bool = True

Prepend Confluent wire format header (magic byte + schema ID) when serializing

classmethod get_producer()[source]

Get sync Kafka producer. Override for custom connection logic.

Return type:

Producer

async classmethod get_async_producer()[source]

Get async Kafka producer. Override for custom connection logic.

Return type:

Any

classmethod get_consumer(group_id=None, settings=None)[source]

Get sync Kafka consumer. Override for custom connection logic.

Parameters:
Return type:

Consumer

async classmethod get_async_consumer(group_id=None, settings=None)[source]

Get async Kafka consumer. Override for custom connection logic.

Parameters:
Return type:

Any

produce_nowait(callback=None)[source]

Produce message to Kafka (non-blocking, fire-and-forget).

Parameters:

callback (Any | None) – Optional delivery callback function(err, msg)

Return type:

None

produce(timeout=10.0)[source]

Produce message and wait for delivery confirmation (blocking).

Parameters:

timeout (float) – Maximum time to wait for delivery (seconds)

Return type:

None

classmethod produce_many(messages, flush=True)[source]

Produce multiple messages (batch).

Parameters:
  • messages (list[FlowBaseModel]) – List of model instances to produce

  • flush (bool) – Whether to wait for all deliveries

Returns:

Number of messages produced

Return type:

int

async aproduce()[source]

Produce message to Kafka (asynchronous).

Return type:

None

async classmethod aproduce_many(messages)[source]

Produce multiple messages asynchronously.

Parameters:

messages (list[FlowBaseModel])

Return type:

int

classmethod consume_one(timeout=1.0, group_id=None, settings=None)[source]

Consume single message (synchronous).

Parameters:
  • timeout (float) – Poll timeout in seconds

  • group_id (str | None) – Consumer group ID (uses Settings.consumer_group if not specified)

  • settings (BaseSettings | None) – Optional settings profile

Returns:

Model instance or None if no message available

Return type:

T | None

classmethod consume_iter(timeout=1.0, group_id=None, settings=None)[source]

Iterate over messages (synchronous generator).

Parameters:
  • timeout (float) – Poll timeout in seconds

  • group_id (str | None) – Consumer group ID

  • settings (BaseSettings | None) – Optional settings profile

Yields:

Model instances

Return type:

Iterator[T]

classmethod consume_batch(max_messages, timeout=1.0, group_id=None, settings=None)[source]

Consume batch of messages.

Parameters:
  • max_messages (int) – Maximum number of messages to consume

  • timeout (float) – Poll timeout in seconds

  • group_id (str | None) – Consumer group ID

  • settings (BaseSettings | None) – Optional settings profile

Returns:

List of model instances

Return type:

list[T]

async classmethod aconsume_one(timeout=1.0, group_id=None, settings=None)[source]

Consume single message (asynchronous).

Parameters:
Return type:

T | None

classmethod aconsume_iter(timeout=1.0, group_id=None, settings=None)[source]

Iterate over messages (async generator).

Parameters:
Return type:

AsyncIterator[T]

async classmethod aconsume_batch(max_messages, timeout=1.0, group_id=None, settings=None)[source]

Consume batch of messages asynchronously.

Parameters:
Return type:

list[T]

classmethod register_schema()[source]

Register Avro schema with Schema Registry.

Uses the same schema resolution as serialization (file → registry → auto-generate) to ensure the registered schema matches what is used for producing messages.

Returns:

Schema ID from registry

Return type:

int

Consumer

Consumer loop patterns for building Kafka microservices.

Provides both synchronous and asynchronous consumer loops with: - Graceful shutdown handling - Error handling and retry logic - Lifecycle hooks (on_startup, on_shutdown) - Configurable commit strategies

class flowodm.consumer.ConsumerLoop[source]

Bases: object

Synchronous consumer loop for processing Kafka messages.

Provides a main loop for microservices that consume and process messages. Handles graceful shutdown on SIGTERM/SIGINT signals.

Example

>>> def process_order(order: OrderEvent) -> None:
...     print(f"Processing order {order.order_id}")
...
>>> loop = ConsumerLoop(
...     model=OrderEvent,
...     handler=process_order,
...     settings=LongRunningSettings(),
... )
>>> loop.run()  # Blocking
__init__(model, handler, settings=None, group_id=None, error_handler=None, on_startup=None, on_shutdown=None, commit_strategy='before_processing', max_retries=0, retry_delay=1.0, poll_timeout=1.0)[source]

Initialize consumer loop.

Parameters:
  • model (type[FlowBaseModel]) – FlowBaseModel subclass to consume

  • handler (Callable[[Any], None]) – Function to process each message

  • settings (BaseSettings | None) – Kafka settings profile (defaults to LongRunningSettings)

  • group_id (str | None) – Consumer group ID (uses model’s Settings if not specified)

  • error_handler (Callable[[Exception, Any, FlowBaseModel | None], None] | None) – Optional function to handle processing errors. Receives (exception, raw_message, deserialized_instance). The deserialized_instance is None if deserialization failed, otherwise contains the FlowBaseModel.

  • on_startup (Callable[[], None] | None) – Optional function called before loop starts

  • on_shutdown (Callable[[], None] | None) – Optional function called after loop stops

  • commit_strategy (str) – Commit timing strategy. Use “before_processing” to commit before handler execution (at-most-once, prevents duplicates in parallel deployments), or “after_processing” to commit after successful processing (at-least-once, may cause duplicates in parallel deployments).

  • max_retries (int) – Maximum retry attempts for failed messages

  • retry_delay (float) – Delay between retries in seconds

  • poll_timeout (float) – Kafka poll timeout in seconds

stop()[source]

Signal the loop to stop gracefully.

Return type:

None

run()[source]

Start the consumer loop (blocking).

Processes messages until stop() is called or a signal is received.

Return type:

None

class flowodm.consumer.AsyncConsumerLoop[source]

Bases: object

Asynchronous consumer loop for processing Kafka messages.

Supports concurrent message processing with configurable parallelism.

Example

>>> async def process_order(order: OrderEvent) -> None:
...     await external_api.submit(order)
...
>>> loop = AsyncConsumerLoop(
...     model=OrderEvent,
...     handler=process_order,
...     max_concurrent=20,
... )
>>> await loop.run()
__init__(model, handler, settings=None, group_id=None, error_handler=None, on_startup=None, on_shutdown=None, max_concurrent=10, commit_strategy='before_processing', max_retries=0, retry_delay=1.0, poll_timeout=1.0)[source]

Initialize async consumer loop.

Parameters:
  • model (type[FlowBaseModel]) – FlowBaseModel subclass to consume

  • handler (Callable[[Any], Awaitable[None]]) – Async function to process each message

  • settings (BaseSettings | None) – Kafka settings profile

  • group_id (str | None) – Consumer group ID

  • error_handler (Callable[[Exception, Any, FlowBaseModel | None], Awaitable[None]] | None) – Optional async function to handle errors. Receives (exception, raw_message, deserialized_instance). The deserialized_instance is None if deserialization failed, otherwise contains the FlowBaseModel.

  • on_startup (Callable[[], Awaitable[None]] | None) – Optional async function called before loop starts

  • on_shutdown (Callable[[], Awaitable[None]] | None) – Optional async function called after loop stops

  • max_concurrent (int) – Maximum concurrent message processing tasks

  • commit_strategy (str) – Commit timing strategy. Use “before_processing” to commit before handler execution (at-most-once, prevents duplicates in parallel deployments), or “after_processing” to commit after successful processing (at-least-once, may cause duplicates in parallel deployments).

  • max_retries (int) – Maximum retry attempts

  • retry_delay (float) – Delay between retries

  • poll_timeout (float) – Kafka poll timeout

stop()[source]

Signal the loop to stop gracefully.

Return type:

None

async run()[source]

Start the async consumer loop.

Processes messages until stop() is called.

Return type:

None

flowodm.consumer.consumer_loop(model, settings=None, **kwargs)[source]

Decorator to create a consumer loop from a handler function.

Example

>>> @consumer_loop(model=OrderEvent, settings=LongRunningSettings())
... def handle_order(order: OrderEvent) -> None:
...     process_order(order)
...
>>> handle_order.run()  # Start the loop
Parameters:
Return type:

Callable[[Callable[[Any], None]], ConsumerLoop]

flowodm.consumer.async_consumer_loop(model, settings=None, **kwargs)[source]

Decorator to create an async consumer loop from a handler function.

Example

>>> @async_consumer_loop(model=OrderEvent, max_concurrent=20)
... async def handle_order(order: OrderEvent) -> None:
...     await process_order(order)
...
>>> await handle_order.run()  # Start the loop
Parameters:
Return type:

Callable[[Callable[[Any], Awaitable[None]]], AsyncConsumerLoop]

Settings

Predefined Kafka settings profiles for different use cases.

class flowodm.settings.BaseSettings[source]

Bases: object

Base settings class with common Kafka configuration options.

session_timeout_ms: int = 45000
heartbeat_interval_ms: int = 15000
max_poll_interval_ms: int = 300000
max_poll_records: int = 500
auto_offset_reset: str = 'earliest'
enable_auto_commit: bool = False
extra_config: dict[str, Any]
to_consumer_config()[source]

Convert settings to confluent-kafka consumer configuration dict.

Return type:

dict[str, Any]

to_producer_config()[source]

Convert settings to confluent-kafka producer configuration dict.

Return type:

dict[str, Any]

__init__(session_timeout_ms=45000, heartbeat_interval_ms=15000, max_poll_interval_ms=300000, max_poll_records=500, auto_offset_reset='earliest', enable_auto_commit=False, extra_config=<factory>)
Parameters:
  • session_timeout_ms (int)

  • heartbeat_interval_ms (int)

  • max_poll_interval_ms (int)

  • max_poll_records (int)

  • auto_offset_reset (str)

  • enable_auto_commit (bool)

  • extra_config (dict[str, Any])

Return type:

None

class flowodm.settings.LongRunningSettings[source]

Bases: BaseSettings

Settings optimized for long-running processing tasks.

Use when processing a single message may take several minutes, such as ML inference, complex calculations, or external API calls.

Timeouts are extended to prevent unnecessary consumer rebalances during long processing.

session_timeout_ms: int = 300000
heartbeat_interval_ms: int = 60000
max_poll_interval_ms: int = 600000
max_poll_records: int = 100
auto_offset_reset: str = 'earliest'
enable_auto_commit: bool = False
__init__(session_timeout_ms=300000, heartbeat_interval_ms=60000, max_poll_interval_ms=600000, max_poll_records=100, auto_offset_reset='earliest', enable_auto_commit=False, extra_config=<factory>)
Parameters:
  • session_timeout_ms (int)

  • heartbeat_interval_ms (int)

  • max_poll_interval_ms (int)

  • max_poll_records (int)

  • auto_offset_reset (str)

  • enable_auto_commit (bool)

  • extra_config (dict[str, Any])

Return type:

None

class flowodm.settings.BatchSettings[source]

Bases: BaseSettings

Settings optimized for batch processing.

Use for ETL jobs, data aggregation, or bulk operations where throughput is more important than latency.

session_timeout_ms: int = 45000
heartbeat_interval_ms: int = 15000
max_poll_interval_ms: int = 300000
max_poll_records: int = 500
auto_offset_reset: str = 'earliest'
enable_auto_commit: bool = False
__init__(session_timeout_ms=45000, heartbeat_interval_ms=15000, max_poll_interval_ms=300000, max_poll_records=500, auto_offset_reset='earliest', enable_auto_commit=False, extra_config=<factory>)
Parameters:
  • session_timeout_ms (int)

  • heartbeat_interval_ms (int)

  • max_poll_interval_ms (int)

  • max_poll_records (int)

  • auto_offset_reset (str)

  • enable_auto_commit (bool)

  • extra_config (dict[str, Any])

Return type:

None

class flowodm.settings.RealTimeSettings[source]

Bases: BaseSettings

Settings optimized for real-time processing.

Use for event-driven microservices, notifications, or any scenario where low latency is critical.

session_timeout_ms: int = 10000
heartbeat_interval_ms: int = 3000
max_poll_interval_ms: int = 30000
max_poll_records: int = 10
auto_offset_reset: str = 'latest'
enable_auto_commit: bool = True
__init__(session_timeout_ms=10000, heartbeat_interval_ms=3000, max_poll_interval_ms=30000, max_poll_records=10, auto_offset_reset='latest', enable_auto_commit=True, extra_config=<factory>)
Parameters:
  • session_timeout_ms (int)

  • heartbeat_interval_ms (int)

  • max_poll_interval_ms (int)

  • max_poll_records (int)

  • auto_offset_reset (str)

  • enable_auto_commit (bool)

  • extra_config (dict[str, Any])

Return type:

None

class flowodm.settings.HighThroughputSettings[source]

Bases: BaseSettings

Settings optimized for maximum throughput.

Use for high-volume data ingestion where some message loss is acceptable in exchange for performance.

session_timeout_ms: int = 45000
heartbeat_interval_ms: int = 15000
max_poll_interval_ms: int = 300000
max_poll_records: int = 1000
auto_offset_reset: str = 'latest'
enable_auto_commit: bool = True
to_producer_config()[source]

Optimized producer config for throughput.

Return type:

dict[str, Any]

__init__(session_timeout_ms=45000, heartbeat_interval_ms=15000, max_poll_interval_ms=300000, max_poll_records=1000, auto_offset_reset='latest', enable_auto_commit=True, extra_config=<factory>)
Parameters:
  • session_timeout_ms (int)

  • heartbeat_interval_ms (int)

  • max_poll_interval_ms (int)

  • max_poll_records (int)

  • auto_offset_reset (str)

  • enable_auto_commit (bool)

  • extra_config (dict[str, Any])

Return type:

None

class flowodm.settings.ReliableSettings[source]

Bases: BaseSettings

Settings optimized for maximum reliability.

Use when message delivery guarantees are critical, such as financial transactions or audit logs.

session_timeout_ms: int = 60000
heartbeat_interval_ms: int = 20000
max_poll_interval_ms: int = 300000
max_poll_records: int = 100
auto_offset_reset: str = 'earliest'
enable_auto_commit: bool = False
to_producer_config()[source]

Optimized producer config for reliability.

Return type:

dict[str, Any]

__init__(session_timeout_ms=60000, heartbeat_interval_ms=20000, max_poll_interval_ms=300000, max_poll_records=100, auto_offset_reset='earliest', enable_auto_commit=False, extra_config=<factory>)
Parameters:
  • session_timeout_ms (int)

  • heartbeat_interval_ms (int)

  • max_poll_interval_ms (int)

  • max_poll_records (int)

  • auto_offset_reset (str)

  • enable_auto_commit (bool)

  • extra_config (dict[str, Any])

Return type:

None

Schema

Schema utilities for Avro schema generation, validation, and registry operations.

class flowodm.schema.ValidationResult[source]

Bases: object

Result of schema validation.

is_valid: bool
errors: list[str]
warnings: list[str]
__init__(is_valid, errors=<factory>, warnings=<factory>)
Parameters:
Return type:

None

class flowodm.schema.CompatibilityResult[source]

Bases: object

Result of schema compatibility check.

is_compatible: bool
message: str = ''
compatibility_level: str = ''
__init__(is_compatible, message='', compatibility_level='')
Parameters:
  • is_compatible (bool)

  • message (str)

  • compatibility_level (str)

Return type:

None

flowodm.schema.load_schema_from_file(path)[source]

Load Avro schema from .avsc file.

Parameters:

path (str | Path) – Path to .avsc file

Returns:

Parsed Avro schema as dict

Raises:
Return type:

dict[str, Any]

flowodm.schema.load_schema_from_registry(subject, version='latest')[source]

Load Avro schema from Schema Registry.

Parameters:
  • subject (str) – Schema Registry subject name

  • version (str | int) – Schema version (“latest” or version number)

Returns:

Parsed Avro schema as dict

Raises:

SchemaRegistryError – If schema cannot be retrieved

Return type:

dict[str, Any]

flowodm.schema.generate_model_from_schema(schema_source, topic, class_name=None, consumer_group=None)[source]

Generate a FlowBaseModel subclass from an Avro schema.

Parameters:
  • schema_source (str | Path | dict[str, Any]) – Path to .avsc file, schema dict, or Schema Registry subject

  • topic (str) – Kafka topic name

  • class_name (str | None) – Optional class name (defaults to schema record name)

  • consumer_group (str | None) – Optional consumer group

Returns:

Dynamically generated FlowBaseModel subclass

Return type:

type[FlowBaseModel]

Example

>>> UserEvent = generate_model_from_schema("schemas/user.avsc", topic="users")
>>> event = UserEvent(user_id="123", action="login")
flowodm.schema.generate_model_from_registry(subject, topic, version='latest', class_name=None, consumer_group=None)[source]

Generate a FlowBaseModel subclass from Schema Registry.

Parameters:
  • subject (str) – Schema Registry subject name

  • topic (str) – Kafka topic name

  • version (str | int) – Schema version (“latest” or version number)

  • class_name (str | None) – Optional class name

  • consumer_group (str | None) – Optional consumer group

Returns:

Dynamically generated FlowBaseModel subclass

Return type:

type[FlowBaseModel]

flowodm.schema.validate_against_file(model_class, schema_path)[source]

Validate that a Pydantic model matches a local Avro schema file.

Parameters:
  • model_class (type[FlowBaseModel]) – FlowBaseModel subclass to validate

  • schema_path (str | Path) – Path to .avsc file

Returns:

ValidationResult with is_valid flag and any errors

Return type:

ValidationResult

flowodm.schema.validate_against_registry(model_class, subject, version='latest')[source]

Validate that a Pydantic model matches a Schema Registry schema.

Parameters:
  • model_class (type[FlowBaseModel]) – FlowBaseModel subclass to validate

  • subject (str) – Schema Registry subject name

  • version (str | int) – Schema version (“latest” or version number)

Returns:

ValidationResult with is_valid flag and any errors

Return type:

ValidationResult

flowodm.schema.check_compatibility(model_class, subject, compatibility_level='BACKWARD')[source]

Check if model’s schema is compatible with existing registry schemas.

Parameters:
  • model_class (type[FlowBaseModel]) – FlowBaseModel subclass

  • subject (str) – Schema Registry subject name

  • compatibility_level (str) – BACKWARD, FORWARD, FULL, or NONE

Returns:

CompatibilityResult with is_compatible flag and message

Return type:

CompatibilityResult

flowodm.schema.upload_schema(schema_path, subject, compatibility_level=None)[source]

Upload an Avro schema file to Schema Registry.

Parameters:
  • schema_path (str | Path) – Path to .avsc file

  • subject (str) – Schema Registry subject name

  • compatibility_level (str | None) – Optional compatibility level to set

Returns:

Schema ID from registry

Raises:

SchemaRegistryError – If upload fails

Return type:

int

flowodm.schema.list_subjects()[source]

List all subjects in Schema Registry.

Returns:

List of subject names

Return type:

list[str]

flowodm.schema.get_schema_versions(subject)[source]

Get all versions of a schema subject.

Parameters:

subject (str) – Schema Registry subject name

Returns:

List of version numbers

Return type:

list[int]

flowodm.schema.delete_subject(subject, permanent=False)[source]

Delete a schema subject from registry.

Parameters:
  • subject (str) – Schema Registry subject name

  • permanent (bool) – If True, permanently delete (cannot be recovered)

Returns:

List of deleted version numbers

Return type:

list[int]

Exceptions

Custom exceptions for FlowODM.

exception flowodm.exceptions.FlowODMError[source]

Bases: Exception

Base exception for all FlowODM errors.

exception flowodm.exceptions.ConnectionError[source]

Bases: FlowODMError

Raised when connection to Kafka or Schema Registry fails.

exception flowodm.exceptions.ConfigurationError[source]

Bases: FlowODMError

Raised when configuration is invalid or missing.

exception flowodm.exceptions.SchemaError[source]

Bases: FlowODMError

Base exception for schema-related errors.

exception flowodm.exceptions.SchemaValidationError[source]

Bases: SchemaError

Raised when schema validation fails.

__init__(message, errors=None)[source]
Parameters:
exception flowodm.exceptions.SchemaRegistryError[source]

Bases: SchemaError

Raised when Schema Registry operations fail.

exception flowodm.exceptions.SchemaCompatibilityError[source]

Bases: SchemaError

Raised when schema compatibility check fails.

__init__(message, compatibility_level=None)[source]
Parameters:
  • message (str)

  • compatibility_level (str | None)

exception flowodm.exceptions.SerializationError[source]

Bases: FlowODMError

Raised when message serialization fails.

exception flowodm.exceptions.DeserializationError[source]

Bases: FlowODMError

Raised when message deserialization fails.

exception flowodm.exceptions.ProducerError[source]

Bases: FlowODMError

Raised when message production fails.

exception flowodm.exceptions.ConsumerError[source]

Bases: FlowODMError

Raised when message consumption fails.

exception flowodm.exceptions.TopicError[source]

Bases: FlowODMError

Raised when topic-related operations fail.

exception flowodm.exceptions.SettingsError[source]

Bases: FlowODMError

Raised when model Settings class is invalid or missing.

CLI

Command-line interface for FlowODM schema operations.

Provides commands for: - Validating models against schemas - Uploading schemas to Schema Registry - Checking schema compatibility - Listing schema subjects

flowodm.cli.main()[source]

Main CLI entry point.

Return type:

None

flowodm.cli.cmd_validate(args)[source]

Validate models against schemas.

Parameters:

args (Namespace)

Return type:

None

flowodm.cli.cmd_upload_schema(args)[source]

Upload schema to Schema Registry.

Parameters:

args (Namespace)

Return type:

None

flowodm.cli.cmd_check_compatibility(args)[source]

Check schema compatibility.

Parameters:

args (Namespace)

Return type:

None

flowodm.cli.cmd_list_subjects(args)[source]

List Schema Registry subjects.

Parameters:

args (Namespace)

Return type:

None

flowodm.cli.cmd_get_schema(args)[source]

Get schema from Schema Registry.

Parameters:

args (Namespace)

Return type:

None