Publish-Subscribe Messaging

Publish-Subscribe (Pub/Sub) is a messaging pattern where publishers send messages to topics without knowing who will receive them, and subscribers receive messages from topics they’re interested in without knowing who sent them. This decoupling enables scalable, flexible, event-driven architectures.

Pub/Sub Fundamentals

Core Concepts

Publishers - Send messages to topics Subscribers - Receive messages from topics Topics - Named channels for message distribution Messages - Data packets being exchanged

Benefits

  • Decoupling - Publishers and subscribers are independent
  • Scalability - Add subscribers without affecting publishers
  • Flexibility - Dynamic subscription management
  • Fan-Out - One message reaches multiple subscribers

Geode Pub/Sub

Creating Topics

Define topics for message distribution:

-- Create topic for user events
CREATE TOPIC user_events
WITH OPTIONS (
    retention_period: '7 days',
    message_ttl: '24 hours',
    max_subscribers: 100
);

-- Create topic with message schema
CREATE TOPIC order_notifications
SCHEMA (
    order_id STRING,
    status STRING,
    timestamp TIMESTAMP,
    details MAP<STRING, ANY>
);

Publishing Messages

Publish to topics:

from geode_client import Client

async def publish_user_event(client, user_id, event_type):
    """Publish user event to topic"""
    message = {
        "user_id": user_id,
        "event_type": event_type,
        "timestamp": datetime.now().isoformat(),
        "metadata": {
            "source": "user-service",
            "version": "1.0"
        }
    }
    
    await client.publish("user_events", message)
    print(f"Published: {event_type} for user {user_id}")

Subscribing to Topics

Subscribe to receive messages:

async def subscribe_to_user_events(client):
    """Subscribe to user event topic"""
    async with client.subscribe("user_events") as subscription:
        async for message in subscription:
            await handle_user_event(message)

async def handle_user_event(message):
    """Process user event"""
    event_type = message["event_type"]
    user_id = message["user_id"]
    
    if event_type == "signup":
        await send_welcome_email(user_id)
    elif event_type == "purchase":
        await update_analytics(user_id, message)
    elif event_type == "churn":
        await trigger_retention_campaign(user_id)

Messaging Patterns

Fan-Out Pattern

One message to many subscribers:

# Publisher
async def notify_all(client, notification):
    """Send notification to all subscribers"""
    await client.publish("notifications", {
        "title": notification.title,
        "body": notification.body,
        "priority": "high"
    })

# Multiple subscribers receive the same message
async def email_subscriber(client):
    async with client.subscribe("notifications") as sub:
        async for msg in sub:
            await send_email(msg)

async def sms_subscriber(client):
    async with client.subscribe("notifications") as sub:
        async for msg in sub:
            await send_sms(msg)

async def push_subscriber(client):
    async with client.subscribe("notifications") as sub:
        async for msg in sub:
            await send_push_notification(msg)

Topic Filtering

Subscribe with filters:

-- Subscribe only to high-priority orders
SUBSCRIBE TO order_notifications
WHERE priority = 'high';

-- Subscribe to specific user's events
SUBSCRIBE TO user_events
WHERE user_id = 'user123';
async def filtered_subscription(client):
    """Subscribe with filter"""
    async with client.subscribe(
        "order_notifications",
        filter="priority = 'high'"
    ) as subscription:
        async for message in subscription:
            await handle_urgent_order(message)

Request-Reply Pattern

Implement RPC over pub/sub:

import uuid

