Quick Start
This guide will help you get started with FlowODM in 5 minutes.
Installation
pip install flowodm
For schema generation from Avro files:
pip install flowodm[schema-gen]
Prerequisites
FlowODM requires:
Python 3.11 or later
Apache Kafka broker
Confluent Schema Registry (optional, but recommended)
Basic Setup
Connect to Kafka
from flowodm import connect connect( bootstrap_servers="localhost:9092", schema_registry_url="http://localhost:8081" )
Define a Model
from datetime import datetime from flowodm import FlowBaseModel class OrderEvent(FlowBaseModel): class Settings: topic = "orders" consumer_group = "order-processor" key_field = "order_id" order_id: str customer_id: str product_name: str quantity: int total_price: float created_at: datetime
Produce Messages
# Create and produce a message order = OrderEvent( order_id="ORD-12345", customer_id="CUST-001", product_name="Laptop", quantity=1, total_price=999.99, created_at=datetime.now() ) # Synchronous produce (blocks until confirmed) order.produce() # Non-blocking produce order.produce_nowait()
Consume Messages
# Consume a single message event = OrderEvent.consume_one(timeout=5.0) if event: print(f"Received order: {event.order_id}") # Iterate over messages for event in OrderEvent.consume_iter(max_messages=100): print(f"Processing order: {event.order_id}")
Environment Variables
FlowODM can be configured via environment variables:
Variable |
Description |
|---|---|
|
Kafka broker addresses (e.g., |
|
Security protocol ( |
|
SASL mechanism ( |
|
SASL username |
|
SASL password |
|
Schema Registry URL |
|
Basic auth in format |
|
Schema Registry API key (for Confluent Cloud) |
|
Schema Registry API secret |
Next Steps
Model Definition - Learn about model definition and configuration
Producing Messages - Detailed guide to producing messages
Consuming Messages - Detailed guide to consuming messages
Consumer Loops - Build microservices with consumer loops