Consumer transaction management guide¶
API Reference: IdempotentConsumerBase
Integration Guide: Consume idempotently
Overview¶
This document clarifies transaction boundaries and responsibilities for Kafka consumers using the idempotent consumer pattern with processed-message store.
Transaction lifecycle¶
The pattern¶
The idempotent consumer follows this transaction pattern:
1. START transaction (if not started)
2. CLAIM event_id (insert into processed_messages)
3. HANDLE event (business logic)
4. COMMIT transaction (caller's responsibility)
Key responsibilities¶
Component |
Responsibility |
|---|---|
ProcessedMessageStore |
Start transaction if needed, claim event_id |
ConsumerBase |
Orchestrate claim + handle |
Concrete Consumer |
Implement handle_event, provide session |
Caller/Framework |
COMMIT or ROLLBACK the session |
Critical: caller must commit¶
IMPORTANT: The SqlAlchemyProcessedMessageStore.claim() method starts a transaction
but DOES NOT commit it. The caller MUST commit the session after successful
processing, or rollback on failure.
Why this design?¶
This design ensures atomicity between:
Marking the event as processed (claim)
The business logic side effects (handle_event)
If either fails, both should roll back together.
Correct usage¶
Pattern 1: with FastStream consumer¶
from faststream.kafka import KafkaBroker
from messagekit.infrastructure.messaging.kafka_consumer_base import IdempotentConsumerBase
from messagekit.infrastructure.persistence.processed_message_store import SqlAlchemyProcessedMessageStore
broker = KafkaBroker()
class MyConsumer(IdempotentConsumerBase):
def __init__(self, session_factory):
self._session_factory = session_factory
# Consumer will create session per message
async def handle_event(self, message: dict) -> None:
"""Process the event (business logic)."""
# Business logic here
await process_business_logic(message)
@broker.subscriber("my-topic")
async def consume_message(message: dict) -> None:
"""Consume one Kafka message with idempotency and transactionality."""
async with session_factory() as session:
# 1. Create store with session
store = SqlAlchemyProcessedMessageStore(session)
# 2. Create consumer with store
consumer = MyConsumer(store)
# 3. Process message (starts transaction, claims, handles)
was_processed = await consumer.consume(message)
if was_processed:
# 4. COMMIT if processing succeeded
await session.commit()
logger.info("Message %s processed and committed", message["event_id"])
else:
# 5. ROLLBACK if already processed (idempotent skip)
await session.rollback()
logger.debug("Message %s already processed, skipped", message["event_id"])
# FastStream will ACK the Kafka message after this function returns
Pattern 2: explicit transaction control¶
async def process_one_event(event: dict) -> None:
"""Process one event with explicit transaction management."""
async with session_factory() as session:
try:
# Create store bound to this session
store = SqlAlchemyProcessedMessageStore(session)
consumer = MyConsumer(store)
# Attempt to claim and process
was_processed = await consumer.consume(event)
if was_processed:
# Commit on success
await session.commit()
else:
# Already processed - rollback claim attempt
await session.rollback()
except Exception as exc:
# Error during processing - rollback everything
logger.error("Failed to process event: %s", exc, exc_info=True)
await session.rollback()
raise
Transaction guarantees¶
Success case¶
START transaction
↓
CLAIM event_id (INSERT processed_messages)
↓
HANDLE event (business logic)
↓
COMMIT
↓
Both claim + business logic committed atomically
Failure during handle¶
START transaction
↓
CLAIM event_id (INSERT processed_messages)
↓
HANDLE event → EXCEPTION
↓
ROLLBACK
↓
Neither claim nor business logic persisted
Event will be reprocessed on next attempt
Duplicate message (already claimed)¶
START transaction
↓
CLAIM event_id → False (already processed)
↓
ROLLBACK (nothing to commit)
↓
No work performed, message ACKed as duplicate
Common pitfalls¶
Pitfall 1: forgetting to commit¶
# ❌ WRONG: Transaction never committed
async def bad_consumer(message: dict) -> None:
async with session_factory() as session:
store = SqlAlchemyProcessedMessageStore(session)
consumer = MyConsumer(store)
await consumer.consume(message)
# Missing: await session.commit()
# Result: Transaction rolled back, event will be reprocessed infinitely
Pitfall 2: committing before handle completes¶
# ❌ WRONG: Commits claim but not business logic
async def bad_consumer(message: dict) -> None:
async with session_factory() as session:
store = SqlAlchemyProcessedMessageStore(session)
# Claim in transaction
claimed = await store.claim(consumer_name="test", event_id=message["event_id"])
await session.commit() # ❌ Commits too early!
if claimed:
# Business logic in NEW transaction or no transaction
await handle_business_logic(message) # ❌ Not atomic with claim!
Pitfall 3: sharing session across messages¶
# ❌ WRONG: Single session for multiple messages
async def bad_batch_consumer(messages: list[dict]) -> None:
async with session_factory() as session: # ❌ One session for all
store = SqlAlchemyProcessedMessageStore(session)
consumer = MyConsumer(store)
for message in messages:
await consumer.consume(message)
await session.commit() # ❌ All-or-nothing for entire batch
# Result: One failure rolls back all successful processing
Correct approach: One session per message:
# ✅ CORRECT: Separate session per message
async def good_batch_consumer(messages: list[dict]) -> None:
for message in messages:
async with session_factory() as session: # ✅ New session each time
store = SqlAlchemyProcessedMessageStore(session)
consumer = MyConsumer(store)
was_processed = await consumer.consume(message)
if was_processed:
await session.commit() # ✅ Independent commit
else:
await session.rollback()
Testing transaction boundaries¶
Test atomicity¶
async def test_claim_and_handle_atomic():
"""Verify claim + handle are atomic (both commit or both rollback)."""
async with session_factory() as session:
store = SqlAlchemyProcessedMessageStore(session)
consumer = FailingConsumer(store) # Raises in handle_event
with pytest.raises(Exception):
await consumer.consume(message)
# Don't commit (simulating error case)
await session.rollback()
# Verify claim was rolled back (event can be reprocessed)
async with session_factory() as session:
store = SqlAlchemyProcessedMessageStore(session)
# Should be able to claim again
claimed = await store.claim(consumer_name="test", event_id=message["event_id"])
assert claimed is True # ✅ Not marked as processed
Test idempotency¶
async def test_duplicate_handling():
"""Verify duplicate messages are skipped."""
# First processing
async with session_factory() as session:
store = SqlAlchemyProcessedMessageStore(session)
consumer = MyConsumer(store)
was_processed = await consumer.consume(message)
assert was_processed is True
await session.commit()
# Second processing (duplicate)
async with session_factory() as session:
store = SqlAlchemyProcessedMessageStore(session)
consumer = MyConsumer(store)
was_processed = await consumer.consume(message)
assert was_processed is False # ✅ Skipped as duplicate
await session.rollback()
Integration with Kafka¶
Kafka message acknowledgment¶
The transaction commit should happen before Kafka ACK:
1. Consume Kafka message
2. START DB transaction
3. CLAIM + HANDLE in transaction
4. COMMIT DB transaction ← Must succeed
5. ACK Kafka message ← Only after DB commit
If DB commit fails, Kafka message is NOT ACKed, and will be redelivered.
With FastStream¶
FastStream automatically ACKs after the handler returns successfully:
@broker.subscriber("topic")
async def handler(message: dict) -> None:
async with session_factory() as session:
store = SqlAlchemyProcessedMessageStore(session)
consumer = MyConsumer(store)
await consumer.consume(message)
await session.commit() # Must succeed
# FastStream ACKs here (after successful return)
If session.commit() raises an exception:
Handler raises exception
FastStream does NOT ACK
Kafka redelivers message
Consumer will retry (hopefully transiently)
Summary¶
Responsibility |
Owner |
|---|---|
Start transaction if needed |
ProcessedMessageStore.claim() |
Claim event_id |
ProcessedMessageStore.claim() |
Business logic |
Concrete consumer (handle_event) |
COMMIT or ROLLBACK |
Caller (FastStream handler, etc.) |
ACK Kafka message |
Framework (FastStream) after successful return |
Golden Rule: The component that creates the session is responsible for committing or rolling it back.
Critical: Never let a transaction end without explicit commit or rollback. Use try/except/finally to ensure cleanup.