class RPCClient:
    def __init__(self, client):
        self.client = client
        self.pending_requests = {}
        self.reply_topic = f"replies_{uuid.uuid4()}"
        
    async def call(self, method, params, timeout=5.0):
        """Make RPC call"""
        request_id = str(uuid.uuid4())
        
        # Set up reply handler
        reply_future = asyncio.Future()
        self.pending_requests[request_id] = reply_future
        
        # Publish request
        await self.client.publish("rpc_requests", {
            "id": request_id,
            "method": method,
            "params": params,
            "reply_to": self.reply_topic
        })
        
        # Wait for reply
        try:
            result = await asyncio.wait_for(reply_future, timeout)
            return result
        finally:
            del self.pending_requests[request_id]
    
    async def listen_for_replies(self):
        """Listen for RPC replies"""
        async with self.client.subscribe(self.reply_topic) as sub:
            async for message in sub:
                request_id = message["request_id"]
                if request_id in self.pending_requests:
                    self.pending_requests[request_id].set_result(message["result"])

Topic Hierarchies

Organize topics hierarchically:

users/
  users/created
  users/updated
  users/deleted
  users/verified

orders/
  orders/placed
  orders/shipped
  orders/delivered
  orders/canceled
# Subscribe to all user events
async with client.subscribe("users/*") as sub:
    async for message in sub:
        handle_user_event(message)

# Subscribe to specific event type
async with client.subscribe("users/created") as sub:
    async for message in sub:
        handle_new_user(message)

Message Delivery Guarantees

At-Most-Once

Message delivered zero or one time:

CREATE TOPIC notifications
WITH DELIVERY at_most_once;
  • Fastest performance
  • No retries
  • Suitable for non-critical notifications

At-Least-Once

Message delivered one or more times:

CREATE TOPIC orders
WITH DELIVERY at_least_once;
  • Messages may be duplicated
  • Subscribers must be idempotent
  • Suitable for most applications

Exactly-Once

Message delivered exactly one time:

CREATE TOPIC transactions
WITH DELIVERY exactly_once;
  • Highest latency
  • No duplicates
  • Suitable for financial transactions

Advanced Features

Message Priorities

Prioritize message delivery:

# Publish with priority
await client.publish("tasks", {
    "task_id": "task123",
    "action": "process_payment"
}, priority="high")

# Subscribe to high-priority messages first
async with client.subscribe(
    "tasks",
    priority_order=True
) as sub:
    async for message in sub:
        await process_task(message)

Message Expiration

Set message TTL:

# Publish with TTL
await client.publish("flash_sales", {
    "sale_id": "sale123",
    "discount": 0.5
}, ttl_seconds=3600)  # Expire after 1 hour

Dead Letter Queues

Handle failed messages:

CREATE TOPIC payment_processing
WITH OPTIONS (
    max_retries: 3,
    dead_letter_topic: 'payment_failures'
);
async def process_failed_payments(client):
    """Handle messages that exceeded retry limit"""
    async with client.subscribe("payment_failures") as sub:
        async for failed_message in sub:
            await manual_review(failed_message)
            await notify_support(failed_message)

Message Batching

Improve throughput with batching:

async def batch_publisher(client):
    """Publish messages in batches"""
    messages = [
        {"user_id": f"user{i}", "action": "login"}
        for i in range(1000)
    ]
    
    # Batch publish
    await client.publish_batch("user_events", messages)

Monitoring and Observability

Topic Metrics

Monitor topic health:

SELECT
    topic_name,
    messages_published,
    messages_consumed,
    pending_messages,
    subscriber_count,
    publish_rate,
    consume_rate
FROM SYSTEM.topics
WHERE pending_messages > 10000;

Subscriber Lag

Track consumer lag:

SELECT
    subscriber_id,
    topic_name,
    lag_messages,
    lag_seconds,
    last_message_time
FROM SYSTEM.subscribers
WHERE lag_messages > 1000;

Best Practices

Message Design

  • Keep messages small and focused
  • Include timestamp and metadata
  • Version message schemas
  • Make messages self-contained
  • Use consistent naming conventions

Subscription Management

  • Clean up unused subscriptions
  • Monitor subscriber lag
  • Implement backpressure handling
  • Use appropriate delivery guarantees
  • Handle redelivery idempotently

Error Handling

