Thread Pools & Executors
Why Thread Pools?
Section titled “Why Thread Pools?”Creating threads manually has significant overhead. Thread pools solve this by reusing threads and managing resources efficiently.
Visual: Manual Threads vs Thread Pool
Section titled “Visual: Manual Threads vs Thread Pool”Benefits of Thread Pools
Section titled “Benefits of Thread Pools”- Resource Reuse: Threads are reused, avoiding creation/destruction overhead
- Overhead Reduction: Thread lifecycle is managed efficiently
- Resource Limits: Prevents system overload with bounded thread counts
- Task Queuing: Tasks wait in queue when all threads are busy
- Better Performance: Reduced context switching and memory usage
Java ExecutorService
Section titled “Java ExecutorService”Java’s ExecutorService provides a high-level abstraction for thread pool management.
Visual: ExecutorService Architecture
Section titled “Visual: ExecutorService Architecture”classDiagram
class ExecutorService {
<<interface>>
+submit(Callable) Future
+submit(Runnable) Future
+shutdown() void
+shutdownNow() List~Runnable~
+awaitTermination(timeout) boolean
}
class ThreadPoolExecutor {
-corePoolSize: int
-maximumPoolSize: int
-keepAliveTime: long
-workQueue: BlockingQueue
+execute(Runnable) void
}
class Executors {
<<utility>>
+newFixedThreadPool(int) ExecutorService
+newCachedThreadPool() ExecutorService
+newScheduledThreadPool(int) ScheduledExecutorService
+newSingleThreadExecutor() ExecutorService
}
ExecutorService <|.. ThreadPoolExecutor
Executors --> ExecutorService : creates
FixedThreadPool
Section titled “FixedThreadPool”A FixedThreadPool has a fixed number of threads with an unbounded queue.
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class FixedThreadPoolExample { public static void main(String[] args) { // Create fixed thread pool with 4 threads ExecutorService executor = Executors.newFixedThreadPool(4);
// Submit tasks for (int i = 0; i < 10; i++) { final int taskId = i; executor.submit(() -> { System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); }
// Shutdown executor.shutdown(); try { executor.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }}package main
import ( "fmt" "sync" "time")
func main() { const numWorkers = 4 tasks := make(chan int, 10) var wg sync.WaitGroup
// Start fixed pool of worker goroutines for w := 0; w < numWorkers; w++ { wg.Add(1) go func(workerID int) { defer wg.Done() for taskID := range tasks { fmt.Printf("Task %d executed by worker %d\n", taskID, workerID) time.Sleep(time.Second) } }(w) }
// Submit 10 tasks for i := 0; i < 10; i++ { tasks <- i } close(tasks) // signal workers no more tasks
wg.Wait()}Characteristics:
- Fixed number of threads
- Unbounded queue (tasks wait if all threads busy)
- Threads live until pool shutdown
- Good for: CPU-bound tasks, predictable workloads
CachedThreadPool
Section titled “CachedThreadPool”A CachedThreadPool creates threads on demand and reuses them.
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class CachedThreadPoolExample { public static void main(String[] args) { // Creates threads as needed, reuses idle threads ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) { final int taskId = i; executor.submit(() -> { System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName()); try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); }
executor.shutdown(); }}package main
import ( "fmt" "sync" "time")
func main() { var wg sync.WaitGroup
// Go's equivalent of CachedThreadPool: spin up a goroutine per task. // The Go runtime schedules them efficiently — no fixed limit needed. for i := 0; i < 20; i++ { taskID := i wg.Add(1) go func() { defer wg.Done() fmt.Printf("Task %d executed\n", taskID) time.Sleep(2 * time.Second) }() }
wg.Wait()}Characteristics:
- Creates threads on demand
- No queue (creates new thread if none available)
- Idle threads terminated after 60 seconds
- Good for: I/O-bound tasks, short-lived tasks, unpredictable workloads
ScheduledThreadPool
Section titled “ScheduledThreadPool”A ScheduledThreadPool executes tasks after a delay or periodically.
import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolExample { public static void main(String[] args) throws InterruptedException { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// Execute after delay scheduler.schedule(() -> { System.out.println("Delayed task executed"); }, 2, TimeUnit.SECONDS);
// Execute periodically scheduler.scheduleAtFixedRate(() -> { System.out.println("Periodic task executed"); }, 0, 1, TimeUnit.SECONDS);
Thread.sleep(5000); scheduler.shutdown(); }}package main
import ( "fmt" "time")
func main() { // Execute once after a delay go func() { time.Sleep(2 * time.Second) fmt.Println("Delayed task executed") }()
// Execute periodically using a ticker ticker := time.NewTicker(time.Second) go func() { for range ticker.C { fmt.Println("Periodic task executed") } }()
time.Sleep(5 * time.Second) ticker.Stop() fmt.Println("Scheduler stopped")}SingleThreadExecutor
Section titled “SingleThreadExecutor”A SingleThreadExecutor ensures sequential execution (one task at a time).
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class SingleThreadExecutorExample { public static void main(String[] args) { ExecutorService executor = Executors.newSingleThreadExecutor();
// Tasks execute sequentially for (int i = 0; i < 5; i++) { final int taskId = i; executor.submit(() -> { System.out.println("Task " + taskId + " executed"); }); }
executor.shutdown(); }}package main
import "fmt"
func main() { tasks := make(chan int, 5)
// Single worker goroutine — tasks execute sequentially go func() { for taskID := range tasks { fmt.Printf("Task %d executed\n", taskID) } }()
for i := 0; i < 5; i++ { tasks <- i } close(tasks)}ForkJoinPool: Work Stealing
Section titled “ForkJoinPool: Work Stealing”ForkJoinPool uses a work-stealing algorithm for divide-and-conquer tasks.
Visual: Work Stealing
Section titled “Visual: Work Stealing”Example: Parallel Merge Sort with ForkJoinPool
Section titled “Example: Parallel Merge Sort with ForkJoinPool”import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveAction;
public class ForkJoinMergeSort { static class MergeSortTask extends RecursiveAction { private final int[] array; private final int left; private final int right;
MergeSortTask(int[] array, int left, int right) { this.array = array; this.left = left; this.right = right; }
@Override protected void compute() { if (right - left < 10) { // Base case: sort directly insertionSort(array, left, right); } else { // Divide: split into two halves int mid = (left + right) / 2; MergeSortTask leftTask = new MergeSortTask(array, left, mid); MergeSortTask rightTask = new MergeSortTask(array, mid + 1, right);
// Fork: execute subtasks in parallel invokeAll(leftTask, rightTask);
// Conquer: merge results merge(array, left, mid, right); } }
private void insertionSort(int[] arr, int left, int right) { for (int i = left + 1; i <= right; i++) { int key = arr[i]; int j = i - 1; while (j >= left && arr[j] > key) { arr[j + 1] = arr[j]; j--; } arr[j + 1] = key; } }
private void merge(int[] arr, int left, int mid, int right) { int[] temp = new int[right - left + 1]; int i = left, j = mid + 1, k = 0;
while (i <= mid && j <= right) { if (arr[i] <= arr[j]) { temp[k++] = arr[i++]; } else { temp[k++] = arr[j++]; } }
while (i <= mid) temp[k++] = arr[i++]; while (j <= right) temp[k++] = arr[j++];
System.arraycopy(temp, 0, arr, left, temp.length); } }
public static void main(String[] args) { int[] array = {64, 34, 25, 12, 22, 11, 90, 5};
ForkJoinPool pool = ForkJoinPool.commonPool(); MergeSortTask task = new MergeSortTask(array, 0, array.length - 1); pool.invoke(task);
System.out.println("Sorted array: " + java.util.Arrays.toString(array)); }}package main
import ( "fmt" "sort" "sync")
func mergeSort(arr []int, wg *sync.WaitGroup) { defer wg.Done() if len(arr) < 10 { sort.Ints(arr) // base case return }
mid := len(arr) / 2 left := arr[:mid] right := arr[mid:]
var innerWg sync.WaitGroup innerWg.Add(2) go mergeSort(left, &innerWg) go mergeSort(right, &innerWg) innerWg.Wait()
// Merge in-place merged := make([]int, len(arr)) i, j, k := 0, 0, 0 for i < len(left) && j < len(right) { if left[i] <= right[j] { merged[k] = left[i] i++ } else { merged[k] = right[j] j++ } k++ } copy(merged[k:], left[i:]) copy(merged[k+len(left)-i:], right[j:]) copy(arr, merged)}
func main() { array := []int{64, 34, 25, 12, 22, 11, 90, 5} var wg sync.WaitGroup wg.Add(1) go mergeSort(array, &wg) wg.Wait() fmt.Println("Sorted array:", array)}RecursiveTask vs RecursiveAction
Section titled “RecursiveTask vs RecursiveAction”- RecursiveAction: For tasks that don’t return a value
- RecursiveTask: For tasks that return a value
import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;
public class RecursiveTaskExample { static class SumTask extends RecursiveTask<Long> { private final int[] array; private final int start; private final int end; private static final int THRESHOLD = 1000;
SumTask(int[] array, int start, int end) { this.array = array; this.start = start; this.end = end; }
@Override protected Long compute() { if (end - start < THRESHOLD) { // Base case: compute directly long sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } else { // Divide int mid = (start + end) / 2; SumTask leftTask = new SumTask(array, start, mid); SumTask rightTask = new SumTask(array, mid, end);
// Fork and join leftTask.fork(); long rightResult = rightTask.compute(); long leftResult = leftTask.join();
return leftResult + rightResult; } } }
public static void main(String[] args) { int[] array = new int[10000]; for (int i = 0; i < array.length; i++) { array[i] = i + 1; }
ForkJoinPool pool = ForkJoinPool.commonPool(); SumTask task = new SumTask(array, 0, array.length); long result = pool.invoke(task);
System.out.println("Sum: " + result); }}package main
import ( "fmt" "sync")
const threshold = 1000
func parallelSum(arr []int, wg *sync.WaitGroup, result *int64, mu *sync.Mutex) { defer wg.Done() if len(arr) < threshold { var sum int64 for _, v := range arr { sum += int64(v) } mu.Lock() *result += sum mu.Unlock() return }
mid := len(arr) / 2 var innerWg sync.WaitGroup innerWg.Add(2) go parallelSum(arr[:mid], &innerWg, result, mu) go parallelSum(arr[mid:], &innerWg, result, mu) innerWg.Wait()}
func main() { array := make([]int, 10000) for i := range array { array[i] = i + 1 }
var result int64 var mu sync.Mutex var wg sync.WaitGroup wg.Add(1) go parallelSum(array, &wg, &result, &mu) wg.Wait()
fmt.Printf("Sum: %d\n", result)}Python concurrent.futures
Section titled “Python concurrent.futures”Python’s concurrent.futures provides a high-level interface for thread and process pools.
ThreadPoolExecutor
Section titled “ThreadPoolExecutor”from concurrent.futures import ThreadPoolExecutor, as_completedimport time
def process_task(task_id): """I/O-bound task""" print(f"Task {task_id} started") time.sleep(1) # Simulate I/O return f"Result from task {task_id}"
# Using context manager (recommended)with ThreadPoolExecutor(max_workers=4) as executor: # Submit tasks futures = [executor.submit(process_task, i) for i in range(10)]
# Get results as they complete for future in as_completed(futures): result = future.result() print(result)
# Or use mapwith ThreadPoolExecutor(max_workers=4) as executor: results = executor.map(process_task, range(10)) for result in results: print(result)package main
import ( "fmt" "sync" "time")
func processTask(taskID int) string { fmt.Printf("Task %d started\n", taskID) time.Sleep(time.Second) // simulate I/O return fmt.Sprintf("Result from task %d", taskID)}
func main() { const maxWorkers = 4 tasks := make(chan int, 10) results := make(chan string, 10) var wg sync.WaitGroup
// Start worker pool for w := 0; w < maxWorkers; w++ { wg.Add(1) go func() { defer wg.Done() for taskID := range tasks { result := processTask(taskID) results <- result } }() }
// Submit tasks for i := 0; i < 10; i++ { tasks <- i } close(tasks)
// Close results when all workers done go func() { wg.Wait() close(results) }()
for result := range results { fmt.Println(result) }}ProcessPoolExecutor
Section titled “ProcessPoolExecutor”from concurrent.futures import ProcessPoolExecutorimport time
def cpu_intensive_task(n): """CPU-bound task""" result = sum(i * i for i in range(n)) return result
# ProcessPoolExecutor bypasses GILwith ProcessPoolExecutor(max_workers=4) as executor: futures = [executor.submit(cpu_intensive_task, 1000000) for _ in range(8)]
results = [future.result() for future in futures] print(f"Results: {results}")package main
import ( "fmt" "os" "os/exec" "strconv" "sync")
// Go equivalent: spawn child processes for CPU-bound parallel work.// Within a single process, goroutines already use all CPU cores.
func cpuIntensiveTask(n int) int { result := 0 for i := 0; i < n; i++ { result += i * i } return result}
func main() { if len(os.Args) > 1 && os.Args[1] == "-child" { result := cpuIntensiveTask(1_000_000) fmt.Println(result) return }
var wg sync.WaitGroup for i := 0; i < 8; i++ { wg.Add(1) go func(idx int) { defer wg.Done() // Spawn a child process for true process isolation cmd := exec.Command(os.Args[0], "-child", strconv.Itoa(idx)) out, _ := cmd.Output() fmt.Printf("Process %d result: %s", idx, out) }(i) } wg.Wait()}submit() vs map()
Section titled “submit() vs map()”from concurrent.futures import ThreadPoolExecutor
def process_item(item): return item * 2
# submit() - returns Future objectswith ThreadPoolExecutor() as executor: futures = [executor.submit(process_item, i) for i in range(5)] results = [f.result() for f in futures] print(results) # [0, 2, 4, 6, 8]
# map() - simpler, returns iteratorwith ThreadPoolExecutor() as executor: results = list(executor.map(process_item, range(5))) print(results) # [0, 2, 4, 6, 8]package main
import ( "fmt" "sync")
func processItem(item int) int { return item * 2}
func main() { // submit() equivalent: individual goroutines with result channels results := make([]int, 5) var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) idx := i go func() { defer wg.Done() results[idx] = processItem(idx) }() } wg.Wait() fmt.Println(results) // [0 2 4 6 8]
// map() equivalent: process slice in parallel, collect ordered results inputs := []int{0, 1, 2, 3, 4} mapped := make([]int, len(inputs)) var wg2 sync.WaitGroup for i, v := range inputs { wg2.Add(1) i, v := i, v go func() { defer wg2.Done() mapped[i] = processItem(v) }() } wg2.Wait() fmt.Println(mapped) // [0 2 4 6 8]}Sizing Thread Pools
Section titled “Sizing Thread Pools”Choosing the right thread pool size is crucial for performance!
Visual: Thread Pool Sizing
Section titled “Visual: Thread Pool Sizing”CPU-Bound Tasks
Section titled “CPU-Bound Tasks”For CPU-bound tasks, use approximately N_cores threads.
Formula: N_threads = N_cores (or N_cores + 1)
Why? More threads than cores add overhead without benefit (context switching).
import multiprocessingfrom concurrent.futures import ProcessPoolExecutor
def cpu_task(n): return sum(i * i for i in range(n))
# Use number of CPU coresnum_cores = multiprocessing.cpu_count()print(f"CPU cores: {num_cores}")
with ProcessPoolExecutor(max_workers=num_cores) as executor: results = executor.map(cpu_task, [1000000] * 8) print(list(results))import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class CPUBoundPool { public static void main(String[] args) { int cores = Runtime.getRuntime().availableProcessors(); System.out.println("CPU cores: " + cores);
// Use number of cores ExecutorService executor = Executors.newFixedThreadPool(cores);
for (int i = 0; i < 10; i++) { executor.submit(() -> { // CPU-intensive work long sum = 0; for (int j = 0; j < 1000000; j++) { sum += j * j; } return sum; }); }
executor.shutdown(); }}package main
import ( "fmt" "runtime" "sync")
func cpuTask(n int) int { result := 0 for i := 0; i < n; i++ { result += i * i } return result}
func main() { numCores := runtime.NumCPU() fmt.Printf("CPU cores: %d\n", numCores)
// Use a fixed pool sized to number of cores tasks := make(chan int, 8) var wg sync.WaitGroup
for w := 0; w < numCores; w++ { wg.Add(1) go func() { defer wg.Done() for n := range tasks { cpuTask(n) } }() }
for i := 0; i < 8; i++ { tasks <- 1_000_000 } close(tasks) wg.Wait()}I/O-Bound Tasks
Section titled “I/O-Bound Tasks”For I/O-bound tasks, use more threads to account for waiting time.
Formula: N_threads = N_cores * (1 + W/C)
- W = Wait time (I/O wait)
- C = Compute time (CPU time)
Example: 4 cores, 9s wait, 1s compute = 4 * (1 + 9/1) = 40 threads
from concurrent.futures import ThreadPoolExecutorimport requestsimport time
def fetch_url(url): """I/O-bound task""" response = requests.get(url) return response.status_code
# For I/O-bound, use more threads# Example: 4 cores, 9s wait, 1s compute = 40 threadsnum_threads = 40
with ThreadPoolExecutor(max_workers=num_threads) as executor: urls = ["https://httpbin.org/delay/1"] * 100 results = executor.map(fetch_url, urls) print(f"Processed {len(list(results))} URLs")import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class IOBoundPool { public static void main(String[] args) { int cores = Runtime.getRuntime().availableProcessors(); // For I/O-bound: more threads // Example: 4 cores * (1 + 9/1) = 40 threads int threads = cores * 10; // Simplified calculation
ExecutorService executor = Executors.newFixedThreadPool(threads);
for (int i = 0; i < 100; i++) { executor.submit(() -> { // I/O operation try { Thread.sleep(1000); // Simulate I/O wait } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); }
executor.shutdown(); }}package main
import ( "fmt" "net/http" "runtime" "sync")
func fetchURL(url string) int { resp, err := http.Get(url) if err != nil { return 0 } defer resp.Body.Close() return resp.StatusCode}
func main() { numCores := runtime.NumCPU() // For I/O-bound: N_cores * (1 + W/C), e.g. 4 * 10 = 40 goroutines numWorkers := numCores * 10
urls := make(chan string, 100) var wg sync.WaitGroup
for w := 0; w < numWorkers; w++ { wg.Add(1) go func() { defer wg.Done() for url := range urls { fetchURL(url) } }() }
for i := 0; i < 100; i++ { urls <- "https://httpbin.org/delay/1" } close(urls) wg.Wait() fmt.Printf("Processed 100 URLs with %d workers\n", numWorkers)}General Formula
Section titled “General Formula”Formula: N_threads = N_cores * U_CPU * (1 + W/C)
- U_CPU = Target CPU utilization (0.0 to 1.0)
- W = Wait time
- C = Compute time
Considerations:
- System load
- Memory constraints
- Task characteristics
- Response time requirements
Shutdown Handling
Section titled “Shutdown Handling”Proper shutdown is crucial to avoid resource leaks and ensure clean termination.
Visual: Shutdown Flow
Section titled “Visual: Shutdown Flow”Example: Proper Shutdown
Section titled “Example: Proper Shutdown”import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;
public class ShutdownExample { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(4);
// Submit tasks for (int i = 0; i < 10; i++) { executor.submit(() -> { try { Thread.sleep(1000); System.out.println("Task completed"); } catch (InterruptedException e) { System.out.println("Task interrupted"); Thread.currentThread().interrupt(); } }); }
// Graceful shutdown executor.shutdown(); // Stop accepting new tasks
try { // Wait for tasks to complete if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // Force shutdown if timeout executor.shutdownNow(); if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { System.err.println("Pool did not terminate"); } } } catch (InterruptedException e) { executor.shutdownNow(); Thread.currentThread().interrupt(); } }}from concurrent.futures import ThreadPoolExecutorimport time
def task(n): time.sleep(1) return n * 2
# Context manager handles shutdown automaticallywith ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(task, i) for i in range(10)] results = [f.result() for f in futures] print(results)
# Manual shutdownexecutor = ThreadPoolExecutor(max_workers=4)futures = [executor.submit(task, i) for i in range(10)]
executor.shutdown(wait=True) # Wait for all tasksresults = [f.result() for f in futures]package main
import ( "context" "fmt" "sync" "time")
func main() { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel()
tasks := make(chan int, 10) var wg sync.WaitGroup
// Start workers for w := 0; w < 4; w++ { wg.Add(1) go func() { defer wg.Done() for taskID := range tasks { select { case <-ctx.Done(): fmt.Println("Task interrupted by context") return default: time.Sleep(time.Second) fmt.Printf("Task %d completed\n", taskID) } } }() }
// Submit tasks for i := 0; i < 10; i++ { tasks <- i } close(tasks) // graceful shutdown: no more tasks
wg.Wait() fmt.Println("All workers shut down cleanly")}Future and Callable
Section titled “Future and Callable”Future objects represent the result of an asynchronous computation.
Visual: Future Pattern
Section titled “Visual: Future Pattern”Example: Using Future
Section titled “Example: Using Future”import java.util.concurrent.*;
public class FutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(4);
// Submit Callable (returns value) Future<Integer> future = executor.submit(() -> { Thread.sleep(2000); return 42; });
// Do other work System.out.println("Doing other work...");
// Get result (blocks until ready) Integer result = future.get(); System.out.println("Result: " + result);
executor.shutdown(); }}from concurrent.futures import ThreadPoolExecutorimport time
def compute_value(n): time.sleep(2) return n * 2
with ThreadPoolExecutor() as executor: # Submit returns Future immediately future = executor.submit(compute_value, 21)
# Do other work print("Doing other work...")
# Get result (blocks until ready) result = future.result() print(f"Result: {result}")package main
import ( "fmt" "time")
func computeValue(n int) int { time.Sleep(2 * time.Second) return n * 2}
func main() { // Go's equivalent of Future: send result through a channel resultCh := make(chan int, 1)
go func() { resultCh <- computeValue(21) }()
// Do other work while waiting fmt.Println("Doing other work...")
// Receive result (blocks until ready) result := <-resultCh fmt.Printf("Result: %d\n", result)}Practice Problems
Section titled “Practice Problems”Easy: Process Tasks with Thread Pool
Section titled “Easy: Process Tasks with Thread Pool”Use a thread pool to process a list of tasks concurrently.
Solution
from concurrent.futures import ThreadPoolExecutor
def process_task(item): return item * 2
items = list(range(10))
with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_task, items)) print(results)import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.stream.IntStream;
ExecutorService executor = Executors.newFixedThreadPool(4);IntStream.range(0, 10) .forEach(i -> executor.submit(() -> System.out.println(i * 2)));executor.shutdown();package main
import ( "fmt" "sync")
func processTask(item int) int { return item * 2}
func main() { items := make([]int, 10) for i := range items { items[i] = i }
results := make([]int, len(items)) tasks := make(chan int, len(items)) var wg sync.WaitGroup
// Worker pool with 4 goroutines for w := 0; w < 4; w++ { wg.Add(1) go func() { defer wg.Done() for item := range tasks { results[item] = processTask(item) } }() }
for _, item := range items { tasks <- item } close(tasks) wg.Wait()
fmt.Println(results)}Interview Questions
Section titled “Interview Questions”Q1: “How would you size a thread pool for CPU-bound vs I/O-bound tasks?”
Section titled “Q1: “How would you size a thread pool for CPU-bound vs I/O-bound tasks?””Answer:
- CPU-bound:
N_threads = N_cores(more threads waste resources) - I/O-bound:
N_threads = N_cores * (1 + W/C)where W=wait time, C=compute time - Reason: I/O-bound tasks spend time waiting, so more threads can be utilized
Q2: “What’s the difference between FixedThreadPool and CachedThreadPool?”
Section titled “Q2: “What’s the difference between FixedThreadPool and CachedThreadPool?””Answer:
- FixedThreadPool: Fixed threads, unbounded queue, threads live until shutdown
- CachedThreadPool: Creates threads on demand, no queue, idle threads terminated after 60s
- Use FixedThreadPool: Predictable workloads, CPU-bound tasks
- Use CachedThreadPool: Short-lived tasks, I/O-bound, unpredictable workloads
Q3: “How does work stealing work in ForkJoinPool?”
Section titled “Q3: “How does work stealing work in ForkJoinPool?””Answer:
- Each thread has its own deque (double-ended queue)
- Threads push/pop from their own deque (LIFO)
- When a thread’s deque is empty, it steals from the tail of another thread’s deque
- Benefits: Better load balancing, higher CPU utilization
- Ideal for: Divide-and-conquer algorithms, recursive tasks
Q4: “When would you use ProcessPoolExecutor vs ThreadPoolExecutor in Python?”
Section titled “Q4: “When would you use ProcessPoolExecutor vs ThreadPoolExecutor in Python?””Answer:
- ProcessPoolExecutor: CPU-bound tasks (bypasses GIL), true parallelism
- ThreadPoolExecutor: I/O-bound tasks (GIL released during I/O), lower overhead
- Choose based on: Task type (CPU vs I/O), not just performance
Key Takeaways
Section titled “Key Takeaways”Next Steps
Section titled “Next Steps”Continue learning concurrency:
- Concurrent Collections - Thread-safe data structures
- Asynchronous Patterns - Futures and async/await
Mastering thread pools is essential for building efficient concurrent systems! ⚙️