Complete Guide to Kotlin Coroutines — launch, async, Flow

What Are Coroutines?

Coroutines are Kotlin’s solution for asynchronous programming. They are lighter than threads and allow you to write asynchronous logic in a sequential code style without callback hell. Through suspend functions, execution can be paused and resumed, and Structured Concurrency safely manages their lifecycle.

This article covers coroutine fundamentals through Flow, with practical patterns throughout.

launch and async — Coroutine Builders

launch creates a coroutine that does not return a result, while async creates one that does.

import kotlinx.coroutines.*

// suspend function — Can only be called inside a coroutine
suspend fun fetchUser(id: Int): String {
    delay(1000)  // Wait 1 second (does not block the thread)
    return "user_$id"
}

suspend fun fetchOrders(userId: String): List<String> {
    delay(800)   // Wait 0.8 seconds
    return listOf("OrderA", "OrderB", "OrderC")
}

fun main() = runBlocking {
    println("Start: ${Thread.currentThread().name}")

    // launch — Async task with no result needed
    val job = launch {
        println("Background task started")
        delay(500)
        println("Background task completed")
    }

    // async — Async task that returns a result
    val deferred = async {
        fetchUser(1)
    }
    val user = deferred.await()  // Wait for result
    println("User: $user")

    // Parallel execution — Run multiple tasks concurrently with async
    val startTime = System.currentTimeMillis()

    val user1 = async { fetchUser(1) }
    val user2 = async { fetchUser(2) }
    val user3 = async { fetchUser(3) }

    // Await all results concurrently
    println("Results: ${user1.await()}, ${user2.await()}, ${user3.await()}")
    val elapsed = System.currentTimeMillis() - startTime
    println("Elapsed: ${elapsed}ms (parallel, so about 1000ms)")

    job.join()  // Wait for launch task to complete
    // Output:
    // Start: main
    // Background task started
    // User: user_1
    // Background task completed
    // Results: user_1, user_2, user_3
    // Elapsed: 1012ms (parallel, so about 1000ms)
}

By starting multiple tasks concurrently with async and collecting results with await(), you can significantly reduce execution time compared to sequential execution.

Structured Concurrency and Coroutine Scopes

Coroutines must always run within a CoroutineScope. When a parent coroutine is cancelled, all child coroutines are cancelled too. This is called Structured Concurrency.

import kotlinx.coroutines.*

// Managing coroutines in a custom scope
class DataProcessor {
    // Dedicated scope — cancellable
    private val scope = CoroutineScope(
        Dispatchers.Default + SupervisorJob()
    )

    fun processAsync(items: List<String>) {
        scope.launch {
            println("Processing started: ${items.size} items")

            // coroutineScope — Waits until all children complete
            coroutineScope {
                items.forEach { item ->
                    launch {
                        processItem(item)
                    }
                }
            }
            println("All items processed")
        }
    }

    private suspend fun processItem(item: String) {
        delay(500)  // Processing simulation
        println("Processed: $item [${Thread.currentThread().name}]")
    }

    fun cancel() {
        scope.cancel()  // Cancel all coroutines
        println("Processor cancelled")
    }
}

fun main() = runBlocking {
    // withContext — Switch dispatchers
    val result = withContext(Dispatchers.IO) {
        // Runs on IO thread (files, network, etc.)
        "Data fetched from IO thread"
    }
    println(result)
    // Output: Data fetched from IO thread

    // Timeout handling
    val data = withTimeoutOrNull(1500) {
        delay(1000)
        "Success data"
    }
    println("Timeout result: $data")
    // Output: Timeout result: Success data

    val expired = withTimeoutOrNull(500) {
        delay(1000)
        "This value will not be returned"
    }
    println("Expired result: $expired")
    // Output: Expired result: null

    // Exception handling
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Exception occurred: ${exception.message}")
    }

    val job = launch(handler) {
        throw RuntimeException("Coroutine internal error")
    }
    job.join()
    // Output: Exception occurred: Coroutine internal error
}

Flow — Asynchronous Data Streams

Flow is an asynchronous stream that emits multiple values sequentially. It is the async version of Sequence and can replace reactive streams (RxJava).

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Creating a Flow — Emit multiple values asynchronously
fun numbersFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(300)  // Async wait
        emit(i)     // Emit value
        println("Emitted: $i")
    }
}

// Simulating a real-time data stream
fun sensorDataFlow(): Flow<Double> = flow {
    repeat(5) {
        delay(500)
        val value = 20.0 + (Math.random() * 10)
        emit(value)
    }
}

fun main() = runBlocking {
    // Basic Flow collection
    println("=== Basic Flow ===")
    numbersFlow().collect { value ->
        println("Collected: $value")
    }
    // Output:
    // Emitted: 1
    // Collected: 1
    // Emitted: 2
    // Collected: 2
    // ... (repeats to 5)

    // Flow operators — map, filter, take
    println("\n=== Flow Operators ===")
    numbersFlow()
        .filter { it % 2 != 0 }       // Odd numbers only
        .map { it * it }               // Square
        .take(2)                       // First 2 only
        .collect { println("Result: $it") }
    // Output:
    // Result: 1
    // Result: 9

    // Sensor data stream
    println("\n=== Sensor Data ===")
    sensorDataFlow()
        .map { String.format("%.1f°C", it) }
        .collect { println("Temperature: $it") }
    // Output (random values):
    // Temperature: 25.3°C
    // Temperature: 22.8°C
    // ... (5 times)

    // flowOf, asFlow — Simple Flow creation
    println("\n=== Simple Flow ===")
    flowOf("A", "B", "C")
        .onEach { delay(100) }
        .collect { print("$it ") }
    println()
    // Output: A B C

    listOf(1, 2, 3, 4, 5)
        .asFlow()
        .reduce { acc, value -> acc + value }
        .let { println("Sum: $it") }
    // Output: Sum: 15
}

Dispatcher Summary

Coroutine dispatchers determine which thread a coroutine runs on.

  • Dispatchers.Main — UI thread (Android, JavaFX)
  • Dispatchers.IO — I/O operations (files, network, DB)
  • Dispatchers.Default — CPU-intensive tasks (sorting, JSON parsing)
  • Dispatchers.Unconfined — Starts on calling thread (for testing)

Practical Tips

  • Use runBlocking only in tests and main: Use CoroutineScope in production code.
  • Use SupervisorJob: When one child coroutine failure should not affect others, use SupervisorJob.
  • Switch dispatchers with withContext: Using withContext(Dispatchers.IO) instead of launch(Dispatchers.IO) simplifies the coroutine hierarchy.
  • Flow is a Cold stream: It only executes when a collector calls collect. To emit to multiple collectors simultaneously, use SharedFlow or StateFlow.
  • Cooperate with cancellation: In long-running coroutines, check isActive or call ensureActive() to cooperate with cancellation.

Was this article helpful?