async def robust_subscriber(client):
    """Handle errors gracefully"""
    while True:
        try:
            async with client.subscribe("events") as sub:
                async for message in sub:
                    try:
                        await process_message(message)
                    except Exception as e:
                        logger.error(f"Error processing message: {e}")
                        await send_to_dlq(message, str(e))
        except ConnectionError:
            await asyncio.sleep(5)  # Reconnect delay

Production Patterns

Multi-Region Pub/Sub

Distribute messages across geographic regions:

topics:
  global_notifications:
    replicas:
      - region: us-east
        priority: primary
      - region: eu-west
        priority: secondary
      - region: ap-south
        priority: secondary
    replication: async
    message_routing: geo_nearest  # Route to nearest region
# Publisher in US publishes to local topic
async with client.publish("global_notifications", message, region="us-east"):
    pass

# Subscribers in EU/AP receive via replication
async with client.subscribe("global_notifications", region="eu-west") as sub:
    async for msg in sub:
        # Receives messages from US with replication lag
        process_message(msg)

Message Acknowledgment Strategies

Control message delivery and acknowledgment:

async def manual_acknowledgment():
    """Fine-grained control over message acknowledgment"""
    async with client.subscribe("orders", auto_ack=False) as sub:
        async for message in sub:
            try:
                # Process message
                await process_order(message.data)

                # Explicitly acknowledge
                await message.ack()
                logger.info(f"Processed and acked: {message.id}")

            except ProcessingError as e:
                # Negative acknowledgment (requeue)
                await message.nack(requeue=True)
                logger.error(f"Failed to process {message.id}: {e}")

            except FatalError as e:
                # Dead letter (don't requeue)
                await message.nack(requeue=False)
                logger.critical(f"Fatal error on {message.id}: {e}")

Consumer Groups

Load balance message processing across consumers:

# Consumer group for parallel processing
async def consumer_group_worker(worker_id, group_id):
    """Worker in consumer group"""
    async with client.subscribe(
        "order_processing",
        consumer_group=group_id,  # Messages distributed across group
        consumer_id=worker_id
    ) as sub:
        async for message in sub:
            logger.info(f"Worker {worker_id} processing {message.id}")
            await process_order(message.data)
            await message.ack()

# Start multiple workers (messages distributed among them)
async def main():
    workers = [
        consumer_group_worker(f"worker-{i}", "order-processors")
        for i in range(10)  # 10 workers share the load
    ]
    await asyncio.gather(*workers)

Ordered Message Processing

Maintain message ordering when required:

# Publish with partition key (same key → same partition → ordered)
await client.publish("user_events", {
    "user_id": "user123",
    "action": "login",
    "timestamp": datetime.now()
}, partition_key="user123")  # All events for user123 ordered

# Subscribe with ordering guarantee
async with client.subscribe(
    "user_events",
    ordering=True,
    partition_key_filter="user123"  # Process only this user's events in order
) as sub:
    async for event in sub:
        # Events for user123 arrive in published order
        await process_user_event(event)

Advanced Messaging Patterns

Saga Pattern with Pub/Sub

Coordinate distributed transactions:

