Thread Pools & Executors
Why Thread Pools?
Section titled “Why Thread Pools?”Creating threads manually has significant overhead. Thread pools solve this by reusing threads and managing resources efficiently.
Visual: Manual Threads vs Thread Pool
Section titled “Visual: Manual Threads vs Thread Pool”Benefits of Thread Pools
Section titled “Benefits of Thread Pools”- Resource Reuse: Threads are reused, avoiding creation/destruction overhead
- Overhead Reduction: Thread lifecycle is managed efficiently
- Resource Limits: Prevents system overload with bounded thread counts
- Task Queuing: Tasks wait in queue when all threads are busy
- Better Performance: Reduced context switching and memory usage
Java ExecutorService
Section titled “Java ExecutorService”Java’s ExecutorService provides a high-level abstraction for thread pool management.
Visual: ExecutorService Architecture
Section titled “Visual: ExecutorService Architecture”classDiagram
class ExecutorService {
<<interface>>
+submit(Callable) Future
+submit(Runnable) Future
+shutdown() void
+shutdownNow() List~Runnable~
+awaitTermination(timeout) boolean
}
class ThreadPoolExecutor {
-corePoolSize: int
-maximumPoolSize: int
-keepAliveTime: long
-workQueue: BlockingQueue
+execute(Runnable) void
}
class Executors {
<<utility>>
+newFixedThreadPool(int) ExecutorService
+newCachedThreadPool() ExecutorService
+newScheduledThreadPool(int) ScheduledExecutorService
+newSingleThreadExecutor() ExecutorService
}
ExecutorService <|.. ThreadPoolExecutor
Executors --> ExecutorService : creates
FixedThreadPool
Section titled “FixedThreadPool”A FixedThreadPool has a fixed number of threads with an unbounded queue.
1import java.util.concurrent.ExecutorService;2import java.util.concurrent.Executors;3
4public class FixedThreadPoolExample {5 public static void main(String[] args) {6 // Create fixed thread pool with 4 threads7 ExecutorService executor = Executors.newFixedThreadPool(4);8
9 // Submit tasks10 for (int i = 0; i < 10; i++) {11 final int taskId = i;12 executor.submit(() -> {13 System.out.println("Task " + taskId +14 " executed by " + Thread.currentThread().getName());15 try {16 Thread.sleep(1000);17 } catch (InterruptedException e) {18 Thread.currentThread().interrupt();19 }20 });21 }22
23 // Shutdown24 executor.shutdown();25 try {26 executor.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS);27 } catch (InterruptedException e) {28 Thread.currentThread().interrupt();29 }30 }31}Characteristics:
- Fixed number of threads
- Unbounded queue (tasks wait if all threads busy)
- Threads live until pool shutdown
- Good for: CPU-bound tasks, predictable workloads
CachedThreadPool
Section titled “CachedThreadPool”A CachedThreadPool creates threads on demand and reuses them.
1import java.util.concurrent.ExecutorService;2import java.util.concurrent.Executors;3
4public class CachedThreadPoolExample {5 public static void main(String[] args) {6 // Creates threads as needed, reuses idle threads7 ExecutorService executor = Executors.newCachedThreadPool();8
9 for (int i = 0; i < 20; i++) {10 final int taskId = i;11 executor.submit(() -> {12 System.out.println("Task " + taskId +13 " executed by " + Thread.currentThread().getName());14 try {15 Thread.sleep(2000);16 } catch (InterruptedException e) {17 Thread.currentThread().interrupt();18 }19 });20 }21
22 executor.shutdown();23 }24}Characteristics:
- Creates threads on demand
- No queue (creates new thread if none available)
- Idle threads terminated after 60 seconds
- Good for: I/O-bound tasks, short-lived tasks, unpredictable workloads
ScheduledThreadPool
Section titled “ScheduledThreadPool”A ScheduledThreadPool executes tasks after a delay or periodically.
1import java.util.concurrent.Executors;2import java.util.concurrent.ScheduledExecutorService;3import java.util.concurrent.TimeUnit;4
5public class ScheduledThreadPoolExample {6 public static void main(String[] args) throws InterruptedException {7 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);8
9 // Execute after delay10 scheduler.schedule(() -> {11 System.out.println("Delayed task executed");12 }, 2, TimeUnit.SECONDS);13
14 // Execute periodically15 scheduler.scheduleAtFixedRate(() -> {16 System.out.println("Periodic task executed");17 }, 0, 1, TimeUnit.SECONDS);18
19 Thread.sleep(5000);20 scheduler.shutdown();21 }22}SingleThreadExecutor
Section titled “SingleThreadExecutor”A SingleThreadExecutor ensures sequential execution (one task at a time).
1import java.util.concurrent.ExecutorService;2import java.util.concurrent.Executors;3
4public class SingleThreadExecutorExample {5 public static void main(String[] args) {6 ExecutorService executor = Executors.newSingleThreadExecutor();7
8 // Tasks execute sequentially9 for (int i = 0; i < 5; i++) {10 final int taskId = i;11 executor.submit(() -> {12 System.out.println("Task " + taskId + " executed");13 });14 }15
16 executor.shutdown();17 }18}ForkJoinPool: Work Stealing
Section titled “ForkJoinPool: Work Stealing”ForkJoinPool uses a work-stealing algorithm for divide-and-conquer tasks.
Visual: Work Stealing
Section titled “Visual: Work Stealing”Example: Parallel Merge Sort with ForkJoinPool
Section titled “Example: Parallel Merge Sort with ForkJoinPool”1import java.util.concurrent.ForkJoinPool;2import java.util.concurrent.RecursiveAction;3
4public class ForkJoinMergeSort {5 static class MergeSortTask extends RecursiveAction {6 private final int[] array;7 private final int left;8 private final int right;9
10 MergeSortTask(int[] array, int left, int right) {11 this.array = array;12 this.left = left;13 this.right = right;14 }15
16 @Override17 protected void compute() {18 if (right - left < 10) {19 // Base case: sort directly20 insertionSort(array, left, right);21 } else {22 // Divide: split into two halves23 int mid = (left + right) / 2;24 MergeSortTask leftTask = new MergeSortTask(array, left, mid);25 MergeSortTask rightTask = new MergeSortTask(array, mid + 1, right);26
27 // Fork: execute subtasks in parallel28 invokeAll(leftTask, rightTask);29
30 // Conquer: merge results31 merge(array, left, mid, right);32 }33 }34
35 private void insertionSort(int[] arr, int left, int right) {36 for (int i = left + 1; i <= right; i++) {37 int key = arr[i];38 int j = i - 1;39 while (j >= left && arr[j] > key) {40 arr[j + 1] = arr[j];41 j--;42 }43 arr[j + 1] = key;44 }45 }46
47 private void merge(int[] arr, int left, int mid, int right) {48 int[] temp = new int[right - left + 1];49 int i = left, j = mid + 1, k = 0;50
51 while (i <= mid && j <= right) {52 if (arr[i] <= arr[j]) {53 temp[k++] = arr[i++];54 } else {55 temp[k++] = arr[j++];56 }57 }58
59 while (i <= mid) temp[k++] = arr[i++];60 while (j <= right) temp[k++] = arr[j++];61
62 System.arraycopy(temp, 0, arr, left, temp.length);63 }64 }65
66 public static void main(String[] args) {67 int[] array = {64, 34, 25, 12, 22, 11, 90, 5};68
69 ForkJoinPool pool = ForkJoinPool.commonPool();70 MergeSortTask task = new MergeSortTask(array, 0, array.length - 1);71 pool.invoke(task);72
73 System.out.println("Sorted array: " + java.util.Arrays.toString(array));74 }75}RecursiveTask vs RecursiveAction
Section titled “RecursiveTask vs RecursiveAction”- RecursiveAction: For tasks that don’t return a value
- RecursiveTask: For tasks that return a value
1import java.util.concurrent.ForkJoinPool;2import java.util.concurrent.RecursiveTask;3
4public class RecursiveTaskExample {5 static class SumTask extends RecursiveTask<Long> {6 private final int[] array;7 private final int start;8 private final int end;9 private static final int THRESHOLD = 1000;10
11 SumTask(int[] array, int start, int end) {12 this.array = array;13 this.start = start;14 this.end = end;15 }16
17 @Override18 protected Long compute() {19 if (end - start < THRESHOLD) {20 // Base case: compute directly21 long sum = 0;22 for (int i = start; i < end; i++) {23 sum += array[i];24 }25 return sum;26 } else {27 // Divide28 int mid = (start + end) / 2;29 SumTask leftTask = new SumTask(array, start, mid);30 SumTask rightTask = new SumTask(array, mid, end);31
32 // Fork and join33 leftTask.fork();34 long rightResult = rightTask.compute();35 long leftResult = leftTask.join();36
37 return leftResult + rightResult;38 }39 }40 }41
42 public static void main(String[] args) {43 int[] array = new int[10000];44 for (int i = 0; i < array.length; i++) {45 array[i] = i + 1;46 }47
48 ForkJoinPool pool = ForkJoinPool.commonPool();49 SumTask task = new SumTask(array, 0, array.length);50 long result = pool.invoke(task);51
52 System.out.println("Sum: " + result);53 }54}Python concurrent.futures
Section titled “Python concurrent.futures”Python’s concurrent.futures provides a high-level interface for thread and process pools.
ThreadPoolExecutor
Section titled “ThreadPoolExecutor”1from concurrent.futures import ThreadPoolExecutor, as_completed2import time3
4def process_task(task_id):5 """I/O-bound task"""6 print(f"Task {task_id} started")7 time.sleep(1) # Simulate I/O8 return f"Result from task {task_id}"9
10# Using context manager (recommended)11with ThreadPoolExecutor(max_workers=4) as executor:12 # Submit tasks13 futures = [executor.submit(process_task, i) for i in range(10)]14
15 # Get results as they complete16 for future in as_completed(futures):17 result = future.result()18 print(result)19
20# Or use map21with ThreadPoolExecutor(max_workers=4) as executor:22 results = executor.map(process_task, range(10))23 for result in results:24 print(result)ProcessPoolExecutor
Section titled “ProcessPoolExecutor”1from concurrent.futures import ProcessPoolExecutor2import time3
4def cpu_intensive_task(n):5 """CPU-bound task"""6 result = sum(i * i for i in range(n))7 return result8
9# ProcessPoolExecutor bypasses GIL10with ProcessPoolExecutor(max_workers=4) as executor:11 futures = [executor.submit(cpu_intensive_task, 1000000)12 for _ in range(8)]13
14 results = [future.result() for future in futures]15 print(f"Results: {results}")submit() vs map()
Section titled “submit() vs map()”1from concurrent.futures import ThreadPoolExecutor2
3def process_item(item):4 return item * 25
6# submit() - returns Future objects7with ThreadPoolExecutor() as executor:8 futures = [executor.submit(process_item, i) for i in range(5)]9 results = [f.result() for f in futures]10 print(results) # [0, 2, 4, 6, 8]11
12# map() - simpler, returns iterator13with ThreadPoolExecutor() as executor:14 results = list(executor.map(process_item, range(5)))15 print(results) # [0, 2, 4, 6, 8]Sizing Thread Pools
Section titled “Sizing Thread Pools”Choosing the right thread pool size is crucial for performance!
Visual: Thread Pool Sizing
Section titled “Visual: Thread Pool Sizing”CPU-Bound Tasks
Section titled “CPU-Bound Tasks”For CPU-bound tasks, use approximately N_cores threads.
Formula: N_threads = N_cores (or N_cores + 1)
Why? More threads than cores add overhead without benefit (context switching).
1import multiprocessing2from concurrent.futures import ProcessPoolExecutor3
4def cpu_task(n):5 return sum(i * i for i in range(n))6
7# Use number of CPU cores8num_cores = multiprocessing.cpu_count()9print(f"CPU cores: {num_cores}")10
11with ProcessPoolExecutor(max_workers=num_cores) as executor:12 results = executor.map(cpu_task, [1000000] * 8)13 print(list(results))1import java.util.concurrent.ExecutorService;2import java.util.concurrent.Executors;3
4public class CPUBoundPool {5 public static void main(String[] args) {6 int cores = Runtime.getRuntime().availableProcessors();7 System.out.println("CPU cores: " + cores);8
9 // Use number of cores10 ExecutorService executor = Executors.newFixedThreadPool(cores);11
12 for (int i = 0; i < 10; i++) {13 executor.submit(() -> {14 // CPU-intensive work15 long sum = 0;16 for (int j = 0; j < 1000000; j++) {17 sum += j * j;18 }19 return sum;20 });21 }22
23 executor.shutdown();24 }25}I/O-Bound Tasks
Section titled “I/O-Bound Tasks”For I/O-bound tasks, use more threads to account for waiting time.
Formula: N_threads = N_cores * (1 + W/C)
- W = Wait time (I/O wait)
- C = Compute time (CPU time)
Example: 4 cores, 9s wait, 1s compute = 4 * (1 + 9/1) = 40 threads
1from concurrent.futures import ThreadPoolExecutor2import requests3import time4
5def fetch_url(url):6 """I/O-bound task"""7 response = requests.get(url)8 return response.status_code9
10# For I/O-bound, use more threads11# Example: 4 cores, 9s wait, 1s compute = 40 threads12num_threads = 4013
14with ThreadPoolExecutor(max_workers=num_threads) as executor:15 urls = ["https://httpbin.org/delay/1"] * 10016 results = executor.map(fetch_url, urls)17 print(f"Processed {len(list(results))} URLs")1import java.util.concurrent.ExecutorService;2import java.util.concurrent.Executors;3
4public class IOBoundPool {5 public static void main(String[] args) {6 int cores = Runtime.getRuntime().availableProcessors();7 // For I/O-bound: more threads8 // Example: 4 cores * (1 + 9/1) = 40 threads9 int threads = cores * 10; // Simplified calculation10
11 ExecutorService executor = Executors.newFixedThreadPool(threads);12
13 for (int i = 0; i < 100; i++) {14 executor.submit(() -> {15 // I/O operation16 try {17 Thread.sleep(1000); // Simulate I/O wait18 } catch (InterruptedException e) {19 Thread.currentThread().interrupt();20 }21 });22 }23
24 executor.shutdown();25 }26}General Formula
Section titled “General Formula”Formula: N_threads = N_cores * U_CPU * (1 + W/C)
- U_CPU = Target CPU utilization (0.0 to 1.0)
- W = Wait time
- C = Compute time
Considerations:
- System load
- Memory constraints
- Task characteristics
- Response time requirements
Shutdown Handling
Section titled “Shutdown Handling”Proper shutdown is crucial to avoid resource leaks and ensure clean termination.
Visual: Shutdown Flow
Section titled “Visual: Shutdown Flow”Example: Proper Shutdown
Section titled “Example: Proper Shutdown”1import java.util.concurrent.ExecutorService;2import java.util.concurrent.Executors;3import java.util.concurrent.TimeUnit;4
5public class ShutdownExample {6 public static void main(String[] args) throws InterruptedException {7 ExecutorService executor = Executors.newFixedThreadPool(4);8
9 // Submit tasks10 for (int i = 0; i < 10; i++) {11 executor.submit(() -> {12 try {13 Thread.sleep(1000);14 System.out.println("Task completed");15 } catch (InterruptedException e) {16 System.out.println("Task interrupted");17 Thread.currentThread().interrupt();18 }19 });20 }21
22 // Graceful shutdown23 executor.shutdown(); // Stop accepting new tasks24
25 try {26 // Wait for tasks to complete27 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {28 // Force shutdown if timeout29 executor.shutdownNow();30 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {31 System.err.println("Pool did not terminate");32 }33 }34 } catch (InterruptedException e) {35 executor.shutdownNow();36 Thread.currentThread().interrupt();37 }38 }39}1from concurrent.futures import ThreadPoolExecutor2import time3
4def task(n):5 time.sleep(1)6 return n * 27
8# Context manager handles shutdown automatically9with ThreadPoolExecutor(max_workers=4) as executor:10 futures = [executor.submit(task, i) for i in range(10)]11 results = [f.result() for f in futures]12 print(results)13
14# Manual shutdown15executor = ThreadPoolExecutor(max_workers=4)16futures = [executor.submit(task, i) for i in range(10)]17
18executor.shutdown(wait=True) # Wait for all tasks19results = [f.result() for f in futures]Future and Callable
Section titled “Future and Callable”Future objects represent the result of an asynchronous computation.
Visual: Future Pattern
Section titled “Visual: Future Pattern”Example: Using Future
Section titled “Example: Using Future”1import java.util.concurrent.*;2
3public class FutureExample {4 public static void main(String[] args) throws ExecutionException, InterruptedException {5 ExecutorService executor = Executors.newFixedThreadPool(4);6
7 // Submit Callable (returns value)8 Future<Integer> future = executor.submit(() -> {9 Thread.sleep(2000);10 return 42;11 });12
13 // Do other work14 System.out.println("Doing other work...");15
16 // Get result (blocks until ready)17 Integer result = future.get();18 System.out.println("Result: " + result);19
20 executor.shutdown();21 }22}1from concurrent.futures import ThreadPoolExecutor2import time3
4def compute_value(n):5 time.sleep(2)6 return n * 27
8with ThreadPoolExecutor() as executor:9 # Submit returns Future immediately10 future = executor.submit(compute_value, 21)11
12 # Do other work13 print("Doing other work...")14
15 # Get result (blocks until ready)16 result = future.result()17 print(f"Result: {result}")Practice Problems
Section titled “Practice Problems”Easy: Process Tasks with Thread Pool
Section titled “Easy: Process Tasks with Thread Pool”Use a thread pool to process a list of tasks concurrently.
Solution
1from concurrent.futures import ThreadPoolExecutor2
3def process_task(item):4 return item * 25
6items = list(range(10))7
8with ThreadPoolExecutor(max_workers=4) as executor:9 results = list(executor.map(process_task, items))10 print(results)1import java.util.concurrent.ExecutorService;2import java.util.concurrent.Executors;3import java.util.stream.IntStream;4
5ExecutorService executor = Executors.newFixedThreadPool(4);6IntStream.range(0, 10)7 .forEach(i -> executor.submit(() -> System.out.println(i * 2)));8executor.shutdown();Interview Questions
Section titled “Interview Questions”Q1: “How would you size a thread pool for CPU-bound vs I/O-bound tasks?”
Section titled “Q1: “How would you size a thread pool for CPU-bound vs I/O-bound tasks?””Answer:
- CPU-bound:
N_threads = N_cores(more threads waste resources) - I/O-bound:
N_threads = N_cores * (1 + W/C)where W=wait time, C=compute time - Reason: I/O-bound tasks spend time waiting, so more threads can be utilized
Q2: “What’s the difference between FixedThreadPool and CachedThreadPool?”
Section titled “Q2: “What’s the difference between FixedThreadPool and CachedThreadPool?””Answer:
- FixedThreadPool: Fixed threads, unbounded queue, threads live until shutdown
- CachedThreadPool: Creates threads on demand, no queue, idle threads terminated after 60s
- Use FixedThreadPool: Predictable workloads, CPU-bound tasks
- Use CachedThreadPool: Short-lived tasks, I/O-bound, unpredictable workloads
Q3: “How does work stealing work in ForkJoinPool?”
Section titled “Q3: “How does work stealing work in ForkJoinPool?””Answer:
- Each thread has its own deque (double-ended queue)
- Threads push/pop from their own deque (LIFO)
- When a thread’s deque is empty, it steals from the tail of another thread’s deque
- Benefits: Better load balancing, higher CPU utilization
- Ideal for: Divide-and-conquer algorithms, recursive tasks
Q4: “When would you use ProcessPoolExecutor vs ThreadPoolExecutor in Python?”
Section titled “Q4: “When would you use ProcessPoolExecutor vs ThreadPoolExecutor in Python?””Answer:
- ProcessPoolExecutor: CPU-bound tasks (bypasses GIL), true parallelism
- ThreadPoolExecutor: I/O-bound tasks (GIL released during I/O), lower overhead
- Choose based on: Task type (CPU vs I/O), not just performance
Key Takeaways
Section titled “Key Takeaways”Next Steps
Section titled “Next Steps”Continue learning concurrency:
- Concurrent Collections - Thread-safe data structures
- Asynchronous Patterns - Futures and async/await
Mastering thread pools is essential for building efficient concurrent systems! ⚙️