🔄 Exchanges Route
Exchanges route messages to queues based on routing keys. Direct, topic, fanout, headers.
RabbitMQ is a traditional message broker implementing the AMQP (Advanced Message Queuing Protocol) standard.
Routes to queue with matching routing key.
Use case: Point-to-point messaging, task queues
Routes based on pattern matching (wildcards).
Patterns:
* - Matches one word# - Matches zero or more wordsUse case: Categorized messages, event routing
Broadcasts to all bound queues (ignores routing key).
Use case: Pub-sub, notifications, cache invalidation
Routes based on message headers (ignores routing key).
Use case: Complex routing logic
1import pika2import json3
4class RabbitMQProducer:5 """RabbitMQ producer"""6
7 def __init__(self, host='localhost'):8 self.connection = pika.BlockingConnection(9 pika.ConnectionParameters(host=host)10 )11 self.channel = self.connection.channel()12
13 def setup_exchange(self, exchange_name: str, exchange_type: str = 'direct'):14 """Declare exchange"""15 self.channel.exchange_declare(16 exchange=exchange_name,17 exchange_type=exchange_type,18 durable=True # Survive broker restart19 )20
21 def publish(self, exchange: str, routing_key: str, message: dict):22 """Publish message"""23 self.channel.basic_publish(24 exchange=exchange,25 routing_key=routing_key,26 body=json.dumps(message),27 properties=pika.BasicProperties(28 delivery_mode=2, # Make message persistent29 content_type='application/json'30 )31 )32 print(f"Published to {exchange} with key {routing_key}")33
34 def close(self):35 """Close connection"""36 self.connection.close()37
38# Usage39producer = RabbitMQProducer()40producer.setup_exchange('orders', 'direct')41producer.publish('orders', 'order.created', {42 'order_id': 123,43 'user_id': 456,44 'amount': 99.9945})46producer.close()1import com.rabbitmq.client.*;2
3public class RabbitMQProducer {4 private final Connection connection;5 private final Channel channel;6
7 public RabbitMQProducer(String host) throws Exception {8 ConnectionFactory factory = new ConnectionFactory();9 factory.setHost(host);10 this.connection = factory.newConnection();11 this.channel = connection.createChannel();12 }13
14 public void setupExchange(String exchangeName, String exchangeType) throws Exception {15 // Declare exchange16 channel.exchangeDeclare(exchangeName, exchangeType, true); // Durable17 }18
19 public void publish(String exchange, String routingKey, String message) throws Exception {20 // Publish message21 channel.basicPublish(22 exchange,23 routingKey,24 MessageProperties.PERSISTENT_TEXT_PLAIN, // Make persistent25 message.getBytes()26 );27 System.out.println("Published to " + exchange + " with key " + routingKey);28 }29
30 public void close() throws Exception {31 channel.close();32 connection.close();33 }34}35
36// Usage37RabbitMQProducer producer = new RabbitMQProducer("localhost");38producer.setupExchange("orders", "direct");39producer.publish("orders", "order.created",40 "{\"order_id\":123,\"user_id\":456,\"amount\":99.99}");41producer.close();1import pika2import json3
4class RabbitMQConsumer:5 """RabbitMQ consumer"""6
7 def __init__(self, host='localhost'):8 self.connection = pika.BlockingConnection(9 pika.ConnectionParameters(host=host)10 )11 self.channel = self.connection.channel()12
13 def setup_queue(self, queue_name: str, durable: bool = True):14 """Declare queue"""15 self.channel.queue_declare(16 queue=queue_name,17 durable=durable # Survive broker restart18 )19
20 def bind_queue(self, queue: str, exchange: str, routing_key: str):21 """Bind queue to exchange"""22 self.channel.queue_bind(23 queue=queue,24 exchange=exchange,25 routing_key=routing_key26 )27
28 def consume(self, queue: str, handler, auto_ack: bool = False):29 """Consume messages"""30 def callback(ch, method, properties, body):31 try:32 message = json.loads(body)33 # Process message34 handler(message)35
36 # Acknowledge message37 if not auto_ack:38 ch.basic_ack(delivery_tag=method.delivery_tag)39 except Exception as e:40 print(f"Error processing message: {e}")41 # Reject and requeue42 if not auto_ack:43 ch.basic_nack(44 delivery_tag=method.delivery_tag,45 requeue=True46 )47
48 # Set prefetch (how many unacked messages per consumer)49 self.channel.basic_qos(prefetch_count=1)50
51 # Start consuming52 self.channel.basic_consume(53 queue=queue,54 on_message_callback=callback,55 auto_ack=auto_ack56 )57
58 print(f"Consuming from {queue}...")59 self.channel.start_consuming()60
61 def close(self):62 """Close connection"""63 self.connection.close()64
65# Usage66def handle_order(message):67 print(f"Processing order: {message['order_id']}")68 # Process order...69
70consumer = RabbitMQConsumer()71consumer.setup_queue('order-processor', durable=True)72consumer.bind_queue('order-processor', 'orders', 'order.created')73consumer.consume('order-processor', handle_order, auto_ack=False)1import com.rabbitmq.client.*;2
3public class RabbitMQConsumer {4 private final Connection connection;5 private final Channel channel;6
7 public RabbitMQConsumer(String host) throws Exception {8 ConnectionFactory factory = new ConnectionFactory();9 factory.setHost(host);10 this.connection = factory.newConnection();11 this.channel = connection.createChannel();12 }13
14 public void setupQueue(String queueName, boolean durable) throws Exception {15 // Declare queue16 channel.queueDeclare(queueName, durable, false, false, null);17 }18
19 public void bindQueue(String queue, String exchange, String routingKey) throws Exception {20 // Bind queue to exchange21 channel.queueBind(queue, exchange, routingKey);22 }23
24 public void consume(String queue, java.util.function.Consumer<String> handler,25 boolean autoAck) throws Exception {26 // Set prefetch27 channel.basicQos(1);28
29 // Consumer callback30 DeliverCallback deliverCallback = (consumerTag, delivery) -> {31 try {32 String message = new String(delivery.getBody(), "UTF-8");33 // Process message34 handler.accept(message);35
36 // Acknowledge37 if (!autoAck) {38 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);39 }40 } catch (Exception e) {41 System.err.println("Error processing message: " + e.getMessage());42 // Reject and requeue43 if (!autoAck) {44 channel.basicNack(45 delivery.getEnvelope().getDeliveryTag(),46 false,47 true // Requeue48 );49 }50 }51 };52
53 // Start consuming54 channel.basicConsume(queue, autoAck, deliverCallback, consumerTag -> {});55 System.out.println("Consuming from " + queue + "...");56 }57
58 public void close() throws Exception {59 channel.close();60 connection.close();61 }62}63
64// Usage65RabbitMQConsumer consumer = new RabbitMQConsumer("localhost");66consumer.setupQueue("order-processor", true);67consumer.bindQueue("order-processor", "orders", "order.created");68consumer.consume("order-processor", message -> {69 System.out.println("Processing: " + message);70}, false);Critical for reliable message processing.
1# Message removed immediately when delivered2channel.basic_consume(queue='orders', on_message_callback=callback, auto_ack=True)Problem: If consumer crashes, message lost!
1# Message removed only after ack2def callback(ch, method, properties, body):3 process_message(body)4 ch.basic_ack(delivery_tag=method.delivery_tag) # Acknowledge5
6channel.basic_consume(queue='orders', on_message_callback=callback, auto_ack=False)Benefits:
| Feature | RabbitMQ | Kafka |
|---|---|---|
| Model | Traditional broker | Streaming platform |
| Message Retention | Removed after consumption | Retained (configurable) |
| Routing | Flexible (exchanges) | Simple (topics/partitions) |
| Ordering | Per queue | Per partition |
| Throughput | Good | Excellent |
| Use Case | Task queues, RPC | Event streaming, logs |
Choose RabbitMQ when:
Choose Kafka when:
🔄 Exchanges Route
Exchanges route messages to queues based on routing keys. Direct, topic, fanout, headers.
✅ Manual Ack
Use manual acknowledgment for reliability. Auto-ack removes messages immediately (risky).
💾 Durability
Make queues/exchanges/messages durable to survive broker restart. Critical for production.
📊 Flexible Routing
RabbitMQ’s flexible routing (exchanges) makes it great for complex routing scenarios.