Settings Profiles
FlowODM provides predefined settings profiles optimized for common use cases.
Available Profiles
Profile |
Session Timeout |
Max Poll Interval |
Max Poll Records |
Use Case |
|---|---|---|---|---|
|
5 min |
10 min |
100 |
ML inference, complex processing |
|
45 sec |
5 min |
500 |
ETL, data aggregation |
|
10 sec |
30 sec |
10 |
Event-driven, notifications |
|
30 sec |
5 min |
1000 |
High-volume batch processing |
|
45 sec |
5 min |
100 |
At-least-once with manual commit |
LongRunningSettings
For tasks that take significant time to process:
from flowodm import ConsumerLoop, LongRunningSettings
loop = ConsumerLoop(
model=MLPredictionRequest,
handler=run_ml_inference,
settings=LongRunningSettings(),
)
Configuration:
session.timeout.ms: 300000 (5 minutes)max.poll.interval.ms: 600000 (10 minutes)max.poll.records: 100
Best for:
Machine learning inference
Complex data transformations
External API calls with rate limits
Database migrations
BatchSettings
For processing large batches efficiently:
from flowodm import ConsumerLoop, BatchSettings
loop = ConsumerLoop(
model=DataRecord,
handler=process_batch,
settings=BatchSettings(),
)
Configuration:
session.timeout.ms: 45000 (45 seconds)max.poll.interval.ms: 300000 (5 minutes)max.poll.records: 500
Best for:
ETL pipelines
Data aggregation
Bulk database operations
Report generation
RealTimeSettings
For low-latency, real-time processing:
from flowodm import ConsumerLoop, RealTimeSettings
loop = ConsumerLoop(
model=NotificationEvent,
handler=send_notification,
settings=RealTimeSettings(),
)
Configuration:
session.timeout.ms: 10000 (10 seconds)max.poll.interval.ms: 30000 (30 seconds)max.poll.records: 10
Best for:
Push notifications
Real-time alerts
Event-driven workflows
Low-latency responses
HighThroughputSettings
For maximum throughput with large batches:
from flowodm import ConsumerLoop, HighThroughputSettings
loop = ConsumerLoop(
model=LogEvent,
handler=process_logs,
settings=HighThroughputSettings(),
)
Configuration:
session.timeout.ms: 30000 (30 seconds)max.poll.interval.ms: 300000 (5 minutes)max.poll.records: 1000
Best for:
Log processing
Metrics aggregation
High-volume data ingestion
ReliableSettings
For at-least-once processing with manual commit:
from flowodm import ConsumerLoop, ReliableSettings
loop = ConsumerLoop(
model=PaymentEvent,
handler=process_payment,
settings=ReliableSettings(),
)
Configuration:
session.timeout.ms: 45000 (45 seconds)max.poll.interval.ms: 300000 (5 minutes)max.poll.records: 100enable.auto.commit: False
Best for:
Financial transactions
Critical data pipelines
Audit logs
Order processing
Custom Settings
Create custom settings by extending BaseSettings:
from flowodm.settings import BaseSettings
class MyCustomSettings(BaseSettings):
session_timeout_ms: int = 60000
max_poll_interval_ms: int = 120000
max_poll_records: int = 50
# Add custom configuration
enable_auto_commit: bool = False
def to_consumer_config(self) -> dict:
config = super().to_consumer_config()
config["enable.auto.commit"] = self.enable_auto_commit
return config
loop = ConsumerLoop(
model=MyEvent,
handler=my_handler,
settings=MyCustomSettings(),
)
Using with connect()
Settings can also be applied globally:
from flowodm import connect
from flowodm.settings import LongRunningSettings
settings = LongRunningSettings()
connect(
bootstrap_servers="localhost:9092",
consumer_config=settings.to_consumer_config(),
)
Understanding the Settings
session.timeout.ms
How long the broker waits for a heartbeat before considering the consumer dead. If your processing takes longer than this, the consumer will be removed from the group.
max.poll.interval.ms
Maximum time between poll() calls. If processing a batch takes longer than this,
the consumer will be removed from the group.
max.poll.records
Maximum number of records returned in a single poll() call. Larger values
increase throughput but also increase memory usage and processing time per batch.
Choosing the Right Profile
Is latency critical? (< 1 second response)
└── Yes → RealTimeSettings
└── No
└── Is processing > 1 minute per message?
└── Yes → LongRunningSettings
└── No
└── Is throughput the priority?
└── Yes → HighThroughputSettings or BatchSettings
└── No
└── Is reliability critical?
└── Yes → ReliableSettings
└── No → BatchSettings (default)