Asynchronous Patterns
Modern non-blocking concurrency with Futures and async/await.
Introduction: Futures and Promises
Section titled “Introduction: Futures and Promises”Futures and Promises represent values that don’t exist yet, enabling asynchronous computation without blocking threads.
Visual: Future Concept
Section titled “Visual: Future Concept”Blocking vs Non-Blocking I/O
Section titled “Blocking vs Non-Blocking I/O”Understanding the difference is crucial!
Visual: Blocking vs Non-Blocking
Section titled “Visual: Blocking vs Non-Blocking”Java: CompletableFuture
Section titled “Java: CompletableFuture”CompletableFuture is Java’s modern way to handle asynchronous operations.
Creating CompletableFutures
Section titled “Creating CompletableFutures”import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;
public class CompletableFutureBasics { public static void main(String[] args) throws ExecutionException, InterruptedException { // Create with supplyAsync (returns value) CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "Hello"; });
// Create with runAsync (no return value) CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { System.out.println("Running async task"); });
// Get result (blocks until ready) String result = future1.get(); System.out.println("Result: " + result); }}package main
import ( "fmt" "time")
func main() { // Create a channel-based "future" (returns value) future1 := make(chan string, 1) go func() { time.Sleep(1 * time.Second) future1 <- "Hello" }()
// Fire-and-forget goroutine (no return value) done := make(chan struct{}, 1) go func() { fmt.Println("Running async task") done <- struct{}{} }()
// Get result (blocks until ready) result := <-future1 fmt.Println("Result:", result)
<-done}Chaining Operations
Section titled “Chaining Operations”thenApply vs thenCompose
Section titled “thenApply vs thenCompose”thenApply: Transforms result synchronously (returns value)thenCompose: Chains another CompletableFuture (returns Future)
Visual: thenApply vs thenCompose
Section titled “Visual: thenApply vs thenCompose”Example: Chaining Operations
Section titled “Example: Chaining Operations”import java.util.concurrent.CompletableFuture;
public class CompletableFutureChaining { public static void main(String[] args) { // Chain operations CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> "Hello") .thenApply(s -> s + " World") // Synchronous transformation .thenApply(String::toUpperCase); // Another transformation
future.thenAccept(System.out::println); // Consume result
// thenCompose for async chaining CompletableFuture<String> future2 = CompletableFuture .supplyAsync(() -> "user123") .thenCompose(userId -> fetchUserData(userId)); // Returns Future
// Exception handling CompletableFuture<String> future3 = CompletableFuture .supplyAsync(() -> { if (Math.random() > 0.5) { throw new RuntimeException("Error!"); } return "Success"; }) .exceptionally(ex -> "Handled: " + ex.getMessage()) // Handle exception .thenApply(s -> "Result: " + s); }
static CompletableFuture<String> fetchUserData(String userId) { return CompletableFuture.supplyAsync(() -> { // Simulate async fetch return "Data for " + userId; }); }}package main
import ( "errors" "fmt" "math/rand")
// pipeline step: transform a string to uppercasefunc toUpper(s string) string { result := make([]byte, len(s)) for i, c := range s { if c >= 'a' && c <= 'z' { result[i] = byte(c - 32) } else { result[i] = byte(c) } } return string(result)}
// fetchUserData simulates an async fetch — returns a channelfunc fetchUserData(userID string) <-chan string { ch := make(chan string, 1) go func() { ch <- "Data for " + userID }() return ch}
func main() { // Chain: "Hello" -> "Hello World" -> "HELLO WORLD" step1 := make(chan string, 1) go func() { step1 <- "Hello" }()
step2 := make(chan string, 1) go func() { s := <-step1 step2 <- s + " World" }()
step3 := make(chan string, 1) go func() { s := <-step2 step3 <- toUpper(s) }() fmt.Println(<-step3) // "HELLO WORLD"
// thenCompose equivalent: chain another async operation userDataCh := fetchUserData("user123") fmt.Println(<-userDataCh)
// Exception handling equivalent resultCh := make(chan string, 1) go func() { if rand.Float64() > 0.5 { resultCh <- "Handled: Error!" } else { resultCh <- "Result: Success" } }() fmt.Println(<-resultCh)}Combining Futures
Section titled “Combining Futures”allOf: Wait for all futures to completeanyOf: Wait for any future to complete
import java.util.concurrent.CompletableFuture;import java.util.Arrays;
public class CombiningFutures { public static void main(String[] args) { // Create multiple futures CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result 1"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result 2"); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Result 3");
// Wait for all CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3); allFutures.thenRun(() -> { System.out.println("All completed!"); });
// Wait for any CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2, future3); anyFuture.thenAccept(result -> { System.out.println("First result: " + result); });
// Combine two futures CompletableFuture<String> combined = future1.thenCombine(future2, (r1, r2) -> r1 + " + " + r2); }}package main
import ( "fmt" "sync")
func asyncTask(result string) <-chan string { ch := make(chan string, 1) go func() { ch <- result }() return ch}
func main() { ch1 := asyncTask("Result 1") ch2 := asyncTask("Result 2") ch3 := asyncTask("Result 3")
// allOf equivalent: wait for all var wg sync.WaitGroup results := make([]string, 3) channels := []<-chan string{ch1, ch2, ch3} for i, ch := range channels { wg.Add(1) go func(idx int, c <-chan string) { defer wg.Done() results[idx] = <-c }(i, ch) } wg.Wait() fmt.Println("All completed:", results)
// anyOf equivalent: wait for first ch4 := asyncTask("Fast 1") ch5 := asyncTask("Fast 2") ch6 := asyncTask("Fast 3") select { case r := <-ch4: fmt.Println("First result:", r) case r := <-ch5: fmt.Println("First result:", r) case r := <-ch6: fmt.Println("First result:", r) }
// Combine two futures a := asyncTask("Left") b := asyncTask("Right") combined := <-a + " + " + <-b fmt.Println(combined)}Python: asyncio
Section titled “Python: asyncio”Python’s asyncio provides async/await syntax for asynchronous programming.
Event Loop
Section titled “Event Loop”The event loop manages and executes asynchronous tasks.
Visual: Event Loop
Section titled “Visual: Event Loop”Example: Basic asyncio
Section titled “Example: Basic asyncio”import asyncio
async def fetch_data(url): """Async function (coroutine)""" await asyncio.sleep(1) # Simulate I/O return f"Data from {url}"
async def main(): # Run coroutines concurrently results = await asyncio.gather( fetch_data("url1"), fetch_data("url2"), fetch_data("url3") ) print(results)
# Run event loopasyncio.run(main())package main
import ( "fmt" "sync")
func fetchData(url string) string { // Simulate I/O return "Data from " + url}
func main() { urls := []string{"url1", "url2", "url3"} results := make([]string, len(urls)) var wg sync.WaitGroup
// Run fetches concurrently (Go's gather equivalent) for i, url := range urls { wg.Add(1) go func(idx int, u string) { defer wg.Done() results[idx] = fetchData(u) }(i, url) }
wg.Wait() fmt.Println(results)}concurrent.futures.Future vs asyncio.Future
Section titled “concurrent.futures.Future vs asyncio.Future”from concurrent.futures import ThreadPoolExecutor, Future as ThreadFutureimport asyncio
# ThreadPoolExecutor Futuredef sync_task(): return "Result"
with ThreadPoolExecutor() as executor: thread_future = executor.submit(sync_task) result = thread_future.result() # Blocks
# asyncio Futureasync def async_task(): await asyncio.sleep(1) return "Result"
async def main(): asyncio_future = asyncio.create_task(async_task()) result = await asyncio_future # Non-blocking print(result)
asyncio.run(main())package main
import ( "fmt" "sync")
// Synchronous task submitted to a "thread pool" via goroutinefunc syncTask() string { return "Result"}
// Async task returning a channel (non-blocking future)func asyncTask() <-chan string { ch := make(chan string, 1) go func() { ch <- "Result" }() return ch}
func main() { // Goroutine-pool style (sync task) var wg sync.WaitGroup var syncResult string wg.Add(1) go func() { defer wg.Done() syncResult = syncTask() // blocks goroutine, not main }() wg.Wait() fmt.Println(syncResult)
// Channel future (non-blocking) future := asyncTask() result := <-future // await equivalent fmt.Println(result)}Cross-Language Comparison
Section titled “Cross-Language Comparison”| Feature | Java | Python |
|---|---|---|
| Future Creation | CompletableFuture.supplyAsync() | asyncio.create_task() or executor.submit() |
| Chaining | thenApply(), thenCompose() | await in coroutines |
| Combining | allOf(), anyOf() | asyncio.gather(), asyncio.wait() |
| Exception Handling | exceptionally(), handle() | try/except in coroutines |
| Event Loop | Implicit (ForkJoinPool) | Explicit (asyncio.run()) |
Real-World Example: Async API Client
Section titled “Real-World Example: Async API Client”import java.util.concurrent.CompletableFuture;import java.util.List;import java.util.stream.Collectors;
public class AsyncAPIClient { public CompletableFuture<String> fetchUser(String userId) { return CompletableFuture.supplyAsync(() -> { // Simulate API call try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "User: " + userId; }); }
public CompletableFuture<List<String>> fetchUsers(List<String> userIds) { List<CompletableFuture<String>> futures = userIds.stream() .map(this::fetchUser) .collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); }}import asyncio
class AsyncAPIClient: async def fetch_user(self, user_id): """Simulate async API call""" await asyncio.sleep(0.1) return f"User: {user_id}"
async def fetch_users(self, user_ids): """Fetch multiple users concurrently""" tasks = [self.fetch_user(uid) for uid in user_ids] return await asyncio.gather(*tasks)
# Usageasync def main(): client = AsyncAPIClient() users = await client.fetch_users(["user1", "user2", "user3"]) print(users)
asyncio.run(main())package main
import ( "fmt" "sync" "time")
type AsyncAPIClient struct{}
func (c *AsyncAPIClient) fetchUser(userID string) <-chan string { ch := make(chan string, 1) go func() { time.Sleep(100 * time.Millisecond) // simulate API call ch <- "User: " + userID }() return ch}
func (c *AsyncAPIClient) fetchUsers(userIDs []string) []string { channels := make([]<-chan string, len(userIDs)) for i, id := range userIDs { channels[i] = c.fetchUser(id) }
// Collect all results concurrently results := make([]string, len(userIDs)) var wg sync.WaitGroup for i, ch := range channels { wg.Add(1) go func(idx int, c <-chan string) { defer wg.Done() results[idx] = <-c }(i, ch) } wg.Wait() return results}
func main() { client := &AsyncAPIClient{} users := client.fetchUsers([]string{"user1", "user2", "user3"}) fmt.Println(users)}Interview Questions
Section titled “Interview Questions”Q1: “What’s the difference between thenApply and thenCompose?”
Section titled “Q1: “What’s the difference between thenApply and thenCompose?””Answer:
thenApply: Synchronous transformation, takes value, returns valuethenCompose: Asynchronous chaining, takes value, returns Future- Use
thenApply: For simple transformations - Use
thenCompose: When you need to chain another async operation
Q2: “When would you use async/await vs threads in Python?”
Section titled “Q2: “When would you use async/await vs threads in Python?””Answer:
- async/await: I/O-bound concurrent operations, many connections, event-driven
- threads: CPU-bound tasks, simpler I/O scenarios, when you need OS-level parallelism
- Choose: Based on task type and concurrency requirements
Key Takeaways
Section titled “Key Takeaways”Next Steps
Section titled “Next Steps”- Lock-Free Programming - CAS and atomic operations
- Concurrency Hazards - Deadlocks and race conditions
Mastering asynchronous patterns enables efficient non-blocking systems! ⚡