Concurrent Collections
Synchronized vs Concurrent Collections
Section titled “Synchronized vs Concurrent Collections”Understanding the difference is crucial for performance!
Visual: Lock Granularity
Section titled “Visual: Lock Granularity”Java Concurrent Collections
Section titled “Java Concurrent Collections”ConcurrentHashMap
Section titled “ConcurrentHashMap”ConcurrentHashMap provides thread-safe hash map operations with high concurrency.
How It Works
Section titled “How It Works”Java 7: Segment locking (16 segments by default) Java 8+: CAS operations + synchronized blocks for individual buckets
Visual: ConcurrentHashMap Architecture
Section titled “Visual: ConcurrentHashMap Architecture”Example: Thread-Safe Cache
Section titled “Example: Thread-Safe Cache”import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapCache<K, V> { private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
public V get(K key) { return cache.get(key); // Thread-safe read }
public void put(K key, V value) { cache.put(key, value); // Thread-safe write }
// Atomic operations public V putIfAbsent(K key, V value) { return cache.putIfAbsent(key, value); // Atomic! }
public boolean remove(K key, V value) { return cache.remove(key, value); // Atomic! }
public V computeIfAbsent(K key, java.util.function.Function<K, V> mappingFunction) { return cache.computeIfAbsent(key, mappingFunction); // Atomic! }}package main
import ( "fmt" "sync")
// sync.Map is Go's equivalent of ConcurrentHashMaptype Cache[K comparable, V any] struct { m sync.Map}
func (c *Cache[K, V]) Get(key K) (V, bool) { val, ok := c.m.Load(key) if !ok { var zero V return zero, false } return val.(V), true}
func (c *Cache[K, V]) Put(key K, value V) { c.m.Store(key, value)}
// LoadOrStore is sync.Map's atomic putIfAbsent equivalentfunc (c *Cache[K, V]) PutIfAbsent(key K, value V) (V, bool) { actual, loaded := c.m.LoadOrStore(key, value) return actual.(V), loaded}
func (c *Cache[K, V]) Delete(key K) { c.m.Delete(key)}
func main() { cache := &Cache[string, int]{} cache.Put("a", 1) cache.Put("b", 2)
v, ok := cache.Get("a") fmt.Printf("Get a: %d, found: %v\n", v, ok)
existing, loaded := cache.PutIfAbsent("a", 99) fmt.Printf("PutIfAbsent a=99: existing=%d, alreadyLoaded=%v\n", existing, loaded)}CopyOnWriteArrayList
Section titled “CopyOnWriteArrayList”CopyOnWriteArrayList creates a new copy on each write, making reads lock-free.
Visual: Copy-On-Write
Section titled “Visual: Copy-On-Write”Example: CopyOnWriteArrayList
Section titled “Example: CopyOnWriteArrayList”import java.util.List;import java.util.concurrent.CopyOnWriteArrayList;
public class CopyOnWriteExample { public static void main(String[] args) { List<String> list = new CopyOnWriteArrayList<>();
// Multiple readers (no locking needed!) for (int i = 0; i < 10; i++) { final int readerId = i; new Thread(() -> { for (int j = 0; j < 1000; j++) { list.size(); // Lock-free read } System.out.println("Reader " + readerId + " finished"); }).start(); }
// Occasional writer new Thread(() -> { for (int i = 0; i < 10; i++) { list.add("Item " + i); // Creates copy try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }).start(); }}package main
import ( "fmt" "sync" "time")
// Go equivalent of CopyOnWriteArrayList: protect slice with RWMutex.// Reads use RLock (concurrent), writes use Lock (exclusive) and copy.type CopyOnWriteSlice[T any] struct { mu sync.RWMutex data []T}
func (c *CopyOnWriteSlice[T]) Add(item T) { c.mu.Lock() defer c.mu.Unlock() // Copy-on-write: make a new slice newData := make([]T, len(c.data)+1) copy(newData, c.data) newData[len(c.data)] = item c.data = newData}
func (c *CopyOnWriteSlice[T]) Len() int { c.mu.RLock() defer c.mu.RUnlock() return len(c.data) // lock-free read on stable snapshot}
func main() { list := &CopyOnWriteSlice[string]{}
var wg sync.WaitGroup // 10 concurrent readers for i := 0; i < 10; i++ { wg.Add(1) readerID := i go func() { defer wg.Done() for j := 0; j < 1000; j++ { list.Len() } fmt.Printf("Reader %d finished\n", readerID) }() }
// Occasional writer wg.Add(1) go func() { defer wg.Done() for i := 0; i < 10; i++ { list.Add(fmt.Sprintf("Item %d", i)) time.Sleep(100 * time.Millisecond) } }()
wg.Wait()}BlockingQueue Implementations
Section titled “BlockingQueue Implementations”We covered these in Producer-Consumer, but here’s a quick reference:
| Queue Type | Characteristics | Use Case |
|---|---|---|
ArrayBlockingQueue | Array-backed, bounded | Fixed-size queues |
LinkedBlockingQueue | Node-based, optionally bounded | Better throughput |
PriorityBlockingQueue | Priority ordering | Priority-based processing |
DelayQueue | Time-based scheduling | Scheduled tasks |
SynchronousQueue | Zero capacity | Direct handoff |
Python Concurrent Collections
Section titled “Python Concurrent Collections”Thread-Safety of Built-in Types
Section titled “Thread-Safety of Built-in Types”Python’s built-in types have limited thread-safety due to the GIL.
Visual: Python Thread-Safety
Section titled “Visual: Python Thread-Safety”Example: Thread-Safe vs Unsafe Operations
Section titled “Example: Thread-Safe vs Unsafe Operations”import threading
# ❌ NOT thread-safe: Compound operationcounter = 0
def unsafe_increment(): global counter counter += 1 # NOT atomic: read-modify-write
threads = [threading.Thread(target=unsafe_increment) for _ in range(10)]for t in threads: t.start()for t in threads: t.join()
print(f"Unsafe result: {counter}") # May not be 10!
# ✅ Thread-safe: Single atomic operationd = {}def safe_operation(): d['key'] = 'value' # Atomic operation
threads = [threading.Thread(target=safe_operation) for _ in range(10)]for t in threads: t.start()for t in threads: t.join()
print(f"Safe result: {len(d)}") # Always 1
# ✅ Thread-safe: Using lockscounter_safe = 0lock = threading.Lock()
def safe_increment(): global counter_safe with lock: counter_safe += 1
threads = [threading.Thread(target=safe_increment) for _ in range(10)]for t in threads: t.start()for t in threads: t.join()
print(f"Safe with lock: {counter_safe}") # Always 10package main
import ( "fmt" "sync" "sync/atomic")
func main() { // ❌ NOT safe: race condition on plain int var unsafeCounter int var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() unsafeCounter++ // data race! }() } wg.Wait() fmt.Printf("Unsafe result: %d (may not be 10)\n", unsafeCounter)
// ✅ Safe: atomic operation var atomicCounter int64 for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() atomic.AddInt64(&atomicCounter, 1) // atomic increment }() } wg.Wait() fmt.Printf("Atomic result: %d\n", atomic.LoadInt64(&atomicCounter)) // Always 10
// ✅ Safe: mutex-protected counter var safeCounter int var mu sync.Mutex for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() mu.Lock() safeCounter++ mu.Unlock() }() } wg.Wait() fmt.Printf("Safe with mutex: %d\n", safeCounter) // Always 10}queue Module
Section titled “queue Module”Python’s queue module provides thread-safe queue implementations.
import queueimport threading
# FIFO Queuefifo_queue = queue.Queue(maxsize=10)
# LIFO Queue (Stack)lifo_queue = queue.LifoQueue(maxsize=10)
# Priority Queuepriority_queue = queue.PriorityQueue(maxsize=10)
def producer(q): for i in range(5): q.put(i) print(f"Produced: {i}")
def consumer(q): while True: try: item = q.get(timeout=1) print(f"Consumed: {item}") q.task_done() except queue.Empty: break
# Thread-safe operationsthreading.Thread(target=producer, args=(fifo_queue,)).start()threading.Thread(target=consumer, args=(fifo_queue,)).start()package main
import "fmt"
func main() { // FIFO Queue: buffered channel fifoQueue := make(chan int, 10)
// Producer go func() { for i := 0; i < 5; i++ { fifoQueue <- i fmt.Printf("Produced: %d\n", i) } close(fifoQueue) }()
// Consumer for item := range fifoQueue { fmt.Printf("Consumed: %d\n", item) }
// Priority queue: use a heap with a mutex (stdlib container/heap) // LIFO (stack): use a slice with a mutex}multiprocessing.Manager
Section titled “multiprocessing.Manager”For shared state across processes (not threads):
import multiprocessing
def worker(shared_dict, shared_list): shared_dict['count'] = shared_dict.get('count', 0) + 1 shared_list.append(shared_dict['count'])
if __name__ == '__main__': manager = multiprocessing.Manager() shared_dict = manager.dict() shared_list = manager.list()
processes = [] for _ in range(5): p = multiprocessing.Process(target=worker, args=(shared_dict, shared_list)) processes.append(p) p.start()
for p in processes: p.join()
print(f"Dict: {shared_dict}") print(f"List: {shared_list}")package main
import ( "fmt" "os" "strconv" "sync" "sync/atomic")
// For sharing state across goroutines (same process), use sync primitives.// For sharing state across OS processes, use files, pipes, or shared memory.
func worker(sharedCounter *int64, sharedList *[]int, mu *sync.Mutex) { count := atomic.AddInt64(sharedCounter, 1) mu.Lock() *sharedList = append(*sharedList, int(count)) mu.Unlock()}
func main() { if len(os.Args) > 1 && os.Args[1] == "-child" { n, _ := strconv.Atoi(os.Args[2]) fmt.Printf("Child %d working\n", n) return }
var sharedCounter int64 var sharedList []int var mu sync.Mutex var wg sync.WaitGroup
for i := 0; i < 5; i++ { wg.Add(1) go func() { defer wg.Done() worker(&sharedCounter, &sharedList, &mu) }() } wg.Wait()
fmt.Printf("Counter: %d\n", sharedCounter) fmt.Printf("List: %v\n", sharedList)}Comparison Table
Section titled “Comparison Table”| Collection Type | Java | Python | Thread-Safety |
|---|---|---|---|
| HashMap/Dict | ConcurrentHashMap | dict + locks | Java: Full, Python: Limited |
| List | CopyOnWriteArrayList | list + locks | Java: Full, Python: Limited |
| Queue | BlockingQueue variants | queue.Queue | Both: Full |
| Set | ConcurrentSkipListSet | set + locks | Java: Full, Python: Limited |
Practice Problems
Section titled “Practice Problems”Easy: Thread-Safe Cache
Section titled “Easy: Thread-Safe Cache”Design a thread-safe cache using concurrent collections.
Solution
import java.util.concurrent.ConcurrentHashMap;
public class ThreadSafeCache<K, V> { private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
public V get(K key) { return cache.get(key); }
public void put(K key, V value) { cache.put(key, value); }
public V computeIfAbsent(K key, java.util.function.Function<K, V> mappingFunction) { return cache.computeIfAbsent(key, mappingFunction); }}import threading
class ThreadSafeCache: def __init__(self): self._cache = {} self._lock = threading.RLock()
def get(self, key): with self._lock: return self._cache.get(key)
def put(self, key, value): with self._lock: self._cache[key] = value
def compute_if_absent(self, key, mapping_function): with self._lock: if key not in self._cache: self._cache[key] = mapping_function(key) return self._cache[key]package main
import ( "fmt" "sync")
// sync.Map provides a thread-safe map with no need for explicit lockingtype ThreadSafeCache[K comparable, V any] struct { m sync.Map}
func (c *ThreadSafeCache[K, V]) Get(key K) (V, bool) { val, ok := c.m.Load(key) if !ok { var zero V return zero, false } return val.(V), true}
func (c *ThreadSafeCache[K, V]) Put(key K, value V) { c.m.Store(key, value)}
func (c *ThreadSafeCache[K, V]) ComputeIfAbsent(key K, fn func(K) V) V { val, _ := c.m.LoadOrStore(key, fn(key)) return val.(V)}
func main() { cache := &ThreadSafeCache[string, int]{} cache.Put("x", 10)
v, ok := cache.Get("x") fmt.Printf("Get x: %d, found: %v\n", v, ok)
result := cache.ComputeIfAbsent("y", func(k string) int { return 42 }) fmt.Printf("ComputeIfAbsent y: %d\n", result)}Interview Questions
Section titled “Interview Questions”Q1: “What’s the difference between ConcurrentHashMap and synchronized HashMap?”
Section titled “Q1: “What’s the difference between ConcurrentHashMap and synchronized HashMap?””Answer:
- synchronized HashMap: Locks entire map for any operation (low concurrency)
- ConcurrentHashMap: Fine-grained locking or CAS (high concurrency)
- Performance: ConcurrentHashMap is much faster for concurrent access
- Use ConcurrentHashMap: When you need thread-safe map with high concurrency
Q2: “When would you use CopyOnWriteArrayList?”
Section titled “Q2: “When would you use CopyOnWriteArrayList?””Answer:
- Use when: Reads vastly outnumber writes (e.g., 100:1 ratio)
- Perfect for: Event listeners, configuration, read-heavy scenarios
- Don’t use when: Frequent writes (too expensive - creates copy each time)
- Trade-off: Expensive writes for lock-free reads
Q3: “Are Python’s built-in dict and list thread-safe?”
Section titled “Q3: “Are Python’s built-in dict and list thread-safe?””Answer:
- Single operations: Yes, atomic (e.g.,
dict[key] = value,list.append(item)) - Compound operations: No, NOT thread-safe (e.g.,
if key in dict: dict[key] = value) - Solution: Use locks for compound operations or thread-safe collections
- GIL: Provides some protection but doesn’t guarantee thread-safety for compound ops
Q4: “What’s the difference between ArrayBlockingQueue and LinkedBlockingQueue?”
Section titled “Q4: “What’s the difference between ArrayBlockingQueue and LinkedBlockingQueue?””Answer:
- ArrayBlockingQueue: Array-backed, always bounded, fixed memory, slightly lower throughput
- LinkedBlockingQueue: Node-based, optionally bounded, dynamic memory, typically higher throughput
- Choose: ArrayBlockingQueue for fixed-size needs, LinkedBlockingQueue for better performance
Key Takeaways
Section titled “Key Takeaways”Next Steps
Section titled “Next Steps”Continue learning concurrency:
- Asynchronous Patterns - Futures and async/await
- Lock-Free Programming - CAS and atomic operations
Mastering concurrent collections is essential for building thread-safe systems! 📦