🔄 WebSocket: Full-Duplex
WebSocket provides bidirectional, persistent connection. Best for chat, gaming, real-time collaboration.
Traditional HTTP: Client requests, server responds. Client must poll for updates.
Problems:
Solution: Real-time communication - Server pushes updates to client!
Bidirectional, persistent connection.
Characteristics:
Server-to-client streaming over HTTP.
Characteristics:
Hold request open until data available.
Characteristics:
1. Handshake (HTTP Upgrade):
1GET /ws HTTP/1.12Host: example.com3Upgrade: websocket4Connection: Upgrade5Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==6Sec-WebSocket-Version: 13Server responds:
1HTTP/1.1 101 Switching Protocols2Upgrade: websocket3Connection: Upgrade4Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=2. Connection Established - Now bidirectional!
1import asyncio2import websockets3import json4
5class WebSocketServer:6 def __init__(self):7 self.clients = set()8
9 async def register_client(self, websocket):10 """Register new client"""11 self.clients.add(websocket)12 print(f"Client connected. Total: {len(self.clients)}")13
14 async def unregister_client(self, websocket):15 """Unregister client"""16 self.clients.discard(websocket)17 print(f"Client disconnected. Total: {len(self.clients)}")18
19 async def handle_client(self, websocket, path):20 """Handle client connection"""21 await self.register_client(websocket)22
23 try:24 async for message in websocket:25 # Handle incoming message26 data = json.loads(message)27 await self.process_message(websocket, data)28
29 except websockets.exceptions.ConnectionClosed:30 pass31 finally:32 await self.unregister_client(websocket)33
34 async def process_message(self, websocket, data):35 """Process client message"""36 message_type = data.get('type')37
38 if message_type == 'chat':39 # Broadcast to all clients40 await self.broadcast({41 'type': 'chat',42 'user': data.get('user'),43 'message': data.get('message'),44 'timestamp': data.get('timestamp')45 })46
47 elif message_type == 'ping':48 # Respond to ping49 await websocket.send(json.dumps({'type': 'pong'}))50
51 async def broadcast(self, message):52 """Broadcast message to all clients"""53 if self.clients:54 message_json = json.dumps(message)55 await asyncio.gather(56 *[client.send(message_json) for client in self.clients],57 return_exceptions=True58 )59
60 async def send_to_client(self, websocket, message):61 """Send message to specific client"""62 try:63 await websocket.send(json.dumps(message))64 except websockets.exceptions.ConnectionClosed:65 await self.unregister_client(websocket)66
67# Start server68server = WebSocketServer()69start_server = websockets.serve(70 server.handle_client,71 "localhost",72 876573)74
75asyncio.get_event_loop().run_until_complete(start_server)76asyncio.get_event_loop().run_forever()1import org.java_websocket.WebSocket;2import org.java_websocket.handshake.ClientHandshake;3import org.java_websocket.server.WebSocketServer;4import java.net.InetSocketAddress;5import java.util.Collections;6import java.util.Set;7import java.util.concurrent.CopyOnWriteArraySet;8
9public class ChatServer extends WebSocketServer {10 private final Set<WebSocket> clients = new CopyOnWriteArraySet<>();11
12 public ChatServer(int port) {13 super(new InetSocketAddress(port));14 }15
16 @Override17 public void onOpen(WebSocket conn, ClientHandshake handshake) {18 // Register new client19 clients.add(conn);20 System.out.println("Client connected. Total: " + clients.size());21 }22
23 @Override24 public void onClose(WebSocket conn, int code, String reason, boolean remote) {25 // Unregister client26 clients.remove(conn);27 System.out.println("Client disconnected. Total: " + clients.size());28 }29
30 @Override31 public void onMessage(WebSocket conn, String message) {32 // Handle incoming message33 try {34 JSONObject data = new JSONObject(message);35 String type = data.getString("type");36
37 if ("chat".equals(type)) {38 // Broadcast to all clients39 broadcast(message);40 } else if ("ping".equals(type)) {41 // Respond to ping42 conn.send("{\"type\":\"pong\"}");43 }44 } catch (JSONException e) {45 e.printStackTrace();46 }47 }48
49 @Override50 public void onError(WebSocket conn, Exception ex) {51 ex.printStackTrace();52 }53
54 public void broadcast(String message) {55 // Broadcast to all clients56 for (WebSocket client : clients) {57 client.send(message);58 }59 }60
61 public static void main(String[] args) {62 ChatServer server = new ChatServer(8765);63 server.start();64 }65}1class WebSocketClient {2 constructor(url) {3 this.url = url;4 this.ws = null;5 this.reconnectInterval = 1000;6 this.maxReconnectAttempts = 5;7 this.reconnectAttempts = 0;8 }9
10 connect() {11 this.ws = new WebSocket(this.url);12
13 this.ws.onopen = () => {14 console.log('WebSocket connected');15 this.reconnectAttempts = 0;16 this.onOpen();17 };18
19 this.ws.onmessage = (event) => {20 const data = JSON.parse(event.data);21 this.onMessage(data);22 };23
24 this.ws.onerror = (error) => {25 console.error('WebSocket error:', error);26 this.onError(error);27 };28
29 this.ws.onclose = () => {30 console.log('WebSocket closed');31 this.onClose();32 this.reconnect();33 };34 }35
36 send(message) {37 if (this.ws && this.ws.readyState === WebSocket.OPEN) {38 this.ws.send(JSON.stringify(message));39 } else {40 console.error('WebSocket not connected');41 }42 }43
44 reconnect() {45 if (this.reconnectAttempts < this.maxReconnectAttempts) {46 this.reconnectAttempts++;47 setTimeout(() => {48 console.log(`Reconnecting... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);49 this.connect();50 }, this.reconnectInterval * this.reconnectAttempts);51 }52 }53
54 onOpen() {55 // Override in subclass56 }57
58 onMessage(data) {59 // Override in subclass60 }61
62 onError(error) {63 // Override in subclass64 }65
66 onClose() {67 // Override in subclass68 }69
70 close() {71 if (this.ws) {72 this.ws.close();73 }74 }75}76
77// Usage78const client = new WebSocketClient('ws://localhost:8765');79
80client.onMessage = (data) => {81 if (data.type === 'chat') {82 console.log(`${data.user}: ${data.message}`);83 }84};85
86client.connect();87
88// Send message89client.send({90 type: 'chat',91 user: 'John',92 message: 'Hello!',93 timestamp: Date.now()94});Client opens HTTP connection, server streams events:
1GET /events HTTP/1.12Host: example.com3Accept: text/event-stream4Cache-Control: no-cacheServer responds with stream:
1HTTP/1.1 200 OK2Content-Type: text/event-stream3Cache-Control: no-cache4Connection: keep-alive5
6event: message7data: Hello World8
9event: update10data: {"user": "John", "status": "online"}11
12event: message13data: Goodbye1from flask import Flask, Response, jsonify2import json3import time4import threading5
6app = Flask(__name__)7
8@app.route('/events')9def stream_events():10 """SSE endpoint"""11 def event_stream():12 while True:13 # Send event14 data = {15 'timestamp': time.time(),16 'message': 'Server update'17 }18 yield f"event: update\ndata: {json.dumps(data)}\n\n"19
20 time.sleep(1) # Send every second21
22 return Response(23 event_stream(),24 mimetype='text/event-stream',25 headers={26 'Cache-Control': 'no-cache',27 'Connection': 'keep-alive',28 'X-Accel-Buffering': 'no' # Disable buffering in nginx29 }30 )31
32@app.route('/notify')33def notify():34 """Trigger notification"""35 # In real app, this would trigger event36 return jsonify({'status': 'notification sent'})37
38if __name__ == '__main__':39 app.run(threaded=True)1import org.springframework.http.MediaType;2import org.springframework.web.bind.annotation.*;3import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;4import java.io.IOException;5import java.util.concurrent.Executors;6import java.util.concurrent.ScheduledExecutorService;7import java.util.concurrent.TimeUnit;8
9@RestController10public class SSEServer {11 private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);12
13 @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)14 public SseEmitter streamEvents() {15 SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);16
17 executor.scheduleAtFixedRate(() -> {18 try {19 SseEmitter.SseEventBuilder event = SseEmitter.event()20 .name("update")21 .data(Map.of("timestamp", System.currentTimeMillis(), "message", "Server update"));22
23 emitter.send(event);24 } catch (IOException e) {25 emitter.completeWithError(e);26 }27 }, 0, 1, TimeUnit.SECONDS);28
29 emitter.onCompletion(() -> executor.shutdown());30 emitter.onTimeout(() -> executor.shutdown());31
32 return emitter;33 }34}1class SSEClient {2 constructor(url) {3 this.url = url;4 this.eventSource = null;5 }6
7 connect() {8 this.eventSource = new EventSource(this.url);9
10 this.eventSource.onopen = () => {11 console.log('SSE connected');12 this.onOpen();13 };14
15 this.eventSource.onmessage = (event) => {16 const data = JSON.parse(event.data);17 this.onMessage(data);18 };19
20 // Listen for specific event types21 this.eventSource.addEventListener('update', (event) => {22 const data = JSON.parse(event.data);23 this.onUpdate(data);24 });25
26 this.eventSource.onerror = (error) => {27 console.error('SSE error:', error);28 this.onError(error);29 };30 }31
32 onOpen() {33 // Override in subclass34 }35
36 onMessage(data) {37 // Override in subclass38 }39
40 onUpdate(data) {41 // Override in subclass42 }43
44 onError(error) {45 // Override in subclass46 }47
48 close() {49 if (this.eventSource) {50 this.eventSource.close();51 }52 }53}54
55// Usage56const client = new SSEClient('/events');57
58client.onUpdate = (data) => {59 console.log('Update:', data);60};61
62client.connect();Client sends request, server holds it open:
1from flask import Flask, jsonify, request2import time3import threading4
5app = Flask(__name__)6pending_requests = []7
8@app.route('/poll')9def poll():10 """Long polling endpoint"""11 timeout = int(request.args.get('timeout', 30))12 last_id = int(request.args.get('last_id', 0))13
14 # Check for new data15 new_data = get_data_since(last_id)16
17 if new_data:18 return jsonify(new_data)19
20 # No data - hold request21 event = threading.Event()22 pending_requests.append({23 'event': event,24 'last_id': last_id,25 'timeout': timeout26 })27
28 # Wait for data or timeout29 if event.wait(timeout):30 new_data = get_data_since(last_id)31 return jsonify(new_data)32 else:33 return jsonify({'status': 'timeout'}), 20034
35@app.route('/notify')36def notify():37 """Trigger notification"""38 # Wake up pending requests39 for req in pending_requests:40 req['event'].set()41
42 pending_requests.clear()43 return jsonify({'status': 'notified'})1import org.springframework.web.bind.annotation.*;2import org.springframework.http.ResponseEntity;3import java.util.concurrent.CompletableFuture;4import java.util.concurrent.TimeUnit;5
6@RestController7public class LongPollingServer {8
9 @GetMapping("/poll")10 public CompletableFuture<ResponseEntity<Map<String, Object>>> poll(11 @RequestParam(defaultValue = "0") int lastId,12 @RequestParam(defaultValue = "30") int timeout) {13
14 // Check for new data15 List<Data> newData = getDataSince(lastId);16
17 if (!newData.isEmpty()) {18 return CompletableFuture.completedFuture(19 ResponseEntity.ok(Map.of("data", newData))20 );21 }22
23 // No data - wait for new data or timeout24 return CompletableFuture.supplyAsync(() -> {25 try {26 // Wait for data or timeout27 Thread.sleep(timeout * 1000);28
29 newData = getDataSince(lastId);30 if (!newData.isEmpty()) {31 return ResponseEntity.ok(Map.of("data", newData));32 } else {33 return ResponseEntity.ok(Map.of("status", "timeout"));34 }35 } catch (InterruptedException e) {36 Thread.currentThread().interrupt();37 return ResponseEntity.ok(Map.of("status", "interrupted"));38 }39 });40 }41}| Pattern | Use When | Don’t Use When |
|---|---|---|
| WebSocket | Bidirectional needed, low latency, high frequency | Simple one-way updates, HTTP-only environments |
| SSE | Server-to-client only, simple implementation, HTTP-based | Bidirectional needed, client-to-server messages |
| Long Polling | WebSocket/SSE not available, simple use case | High frequency, low latency needed |
1Need bidirectional?2 Yes → WebSocket3 No → Need low latency?4 Yes → SSE5 No → Long Polling (fallback)Managing connections is critical:
1from typing import Dict, Set2import asyncio3from datetime import datetime, timedelta4
5class ConnectionManager:6 """Manages WebSocket connections"""7
8 def __init__(self):9 self.connections: Dict[str, Set] = {}10 self.heartbeat_interval = 30 # seconds11 self.connection_timeout = 60 # seconds12
13 async def add_connection(self, connection_id: str, websocket):14 """Add new connection"""15 if connection_id not in self.connections:16 self.connections[connection_id] = set()17
18 self.connections[connection_id].add(websocket)19
20 # Start heartbeat21 asyncio.create_task(self.heartbeat(websocket))22
23 async def remove_connection(self, connection_id: str, websocket):24 """Remove connection"""25 if connection_id in self.connections:26 self.connections[connection_id].discard(websocket)27 if not self.connections[connection_id]:28 del self.connections[connection_id]29
30 async def heartbeat(self, websocket):31 """Send heartbeat to keep connection alive"""32 try:33 while True:34 await asyncio.sleep(self.heartbeat_interval)35 await websocket.send(json.dumps({'type': 'ping'}))36 except:37 await self.remove_connection('unknown', websocket)38
39 async def broadcast(self, message, connection_id: str = None):40 """Broadcast message"""41 targets = self.connections.get(connection_id, set()) if connection_id else set().union(*self.connections.values())42
43 disconnected = set()44 for ws in targets:45 try:46 await ws.send(json.dumps(message))47 except:48 disconnected.add(ws)49
50 # Clean up disconnected51 for ws in disconnected:52 await self.remove_connection('unknown', ws)1import java.util.concurrent.ConcurrentHashMap;2import java.util.Set;3import java.util.concurrent.CopyOnWriteArraySet;4import java.util.concurrent.ScheduledExecutorService;5import java.util.concurrent.Executors;6import java.util.concurrent.TimeUnit;7
8public class ConnectionManager {9 private final Map<String, Set<WebSocket>> connections = new ConcurrentHashMap<>();10 private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);11 private final int heartbeatInterval = 30; // seconds12
13 public void addConnection(String connectionId, WebSocket websocket) {14 connections.computeIfAbsent(connectionId, k -> new CopyOnWriteArraySet<>())15 .add(websocket);16
17 // Start heartbeat18 scheduler.scheduleAtFixedRate(() -> {19 try {20 websocket.send("{\"type\":\"ping\"}");21 } catch (Exception e) {22 removeConnection(connectionId, websocket);23 }24 }, heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS);25 }26
27 public void removeConnection(String connectionId, WebSocket websocket) {28 Set<WebSocket> conns = connections.get(connectionId);29 if (conns != null) {30 conns.remove(websocket);31 if (conns.isEmpty()) {32 connections.remove(connectionId);33 }34 }35 }36
37 public void broadcast(String message, String connectionId) {38 Set<WebSocket> targets = connectionId != null39 ? connections.getOrDefault(connectionId, Collections.emptySet())40 : connections.values().stream()41 .flatMap(Set::stream)42 .collect(Collectors.toSet());43
44 for (WebSocket ws : targets) {45 try {46 ws.send(message);47 } catch (Exception e) {48 removeConnection(connectionId, ws);49 }50 }51 }52}🔄 WebSocket: Full-Duplex
WebSocket provides bidirectional, persistent connection. Best for chat, gaming, real-time collaboration.
📡 SSE: Simple Streaming
SSE is HTTP-based, server-to-client streaming. Simpler than WebSocket, automatic reconnection.
⏳ Long Polling: Fallback
Long polling holds requests open. Use when WebSocket/SSE not available. Less efficient but works everywhere.
🔌 Connection Management
Manage connections carefully: heartbeat, cleanup, reconnection. Critical for production systems.