Producer-Consumer Pattern
Introduction: The Producer-Consumer Problem
Section titled “Introduction: The Producer-Consumer Problem”The Producer-Consumer pattern is one of the most common concurrency patterns, especially in LLD interviews. It solves the problem of decoupling data production from data consumption.
Visual: The Problem
Section titled “Visual: The Problem”Why Use Producer-Consumer?
Section titled “Why Use Producer-Consumer?”Benefits:
- Decoupling: Producers and consumers don’t need to know about each other
- Rate Buffering: Handles speed mismatches (fast producer, slow consumer)
- Scalability: Easy to add more producers or consumers
- Backpressure: Bounded buffers prevent memory issues
Common Use Cases:
- Message queues
- Task queues
- Log processing
- Data pipelines
- Event-driven systems
The Basic Pattern
Section titled “The Basic Pattern”Visual: Producer-Consumer Flow
Section titled “Visual: Producer-Consumer Flow”Implementation Approach 1: Low-Level (Condition Variables)
Section titled “Implementation Approach 1: Low-Level (Condition Variables)”Let’s implement a bounded buffer from scratch using condition variables. This is often asked in interviews!
Visual: Condition Variable Flow
Section titled “Visual: Condition Variable Flow”Example: Bounded Buffer Implementation
Section titled “Example: Bounded Buffer Implementation”1import threading2
3class BoundedBuffer:4 def __init__(self, capacity):5 self.capacity = capacity6 self.buffer = []7 self.lock = threading.Lock()8 self.not_full = threading.Condition(self.lock)9 self.not_empty = threading.Condition(self.lock)10
11 def put(self, item):12 """Add item to buffer, blocking if full"""13 with self.lock:14 # Wait while buffer is full15 while len(self.buffer) >= self.capacity:16 self.not_full.wait() # Releases lock, waits17
18 self.buffer.append(item)19 print(f"Produced: {item}, Buffer size: {len(self.buffer)}")20 self.not_empty.notify() # Wake up waiting consumers21
22 def get(self):23 """Remove item from buffer, blocking if empty"""24 with self.lock:25 # Wait while buffer is empty26 while len(self.buffer) == 0:27 self.not_empty.wait() # Releases lock, waits28
29 item = self.buffer.pop(0)30 print(f"Consumed: {item}, Buffer size: {len(self.buffer)}")31 self.not_full.notify() # Wake up waiting producers32 return item33
34# Usage35buffer = BoundedBuffer(capacity=5)36
37def producer():38 for i in range(10):39 buffer.put(i)40 import time41 time.sleep(0.1)42
43def consumer():44 for _ in range(10):45 item = buffer.get()46 import time47 time.sleep(0.2)48
49threading.Thread(target=producer, daemon=True).start()50threading.Thread(target=consumer, daemon=True).start()1import java.util.LinkedList;2import java.util.Queue;3import java.util.concurrent.locks.Condition;4import java.util.concurrent.locks.Lock;5import java.util.concurrent.locks.ReentrantLock;6
7public class BoundedBuffer<T> {8 private final Queue<T> buffer;9 private final int capacity;10 private final Lock lock = new ReentrantLock();11 private final Condition notFull = lock.newCondition();12 private final Condition notEmpty = lock.newCondition();13
14 public BoundedBuffer(int capacity) {15 this.capacity = capacity;16 this.buffer = new LinkedList<>();17 }18
19 public void put(T item) throws InterruptedException {20 lock.lock();21 try {22 // Wait while buffer is full23 while (buffer.size() >= capacity) {24 notFull.await(); // Releases lock, waits25 }26
27 buffer.offer(item);28 System.out.println("Produced: " + item + ", Buffer size: " + buffer.size());29 notEmpty.signal(); // Wake up waiting consumers30 } finally {31 lock.unlock();32 }33 }34
35 public T get() throws InterruptedException {36 lock.lock();37 try {38 // Wait while buffer is empty39 while (buffer.isEmpty()) {40 notEmpty.await(); // Releases lock, waits41 }42
43 T item = buffer.poll();44 System.out.println("Consumed: " + item + ", Buffer size: " + buffer.size());45 notFull.signal(); // Wake up waiting producers46 return item;47 } finally {48 lock.unlock();49 }50 }51
52 public static void main(String[] args) throws InterruptedException {53 BoundedBuffer<Integer> buffer = new BoundedBuffer<>(5);54
55 Thread producer = new Thread(() -> {56 try {57 for (int i = 0; i < 10; i++) {58 buffer.put(i);59 Thread.sleep(100);60 }61 } catch (InterruptedException e) {62 Thread.currentThread().interrupt();63 }64 });65
66 Thread consumer = new Thread(() -> {67 try {68 for (int i = 0; i < 10; i++) {69 buffer.get();70 Thread.sleep(200);71 }72 } catch (InterruptedException e) {73 Thread.currentThread().interrupt();74 }75 });76
77 producer.start();78 consumer.start();79
80 producer.join();81 consumer.join();82 }83}Implementation Approach 2: High-Level (Blocking Queues)
Section titled “Implementation Approach 2: High-Level (Blocking Queues)”Using built-in blocking queues is much simpler and less error-prone!
Example: Using BlockingQueue (Java)
Section titled “Example: Using BlockingQueue (Java)”1import java.util.concurrent.ArrayBlockingQueue;2import java.util.concurrent.BlockingQueue;3
4public class BlockingQueueExample {5 public static void main(String[] args) throws InterruptedException {6 // Create bounded blocking queue7 BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);8
9 // Producer10 Thread producer = new Thread(() -> {11 try {12 for (int i = 0; i < 10; i++) {13 queue.put(i); // Blocks if full14 System.out.println("Produced: " + i);15 Thread.sleep(100);16 }17 } catch (InterruptedException e) {18 Thread.currentThread().interrupt();19 }20 });21
22 // Consumer23 Thread consumer = new Thread(() -> {24 try {25 for (int i = 0; i < 10; i++) {26 int item = queue.take(); // Blocks if empty27 System.out.println("Consumed: " + item);28 Thread.sleep(200);29 }30 } catch (InterruptedException e) {31 Thread.currentThread().interrupt();32 }33 });34
35 producer.start();36 consumer.start();37
38 producer.join();39 consumer.join();40 }41}Example: Using queue.Queue (Python)
Section titled “Example: Using queue.Queue (Python)”1import queue2import threading3import time4
5# Create bounded queue6q = queue.Queue(maxsize=5)7
8def producer():9 for i in range(10):10 q.put(i) # Blocks if full11 print(f"Produced: {i}")12 time.sleep(0.1)13
14def consumer():15 for _ in range(10):16 item = q.get() # Blocks if empty17 print(f"Consumed: {item}")18 q.task_done() # Mark task as done19 time.sleep(0.2)20
21threading.Thread(target=producer, daemon=True).start()22threading.Thread(target=consumer, daemon=True).start()23
24q.join() # Wait for all tasks to be doneComparison: ArrayBlockingQueue vs LinkedBlockingQueue
Section titled “Comparison: ArrayBlockingQueue vs LinkedBlockingQueue”| Feature | ArrayBlockingQueue | LinkedBlockingQueue |
|---|---|---|
| Backing | Array | Linked nodes |
| Bounded | Always bounded | Optionally bounded |
| Memory | Fixed size | Dynamic allocation |
| Throughput | Lower | Higher (typically) |
| Fairness | Optional | Optional |
Multiple Producers and Consumers
Section titled “Multiple Producers and Consumers”The pattern scales beautifully to multiple producers and consumers!
Visual: Multiple Producers/Consumers
Section titled “Visual: Multiple Producers/Consumers”Example: Multiple Producers/Consumers
Section titled “Example: Multiple Producers/Consumers”1import queue2import threading3import time4
5q = queue.Queue(maxsize=10)6
7def producer(producer_id):8 for i in range(5):9 item = f"P{producer_id}-{i}"10 q.put(item)11 print(f"Producer {producer_id} produced: {item}")12 time.sleep(0.1)13
14def consumer(consumer_id):15 while True:16 item = q.get()17 if item is None: # Poison pill18 q.task_done()19 break20 print(f"Consumer {consumer_id} consumed: {item}")21 q.task_done()22 time.sleep(0.2)23
24# Create multiple producers25producers = []26for i in range(3):27 p = threading.Thread(target=producer, args=(i,))28 producers.append(p)29 p.start()30
31# Create multiple consumers32consumers = []33for i in range(2):34 c = threading.Thread(target=consumer, args=(i,))35 consumers.append(c)36 c.start()37
38# Wait for producers39for p in producers:40 p.join()41
42# Send poison pills43for _ in consumers:44 q.put(None)45
46# Wait for consumers47for c in consumers:48 c.join()1import java.util.concurrent.ArrayBlockingQueue;2import java.util.concurrent.BlockingQueue;3
4public class MultipleProducersConsumers {5 private static final int POISON_PILL = -1;6
7 public static void main(String[] args) throws InterruptedException {8 BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);9
10 // Multiple producers11 Thread[] producers = new Thread[3];12 for (int i = 0; i < 3; i++) {13 final int producerId = i;14 producers[i] = new Thread(() -> {15 try {16 for (int j = 0; j < 5; j++) {17 int item = producerId * 10 + j;18 queue.put(item);19 System.out.println("Producer " + producerId + " produced: " + item);20 Thread.sleep(100);21 }22 } catch (InterruptedException e) {23 Thread.currentThread().interrupt();24 }25 });26 producers[i].start();27 }28
29 // Multiple consumers30 Thread[] consumers = new Thread[2];31 for (int i = 0; i < 2; i++) {32 final int consumerId = i;33 consumers[i] = new Thread(() -> {34 try {35 while (true) {36 int item = queue.take();37 if (item == POISON_PILL) {38 break; // Exit on poison pill39 }40 System.out.println("Consumer " + consumerId + " consumed: " + item);41 Thread.sleep(200);42 }43 } catch (InterruptedException e) {44 Thread.currentThread().interrupt();45 }46 });47 consumers[i].start();48 }49
50 // Wait for producers51 for (Thread producer : producers) {52 producer.join();53 }54
55 // Send poison pills56 for (int i = 0; i < consumers.length; i++) {57 queue.put(POISON_PILL);58 }59
60 // Wait for consumers61 for (Thread consumer : consumers) {62 consumer.join();63 }64 }65}Handling Backpressure
Section titled “Handling Backpressure”Backpressure occurs when producers generate data faster than consumers can process it. Bounded buffers handle this automatically!
Visual: Backpressure Handling
Section titled “Visual: Backpressure Handling”Poison Pill Pattern
Section titled “Poison Pill Pattern”The poison pill pattern enables graceful shutdown by sending a special sentinel value that signals consumers to stop.
Visual: Poison Pill Pattern
Section titled “Visual: Poison Pill Pattern”Example: Poison Pill Implementation
Section titled “Example: Poison Pill Implementation”1import queue2import threading3
4POISON_PILL = object() # Sentinel value5
6def producer(q):7 for i in range(10):8 q.put(i)9 print(f"Produced: {i}")10 q.put(POISON_PILL) # Send poison pill11 print("Producer finished")12
13def consumer(q, consumer_id):14 while True:15 item = q.get()16 if item is POISON_PILL:17 q.put(POISON_PILL) # Put it back for other consumers18 q.task_done()19 print(f"Consumer {consumer_id} shutting down")20 break21 print(f"Consumer {consumer_id} consumed: {item}")22 q.task_done()23
24q = queue.Queue()25threading.Thread(target=producer, args=(q,)).start()26
27# Multiple consumers28for i in range(3):29 threading.Thread(target=consumer, args=(q, i)).start()1import java.util.concurrent.BlockingQueue;2import java.util.concurrent.LinkedBlockingQueue;3
4public class PoisonPillExample {5 private static final String POISON_PILL = "POISON_PILL";6
7 public static void main(String[] args) throws InterruptedException {8 BlockingQueue<String> queue = new LinkedBlockingQueue<>();9
10 // Producer11 Thread producer = new Thread(() -> {12 try {13 for (int i = 0; i < 10; i++) {14 queue.put("Item-" + i);15 System.out.println("Produced: Item-" + i);16 }17 queue.put(POISON_PILL); // Send poison pill18 System.out.println("Producer finished");19 } catch (InterruptedException e) {20 Thread.currentThread().interrupt();21 }22 });23
24 // Consumers25 Thread[] consumers = new Thread[3];26 for (int i = 0; i < 3; i++) {27 final int consumerId = i;28 consumers[i] = new Thread(() -> {29 try {30 while (true) {31 String item = queue.take();32 if (POISON_PILL.equals(item)) {33 queue.put(POISON_PILL); // Put back for others34 System.out.println("Consumer " + consumerId + " shutting down");35 break;36 }37 System.out.println("Consumer " + consumerId + " consumed: " + item);38 }39 } catch (InterruptedException e) {40 Thread.currentThread().interrupt();41 }42 });43 consumers[i].start();44 }45
46 producer.start();47 producer.join();48
49 for (Thread consumer : consumers) {50 consumer.join();51 }52 }53}Priority Queue Producer-Consumer
Section titled “Priority Queue Producer-Consumer”Sometimes you need to process items by priority, not just FIFO order.
Example: Priority Queue
Section titled “Example: Priority Queue”1import queue2import threading3
4class Task:5 def __init__(self, priority, data):6 self.priority = priority7 self.data = data8
9 def __lt__(self, other):10 return self.priority < other.priority # Lower priority number = higher priority11
12pq = queue.PriorityQueue()13
14def producer():15 tasks = [16 Task(3, "Low priority task"),17 Task(1, "High priority task"),18 Task(2, "Medium priority task"),19 ]20 for task in tasks:21 pq.put(task)22 print(f"Produced: {task.data} (priority: {task.priority})")23
24def consumer():25 while True:26 task = pq.get()27 if task is None: # Poison pill28 break29 print(f"Consumed: {task.data} (priority: {task.priority})")30 pq.task_done()31
32threading.Thread(target=producer).start()33threading.Thread(target=consumer).start()1import java.util.concurrent.PriorityBlockingQueue;2
3public class PriorityQueueExample {4 static class Task implements Comparable<Task> {5 int priority;6 String data;7
8 Task(int priority, String data) {9 this.priority = priority;10 this.data = data;11 }12
13 @Override14 public int compareTo(Task other) {15 return Integer.compare(this.priority, other.priority);16 }17 }18
19 public static void main(String[] args) {20 PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();21
22 // Producer23 new Thread(() -> {24 queue.put(new Task(3, "Low priority task"));25 queue.put(new Task(1, "High priority task"));26 queue.put(new Task(2, "Medium priority task"));27 }).start();28
29 // Consumer30 new Thread(() -> {31 try {32 while (true) {33 Task task = queue.take();34 System.out.println("Consumed: " + task.data +35 " (priority: " + task.priority + ")");36 }37 } catch (InterruptedException e) {38 Thread.currentThread().interrupt();39 }40 }).start();41 }42}Real-World Example: Message Queue System
Section titled “Real-World Example: Message Queue System”Let’s build a more realistic example—a message queue system with multiple topics.
Example: Multi-Topic Message Queue
Section titled “Example: Multi-Topic Message Queue”1import queue2import threading3from collections import defaultdict4
5class MessageQueue:6 def __init__(self):7 self.queues = defaultdict(lambda: queue.Queue(maxsize=100))8 self.lock = threading.Lock()9
10 def publish(self, topic, message):11 """Publish message to a topic"""12 with self.lock:13 if topic in self.queues:14 try:15 self.queues[topic].put_nowait(message)16 print(f"Published to {topic}: {message}")17 except queue.Full:18 print(f"Topic {topic} queue full, dropping message")19
20 def subscribe(self, topic, callback):21 """Subscribe to a topic and process messages"""22 def consumer():23 while True:24 try:25 message = self.queues[topic].get(timeout=1)26 if message is None: # Poison pill27 break28 callback(topic, message)29 self.queues[topic].task_done()30 except queue.Empty:31 continue32
33 thread = threading.Thread(target=consumer, daemon=True)34 thread.start()35 return thread36
37# Usage38mq = MessageQueue()39
40# Subscribers41def handle_order(topic, message):42 print(f"Processing order: {message}")43
44def handle_payment(topic, message):45 print(f"Processing payment: {message}")46
47mq.subscribe("orders", handle_order)48mq.subscribe("payments", handle_payment)49
50# Publishers51mq.publish("orders", "Order #123")52mq.publish("payments", "Payment $100")1import java.util.Map;2import java.util.concurrent.*;3import java.util.function.Consumer;4
5public class MessageQueue {6 private final Map<String, BlockingQueue<String>> queues = new ConcurrentHashMap<>();7
8 public void publish(String topic, String message) {9 queues.computeIfAbsent(topic, k -> new LinkedBlockingQueue<>(100))10 .offer(message); // Non-blocking11 System.out.println("Published to " + topic + ": " + message);12 }13
14 public void subscribe(String topic, Consumer<String> callback) {15 BlockingQueue<String> queue = queues.computeIfAbsent(16 topic, k -> new LinkedBlockingQueue<>(100));17
18 new Thread(() -> {19 try {20 while (true) {21 String message = queue.take();22 if (message == null) break; // Poison pill23 callback.accept(message);24 }25 } catch (InterruptedException e) {26 Thread.currentThread().interrupt();27 }28 }).start();29 }30
31 public static void main(String[] args) {32 MessageQueue mq = new MessageQueue();33
34 // Subscribers35 mq.subscribe("orders", msg -> System.out.println("Processing order: " + msg));36 mq.subscribe("payments", msg -> System.out.println("Processing payment: " + msg));37
38 // Publishers39 mq.publish("orders", "Order #123");40 mq.publish("payments", "Payment $100");41 }42}Practice Problems
Section titled “Practice Problems”Easy: Bounded Buffer
Section titled “Easy: Bounded Buffer”Design a bounded buffer with blocking put() and get() operations.
Solution
See the BoundedBuffer implementation in the “Low-Level Implementation” section above.
Medium: Message Queue with Topics
Section titled “Medium: Message Queue with Topics”Design a message queue system supporting multiple topics/channels with separate queues.
Solution
See the MessageQueue example in the “Real-World Example” section above.
Interview Questions
Section titled “Interview Questions”Q1: “How would you implement a producer-consumer pattern?”
Section titled “Q1: “How would you implement a producer-consumer pattern?””Answer:
-
Low-level: Use condition variables with a lock
- Producers wait on
not_fullcondition when buffer is full - Consumers wait on
not_emptycondition when buffer is empty - Use
whileloops to handle spurious wakeups
- Producers wait on
-
High-level: Use blocking queues
BlockingQueuein Java orqueue.Queuein Pythonput()blocks when full,take()/get()blocks when empty- Much simpler and less error-prone
Q2: “What happens when the buffer is full? How do you handle backpressure?”
Section titled “Q2: “What happens when the buffer is full? How do you handle backpressure?””Answer:
- Bounded buffer: Producers block when buffer is full (automatic backpressure)
- Unbounded buffer: Can cause memory issues if producers are faster
- Backpressure strategies:
- Block producers (bounded buffer)
- Drop messages (with notification)
- Use backpressure signals to slow producers
- Implement priority-based dropping
Q3: “How would you implement a bounded buffer from scratch?”
Section titled “Q3: “How would you implement a bounded buffer from scratch?””Answer:
- Use a lock for mutual exclusion
- Use two condition variables:
not_fullandnot_empty put(): Acquire lock, wait onnot_fullwhile full, add item, signalnot_emptyget(): Acquire lock, wait onnot_emptywhile empty, remove item, signalnot_full- Always use
whileloops, notif, to handle spurious wakeups
Q4: “What’s the difference between ArrayBlockingQueue and LinkedBlockingQueue?”
Section titled “Q4: “What’s the difference between ArrayBlockingQueue and LinkedBlockingQueue?””Answer:
- ArrayBlockingQueue: Array-backed, always bounded, fixed memory, slightly lower throughput
- LinkedBlockingQueue: Node-based, optionally bounded, dynamic memory, typically higher throughput
- Choose: ArrayBlockingQueue for fixed-size needs, LinkedBlockingQueue for better performance
Key Takeaways
Section titled “Key Takeaways”Next Steps
Section titled “Next Steps”Continue learning concurrency patterns:
- Thread Pools & Executors - Efficient resource management
- Concurrent Collections - Thread-safe data structures
The Producer-Consumer pattern is fundamental to many concurrent systems! 🏭