Producing Messages
FlowODM provides both synchronous and asynchronous methods for producing messages.
Synchronous Production
Blocking Production
Use produce() to produce a message and wait for broker acknowledgment:
from datetime import datetime
from flowodm import FlowBaseModel, connect
connect(bootstrap_servers="localhost:9092")
class OrderEvent(FlowBaseModel):
class Settings:
topic = "orders"
key_field = "order_id"
order_id: str
amount: float
order = OrderEvent(order_id="ORD-001", amount=99.99)
order.produce() # Blocks until acknowledged
Non-Blocking Production
Use produce_nowait() for non-blocking production (fire-and-forget):
order.produce_nowait() # Returns immediately
# Flush all pending messages
from flowodm import get_producer
get_producer().flush()
Asynchronous Production
For async applications, use aproduce():
import asyncio
from flowodm import FlowBaseModel, connect
connect(bootstrap_servers="localhost:9092")
class OrderEvent(FlowBaseModel):
class Settings:
topic = "orders"
order_id: str
amount: float
async def main():
order = OrderEvent(order_id="ORD-001", amount=99.99)
await order.aproduce()
asyncio.run(main())
Message Keys
Messages can be produced with a key for partitioning:
Using key_field Setting
class OrderEvent(FlowBaseModel):
class Settings:
topic = "orders"
key_field = "order_id" # Use this field as key
order_id: str
amount: float
# order_id value is automatically used as the message key
order = OrderEvent(order_id="ORD-001", amount=99.99)
order.produce()
Explicit Key
order = OrderEvent(order_id="ORD-001", amount=99.99)
order.produce(key="custom-key")
Delivery Callbacks
For non-blocking production, you can provide a callback:
def on_delivery(err, msg):
if err is not None:
print(f"Delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()}/{msg.partition()}")
order.produce_nowait(callback=on_delivery)
Batch Production
For high-throughput scenarios, produce multiple messages efficiently:
from flowodm import get_producer
orders = [
OrderEvent(order_id=f"ORD-{i:03d}", amount=float(i * 10))
for i in range(1000)
]
for order in orders:
order.produce() # Non-blocking
# Flush all pending messages
get_producer().flush()
Producer Configuration
Customize producer settings via connect():
connect(
bootstrap_servers="localhost:9092",
producer_config={
"acks": "all",
"retries": 3,
"linger.ms": 10,
"batch.size": 16384,
"compression.type": "snappy",
}
)
Wire Format
When a Schema Registry is configured, produced messages include the standard
Confluent wire format header (magic byte + 4-byte schema ID). This ensures
compatibility with Confluent consumers, Java’s KafkaAvroDeserializer, and
Kafka UI tools (AKHQ, Kafdrop, etc.).
The schema ID is cached per model class, so only the first serialization triggers a registry call.
To disable the wire format header, set confluent_wire_format = False in
your model’s Settings class. See Model Definition for details.
Error Handling
Handle production errors appropriately:
from flowodm.exceptions import ProducerError
try:
order.produce()
except ProducerError as e:
print(f"Failed to produce message: {e}")
# Handle error: retry, log, alert, etc.