๐ Producer-Consumer
One producer, multiple consumers. Each message processed once. Perfect for task distribution.
Synchronous systems: Service A calls Service B and waits for response.
Problems:
Solution: Message Queues - Asynchronous, decoupled communication!
Message queue = Buffer that stores messages between senders and receivers.
One producer, one or more consumers. Each message processed once.
Characteristics:
Use cases:
1import queue2import threading3import time4from typing import Optional5
6class MessageQueue:7 """Simple message queue implementation"""8
9 def __init__(self, maxsize: int = 1000):10 self.queue = queue.Queue(maxsize=maxsize)11 self.consumers = []12 self.running = False13
14 def produce(self, message: dict):15 """Producer: Add message to queue"""16 try:17 self.queue.put_nowait(message)18 print(f"Produced: {message}")19 except queue.Full:20 print("Queue full! Message rejected.")21
22 def consume(self, handler):23 """Consumer: Process messages from queue"""24 while self.running:25 try:26 message = self.queue.get(timeout=1)27 # Process message28 handler(message)29 # Acknowledge30 self.queue.task_done()31 except queue.Empty:32 continue33
34 def start_consumer(self, handler, consumer_id: int):35 """Start a consumer thread"""36 def consumer_loop():37 print(f"Consumer {consumer_id} started")38 self.consume(handler)39
40 thread = threading.Thread(target=consumer_loop, daemon=True)41 thread.start()42 self.consumers.append(thread)43
44 def start(self):45 """Start queue processing"""46 self.running = True47
48 def stop(self):49 """Stop queue processing"""50 self.running = False51 self.queue.join() # Wait for all tasks to complete52
53# Usage54queue = MessageQueue(maxsize=100)55queue.start()56
57# Message handler58def process_order(message):59 print(f"Processing order: {message['order_id']}")60 # Process order...61 time.sleep(1) # Simulate work62 print(f"Order {message['order_id']} processed")63
64# Start multiple consumers65queue.start_consumer(process_order, consumer_id=1)66queue.start_consumer(process_order, consumer_id=2)67
68# Producer sends messages69for i in range(10):70 queue.produce({'order_id': i, 'amount': 100 + i})71 time.sleep(0.1)72
73time.sleep(5) # Let consumers process74queue.stop()1import java.util.concurrent.BlockingQueue;2import java.util.concurrent.LinkedBlockingQueue;3import java.util.concurrent.ExecutorService;4import java.util.concurrent.Executors;5import java.util.function.Consumer;6
7public class MessageQueue {8 private final BlockingQueue<Message> queue;9 private final ExecutorService executor;10 private volatile boolean running = false;11
12 public MessageQueue(int maxSize) {13 this.queue = new LinkedBlockingQueue<>(maxSize);14 this.executor = Executors.newCachedThreadPool();15 }16
17 public void produce(Message message) {18 // Producer: Add message to queue19 if (queue.offer(message)) {20 System.out.println("Produced: " + message);21 } else {22 System.out.println("Queue full! Message rejected.");23 }24 }25
26 public void consume(Consumer<Message> handler) {27 // Consumer: Process messages from queue28 while (running) {29 try {30 Message message = queue.take(); // Blocks until message available31 handler.accept(message);32 } catch (InterruptedException e) {33 Thread.currentThread().interrupt();34 break;35 }36 }37 }38
39 public void startConsumer(Consumer<Message> handler, int consumerId) {40 // Start a consumer thread41 executor.submit(() -> {42 System.out.println("Consumer " + consumerId + " started");43 consume(handler);44 });45 }46
47 public void start() {48 running = true;49 }50
51 public void stop() {52 running = false;53 executor.shutdown();54 }55}56
57// Usage58MessageQueue queue = new MessageQueue(1000);59queue.start();60
61// Message handler62Consumer<Message> processOrder = message -> {63 System.out.println("Processing order: " + message.getOrderId());64 // Process order...65 try {66 Thread.sleep(1000); // Simulate work67 } catch (InterruptedException e) {68 Thread.currentThread().interrupt();69 }70 System.out.println("Order " + message.getOrderId() + " processed");71};72
73// Start multiple consumers74queue.startConsumer(processOrder, 1);75queue.startConsumer(processOrder, 2);76
77// Producer sends messages78for (int i = 0; i < 10; i++) {79 queue.produce(new Message(i, 100 + i));80 Thread.sleep(100);81}82
83Thread.sleep(5000); // Let consumers process84queue.stop();One publisher, multiple subscribers. Each subscriber gets copy of message.
Characteristics:
Use cases:
1from typing import List, Callable, Dict, Any2from threading import Lock3import threading4
5class Topic:6 """Pub-sub topic"""7
8 def __init__(self, name: str):9 self.name = name10 self.subscribers: List[Callable] = []11 self.lock = Lock()12
13 def subscribe(self, handler: Callable):14 """Subscribe to topic"""15 with self.lock:16 self.subscribers.append(handler)17 print(f"Subscriber added to {self.name}. Total: {len(self.subscribers)}")18
19 def unsubscribe(self, handler: Callable):20 """Unsubscribe from topic"""21 with self.lock:22 if handler in self.subscribers:23 self.subscribers.remove(handler)24
25 def publish(self, message: Dict[str, Any]):26 """Publish message to all subscribers"""27 with self.lock:28 subscribers = self.subscribers.copy()29
30 # Notify all subscribers (asynchronously)31 for subscriber in subscribers:32 try:33 # Run in separate thread to avoid blocking34 threading.Thread(35 target=subscriber,36 args=(message,),37 daemon=True38 ).start()39 except Exception as e:40 print(f"Error notifying subscriber: {e}")41
42class PubSubBroker:43 """Pub-sub message broker"""44
45 def __init__(self):46 self.topics: Dict[str, Topic] = {}47 self.lock = Lock()48
49 def get_topic(self, name: str) -> Topic:50 """Get or create topic"""51 with self.lock:52 if name not in self.topics:53 self.topics[name] = Topic(name)54 return self.topics[name]55
56 def publish(self, topic_name: str, message: Dict[str, Any]):57 """Publish message to topic"""58 topic = self.get_topic(topic_name)59 topic.publish(message)60
61 def subscribe(self, topic_name: str, handler: Callable):62 """Subscribe to topic"""63 topic = self.get_topic(topic_name)64 topic.subscribe(handler)65
66# Usage67broker = PubSubBroker()68
69# Subscribers70def email_handler(message):71 print(f"Email Service: Sending email for {message['event']}")72
73def sms_handler(message):74 print(f"SMS Service: Sending SMS for {message['event']}")75
76def analytics_handler(message):77 print(f"Analytics Service: Recording {message['event']}")78
79# Subscribe to 'user.created' topic80broker.subscribe('user.created', email_handler)81broker.subscribe('user.created', sms_handler)82broker.subscribe('user.created', analytics_handler)83
84# Publisher publishes event85broker.publish('user.created', {86 'event': 'user.created',87 'user_id': 123,88 'email': 'john@example.com'89})1import java.util.*;2import java.util.concurrent.CopyOnWriteArrayList;3import java.util.function.Consumer;4
5class Topic {6 private final String name;7 private final List<Consumer<Message>> subscribers = new CopyOnWriteArrayList<>();8
9 public Topic(String name) {10 this.name = name;11 }12
13 public void subscribe(Consumer<Message> handler) {14 subscribers.add(handler);15 System.out.println("Subscriber added to " + name + ". Total: " + subscribers.size());16 }17
18 public void unsubscribe(Consumer<Message> handler) {19 subscribers.remove(handler);20 }21
22 public void publish(Message message) {23 // Notify all subscribers24 subscribers.forEach(subscriber -> {25 try {26 subscriber.accept(message);27 } catch (Exception e) {28 System.err.println("Error notifying subscriber: " + e.getMessage());29 }30 });31 }32}33
34class PubSubBroker {35 private final Map<String, Topic> topics = new HashMap<>();36
37 public synchronized Topic getTopic(String name) {38 return topics.computeIfAbsent(name, Topic::new);39 }40
41 public void publish(String topicName, Message message) {42 Topic topic = getTopic(topicName);43 topic.publish(message);44 }45
46 public void subscribe(String topicName, Consumer<Message> handler) {47 Topic topic = getTopic(topicName);48 topic.subscribe(handler);49 }50}51
52// Usage53PubSubBroker broker = new PubSubBroker();54
55// Subscribers56Consumer<Message> emailHandler = message ->57 System.out.println("Email Service: Sending email for " + message.getEvent());58
59Consumer<Message> smsHandler = message ->60 System.out.println("SMS Service: Sending SMS for " + message.getEvent());61
62Consumer<Message> analyticsHandler = message ->63 System.out.println("Analytics Service: Recording " + message.getEvent());64
65// Subscribe to 'user.created' topic66broker.subscribe("user.created", emailHandler);67broker.subscribe("user.created", smsHandler);68broker.subscribe("user.created", analyticsHandler);69
70// Publisher publishes event71broker.publish("user.created", new Message("user.created", 123, "john@example.com"));Message may be lost, but never duplicated.
Characteristics:
Use when: Non-critical messages, metrics, logs
Message delivered at least once, may have duplicates.
Characteristics:
Use when: Critical messages, order processing, payments
Message delivered exactly once. Requires deduplication.
Characteristics:
Use when: Financial transactions, critical operations
Ordering ensures messages processed in sequence.
Example: User account balance updates
1Message 1: Balance = 1002Message 2: Balance = 150 (add 50)3Message 3: Balance = 120 (subtract 30)Correct order: 100 โ 150 โ 120 โ
Wrong order: 100 โ 120 โ 150 = 150 โ (wrong!)
1. Per-Partition Ordering (Kafka)
2. Per-Queue Ordering (RabbitMQ)
3. Global Ordering
At the code level, message queues translate to producer/consumer classes, message handlers, and acknowledgment logic.
1from abc import ABC, abstractmethod2from typing import Dict, Any, Optional3import json4
5class MessageHandler(ABC):6 """Base message handler interface"""7
8 @abstractmethod9 def handle(self, message: Dict[str, Any]) -> bool:10 """11 Handle message. Returns True if successful.12 Should be idempotent for at-least-once delivery.13 """14 pass15
16 @abstractmethod17 def can_handle(self, message_type: str) -> bool:18 """Check if handler can process this message type"""19 pass20
21class OrderProcessor(MessageHandler):22 """Process order messages"""23
24 def __init__(self, order_service):25 self.order_service = order_service26 self.processed_ids = set() # For idempotency27
28 def can_handle(self, message_type: str) -> bool:29 return message_type == 'order.created'30
31 def handle(self, message: Dict[str, Any]) -> bool:32 order_id = message.get('order_id')33
34 # Idempotency check35 if order_id in self.processed_ids:36 print(f"Order {order_id} already processed. Skipping.")37 return True # Already processed, consider success38
39 try:40 # Process order41 self.order_service.process_order(order_id, message)42
43 # Mark as processed44 self.processed_ids.add(order_id)45
46 return True47 except Exception as e:48 print(f"Error processing order {order_id}: {e}")49 return False # Return False to trigger retry50
51class MessageConsumer:52 """Consumer that routes messages to handlers"""53
54 def __init__(self, queue, handlers: List[MessageHandler]):55 self.queue = queue56 self.handlers = handlers57
58 def consume(self):59 """Consume messages from queue"""60 while True:61 try:62 message_data = self.queue.get(timeout=1)63 message = json.loads(message_data)64
65 # Find handler66 handler = self.find_handler(message.get('type'))67
68 if handler:69 # Process message70 success = handler.handle(message)71
72 if success:73 # Acknowledge message74 self.queue.task_done()75 else:76 # Return to queue for retry77 self.queue.put(message_data)78 else:79 print(f"No handler for message type: {message.get('type')}")80 self.queue.task_done()81
82 except Exception as e:83 print(f"Error consuming message: {e}")84
85 def find_handler(self, message_type: str) -> Optional[MessageHandler]:86 """Find handler for message type"""87 for handler in self.handlers:88 if handler.can_handle(message_type):89 return handler90 return None1import java.util.*;2
3interface MessageHandler {4 boolean handle(Message message);5 boolean canHandle(String messageType);6}7
8class OrderProcessor implements MessageHandler {9 private final OrderService orderService;10 private final Set<String> processedIds = new HashSet<>();11
12 public OrderProcessor(OrderService orderService) {13 this.orderService = orderService;14 }15
16 @Override17 public boolean canHandle(String messageType) {18 return "order.created".equals(messageType);19 }20
21 @Override22 public boolean handle(Message message) {23 String orderId = message.getOrderId();24
25 // Idempotency check26 synchronized (processedIds) {27 if (processedIds.contains(orderId)) {28 System.out.println("Order " + orderId + " already processed. Skipping.");29 return true; // Already processed30 }31 }32
33 try {34 // Process order35 orderService.processOrder(orderId, message);36
37 // Mark as processed38 synchronized (processedIds) {39 processedIds.add(orderId);40 }41
42 return true;43 } catch (Exception e) {44 System.err.println("Error processing order " + orderId + ": " + e.getMessage());45 return false; // Return false to trigger retry46 }47 }48}49
50class MessageConsumer {51 private final BlockingQueue<String> queue;52 private final List<MessageHandler> handlers;53
54 public MessageConsumer(BlockingQueue<String> queue, List<MessageHandler> handlers) {55 this.queue = queue;56 this.handlers = handlers;57 }58
59 public void consume() {60 while (true) {61 try {62 String messageData = queue.take();63 Message message = parseMessage(messageData);64
65 // Find handler66 MessageHandler handler = findHandler(message.getType());67
68 if (handler != null) {69 // Process message70 boolean success = handler.handle(message);71
72 if (success) {73 // Message processed successfully74 // (Acknowledgment handled by queue)75 } else {76 // Return to queue for retry77 queue.put(messageData);78 }79 } else {80 System.err.println("No handler for message type: " + message.getType());81 }82 } catch (InterruptedException e) {83 Thread.currentThread().interrupt();84 break;85 } catch (Exception e) {86 System.err.println("Error consuming message: " + e.getMessage());87 }88 }89 }90
91 private MessageHandler findHandler(String messageType) {92 return handlers.stream()93 .filter(h -> h.canHandle(messageType))94 .findFirst()95 .orElse(null);96 }97}๐ Producer-Consumer
One producer, multiple consumers. Each message processed once. Perfect for task distribution.
๐ข Pub-Sub
One publisher, multiple subscribers. Each gets copy. Perfect for event broadcasting.
โ Delivery Guarantees
At-least-once most common. Requires idempotent consumers. Exactly-once is hardest but most reliable.
๐ Ordering Matters
Message ordering critical for state changes. Per-partition ordering balances order and parallelism.