🔄 Idempotency is Critical
Make operations safe to retry. Essential for distributed systems, retries, at-least-once delivery.
In distributed systems, messages/requests can be duplicated:
Problem: Without idempotency, retries create duplicates!
Solution: Idempotency - Make operations safe to retry!
Idempotent operation = Same result when executed multiple times.
f(f(x)) = f(x)
Calling the function twice = calling once.
✅ Idempotent Operations:
GET /users/123 - Always returns same userPUT /users/123 - Replaces user (same result)DELETE /users/123 - Deletes user (same result if already deleted)SET balance = 100 - Sets to 100 (same result)❌ Non-Idempotent Operations:
POST /orders - Creates new order each timebalance = balance + 50 - Adds 50 each timesend_email() - Sends email each timeClient provides unique key. Server checks if seen before.
How it works:
1from typing import Optional, Dict, Any2import hashlib3import json4from datetime import datetime, timedelta5from functools import wraps6
7class IdempotencyStore:8 """Stores idempotency keys and results"""9
10 def __init__(self):11 self.store: Dict[str, Dict[str, Any]] = {}12
13 def get(self, key: str) -> Optional[Dict[str, Any]]:14 """Get cached result for key"""15 entry = self.store.get(key)16
17 if entry:18 # Check if expired19 if datetime.now() > entry['expires_at']:20 del self.store[key]21 return None22
23 return entry['result']24
25 return None26
27 def set(self, key: str, result: Dict[str, Any], ttl_seconds: int = 3600):28 """Store result with key"""29 self.store[key] = {30 'result': result,31 'expires_at': datetime.now() + timedelta(seconds=ttl_seconds)32 }33
34class IdempotentHandler:35 """Handles idempotent operations"""36
37 def __init__(self, idempotency_store: IdempotencyStore):38 self.store = idempotency_store39
40 def handle_request(self, idempotency_key: str, operation: callable, *args, **kwargs):41 """Handle request with idempotency"""42 # Check if key exists43 cached_result = self.store.get(idempotency_key)44
45 if cached_result:46 print(f"Idempotency key {idempotency_key} seen before. Returning cached result.")47 return cached_result48
49 # Execute operation50 try:51 result = operation(*args, **kwargs)52
53 # Store result54 self.store.set(idempotency_key, result)55
56 return result57 except Exception as e:58 # Don't store failed operations59 raise60
61# Decorator for idempotent endpoints62def idempotent(idempotency_store: IdempotencyStore):63 """Decorator for idempotent endpoints"""64 def decorator(func):65 @wraps(func)66 def wrapper(request, *args, **kwargs):67 # Get idempotency key from header68 idempotency_key = request.headers.get('Idempotency-Key')69
70 if not idempotency_key:71 # Generate from request content72 content = json.dumps(request.json or {})73 idempotency_key = hashlib.sha256(content.encode()).hexdigest()74
75 handler = IdempotentHandler(idempotency_store)76
77 return handler.handle_request(78 idempotency_key,79 func,80 request,81 *args,82 **kwargs83 )84 return wrapper85 return decorator86
87# Usage88store = IdempotencyStore()89
90@idempotent(store)91def create_order(request):92 """Create order endpoint"""93 order_data = request.json94 # Create order...95 return {'order_id': 123, 'status': 'created'}96
97# Client sends request with idempotency key98# POST /orders99# Idempotency-Key: abc123100# { "user_id": 456, "amount": 99.99 }1import java.util.*;2import java.util.concurrent.ConcurrentHashMap;3import java.time.LocalDateTime;4import java.time.temporal.ChronoUnit;5
6class IdempotencyStore {7 private final Map<String, IdempotencyEntry> store = new ConcurrentHashMap<>();8
9 public Optional<Map<String, Object>> get(String key) {10 IdempotencyEntry entry = store.get(key);11
12 if (entry != null) {13 // Check if expired14 if (LocalDateTime.now().isAfter(entry.getExpiresAt())) {15 store.remove(key);16 return Optional.empty();17 }18
19 return Optional.of(entry.getResult());20 }21
22 return Optional.empty();23 }24
25 public void set(String key, Map<String, Object> result, int ttlSeconds) {26 store.put(key, new IdempotencyEntry(27 result,28 LocalDateTime.now().plus(ttlSeconds, ChronoUnit.SECONDS)29 ));30 }31}32
33class IdempotentHandler {34 private final IdempotencyStore store;35
36 public Map<String, Object> handleRequest(37 String idempotencyKey,38 java.util.function.Supplier<Map<String, Object>> operation) {39
40 // Check if key exists41 Optional<Map<String, Object>> cached = store.get(idempotencyKey);42
43 if (cached.isPresent()) {44 System.out.println("Idempotency key " + idempotencyKey + " seen before. Returning cached result.");45 return cached.get();46 }47
48 // Execute operation49 try {50 Map<String, Object> result = operation.get();51
52 // Store result53 store.set(idempotencyKey, result, 3600);54
55 return result;56 } catch (Exception e) {57 // Don't store failed operations58 throw e;59 }60 }61}62
63// Usage in controller64@PostMapping("/orders")65public ResponseEntity<Order> createOrder(66 @RequestHeader(value = "Idempotency-Key", required = false) String idempotencyKey,67 @RequestBody OrderRequest request) {68
69 if (idempotencyKey == null) {70 // Generate from request71 idempotencyKey = generateKey(request);72 }73
74 IdempotentHandler handler = new IdempotentHandler(idempotencyStore);75
76 Order order = handler.handleRequest(idempotencyKey, () -> {77 // Create order78 return orderService.createOrder(request);79 });80
81 return ResponseEntity.ok(order);82}Ensures message processed exactly once.
1from typing import Set2import hashlib3import json4from datetime import datetime, timedelta5
6class MessageDeduplicator:7 """Deduplicates messages"""8
9 def __init__(self, ttl_seconds: int = 3600):10 self.processed_messages: Set[str] = set()11 self.message_timestamps: Dict[str, datetime] = {}12 self.ttl_seconds = ttl_seconds13
14 def generate_message_id(self, message: Dict[str, Any]) -> str:15 """Generate unique ID for message"""16 # Use message content + source17 content = json.dumps(message, sort_keys=True)18 return hashlib.sha256(content.encode()).hexdigest()19
20 def is_duplicate(self, message: Dict[str, Any]) -> bool:21 """Check if message is duplicate"""22 message_id = self.generate_message_id(message)23
24 # Clean expired entries25 self._clean_expired()26
27 # Check if seen28 if message_id in self.processed_messages:29 return True30
31 # Mark as processed32 self.processed_messages.add(message_id)33 self.message_timestamps[message_id] = datetime.now()34
35 return False36
37 def _clean_expired(self):38 """Remove expired entries"""39 now = datetime.now()40 expired = [41 msg_id for msg_id, timestamp in self.message_timestamps.items()42 if (now - timestamp).total_seconds() > self.ttl_seconds43 ]44
45 for msg_id in expired:46 self.processed_messages.discard(msg_id)47 del self.message_timestamps[msg_id]48
49class ExactlyOnceProcessor:50 """Processes messages exactly once"""51
52 def __init__(self, deduplicator: MessageDeduplicator):53 self.deduplicator = deduplicator54
55 def process(self, message: Dict[str, Any], handler: callable):56 """Process message exactly once"""57 # Check if duplicate58 if self.deduplicator.is_duplicate(message):59 print("Duplicate message detected. Skipping.")60 return None61
62 # Process message (idempotent handler)63 try:64 result = handler(message)65 return result66 except Exception as e:67 # On error, remove from processed set (allow retry)68 message_id = self.deduplicator.generate_message_id(message)69 self.deduplicator.processed_messages.discard(message_id)70 raise71
72# Usage73deduplicator = MessageDeduplicator(ttl_seconds=3600)74processor = ExactlyOnceProcessor(deduplicator)75
76def handle_order_message(message):77 """Idempotent message handler"""78 order_id = message['order_id']79 # Process order (idempotent operation)80 return process_order(order_id)81
82# Process message83result = processor.process({84 'order_id': 123,85 'user_id': 456,86 'amount': 99.9987}, handle_order_message)88
89# Retry same message (will be skipped)90result = processor.process({91 'order_id': 123,92 'user_id': 456,93 'amount': 99.9994}, handle_order_message) # Returns None (duplicate)1import java.util.*;2import java.util.concurrent.ConcurrentHashMap;3import java.security.MessageDigest;4import java.time.LocalDateTime;5
6class MessageDeduplicator {7 private final Set<String> processedMessages = ConcurrentHashMap.newKeySet();8 private final Map<String, LocalDateTime> messageTimestamps = new ConcurrentHashMap<>();9 private final int ttlSeconds;10
11 public String generateMessageId(Map<String, Object> message) {12 // Generate unique ID from message content13 String content = message.toString(); // Simplified14 return sha256(content);15 }16
17 public boolean isDuplicate(Map<String, Object> message) {18 String messageId = generateMessageId(message);19
20 // Clean expired entries21 cleanExpired();22
23 // Check if seen24 if (processedMessages.contains(messageId)) {25 return true;26 }27
28 // Mark as processed29 processedMessages.add(messageId);30 messageTimestamps.put(messageId, LocalDateTime.now());31
32 return false;33 }34
35 private void cleanExpired() {36 LocalDateTime now = LocalDateTime.now();37 messageTimestamps.entrySet().removeIf(entry ->38 ChronoUnit.SECONDS.between(entry.getValue(), now) > ttlSeconds39 );40 }41}42
43class ExactlyOnceProcessor {44 private final MessageDeduplicator deduplicator;45
46 public Object process(Map<String, Object> message,47 java.util.function.Function<Map<String, Object>, Object> handler) {48 // Check if duplicate49 if (deduplicator.isDuplicate(message)) {50 System.out.println("Duplicate message detected. Skipping.");51 return null;52 }53
54 // Process message55 try {56 return handler.apply(message);57 } catch (Exception e) {58 // On error, remove from processed set59 String messageId = deduplicator.generateMessageId(message);60 deduplicator.processedMessages.remove(messageId);61 throw e;62 }63 }64}For multiple servers, use shared storage (Redis/Database):
1import redis2import json3import hashlib4
5class DistributedIdempotencyStore:6 """Distributed idempotency store using Redis"""7
8 def __init__(self, redis_client: redis.Redis):9 self.redis = redis_client10
11 def check_and_set(self, key: str, operation: callable, ttl_seconds: int = 3600):12 """Check if key exists, execute if not"""13 # Try to set key (only if not exists)14 acquired = self.redis.set(15 f"idempotency:{key}",16 "processing",17 nx=True, # Only set if not exists18 ex=ttl_seconds19 )20
21 if not acquired:22 # Key exists - get cached result23 cached = self.redis.get(f"idempotency:result:{key}")24 if cached:25 return json.loads(cached)26 else:27 # Still processing, wait or return error28 raise Exception("Operation in progress")29
30 try:31 # Execute operation32 result = operation()33
34 # Store result35 self.redis.setex(36 f"idempotency:result:{key}",37 ttl_seconds,38 json.dumps(result)39 )40
41 # Mark as completed42 self.redis.setex(43 f"idempotency:{key}",44 ttl_seconds,45 "completed"46 )47
48 return result49 except Exception as e:50 # On error, remove key (allow retry)51 self.redis.delete(f"idempotency:{key}")52 raise53
54# Usage55redis_client = redis.Redis(host='localhost', port=6379)56store = DistributedIdempotencyStore(redis_client)57
58def create_order(order_data):59 # Create order logic...60 return {'order_id': 123}61
62# Process with idempotency63idempotency_key = "order-123-abc"64result = store.check_and_set(65 idempotency_key,66 lambda: create_order(order_data),67 ttl_seconds=360068)1import redis.clients.jedis.Jedis;2import redis.clients.jedis.JedisPool;3import com.fasterxml.jackson.databind.ObjectMapper;4
5class DistributedIdempotencyStore {6 private final JedisPool jedisPool;7 private final ObjectMapper objectMapper = new ObjectMapper();8
9 public <T> T checkAndSet(String key, java.util.function.Supplier<T> operation,10 int ttlSeconds) throws Exception {11 try (Jedis jedis = jedisPool.getResource()) {12 // Try to set key (only if not exists)13 String lockKey = "idempotency:" + key;14 String result = jedis.set(lockKey, "processing", "NX", "EX", ttlSeconds);15
16 if (!"OK".equals(result)) {17 // Key exists - get cached result18 String cached = jedis.get("idempotency:result:" + key);19 if (cached != null) {20 return (T) objectMapper.readValue(cached, Object.class);21 } else {22 throw new Exception("Operation in progress");23 }24 }25
26 try {27 // Execute operation28 T result = operation.get();29
30 // Store result31 jedis.setex(32 "idempotency:result:" + key,33 ttlSeconds,34 objectMapper.writeValueAsString(result)35 );36
37 // Mark as completed38 jedis.setex(lockKey, ttlSeconds, "completed");39
40 return result;41 } catch (Exception e) {42 // On error, remove key43 jedis.del(lockKey);44 throw e;45 }46 }47 }48}Always provide idempotency keys for mutating operations:
1POST /orders2Idempotency-Key: abc123-xyz7893Content-Type: application/json4
5{ "user_id": 456, "amount": 99.99 }Client generates unique key:
Cache results with keys:
Always check key before processing:
Design operations to be idempotent:
🔄 Idempotency is Critical
Make operations safe to retry. Essential for distributed systems, retries, at-least-once delivery.
🔑 Idempotency Keys
Use unique keys to detect duplicates. Check before processing. Cache results. Return cached on duplicate.
✅ Exactly-Once
Exactly-once requires idempotency + deduplication. Track processed messages. Use distributed storage for multiple servers.
💾 Store Results
Cache successful results with keys. Don’t cache failures. Set TTL. Check before processing.