class OrderSaga:
    """Distributed transaction using pub/sub"""

    def __init__(self, client):
        self.client = client

    async def start_order(self, order_data):
        """Initiate saga"""
        saga_id = str(uuid.uuid4())

        # Publish order created event
        await self.client.publish("saga_events", {
            "saga_id": saga_id,
            "type": "ORDER_CREATED",
            "data": order_data
        })

        return saga_id

    async def saga_orchestrator(self):
        """Orchestrate saga steps"""
        async with self.client.subscribe("saga_events") as sub:
            async for event in sub:
                saga_id = event.data['saga_id']
                event_type = event.data['type']

                if event_type == "ORDER_CREATED":
                    # Step 1: Reserve inventory
                    await self.reserve_inventory(saga_id, event.data['data'])

                elif event_type == "INVENTORY_RESERVED":
                    # Step 2: Process payment
                    await self.process_payment(saga_id, event.data['data'])

                elif event_type == "PAYMENT_PROCESSED":
                    # Step 3: Ship order
                    await self.ship_order(saga_id, event.data['data'])

                elif event_type == "SHIPPING_CONFIRMED":
                    # Saga complete
                    await self.complete_saga(saga_id)

                elif event_type.endswith("_FAILED"):
                    # Compensating actions
                    await self.rollback_saga(saga_id, event_type)

    async def reserve_inventory(self, saga_id, order):
        """Reserve inventory (Step 1)"""
        try:
            # Business logic
            result = await inventory_service.reserve(order['items'])

            # Publish success event
            await self.client.publish("saga_events", {
                "saga_id": saga_id,
                "type": "INVENTORY_RESERVED",
                "data": result
            })
        except Exception as e:
            # Publish failure event
            await self.client.publish("saga_events", {
                "saga_id": saga_id,
                "type": "INVENTORY_RESERVATION_FAILED",
                "error": str(e)
            })

    async def rollback_saga(self, saga_id, failed_step):
        """Compensating transactions"""
        if failed_step in ["PAYMENT_FAILED", "SHIPPING_FAILED"]:
            # Release inventory reservation
            await self.release_inventory(saga_id)

        if failed_step == "SHIPPING_FAILED":
            # Refund payment
            await self.refund_payment(saga_id)

        # Mark saga as rolled back
        await self.client.publish("saga_events", {
            "saga_id": saga_id,
            "type": "SAGA_ROLLED_BACK",
            "reason": failed_step
        })

Event Sourcing

Store state as sequence of events:

class EventSourcedEntity:
    """Entity rebuilt from event stream"""

    def __init__(self, entity_id, client):
        self.entity_id = entity_id
        self.client = client
        self.state = {}
        self.version = 0

    async def load(self):
        """Rebuild state from events"""
        # Subscribe to entity's event stream
        async with self.client.subscribe(
            f"entity_{self.entity_id}",
            from_beginning=True  # Replay all events
        ) as sub:
            async for event in sub:
                self.apply_event(event)
                self.version += 1

    def apply_event(self, event):
        """Apply event to state"""
        if event['type'] == 'ACCOUNT_CREATED':
            self.state['balance'] = event['initial_balance']
            self.state['created_at'] = event['timestamp']

        elif event['type'] == 'DEPOSIT':
            self.state['balance'] += event['amount']

        elif event['type'] == 'WITHDRAWAL':
            self.state['balance'] -= event['amount']

    async def deposit(self, amount):
        """Command: deposit money"""
        # Validate
        if amount <= 0:
            raise ValueError("Amount must be positive")

        # Publish event
        await self.client.publish(f"entity_{self.entity_id}", {
            "type": "DEPOSIT",
            "amount": amount,
            "timestamp": datetime.now(),
            "version": self.version + 1
        })

        # Apply to local state
        self.state['balance'] += amount
        self.version += 1

CQRS with Pub/Sub

Separate read and write models:

# Write side: publish events
async def create_user(user_data):
    """Command: create user (write model)"""
    user_id = str(uuid.uuid4())

    # Store in write database
    async with geode_client.connection() as conn:
        await conn.begin()
        try:
            await conn.execute("""
                CREATE (u:User {id: $id, email: $email, created_at: datetime()})
            """, {"id": user_id, "email": user_data['email']})
            await conn.commit()
        except Exception:
            await conn.rollback()
            raise

    # Publish event
    await pubsub_client.publish("user_events", {
        "type": "USER_CREATED",
        "user_id": user_id,
        "data": user_data
    })

    return user_id

