🔄 Saga Pattern
Sequence of local transactions with compensation. No distributed locks, better scalability.
Problem: How to maintain consistency across multiple services?
Traditional solution: 2PC (Two-Phase Commit) - but it’s:
Solution: Saga Pattern - Sequence of local transactions with compensation!
Saga = Sequence of local transactions. If any step fails, compensate previous steps.
Central coordinator orchestrates all steps.
Characteristics:
1from enum import Enum2from typing import List, Callable, Dict, Any3from dataclasses import dataclass4
5class SagaStatus(Enum):6 PENDING = "pending"7 IN_PROGRESS = "in_progress"8 COMPENSATING = "compensating"9 COMPLETED = "completed"10 FAILED = "failed"11
12@dataclass13class SagaStep:14 """Saga step definition"""15 name: str16 execute: Callable # Step execution function17 compensate: Callable # Compensation function18 service: str19
20class SagaOrchestrator:21 """Saga orchestrator"""22
23 def __init__(self):24 self.steps: List[SagaStep] = []25 self.executed_steps: List[SagaStep] = []26 self.status = SagaStatus.PENDING27
28 def add_step(self, step: SagaStep):29 """Add step to saga"""30 self.steps.append(step)31
32 def execute(self, context: Dict[str, Any]) -> bool:33 """Execute saga"""34 self.status = SagaStatus.IN_PROGRESS35
36 try:37 for step in self.steps:38 print(f"Executing step: {step.name}")39
40 # Execute step41 result = step.execute(context)42
43 if not result:44 # Step failed - compensate45 print(f"Step {step.name} failed. Compensating...")46 self._compensate()47 return False48
49 # Track executed step50 self.executed_steps.append(step)51 context[f"{step.name}_result"] = result52
53 # All steps succeeded54 self.status = SagaStatus.COMPLETED55 return True56
57 except Exception as e:58 print(f"Saga failed: {e}")59 self._compensate()60 return False61
62 def _compensate(self):63 """Compensate executed steps (in reverse order)"""64 self.status = SagaStatus.COMPENSATING65
66 # Compensate in reverse order67 for step in reversed(self.executed_steps):68 try:69 print(f"Compensating step: {step.name}")70 step.compensate(step)71 except Exception as e:72 print(f"Compensation failed for {step.name}: {e}")73 # Log but continue compensating74
75 self.status = SagaStatus.FAILED76
77# Example: Create Order Saga78def create_order(context):79 print("Creating order...")80 # Call order service81 return {'order_id': 123}82
83def cancel_order(step):84 print("Cancelling order...")85 # Call order service to cancel86
87def reserve_inventory(context):88 print("Reserving inventory...")89 # Call inventory service90 return {'reservation_id': 456}91
92def release_inventory(step):93 print("Releasing inventory...")94 # Call inventory service to release95
96def charge_payment(context):97 print("Charging payment...")98 # Call payment service99 return False # Simulate failure100
101def refund_payment(step):102 print("Refunding payment...")103 # Call payment service to refund104
105# Create saga106orchestrator = SagaOrchestrator()107orchestrator.add_step(SagaStep(108 name='create_order',109 execute=create_order,110 compensate=cancel_order,111 service='order-service'112))113orchestrator.add_step(SagaStep(114 name='reserve_inventory',115 execute=reserve_inventory,116 compensate=release_inventory,117 service='inventory-service'118))119orchestrator.add_step(SagaStep(120 name='charge_payment',121 execute=charge_payment,122 compensate=refund_payment,123 service='payment-service'124))125
126# Execute saga127success = orchestrator.execute({})1import java.util.*;2
3enum SagaStatus {4 PENDING, IN_PROGRESS, COMPENSATING, COMPLETED, FAILED5}6
7class SagaStep {8 private final String name;9 private final Function<Map<String, Object>, Object> execute;10 private final Consumer<SagaStep> compensate;11 private final String service;12
13 // Constructor, getters...14}15
16class SagaOrchestrator {17 private final List<SagaStep> steps = new ArrayList<>();18 private final List<SagaStep> executedSteps = new ArrayList<>();19 private SagaStatus status = SagaStatus.PENDING;20
21 public void addStep(SagaStep step) {22 steps.add(step);23 }24
25 public boolean execute(Map<String, Object> context) {26 status = SagaStatus.IN_PROGRESS;27
28 try {29 for (SagaStep step : steps) {30 System.out.println("Executing step: " + step.getName());31
32 // Execute step33 Object result = step.getExecute().apply(context);34
35 if (result == null) {36 // Step failed - compensate37 System.out.println("Step " + step.getName() + " failed. Compensating...");38 compensate();39 return false;40 }41
42 // Track executed step43 executedSteps.add(step);44 context.put(step.getName() + "_result", result);45 }46
47 // All steps succeeded48 status = SagaStatus.COMPLETED;49 return true;50 } catch (Exception e) {51 System.err.println("Saga failed: " + e.getMessage());52 compensate();53 return false;54 }55 }56
57 private void compensate() {58 status = SagaStatus.COMPENSATING;59
60 // Compensate in reverse order61 Collections.reverse(executedSteps);62 for (SagaStep step : executedSteps) {63 try {64 System.out.println("Compensating step: " + step.getName());65 step.getCompensate().accept(step);66 } catch (Exception e) {67 System.err.println("Compensation failed for " + step.getName() + ": " + e.getMessage());68 }69 }70
71 status = SagaStatus.FAILED;72 }73}Distributed coordination. Each service knows next step.
Characteristics:
1class OrderService:2 """Order service - starts saga"""3
4 def create_order(self, order_data):5 """Create order and publish event"""6 # Create order (local transaction)7 order = self._create_order_local(order_data)8
9 # Publish event10 event_bus.publish('OrderCreated', {11 'order_id': order.id,12 'user_id': order.user_id,13 'amount': order.amount14 })15
16 return order17
18 def handle_payment_failed(self, event):19 """Compensate: Cancel order"""20 order_id = event['order_id']21 self._cancel_order(order_id)22
23class InventoryService:24 """Inventory service - listens to OrderCreated"""25
26 def handle_order_created(self, event):27 """Reserve inventory"""28 try:29 # Reserve inventory (local transaction)30 reservation = self._reserve_inventory_local(31 event['order_id'],32 event['items']33 )34
35 # Publish success event36 event_bus.publish('InventoryReserved', {37 'order_id': event['order_id'],38 'reservation_id': reservation.id39 })40 except Exception as e:41 # Publish failure event42 event_bus.publish('InventoryReservationFailed', {43 'order_id': event['order_id'],44 'error': str(e)45 })46
47 def handle_payment_failed(self, event):48 """Compensate: Release inventory"""49 order_id = event['order_id']50 self._release_inventory(order_id)51
52class PaymentService:53 """Payment service - listens to InventoryReserved"""54
55 def handle_inventory_reserved(self, event):56 """Charge payment"""57 try:58 # Charge payment (local transaction)59 payment = self._charge_payment_local(60 event['order_id'],61 event['amount']62 )63
64 # Publish success event65 event_bus.publish('PaymentCharged', {66 'order_id': event['order_id'],67 'payment_id': payment.id68 })69 except Exception as e:70 # Publish failure event - triggers compensation71 event_bus.publish('PaymentFailed', {72 'order_id': event['order_id'],73 'error': str(e)74 })75
76# Setup event subscriptions77event_bus.subscribe('OrderCreated', inventory_service.handle_order_created)78event_bus.subscribe('InventoryReserved', payment_service.handle_inventory_reserved)79event_bus.subscribe('PaymentFailed', order_service.handle_payment_failed)80event_bus.subscribe('PaymentFailed', inventory_service.handle_payment_failed)1class OrderService {2 public void createOrder(OrderData orderData) {3 // Create order (local transaction)4 Order order = createOrderLocal(orderData);5
6 // Publish event7 eventBus.publish("OrderCreated", Map.of(8 "order_id", order.getId(),9 "user_id", order.getUserId(),10 "amount", order.getAmount()11 ));12 }13
14 @EventHandler15 public void handlePaymentFailed(PaymentFailedEvent event) {16 // Compensate: Cancel order17 cancelOrder(event.getOrderId());18 }19}20
21class InventoryService {22 @EventHandler23 public void handleOrderCreated(OrderCreatedEvent event) {24 try {25 // Reserve inventory (local transaction)26 Reservation reservation = reserveInventoryLocal(27 event.getOrderId(),28 event.getItems()29 );30
31 // Publish success event32 eventBus.publish("InventoryReserved", Map.of(33 "order_id", event.getOrderId(),34 "reservation_id", reservation.getId()35 ));36 } catch (Exception e) {37 // Publish failure event38 eventBus.publish("InventoryReservationFailed", Map.of(39 "order_id", event.getOrderId(),40 "error", e.getMessage()41 ));42 }43 }44
45 @EventHandler46 public void handlePaymentFailed(PaymentFailedEvent event) {47 // Compensate: Release inventory48 releaseInventory(event.getOrderId());49 }50}| Aspect | Orchestrator | Choreography |
|---|---|---|
| Control | Centralized | Distributed |
| Complexity | Lower | Higher |
| Coupling | Higher | Lower |
| Debugging | Easier | Harder |
| Scalability | Lower | Higher |
| Use Case | Complex workflows | Simple workflows |
Choose Orchestrator when:
Choose Choreography when:
Undo the action (if possible):
Opposite transaction:
Automatic expiration:
🔄 Saga Pattern
Sequence of local transactions with compensation. No distributed locks, better scalability.
🎭 Orchestrator
Central coordinator. Easier to understand and debug. Good for complex workflows.
💃 Choreography
Distributed coordination. More decoupled and scalable. Good for simple workflows.
↩️ Compensation
Undo completed steps when saga fails. Not rollback, but compensating action. Critical for consistency.