When ConcurrentHashMap is not concurrent and runBlocking is not blocking
A (click)bait? Let me explain.
Take a look at this simple code. Can you predict what will be printed?
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.ConcurrentHashMap
//sampleStart
fun main() {
val map = ConcurrentHashMap<String, String>()
runBlocking {
launch {
map.computeIfAbsent("key") {
runBlocking {
delay(100)
"value1"
}
}
}
launch {
map.computeIfAbsent("key") {
runBlocking {
delay(500)
"value2"
}
}
}
}
println(map["key"])
}
//sampleEnd
Before we dive into what’s wrong here, let’s address an obvious concern:
Does this code even make sense? It looks weird. Surely not a real-world code!
But it kind of is.
Sure, it’s synthetic and simplified, but it represents a valid real-world scenario I encountered recently.
The essence is straightforward: we have a ConcurrentHashMap
and two concurrent computations, both trying to set the value for the same key.
In the real world, things were a bit more involved.
We had to preload some data from a third-party service during the startup of a Spring Boot application.
But Spring doesn’t support suspending functions in @PostConstruct
, InitializingBean
, or @EventListener
.
So we went with the simplest option: wrapping the logic in runBlocking
.
To speed things up, we performed the calls in parallel (using async
in the real code).
While each request was loading different data, they all required service-scoped JWTs issued by another token service.
These tokens are short-lived and cached using the service name as the key.
And that cache (Caffeine) is, essentially, a ConcurrentHashMap
.
So, if two parallel requests for the same service kick off and no token is cached yet, both will try to call the token service and cache the result.
Since Java’s computeIfAbsent
doesn’t accept suspend
functions, we wrapped that in another runBlocking
.
A code smell?
Maybe.
Nested runBlocking
s, expensive compurations inside computeIfAbsent
…
But hey, ConcurrentHashMap
is concurrent, right?
At worst, we’d waste a call or two to the token service.
One thread wins and sets the value; the other one moves on.
Except… The code is completely broken:
Exception in thread "main" java.lang.IllegalStateException: Recursive update
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1763)
Let’s dig in.
The exception is thrown here.
So there is a variable f
which is an instance of a ReservationNode
.
And the docs for that class immediately prove that we’re on a right track:
A place-holder node used in computeIfAbsent and compute.
Going back in the flow brings us to these lines.
There’s a strange bit: it synchronizes on a local variable. Weird? Yes. But there’s a good reason: that’s how ConcurrentHashMap
avoids locking the entire map.
Simplifying a bit: it reserves a slot for the key and synchronizes on that specific entry.
Only one thread may enter that block.
Note
|
IntelliJ IDEA has a neat feature, helping quickly understand how the execution got to a particular point, called “Analyze Data Flow”. |
Let’s sprinkle in some logging to observe the threads in action:
fun printlnWithThreadName(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
Here’s the code again with logging:
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.ConcurrentHashMap
fun printlnWithThreadName(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
//sampleStart
fun main() {
printlnWithThreadName("Started main")
val map = ConcurrentHashMap<String, String>()
runBlocking {
printlnWithThreadName("Entered runBlocking")
launch {
printlnWithThreadName("Started launch (1)")
map.computeIfAbsent("key") {
printlnWithThreadName("Computing key (1)")
runBlocking {
printlnWithThreadName("Computing key (1), inside runBlocking")
delay(100)
"value1"
}
}
}
launch {
printlnWithThreadName("Started launch (2)")
map.computeIfAbsent("key") {
printlnWithThreadName("Computing key (2)")
runBlocking {
printlnWithThreadName("Computing key (2), inside runBlocking")
delay(500)
"value2"
}
}
}
}
println(map["key"])
}
//sampleEnd
And the output:
[main] Started main
[main @coroutine#1] Entered runBlocking
[main @coroutine#2] Started launch (1)
[main @coroutine#2] Computing key (1)
[main @coroutine#3] Started launch (2)
[main @coroutine#4] Computing key (1), inside runBlocking
Turns out, runBlocking
… isn’t truly blocking in the way one might expect.
So how is it starting multiple computations?
They’re not running in parallel, after all, a single thread can’t do that.
But they do appear to be running concurrently.
No surprise there: that’s exactly how coroutines work.
It’s cooperative multitasking: when a coroutine suspends, the thread can resume another coroutine that’s ready to continue.
Multiple coroutines can take turns on a single thread.
This is when the runBlocking
's documentation starts to make sense:
The default
CoroutineDispatcher
for this builder is an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine. SeeCoroutineDispatcher
for the other implementations that are provided bykotlinx.coroutines
.When
CoroutineDispatcher
is explicitly specified in the context, then the new coroutine runs in the context of the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of anotherrunBlocking
, then this invocation uses the outer event loop.
So: runBlocking
does block the calling thread until the coroutine completes.
However, within that thread, it runs an event loop to dispatch and resume coroutines created inside the same runBlocking
block.
Since all these coroutines run on the same thread, what happens when they hit a synchronized block, like the one inside the computeIfAbsent
?
In other words: if a coroutine enters a synchronized block and then suspends, could another coroutine resume on the same thread while the monitor is still held?
Is it just… let in?
Turns out that the answer is "yes". And that’s the problem.
ConcurrentHashMap
is thread-safe, but not coroutine-safe.
Another finding: those launch
blocks (or async
s) are generally pointless.
Of course IO operations like this benefit from suspension, but there’s no actual parallelism if everything is running on a single thread.
The code had been running like this for ages. What revealed the issue? I was updating our HTTP clients, introducing that exact token cache and discovered a bug in the preload logic!
Let’s give the outer runBlocking a proper CoroutineDispatcher
:
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.ConcurrentHashMap
fun printlnWithThreadName(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
//sampleStart
fun main() {
printlnWithThreadName("Started main")
val map = ConcurrentHashMap<String, String>()
runBlocking(Dispatchers.IO) {
printlnWithThreadName("Entered runBlocking")
launch {
printlnWithThreadName("Started launch (1)")
map.computeIfAbsent("key") {
printlnWithThreadName("Computing key (1)")
runBlocking {
printlnWithThreadName("Computing key (1), inside runBlocking")
delay(100)
"value1"
}
}
}
launch {
printlnWithThreadName("Started launch (2)")
map.computeIfAbsent("key") {
printlnWithThreadName("Computing key (2)")
runBlocking {
printlnWithThreadName("Computing key (2), inside runBlocking")
delay(500)
"value2"
}
}
}
}
println(map["key"])
}
//sampleEnd
Now, everything works:
[main] Started main
[DefaultDispatcher-worker-2 @coroutine#1] Entered runBlocking
[DefaultDispatcher-worker-3 @coroutine#2] Started launch (1)
[DefaultDispatcher-worker-3 @coroutine#2] Computing key (1)
[DefaultDispatcher-worker-1 @coroutine#3] Started launch (2)
[DefaultDispatcher-worker-3 @coroutine#4] Computing key (1), inside runBlocking
value1
Even limiting parallelism to one (e.g., Dispatchers.IO.limitedParallelism(1)
, try it!) seems to fix the issue.
Is the root cause in the event-loop nature of the default dispatcher in runBlocking
?
I still think, that the ConcurrentHashMap
is not, generally, coroutine-safe.
Even if you’re using multiple physical threads with a proper dispatcher, there’s still a chance that two computeIfAbsent
operations could be scheduled on the same thread if they’re suspendable.
Although this case would be very rare.
What do you think?