# Read side: consume events and update read model
async def user_projection_updater():
    """Update read-optimized view"""
    async with pubsub_client.subscribe("user_events") as sub:
        async for event in sub:
            if event['type'] == 'USER_CREATED':
                # Update read database (denormalized for fast queries)
                await redis.hset(
                    f"user:{event['user_id']}",
                    mapping={
                        "email": event['data']['email'],
                        "created_at": event['data']['created_at']
                    }
                )

            elif event['type'] == 'USER_UPDATED':
                # Update read model
                await redis.hset(
                    f"user:{event['user_id']}",
                    mapping=event['data']
                )

# Read from read model (fast)
async def get_user(user_id):
    """Query: get user (read model)"""
    return await redis.hgetall(f"user:{user_id}")

Performance Tuning

Batch Publishing

Improve throughput with batching:

async def batch_publish_example():
    """Publish messages in batches"""
    messages = [
        {"user_id": i, "action": "click", "timestamp": datetime.now()}
        for i in range(10000)
    ]

    # Batch publish (100x faster than individual publishes)
    await client.publish_batch("user_actions", messages, batch_size=100)
    # Publishes 100 messages per network round-trip

Message Compression

Reduce network bandwidth:

# Enable compression for large messages
await client.publish(
    "analytics_data",
    large_payload,
    compression="lz4"  # Fast compression algorithm
)

# Compression reduces bandwidth by 70-90% for JSON payloads

Topic Sharding

Scale topics horizontally:

# Shard topic across multiple partitions
topics:
  high_volume_events:
    partitions: 64  # 64 shards for parallel processing
    partition_key: user_id  # Partition by user ID
# Messages automatically sharded by partition key
await client.publish(
    "high_volume_events",
    message,
    partition_key=user_id  # Determines which partition
)

# Subscribers process partitions in parallel
async def consume_partition(partition_id):
    async with client.subscribe(
        "high_volume_events",
        partition=partition_id
    ) as sub:
        async for msg in sub:
            await process_message(msg)

# 64 workers, each processing one partition
workers = [consume_partition(i) for i in range(64)]
await asyncio.gather(*workers)

Monitoring and Operations

Message Flow Metrics

Track message flow through system:

from prometheus_client import Counter, Histogram

messages_published = Counter(
    'pubsub_messages_published_total',
    'Total messages published',
    ['topic']
)

messages_consumed = Counter(
    'pubsub_messages_consumed_total',
    'Total messages consumed',
    ['topic', 'consumer_group']
)

message_latency = Histogram(
    'pubsub_message_latency_seconds',
    'Message end-to-end latency',
    ['topic']
)

async def publish_with_metrics(topic, message):
    """Publish with metrics tracking"""
    start_time = time.time()
    message['_published_at'] = start_time

    await client.publish(topic, message)

    messages_published.labels(topic=topic).inc()

async def consume_with_metrics(topic, consumer_group):
    """Consume with metrics tracking"""
    async with client.subscribe(topic, consumer_group=consumer_group) as sub:
        async for msg in sub:
            # Track latency
            latency = time.time() - msg.data['_published_at']
            message_latency.labels(topic=topic).observe(latency)

            # Process
            await process_message(msg)

            messages_consumed.labels(
                topic=topic,
                consumer_group=consumer_group
            ).inc()

Health Checks

Monitor pub/sub system health:

async def health_check():
    """Check pub/sub system health"""
    # Check topic lag
    lag = await client.get_consumer_lag("critical_events", "processors")
    if lag > 10000:
        alert("High consumer lag: {}".format(lag))

    # Check error rates
    error_rate = await client.get_error_rate("critical_events")
    if error_rate > 0.01:  # >1% error rate
        alert("High error rate: {:.2%}".format(error_rate))

    # Check throughput
    throughput = await client.get_throughput("critical_events")
    if throughput < 100:  # <100 msgs/sec
        alert("Low throughput: {} msgs/sec".format(throughput))

Learn More


Related Articles

No articles found with this tag yet.

Back to Home