Producing Messages

FlowODM provides both synchronous and asynchronous methods for producing messages.

Synchronous Production

Blocking Production

Use produce() to produce a message and wait for broker acknowledgment:

from datetime import datetime
from flowodm import FlowBaseModel, connect

connect(bootstrap_servers="localhost:9092")

class OrderEvent(FlowBaseModel):
    class Settings:
        topic = "orders"
        key_field = "order_id"

    order_id: str
    amount: float

order = OrderEvent(order_id="ORD-001", amount=99.99)
order.produce()  # Blocks until acknowledged

Non-Blocking Production

Use produce_nowait() for non-blocking production (fire-and-forget):

order.produce_nowait()  # Returns immediately

# Flush all pending messages
from flowodm import get_producer
get_producer().flush()

Asynchronous Production

For async applications, use aproduce():

import asyncio
from flowodm import FlowBaseModel, connect

connect(bootstrap_servers="localhost:9092")

class OrderEvent(FlowBaseModel):
    class Settings:
        topic = "orders"

    order_id: str
    amount: float

async def main():
    order = OrderEvent(order_id="ORD-001", amount=99.99)
    await order.aproduce()

asyncio.run(main())

Message Keys

Messages can be produced with a key for partitioning:

Using key_field Setting

class OrderEvent(FlowBaseModel):
    class Settings:
        topic = "orders"
        key_field = "order_id"  # Use this field as key

    order_id: str
    amount: float

# order_id value is automatically used as the message key
order = OrderEvent(order_id="ORD-001", amount=99.99)
order.produce()

Explicit Key

order = OrderEvent(order_id="ORD-001", amount=99.99)
order.produce(key="custom-key")

Delivery Callbacks

For non-blocking production, you can provide a callback:

def on_delivery(err, msg):
    if err is not None:
        print(f"Delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()}/{msg.partition()}")

order.produce_nowait(callback=on_delivery)

Batch Production

For high-throughput scenarios, produce multiple messages efficiently:

from flowodm import get_producer

orders = [
    OrderEvent(order_id=f"ORD-{i:03d}", amount=float(i * 10))
    for i in range(1000)
]

for order in orders:
    order.produce()  # Non-blocking

# Flush all pending messages
get_producer().flush()

Producer Configuration

Customize producer settings via connect():

connect(
    bootstrap_servers="localhost:9092",
    producer_config={
        "acks": "all",
        "retries": 3,
        "linger.ms": 10,
        "batch.size": 16384,
        "compression.type": "snappy",
    }
)

Wire Format

When a Schema Registry is configured, produced messages include the standard Confluent wire format header (magic byte + 4-byte schema ID). This ensures compatibility with Confluent consumers, Java’s KafkaAvroDeserializer, and Kafka UI tools (AKHQ, Kafdrop, etc.).

The schema ID is cached per model class, so only the first serialization triggers a registry call.

To disable the wire format header, set confluent_wire_format = False in your model’s Settings class. See Model Definition for details.

Error Handling

Handle production errors appropriately:

from flowodm.exceptions import ProducerError

try:
    order.produce()
except ProducerError as e:
    print(f"Failed to produce message: {e}")
    # Handle error: retry, log, alert, etc.