📜 Event Sourcing
Store all changes as events. State = replay of events. Enables audit trail, time travel, replay.
Event-Driven Architecture (EDA) = Systems communicate through events (things that happened).
Store all changes as sequence of events. State = replay of events.
Problem: Lost history! Can’t see how state changed.
Benefits:
1from typing import List, Dict, Any2from datetime import datetime3from dataclasses import dataclass4from enum import Enum5
6class EventType(Enum):7 USER_CREATED = "user.created"8 BALANCE_ADDED = "balance.added"9 BALANCE_SUBTRACTED = "balance.subtracted"10
11@dataclass12class Event:13 """Base event class"""14 event_id: str15 event_type: EventType16 aggregate_id: str17 data: Dict[str, Any]18 timestamp: datetime19 version: int20
21class EventStore:22 """Event store - append-only log"""23
24 def __init__(self):25 self.events: List[Event] = []26
27 def append(self, event: Event):28 """Append event (immutable)"""29 self.events.append(event)30
31 def get_events(self, aggregate_id: str) -> List[Event]:32 """Get all events for aggregate"""33 return [e for e in self.events if e.aggregate_id == aggregate_id]34
35 def replay(self, aggregate_id: str) -> Dict[str, Any]:36 """Replay events to rebuild state"""37 events = self.get_events(aggregate_id)38 state = {}39
40 for event in events:41 if event.event_type == EventType.USER_CREATED:42 state['id'] = event.data['user_id']43 state['balance'] = 044 state['status'] = 'active'45 elif event.event_type == EventType.BALANCE_ADDED:46 state['balance'] = state.get('balance', 0) + event.data['amount']47 elif event.event_type == EventType.BALANCE_SUBTRACTED:48 state['balance'] = state.get('balance', 0) - event.data['amount']49
50 return state51
52class UserAggregate:53 """User aggregate with event sourcing"""54
55 def __init__(self, event_store: EventStore):56 self.event_store = event_store57
58 def create_user(self, user_id: str, initial_balance: int = 0):59 """Create user (command)"""60 event = Event(61 event_id=f"evt-{datetime.now().timestamp()}",62 event_type=EventType.USER_CREATED,63 aggregate_id=user_id,64 data={'user_id': user_id, 'initial_balance': initial_balance},65 timestamp=datetime.now(),66 version=167 )68 self.event_store.append(event)69
70 def add_balance(self, user_id: str, amount: int):71 """Add balance (command)"""72 event = Event(73 event_id=f"evt-{datetime.now().timestamp()}",74 event_type=EventType.BALANCE_ADDED,75 aggregate_id=user_id,76 data={'amount': amount},77 timestamp=datetime.now(),78 version=self._get_next_version(user_id)79 )80 self.event_store.append(event)81
82 def get_user_state(self, user_id: str) -> Dict[str, Any]:83 """Get current state (query) - replay events"""84 return self.event_store.replay(user_id)85
86 def _get_next_version(self, aggregate_id: str) -> int:87 """Get next version number"""88 events = self.event_store.get_events(aggregate_id)89 return len(events) + 190
91# Usage92event_store = EventStore()93user_aggregate = UserAggregate(event_store)94
95# Create user96user_aggregate.create_user('user-123', initial_balance=0)97
98# Add balance99user_aggregate.add_balance('user-123', 50)100user_aggregate.add_balance('user-123', 100)101user_aggregate.add_balance('user-123', -30)102
103# Get current state (replay events)104state = user_aggregate.get_user_state('user-123')105print(f"Current balance: {state['balance']}") # 120106
107# Get full history108events = event_store.get_events('user-123')109for event in events:110 print(f"{event.event_type}: {event.data}")1import java.time.LocalDateTime;2import java.util.*;3
4class Event {5 private final String eventId;6 private final EventType eventType;7 private final String aggregateId;8 private final Map<String, Object> data;9 private final LocalDateTime timestamp;10 private final int version;11
12 // Constructor, getters...13}14
15enum EventType {16 USER_CREATED, BALANCE_ADDED, BALANCE_SUBTRACTED17}18
19class EventStore {20 private final List<Event> events = new ArrayList<>();21
22 public void append(Event event) {23 events.add(event);24 }25
26 public List<Event> getEvents(String aggregateId) {27 return events.stream()28 .filter(e -> e.getAggregateId().equals(aggregateId))29 .collect(Collectors.toList());30 }31
32 public Map<String, Object> replay(String aggregateId) {33 List<Event> events = getEvents(aggregateId);34 Map<String, Object> state = new HashMap<>();35
36 for (Event event : events) {37 switch (event.getEventType()) {38 case USER_CREATED:39 state.put("id", event.getData().get("user_id"));40 state.put("balance", 0);41 state.put("status", "active");42 break;43 case BALANCE_ADDED:44 state.put("balance",45 (Integer) state.getOrDefault("balance", 0) +46 (Integer) event.getData().get("amount"));47 break;48 case BALANCE_SUBTRACTED:49 state.put("balance",50 (Integer) state.getOrDefault("balance", 0) -51 (Integer) event.getData().get("amount"));52 break;53 }54 }55
56 return state;57 }58}59
60class UserAggregate {61 private final EventStore eventStore;62
63 public void createUser(String userId, int initialBalance) {64 Event event = new Event(65 UUID.randomUUID().toString(),66 EventType.USER_CREATED,67 userId,68 Map.of("user_id", userId, "initial_balance", initialBalance),69 LocalDateTime.now(),70 171 );72 eventStore.append(event);73 }74
75 public Map<String, Object> getUserState(String userId) {76 return eventStore.replay(userId);77 }78}Separate read and write models.
Benefits:
1class WriteModel:2 """Write model - handles commands"""3
4 def __init__(self, event_store):5 self.event_store = event_store6
7 def create_user(self, user_id: str, email: str):8 """Command: Create user"""9 event = Event(10 event_id=generate_id(),11 event_type=EventType.USER_CREATED,12 aggregate_id=user_id,13 data={'user_id': user_id, 'email': email},14 timestamp=datetime.now(),15 version=116 )17 self.event_store.append(event)18 # Publish event for read model update19 event_bus.publish(event)20
21class ReadModel:22 """Read model - optimized for queries"""23
24 def __init__(self):25 self.users = {} # Denormalized, optimized for reads26
27 def handle_user_created(self, event: Event):28 """Projection: Update read model"""29 self.users[event.aggregate_id] = {30 'id': event.data['user_id'],31 'email': event.data['email'],32 'created_at': event.timestamp33 }34
35 def get_user(self, user_id: str) -> Dict:36 """Query: Get user (fast!)"""37 return self.users.get(user_id)38
39 def list_users(self) -> List[Dict]:40 """Query: List users (fast!)"""41 return list(self.users.values())42
43# Usage44event_store = EventStore()45write_model = WriteModel(event_store)46read_model = ReadModel()47
48# Subscribe read model to events49event_bus.subscribe(EventType.USER_CREATED, read_model.handle_user_created)50
51# Write (command)52write_model.create_user('user-123', 'john@example.com')53
54# Read (query) - fast, optimized55user = read_model.get_user('user-123')56users = read_model.list_users()1class WriteModel {2 private final EventStore eventStore;3 private final EventBus eventBus;4
5 public void createUser(String userId, String email) {6 Event event = new Event(7 UUID.randomUUID().toString(),8 EventType.USER_CREATED,9 userId,10 Map.of("user_id", userId, "email", email),11 LocalDateTime.now(),12 113 );14 eventStore.append(event);15 eventBus.publish(event);16 }17}18
19class ReadModel {20 private final Map<String, UserDTO> users = new HashMap<>();21
22 public void handleUserCreated(Event event) {23 users.put(event.getAggregateId(), new UserDTO(24 event.getData().get("user_id"),25 event.getData().get("email"),26 event.getTimestamp()27 ));28 }29
30 public UserDTO getUser(String userId) {31 return users.get(userId);32 }33
34 public List<UserDTO> listUsers() {35 return new ArrayList<>(users.values());36 }37}Rebuild state by replaying events.
Problem: Replaying millions of events is slow!
Solution: Snapshots!
Strategy:
📜 Event Sourcing
Store all changes as events. State = replay of events. Enables audit trail, time travel, replay.
🔄 CQRS
Separate read and write models. Optimize independently. Scale independently. Better performance.
⏪ Event Replay
Replay events to rebuild state. Use snapshots for performance. Enables debugging, testing, time travel.
🎯 Events are Facts
Events represent something that happened. Immutable. Past tense. Full history of changes.