Java Concurrency — CompletableFuture Practical Guide

What Is CompletableFuture?

CompletableFuture, introduced in Java 8, is an API that lets you declaratively compose asynchronous operations. While the old Future required blocking with get() to wait for results, CompletableFuture builds non-blocking pipelines through callback chaining.

Think of it like package tracking. The old Future is like standing at your door waiting for the delivery. CompletableFuture is like registering instructions ahead of time: “When the package arrives, sign for it, open the box, and check the contents.”

Basic Creation and Execution

// CompletableFutureBasic.java — Basic creation and execution
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CompletableFutureBasic {
    public static void main(String[] args) throws Exception {
        // 1. supplyAsync — Async task that returns a value
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            sleep(1000); // Simulating 1-second delay
            return "Database query result";
        });
        // Runs on ForkJoinPool.commonPool()

        // 2. runAsync — Async task with no return value
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            sleep(500);
            System.out.println("Log saved");
        });

        // 3. Specify custom thread pool (essential for production!)
        ExecutorService pool = Executors.newFixedThreadPool(4);
        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Executing thread: " + Thread.currentThread().getName());
            return 42;
        }, pool); // Pass Executor as second argument

        // Check results
        System.out.println(future1.get(3, TimeUnit.SECONDS));
        // Output: Database query result

        System.out.println(future3.get());
        // Output:
        // Executing thread: pool-1-thread-1
        // 42

        pool.shutdown();
    }

    static void sleep(long millis) {
        try { Thread.sleep(millis); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Chaining: thenApply, thenCompose, thenAccept

// ChainingExample.java — Async pipeline chaining
import java.util.concurrent.CompletableFuture;

public class ChainingExample {
    public static void main(String[] args) throws Exception {

        // thenApply — Transform the result (similar to map)
        CompletableFuture<String> pipeline = CompletableFuture
            .supplyAsync(() -> "  Hello, CompletableFuture!  ")
            .thenApply(String::trim)           // Remove whitespace
            .thenApply(String::toUpperCase)    // Convert to uppercase
            .thenApply(s -> s + " [PROCESSED]"); // Append suffix

        System.out.println(pipeline.get());
        // Output: HELLO, COMPLETABLEFUTURE! [PROCESSED]

        // thenCompose — Chain async operations (similar to flatMap)
        CompletableFuture<String> composed = getUserId()
            .thenCompose(ChainingExample::getUserName)   // userId -> userName async lookup
            .thenCompose(ChainingExample::getUserEmail);  // userName -> email async lookup

        System.out.println(composed.get());
        // Output: user-42@example.com

        // thenAccept — Consume the result (no return value)
        CompletableFuture.supplyAsync(() -> "Processed data")
            .thenAccept(data -> System.out.println("Received: " + data))
            .thenRun(() -> System.out.println("Pipeline finished"));
        // Output:
        // Received: Processed data
        // Pipeline finished

        // Async variants — Execute callback on a separate thread
        CompletableFuture.supplyAsync(() -> "data")
            .thenApplyAsync(s -> {
                // This callback runs on a different thread
                System.out.println("Callback thread: " + Thread.currentThread().getName());
                return s.length();
            });

        Thread.sleep(500); // Wait for async completion
    }

    // Step-by-step async methods
    static CompletableFuture<Long> getUserId() {
        return CompletableFuture.supplyAsync(() -> 42L);
    }

    static CompletableFuture<String> getUserName(Long id) {
        return CompletableFuture.supplyAsync(() -> "user-" + id);
    }

    static CompletableFuture<String> getUserEmail(String name) {
        return CompletableFuture.supplyAsync(() -> name + "@example.com");
    }
}

Composition: allOf, anyOf, thenCombine

// CombineExample.java — Combining multiple async operations
import java.util.concurrent.CompletableFuture;
import java.util.List;
import java.util.stream.Stream;

public class CombineExample {
    public static void main(String[] args) throws Exception {

        // thenCombine — Merge results of two tasks
        CompletableFuture<String> priceFuture = CompletableFuture
            .supplyAsync(() -> { sleep(800); return 15000; })    // Fetch price
            .thenCombine(
                CompletableFuture.supplyAsync(() -> { sleep(600); return 0.1; }), // Fetch discount
                (price, discount) -> String.format("Final price: %,d", (int)(price * (1 - discount)))
            );

        System.out.println(priceFuture.get());
        // Output: Final price: 13,500
        // Both tasks run in parallel -> total ~800ms (sequential would be 1,400ms)

        // allOf — Wait for all tasks to complete
        CompletableFuture<String> api1 = CompletableFuture.supplyAsync(() -> { sleep(300); return "User info"; });
        CompletableFuture<String> api2 = CompletableFuture.supplyAsync(() -> { sleep(500); return "Order history"; });
        CompletableFuture<String> api3 = CompletableFuture.supplyAsync(() -> { sleep(200); return "Delivery status"; });

        CompletableFuture<List<String>> allResults = CompletableFuture
            .allOf(api1, api2, api3)
            .thenApply(v -> Stream.of(api1, api2, api3)
                .map(CompletableFuture::join) // Already completed, no blocking
                .toList()
            );

        System.out.println(allResults.get());
        // Output: [User info, Order history, Delivery status]
        // Total ~500ms (based on the slowest task)

        // anyOf — Use whichever completes first
        CompletableFuture<Object> fastest = CompletableFuture.anyOf(
            CompletableFuture.supplyAsync(() -> { sleep(1000); return "Server A response"; }),
            CompletableFuture.supplyAsync(() -> { sleep(200);  return "Server B response"; }),
            CompletableFuture.supplyAsync(() -> { sleep(500);  return "Server C response"; })
        );

        System.out.println(fastest.get());
        // Output: Server B response (fastest at 200ms)
    }

    static void sleep(long millis) {
        try { Thread.sleep(millis); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Error Handling

// ErrorHandling.java — CompletableFuture exception handling patterns
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class ErrorHandling {
    public static void main(String[] args) throws Exception {

        // exceptionally — Return fallback value on exception
        CompletableFuture<String> withFallback = CompletableFuture
            .supplyAsync(() -> {
                if (true) throw new RuntimeException("DB connection failed");
                return "data";
            })
            .exceptionally(ex -> {
                System.err.println("Error occurred: " + ex.getMessage());
                return "Cached data"; // Fallback value
            });

        System.out.println(withFallback.get());
        // Output:
        // Error occurred: java.lang.RuntimeException: DB connection failed
        // Cached data

        // handle — Handle both success and failure
        CompletableFuture<String> handled = CompletableFuture
            .supplyAsync(() -> {
                if (Math.random() > 0.5) throw new RuntimeException("Random error");
                return "Normal result";
            })
            .handle((result, ex) -> {
                if (ex != null) {
                    return "Default value (error: " + ex.getMessage() + ")";
                }
                return result + " [verified]";
            });

        System.out.println(handled.get());
        // On success: Normal result [verified]
        // On failure: Default value (error: java.lang.RuntimeException: Random error)

        // Timeout (Java 9+)
        CompletableFuture<String> withTimeout = CompletableFuture
            .supplyAsync(() -> { sleep(5000); return "Slow response"; })
            .orTimeout(2, TimeUnit.SECONDS)          // TimeoutException if exceeds 2 seconds
            .exceptionally(ex -> "Timeout default value");

        System.out.println(withTimeout.get());
        // Output: Timeout default value

        // completeOnTimeout (Java 9+) — Default value on timeout
        CompletableFuture<String> withDefault = CompletableFuture
            .supplyAsync(() -> { sleep(5000); return "Slow response"; })
            .completeOnTimeout("Fast default value", 1, TimeUnit.SECONDS);

        System.out.println(withDefault.get());
        // Output: Fast default value
    }

    static void sleep(long millis) {
        try { Thread.sleep(millis); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Thread Pool Design

// ThreadPoolConfig.java — Production thread pool configuration
import java.util.concurrent.*;

public class ThreadPoolConfig {
    public static void main(String[] args) {
        int cpuCores = Runtime.getRuntime().availableProcessors();

        // CPU-bound tasks: core count + 1
        ExecutorService cpuPool = Executors.newFixedThreadPool(cpuCores + 1);

        // I/O-bound tasks: core count * (1 + wait/processing ratio)
        // If wait time is 10x processing time: cores * 11
        ExecutorService ioPool = new ThreadPoolExecutor(
            cpuCores * 2,             // Core thread count
            cpuCores * 10,            // Maximum thread count
            60L, TimeUnit.SECONDS,    // Idle thread keep-alive time
            new LinkedBlockingQueue<>(1000), // Task queue (must be bounded!)
            new ThreadPoolExecutor.CallerRunsPolicy() // Run on caller thread when queue is full
        );

        // Usage example
        CompletableFuture<String> dbQuery = CompletableFuture
            .supplyAsync(() -> "DB result", ioPool);     // I/O task -> ioPool

        CompletableFuture<Integer> calculation = CompletableFuture
            .supplyAsync(() -> fibonacci(40), cpuPool); // CPU task -> cpuPool

        System.out.println("CPU cores: " + cpuCores);
        // Output: CPU cores: 8 (varies by environment)

        cpuPool.shutdown();
        ioPool.shutdown();
    }

    static int fibonacci(int n) {
        if (n <= 1) return n;
        return fibonacci(n - 1) + fibonacci(n - 2);
    }
}

Practical Tips

Key principles for using CompletableFuture effectively.

  • Always use a custom thread pool. The default ForkJoinPool.commonPool() is shared by all CompletableFutures, so a single slow task can block the entire system
  • Distinguish thenApply vs thenCompose in chaining. Use thenApply for synchronous transformations and thenCompose for chaining async operations. Using the wrong one results in CompletableFuture<CompletableFuture<T>> nesting
  • Always set timeouts. Use Java 9+ orTimeout() or completeOnTimeout()
  • Use get() with a timeout instead of join(). join() can block indefinitely
  • Place error handling at the end of the chain. Putting exceptionally() at the end of the pipeline catches exceptions from all stages
  • On Java 21+, combining with Virtual Threads can achieve even higher throughput for I/O-bound tasks

Was this article helpful?