When ConcurrentHashMap is not concurrent and runBlocking is not blocking

A (click)bait? Let me explain.

Take a look at this simple code. Can you predict what will be printed?

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.util.concurrent.ConcurrentHashMap //sampleStart fun main() { val map = ConcurrentHashMap<String, String>() runBlocking { launch { map.computeIfAbsent("key") { runBlocking { delay(100) "value1" } } } launch { map.computeIfAbsent("key") { runBlocking { delay(500) "value2" } } } } println(map["key"]) } //sampleEnd

Before we dive into what’s wrong here, let’s address an obvious concern:

Does this code even make sense? It looks weird. Surely not a real-world code!

But it kind of is. Sure, it’s synthetic and simplified, but it represents a valid real-world scenario I encountered recently. The essence is straightforward: we have a ConcurrentHashMap and two concurrent computations, both trying to set the value for the same key.

In the real world, things were a bit more involved. We had to preload some data from a third-party service during the startup of a Spring Boot application. But Spring doesn’t support suspending functions in @PostConstruct, InitializingBean, or @EventListener. So we went with the simplest option: wrapping the logic in runBlocking.

To speed things up, we performed the calls in parallel (using async in the real code). While each request was loading different data, they all required service-scoped JWTs issued by another token service. These tokens are short-lived and cached using the service name as the key. And that cache (Caffeine) is, essentially, a ConcurrentHashMap.

So, if two parallel requests for the same service kick off and no token is cached yet, both will try to call the token service and cache the result. Since Java’s computeIfAbsent doesn’t accept suspend functions, we wrapped that in another runBlocking.

A code smell? Maybe. Nested runBlockings, expensive compurations inside computeIfAbsent… But hey, ConcurrentHashMap is concurrent, right? At worst, we’d waste a call or two to the token service. One thread wins and sets the value; the other one moves on.

Except… The code is completely broken:

Exception in thread "main" java.lang.IllegalStateException: Recursive update
 at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1763)

Let’s dig in.

The exception is thrown here. So there is a variable f which is an instance of a ReservationNode. And the docs for that class immediately prove that we’re on a right track:

A place-holder node used in computeIfAbsent and compute.

Going back in the flow brings us to these lines. There’s a strange bit: it synchronizes on a local variable. Weird? Yes. But there’s a good reason: that’s how ConcurrentHashMap avoids locking the entire map. Simplifying a bit: it reserves a slot for the key and synchronizes on that specific entry. Only one thread may enter that block.

Note

IntelliJ IDEA has a neat feature, helping quickly understand how the execution got to a particular point, called “Analyze Data Flow”.

Let’s sprinkle in some logging to observe the threads in action:

fun printlnWithThreadName(message: Any?) { println("[${Thread.currentThread().name}] $message") }

Here’s the code again with logging:

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.util.concurrent.ConcurrentHashMap fun printlnWithThreadName(message: Any?) { println("[${Thread.currentThread().name}] $message") } //sampleStart fun main() { printlnWithThreadName("Started main") val map = ConcurrentHashMap<String, String>() runBlocking { printlnWithThreadName("Entered runBlocking") launch { printlnWithThreadName("Started launch (1)") map.computeIfAbsent("key") { printlnWithThreadName("Computing key (1)") runBlocking { printlnWithThreadName("Computing key (1), inside runBlocking") delay(100) "value1" } } } launch { printlnWithThreadName("Started launch (2)") map.computeIfAbsent("key") { printlnWithThreadName("Computing key (2)") runBlocking { printlnWithThreadName("Computing key (2), inside runBlocking") delay(500) "value2" } } } } println(map["key"]) } //sampleEnd

And the output:

[main] Started main
[main @coroutine#1] Entered runBlocking
[main @coroutine#2] Started launch (1)
[main @coroutine#2] Computing key (1)
[main @coroutine#3] Started launch (2)
[main @coroutine#4] Computing key (1), inside runBlocking

Turns out, runBlocking… isn’t truly blocking in the way one might expect. So how is it starting multiple computations? They’re not running in parallel, after all, a single thread can’t do that. But they do appear to be running concurrently. No surprise there: that’s exactly how coroutines work. It’s cooperative multitasking: when a coroutine suspends, the thread can resume another coroutine that’s ready to continue. Multiple coroutines can take turns on a single thread.

This is when the runBlocking's documentation starts to make sense:

The default CoroutineDispatcher for this builder is an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine. See CoroutineDispatcher for the other implementations that are provided by kotlinx.coroutines.

When CoroutineDispatcher is explicitly specified in the context, then the new coroutine runs in the context of the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another runBlocking, then this invocation uses the outer event loop.

So: runBlocking does block the calling thread until the coroutine completes. However, within that thread, it runs an event loop to dispatch and resume coroutines created inside the same runBlocking block. Since all these coroutines run on the same thread, what happens when they hit a synchronized block, like the one inside the computeIfAbsent? In other words: if a coroutine enters a synchronized block and then suspends, could another coroutine resume on the same thread while the monitor is still held? Is it just… let in?

Turns out that the answer is "yes". And that’s the problem.

ConcurrentHashMap is thread-safe, but not coroutine-safe.

Another finding: those launch blocks (or asyncs) are generally pointless. Of course IO operations like this benefit from suspension, but there’s no actual parallelism if everything is running on a single thread.

The code had been running like this for ages. What revealed the issue? I was updating our HTTP clients, introducing that exact token cache and discovered a bug in the preload logic!

Let’s give the outer runBlocking a proper CoroutineDispatcher:

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.util.concurrent.ConcurrentHashMap fun printlnWithThreadName(message: Any?) { println("[${Thread.currentThread().name}] $message") } //sampleStart fun main() { printlnWithThreadName("Started main") val map = ConcurrentHashMap<String, String>() runBlocking(Dispatchers.IO) { printlnWithThreadName("Entered runBlocking") launch { printlnWithThreadName("Started launch (1)") map.computeIfAbsent("key") { printlnWithThreadName("Computing key (1)") runBlocking { printlnWithThreadName("Computing key (1), inside runBlocking") delay(100) "value1" } } } launch { printlnWithThreadName("Started launch (2)") map.computeIfAbsent("key") { printlnWithThreadName("Computing key (2)") runBlocking { printlnWithThreadName("Computing key (2), inside runBlocking") delay(500) "value2" } } } } println(map["key"]) } //sampleEnd

Now, everything works:

[main] Started main
[DefaultDispatcher-worker-2 @coroutine#1] Entered runBlocking
[DefaultDispatcher-worker-3 @coroutine#2] Started launch (1)
[DefaultDispatcher-worker-3 @coroutine#2] Computing key (1)
[DefaultDispatcher-worker-1 @coroutine#3] Started launch (2)
[DefaultDispatcher-worker-3 @coroutine#4] Computing key (1), inside runBlocking
value1

Even limiting parallelism to one (e.g., Dispatchers.IO.limitedParallelism(1), try it!) seems to fix the issue. Is the root cause in the event-loop nature of the default dispatcher in runBlocking?

I still think, that the ConcurrentHashMap is not, generally, coroutine-safe. Even if you’re using multiple physical threads with a proper dispatcher, there’s still a chance that two computeIfAbsent operations could be scheduled on the same thread if they’re suspendable. Although this case would be very rare.

What do you think?

P.S. Another folks argued about ConcurrentHashMap's safety in coroutine world in Kotlin Slack here. Also, there is a discussion of a very similar case already in the kotlinx.coroutines GitHub: #3982.

comments powered by Disqus