What Is CompletableFuture?
CompletableFuture, introduced in Java 8, is an API that lets you declaratively compose asynchronous operations. While the old Future required blocking with get() to wait for results, CompletableFuture builds non-blocking pipelines through callback chaining.
Think of it like package tracking. The old Future is like standing at your door waiting for the delivery. CompletableFuture is like registering instructions ahead of time: “When the package arrives, sign for it, open the box, and check the contents.”
Basic Creation and Execution
// CompletableFutureBasic.java — Basic creation and execution
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CompletableFutureBasic {
public static void main(String[] args) throws Exception {
// 1. supplyAsync — Async task that returns a value
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000); // Simulating 1-second delay
return "Database query result";
});
// Runs on ForkJoinPool.commonPool()
// 2. runAsync — Async task with no return value
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
sleep(500);
System.out.println("Log saved");
});
// 3. Specify custom thread pool (essential for production!)
ExecutorService pool = Executors.newFixedThreadPool(4);
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("Executing thread: " + Thread.currentThread().getName());
return 42;
}, pool); // Pass Executor as second argument
// Check results
System.out.println(future1.get(3, TimeUnit.SECONDS));
// Output: Database query result
System.out.println(future3.get());
// Output:
// Executing thread: pool-1-thread-1
// 42
pool.shutdown();
}
static void sleep(long millis) {
try { Thread.sleep(millis); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Chaining: thenApply, thenCompose, thenAccept
// ChainingExample.java — Async pipeline chaining
import java.util.concurrent.CompletableFuture;
public class ChainingExample {
public static void main(String[] args) throws Exception {
// thenApply — Transform the result (similar to map)
CompletableFuture<String> pipeline = CompletableFuture
.supplyAsync(() -> " Hello, CompletableFuture! ")
.thenApply(String::trim) // Remove whitespace
.thenApply(String::toUpperCase) // Convert to uppercase
.thenApply(s -> s + " [PROCESSED]"); // Append suffix
System.out.println(pipeline.get());
// Output: HELLO, COMPLETABLEFUTURE! [PROCESSED]
// thenCompose — Chain async operations (similar to flatMap)
CompletableFuture<String> composed = getUserId()
.thenCompose(ChainingExample::getUserName) // userId -> userName async lookup
.thenCompose(ChainingExample::getUserEmail); // userName -> email async lookup
System.out.println(composed.get());
// Output: user-42@example.com
// thenAccept — Consume the result (no return value)
CompletableFuture.supplyAsync(() -> "Processed data")
.thenAccept(data -> System.out.println("Received: " + data))
.thenRun(() -> System.out.println("Pipeline finished"));
// Output:
// Received: Processed data
// Pipeline finished
// Async variants — Execute callback on a separate thread
CompletableFuture.supplyAsync(() -> "data")
.thenApplyAsync(s -> {
// This callback runs on a different thread
System.out.println("Callback thread: " + Thread.currentThread().getName());
return s.length();
});
Thread.sleep(500); // Wait for async completion
}
// Step-by-step async methods
static CompletableFuture<Long> getUserId() {
return CompletableFuture.supplyAsync(() -> 42L);
}
static CompletableFuture<String> getUserName(Long id) {
return CompletableFuture.supplyAsync(() -> "user-" + id);
}
static CompletableFuture<String> getUserEmail(String name) {
return CompletableFuture.supplyAsync(() -> name + "@example.com");
}
}
Composition: allOf, anyOf, thenCombine
// CombineExample.java — Combining multiple async operations
import java.util.concurrent.CompletableFuture;
import java.util.List;
import java.util.stream.Stream;
public class CombineExample {
public static void main(String[] args) throws Exception {
// thenCombine — Merge results of two tasks
CompletableFuture<String> priceFuture = CompletableFuture
.supplyAsync(() -> { sleep(800); return 15000; }) // Fetch price
.thenCombine(
CompletableFuture.supplyAsync(() -> { sleep(600); return 0.1; }), // Fetch discount
(price, discount) -> String.format("Final price: %,d", (int)(price * (1 - discount)))
);
System.out.println(priceFuture.get());
// Output: Final price: 13,500
// Both tasks run in parallel -> total ~800ms (sequential would be 1,400ms)
// allOf — Wait for all tasks to complete
CompletableFuture<String> api1 = CompletableFuture.supplyAsync(() -> { sleep(300); return "User info"; });
CompletableFuture<String> api2 = CompletableFuture.supplyAsync(() -> { sleep(500); return "Order history"; });
CompletableFuture<String> api3 = CompletableFuture.supplyAsync(() -> { sleep(200); return "Delivery status"; });
CompletableFuture<List<String>> allResults = CompletableFuture
.allOf(api1, api2, api3)
.thenApply(v -> Stream.of(api1, api2, api3)
.map(CompletableFuture::join) // Already completed, no blocking
.toList()
);
System.out.println(allResults.get());
// Output: [User info, Order history, Delivery status]
// Total ~500ms (based on the slowest task)
// anyOf — Use whichever completes first
CompletableFuture<Object> fastest = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> { sleep(1000); return "Server A response"; }),
CompletableFuture.supplyAsync(() -> { sleep(200); return "Server B response"; }),
CompletableFuture.supplyAsync(() -> { sleep(500); return "Server C response"; })
);
System.out.println(fastest.get());
// Output: Server B response (fastest at 200ms)
}
static void sleep(long millis) {
try { Thread.sleep(millis); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Error Handling
// ErrorHandling.java — CompletableFuture exception handling patterns
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class ErrorHandling {
public static void main(String[] args) throws Exception {
// exceptionally — Return fallback value on exception
CompletableFuture<String> withFallback = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("DB connection failed");
return "data";
})
.exceptionally(ex -> {
System.err.println("Error occurred: " + ex.getMessage());
return "Cached data"; // Fallback value
});
System.out.println(withFallback.get());
// Output:
// Error occurred: java.lang.RuntimeException: DB connection failed
// Cached data
// handle — Handle both success and failure
CompletableFuture<String> handled = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("Random error");
return "Normal result";
})
.handle((result, ex) -> {
if (ex != null) {
return "Default value (error: " + ex.getMessage() + ")";
}
return result + " [verified]";
});
System.out.println(handled.get());
// On success: Normal result [verified]
// On failure: Default value (error: java.lang.RuntimeException: Random error)
// Timeout (Java 9+)
CompletableFuture<String> withTimeout = CompletableFuture
.supplyAsync(() -> { sleep(5000); return "Slow response"; })
.orTimeout(2, TimeUnit.SECONDS) // TimeoutException if exceeds 2 seconds
.exceptionally(ex -> "Timeout default value");
System.out.println(withTimeout.get());
// Output: Timeout default value
// completeOnTimeout (Java 9+) — Default value on timeout
CompletableFuture<String> withDefault = CompletableFuture
.supplyAsync(() -> { sleep(5000); return "Slow response"; })
.completeOnTimeout("Fast default value", 1, TimeUnit.SECONDS);
System.out.println(withDefault.get());
// Output: Fast default value
}
static void sleep(long millis) {
try { Thread.sleep(millis); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Thread Pool Design
// ThreadPoolConfig.java — Production thread pool configuration
import java.util.concurrent.*;
public class ThreadPoolConfig {
public static void main(String[] args) {
int cpuCores = Runtime.getRuntime().availableProcessors();
// CPU-bound tasks: core count + 1
ExecutorService cpuPool = Executors.newFixedThreadPool(cpuCores + 1);
// I/O-bound tasks: core count * (1 + wait/processing ratio)
// If wait time is 10x processing time: cores * 11
ExecutorService ioPool = new ThreadPoolExecutor(
cpuCores * 2, // Core thread count
cpuCores * 10, // Maximum thread count
60L, TimeUnit.SECONDS, // Idle thread keep-alive time
new LinkedBlockingQueue<>(1000), // Task queue (must be bounded!)
new ThreadPoolExecutor.CallerRunsPolicy() // Run on caller thread when queue is full
);
// Usage example
CompletableFuture<String> dbQuery = CompletableFuture
.supplyAsync(() -> "DB result", ioPool); // I/O task -> ioPool
CompletableFuture<Integer> calculation = CompletableFuture
.supplyAsync(() -> fibonacci(40), cpuPool); // CPU task -> cpuPool
System.out.println("CPU cores: " + cpuCores);
// Output: CPU cores: 8 (varies by environment)
cpuPool.shutdown();
ioPool.shutdown();
}
static int fibonacci(int n) {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
}
Practical Tips
Key principles for using CompletableFuture effectively.
- Always use a custom thread pool. The default
ForkJoinPool.commonPool()is shared by all CompletableFutures, so a single slow task can block the entire system - Distinguish
thenApplyvsthenComposein chaining. UsethenApplyfor synchronous transformations andthenComposefor chaining async operations. Using the wrong one results inCompletableFuture<CompletableFuture<T>>nesting - Always set timeouts. Use Java 9+
orTimeout()orcompleteOnTimeout() - Use
get()with a timeout instead ofjoin().join()can block indefinitely - Place error handling at the end of the chain. Putting
exceptionally()at the end of the pipeline catches exceptions from all stages - On Java 21+, combining with Virtual Threads can achieve even higher throughput for I/O-bound tasks