Cross-service communication¶
Overview¶
messagekit enables reliable event-driven communication between microservices following the database-per-service pattern. Each service maintains its own PostgreSQL database and communicates via shared Kafka infrastructure.
Architecture principle: Database isolation¶
Critical: Services do NOT share databases. Each service has:
Its own PostgreSQL database
Its own
outbox_eventstable (for producing events)Its own
processed_messagestable (for consuming events idempotently)
Services communicate via Kafka (shared event bus), not direct database access.
Complete event flow¶
Step-by-step example: User creation event¶
Service A (User Service)
├── Database: postgres-a:5432/users_db
│ ├── users table
│ └── outbox_events table
└── Produces: UserCreated event
Service B (Analytics Service)
├── Database: postgres-b:5432/analytics_db
│ ├── analytics_data table
│ └── processed_messages table
└── Consumes: UserCreated from Kafka
Service C (Notification Service)
├── Database: postgres-c:5432/notifications_db
│ ├── notifications table
│ └── processed_messages table
└── Consumes: UserCreated from RabbitMQ (via bridge)
Detailed flow¶
1. Service A produces event (postgres-a)¶
# service-a/routes.py
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from messagekit.infrastructure.outbox import SqlAlchemyOutboxRepository
from messagekit.core import BaseEvent
app = FastAPI()
class UserCreated(BaseEvent):
event_type: str = "user.created"
aggregate_id: str
user_id: int
email: str
@app.post("/users")
async def create_user(
data: dict,
session: AsyncSession = Depends(get_session),
outbox_repo: SqlAlchemyOutboxRepository = Depends(get_outbox_repo)
):
async with session.begin():
# 1. Business data goes to postgres-a
user = User(**data)
session.add(user)
# 2. Event goes to outbox_events in postgres-a (same transaction)
await outbox_repo.add_event(
UserCreated(
aggregate_id=f"user-{user.id}",
user_id=user.id,
email=user.email
),
session=session
)
# 3. Atomic commit (both user + event in postgres-a)
# If commit fails, neither persists
return {"user_id": user.id}
Result: users table and outbox_events table in postgres-a both updated atomically.
2. Kafka Connect CDC publishes to Kafka¶
Kafka Connect monitors postgres-a.outbox_events using PostgreSQL Write-Ahead Log (WAL):
// Debezium CDC configuration
{
"name": "service-a-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres-a",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "users_db",
"database.server.name": "service-a",
"table.include.list": "public.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter"
}
}
CDC detects new row in outbox_events and publishes to Kafka topic: user.created.
Important: CDC reads from postgres-a’s outbox. Service B and C do NOT access postgres-a directly.
3. Service B consumes from Kafka (postgres-b)¶
# service-b/consumers.py
from faststream.kafka import KafkaBroker
from messagekit.infrastructure.pubsub.consumer_base import SqlAlchemyProcessedMessageStore
broker = KafkaBroker("kafka:9092") # Docker service name
@broker.subscriber("user.created")
async def handle_user_created(message: dict):
"""
FastStream connects to kafka:9092 (shared infrastructure).
Docker DNS resolves 'kafka' to Kafka container IP.
"""
async with session_factory() as session:
# Idempotency check in postgres-b (NOT postgres-a)
store = SqlAlchemyProcessedMessageStore(session)
if not await store.was_processed(message["messageId"]):
# Process business logic in postgres-b
analytics_record = AnalyticsRecord(
user_id=message["user_id"],
email=message["email"],
event_timestamp=message["timestamp"]
)
session.add(analytics_record)
# Mark as processed in postgres-b's processed_messages table
await store.mark_processed(message["messageId"])
await session.commit()
Key points:
Service B subscribes to Kafka topic (not postgres-a)
Service B writes to postgres-b (its own database)
Idempotency check uses postgres-b’s processed_messages table
Kafka is the communication medium, not database access
4. Bridge forwards Kafka → RabbitMQ¶
The bridge service (part of standard architecture) runs the Kafka → RabbitMQ forwarder:
# messagekit-bridge/main.py (thin wrapper service)
from fastapi import FastAPI
from messagekit.main._initialization.bridge_setup import initialize_production_bridge
from messagekit.config import settings
app = FastAPI(title="Messagekit Bridge")
@app.on_event("startup")
async def startup():
# Initialize bridge: Kafka → RabbitMQ
await initialize_production_bridge(
session_factory=session_factory, # Bridge's own postgres-bridge DB
kafka_broker=kafka_broker, # Connects to kafka:9092
rabbit_broker=rabbit_broker, # Connects to rabbitmq:5672
)
# Subscribes to Kafka topic "user.created"
# Forwards to RabbitMQ exchange "events" with routing key "user-service.user.created"
5. Service C consumes from RabbitMQ (postgres-c)¶
# service-c/consumers.py
from faststream.rabbit import RabbitBroker
from messagekit.infrastructure.pubsub.consumer_base import SqlAlchemyProcessedMessageStore
rabbit_broker = RabbitBroker("amqp://guest:guest@rabbitmq:5672/")
@rabbit_broker.subscriber("events", routing_key="user-service.user.created")
async def handle_user_created(message: dict):
"""
Consumes from RabbitMQ (via bridge).
Docker DNS resolves 'rabbitmq' to RabbitMQ container IP.
"""
async with session_factory() as session:
# Idempotency check in postgres-c (NOT postgres-a or postgres-b)
store = SqlAlchemyProcessedMessageStore(session)
if not await store.was_processed(message["messageId"]):
# Send notification (business logic in postgres-c)
notification = Notification(
user_id=message["user_id"],
type="welcome_email",
status="pending"
)
session.add(notification)
# Mark as processed in postgres-c's processed_messages table
await store.mark_processed(message["messageId"])
await session.commit()
Key points:
Service C subscribes to RabbitMQ exchange (forwarded from Kafka via bridge)
Service C writes to postgres-c (its own database)
Idempotency check uses postgres-c’s processed_messages table
Production deployment¶
Docker Compose: Single file (all services)¶
For monorepo or tightly coupled services:
version: '3.8'
networks:
microservices-network:
driver: bridge
services:
# ============================================
# SHARED INFRASTRUCTURE
# ============================================
kafka:
image: confluentinc/cp-kafka:latest
container_name: shared-kafka
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
networks:
- microservices-network
rabbitmq:
image: rabbitmq:3.13-management
container_name: shared-rabbitmq
ports:
- "5672:5672"
- "15672:15672"
networks:
- microservices-network
kafka-connect:
image: debezium/connect:latest
environment:
BOOTSTRAP_SERVERS: kafka:9092
networks:
- microservices-network
# ============================================
# SERVICE A (Producer)
# ============================================
postgres-a:
image: postgres:15
environment:
POSTGRES_DB: users_db
networks:
- microservices-network
service-a:
build: ./service-a
depends_on:
- postgres-a
- kafka
environment:
# Service A config
DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres-a:5432/users_db"
KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
networks:
- microservices-network
# ============================================
# BRIDGE (Kafka → RabbitMQ)
# ============================================
postgres-bridge:
image: postgres:15
environment:
POSTGRES_DB: bridge_db
networks:
- microservices-network
messagekit-bridge:
build: ./messagekit-bridge
depends_on:
- kafka
- rabbitmq
- postgres-bridge
environment:
# Bridge config
DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres-bridge:5432/bridge_db"
KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
KAFKA_TOPIC: "user.created"
RABBITMQ_URL: "amqp://guest:guest@rabbitmq:5672/"
RABBITMQ_EXCHANGE: "events"
networks:
- microservices-network
# ============================================
# SERVICE B (Kafka Consumer)
# ============================================
postgres-b:
image: postgres:15
environment:
POSTGRES_DB: analytics_db
networks:
- microservices-network
service-b:
build: ./service-b
depends_on:
- postgres-b
- kafka
environment:
# Service B config
DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres-b:5432/analytics_db"
KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
networks:
- microservices-network
# ============================================
# SERVICE C (RabbitMQ Consumer)
# ============================================
postgres-c:
image: postgres:15
environment:
POSTGRES_DB: notifications_db
networks:
- microservices-network
service-c:
build: ./service-c
depends_on:
- postgres-c
- rabbitmq
environment:
# Service C config
DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres-c:5432/notifications_db"
RABBITMQ_URL: "amqp://guest:guest@rabbitmq:5672/"
networks:
- microservices-network
Docker networking: Service discovery¶
All services on the same Docker network enable automatic service discovery via DNS:
kafka:9092→ Resolves to Kafka container IPrabbitmq:5672→ Resolves to RabbitMQ container IPpostgres-a:5432→ Resolves to Service A’s Postgres IPpostgres-b:5432→ Resolves to Service B’s Postgres IPpostgres-c:5432→ Resolves to Service C’s Postgres IP
No manual IP configuration needed. Docker DNS handles everything.
Configuration via environment variables¶
Each service configures messagekit via environment variables (read by Pydantic Settings):
Service A (Producer):
# Connects to its own database
DATABASE_URL=postgresql+asyncpg://postgres:postgres@postgres-a:5432/users_db
# CDC will publish from this database to Kafka
KAFKA_BOOTSTRAP_SERVERS=kafka:9092
Service B (Kafka Consumer):
# Connects to its own database
DATABASE_URL=postgresql+asyncpg://postgres:postgres@postgres-b:5432/analytics_db
# Subscribes to Kafka
KAFKA_BOOTSTRAP_SERVERS=kafka:9092
Service C (RabbitMQ Consumer):
# Connects to its own database
DATABASE_URL=postgresql+asyncpg://postgres:postgres@postgres-c:5432/notifications_db
# Subscribes to RabbitMQ
RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
Bridge Service:
# Connects to its own database
DATABASE_URL=postgresql+asyncpg://postgres:postgres@postgres-bridge:5432/bridge_db
# Consumes from Kafka, publishes to RabbitMQ
KAFKA_BOOTSTRAP_SERVERS=kafka:9092
RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
KAFKA_TOPIC=user.created
RABBITMQ_EXCHANGE=events
Architecture decisions¶
Why Kafka AND RabbitMQ?¶
Both are part of the standard architecture:
Broker |
Primary Use |
Consumers |
Notes |
|---|---|---|---|
Kafka |
Event backbone |
Python services (FastStream) |
High throughput, durable, partitioned |
RabbitMQ |
Routing flexibility |
Any service (via bridge) |
Complex routing, native AMQP, easy dead-letter exchanges |
Pattern: Kafka is the primary event bus. RabbitMQ provides routing flexibility for services that need AMQP or complex routing patterns.
When to consume from Kafka vs RabbitMQ¶
Consume from Kafka when:
Service is written in Python using FastStream
Need high throughput (millions of events)
Need event replay from offset
Partitioning important for scalability
Consume from RabbitMQ when:
Service uses AMQP clients (Node.js, .NET, etc.)
Need complex routing (topic exchanges, headers exchanges)
Legacy integration requirements
Prefer native dead-letter exchanges
Database isolation guarantees¶
What database-per-service provides:
✅ Transactional boundary: Each service’s business logic + events are atomic within its own database
✅ Service autonomy: Services can be developed, deployed, scaled independently
✅ Failure isolation: Database failure in Service A doesn’t affect Service B or C
✅ Schema evolution: Each service can modify its schema without coordinating with others
What Kafka provides:
✅ Event delivery guarantee: At-least-once delivery (CDC retries on failure)
✅ Event ordering: Per-partition ordering for related events
✅ Event replay: Consumers can replay from any offset
✅ Service decoupling: Producers don’t know about consumers
What idempotency stores provide:
✅ Exactly-once processing: Duplicate messages are safely ignored
✅ Consumer isolation: Each consumer tracks its own processed messages
✅ Transactional safety: Idempotency check + business logic in same transaction
Troubleshooting¶
Issue: Service can’t connect to Kafka¶
Symptoms: Connection refused or Connection timed out errors.
Cause: Using wrong hostname or port.
Solution: Use Docker service name:
# ❌ Wrong (host networking)
broker = KafkaBroker("localhost:9092")
# ✅ Correct (Docker networking)
broker = KafkaBroker("kafka:9092")
Issue: Events published but not consumed¶
Symptoms: Producer succeeds, but consumer never receives messages.
Diagnosis:
Check CDC connector status:
curl http://kafka-connect:8083/connectors/service-a-outbox-connector/statusCheck Kafka topic has messages:
docker exec shared-kafka kafka-console-consumer \ --bootstrap-server kafka:9092 \ --topic user.created \ --from-beginning
Check consumer is subscribed:
# Verify topic name matches @broker.subscriber("user.created") # Must match CDC topic
Issue: Duplicate event processing¶
Symptoms: Same event processed multiple times, causing duplicate side effects.
Cause: Consumer not using idempotency pattern.
Solution: Always use SqlAlchemyProcessedMessageStore:
async with session.begin():
store = SqlAlchemyProcessedMessageStore(session)
if not await store.was_processed(message["messageId"]):
# Business logic
await store.mark_processed(message["messageId"])
# Commit both business + idempotency check
Issue: Bridge not forwarding to RabbitMQ¶
Symptoms: Events appear in Kafka but not RabbitMQ.
Diagnosis:
Check bridge service logs:
docker logs messagekit-bridge
Verify bridge environment variables:
KAFKA_TOPIC=user.created # Must match producer's topic RABBITMQ_EXCHANGE=events # Must match consumer's exchange
Check RabbitMQ exchange exists:
# RabbitMQ Management UI http://localhost:15672/ # Username: guest, Password: guest
See also¶
Integration guide - How to integrate
messagekitin your serviceTransactional outbox pattern - usage guide - Outbox pattern details
Consumer transaction management guide - Idempotent consumer pattern
Dead letter queue (DLQ) handler guide - Dead letter queue handling