Introduction
Julia was designed with multi-threading with in mind, somewhat, given the scientific computing roots of the language. For example, multi threading was highlighted in a 1.3 release blog. And widely used in the community [1].
Example: sum from 1 to 100
Let's first look at an example function. That simply sum from 1 to 100.
f(i) = (sleep(0.005); i);
state = [0]
for x in 1:100
state[1] += f(x)
end
@show state
state = [5050]
Later, we will try to write concurrent / multi-threaded versions of this function to demosntrate incorrect behavior. (we will discuss the difference between "concurrency" and "multi-threading" in a moment).
So here's a version adopted from the blog, except it doesn't work anymore:
states = [0 for i in 1:Threads.nthreads()]
Threads.@threads for x in 1:100
states[Threads.threadid()] += f(x)
end
@show states
@show sum(states)
states = [925, 924, 1508, 1575]
sum(states) = 4932
What's the problem?
The problem is since Julia 1.8, @threads
doesn't mean "run each iteration of loop body in one thread exclusively", it means "schedule N tasks, N is same as number of OS-threads you have"; but more importantly, it defaults to @threads :dynamic
(the old behavior is equivalent to :static
). Under dynamic scheduling, each task you scheduled can migrate between threads, causing race condition.
This is why our dummy f(i)
includes a sleep call, it's an easy way to induce "yielding" and thus causing tasks to migrate. As an example, here's one way how the execution could have gone wrong:
task 1 start running on thread 1, evaluated
threadid()
to be 1, and hitsleep()
when task 1 finished sleeping, it is now on running on thread 2
but it still tries to "write" to
states[1]
however anotehr task at some point evaluated
threadid() == 1
as well, and is now attemping to write to the same index💥💥💥💥
Essentially, because each task no longer has exclusive rights to a thread, using threadid()
is very risky business. In fact, fundamentally, this is a concurrency bug.
A working definition is concurrency involves multiple tasks but you may or may not have multiple threads (think actual CPU) working at the same time. You can see the same underlying problem in this construction:
async_state = [0]
@sync for x in 1:100
@async async_state[1] += f(x)
end
# should be 5050
@show async_state
async_state = [100]
The pattern in UnROOT.jl
In ROOT File I/O, we have a slightly more complex problem at hand:
each column is chunked in to clusters, and each cluster is individually compressed on disk
when
getindex(col, idx)
, we need to find the cluster, decompress it, compute localidx, and return the datauser is very likely to also immediately
getindex(col, idx+1)
, so we cache decompressed cluster
I have whipped up a lengthy Discourse post and also an accompanying Github gist if you're interested, but this blog will be self contained for readers.
Naive implementation
For get about cache for a second, what would the a basic getindex()
look like?
function no_cache_getindex(idx)
# 1. find the cluster::UnitRange
cluster = _findrange(all_ranges, idx)
# 2. decompress it
data = _mock_io(cluster)
# 3. compute local index
localidx = idx - first(cluster) + 1
# 4. return user's data
return data[localidx]
end
again, this is way too slow because the data
may have a few thousands elements and user most likely to do getindex(idx+1)
next.
Btw, how do end users interact with this code? In reality it's something like:
for evt in mytree
push!(histogram, evt.col1)
...
end
and mytree.col1
is a column, and eventually it would call
getindex(mytree.col1, idx)
the above functions no_cache_getindex
is basically the essense of the full story.
The "illegal" implementation
As alluded to above, using threadid()
to index buffer is bad, nevertheless, because it was once advertized by official blog post, I had the genious idea to adopt the following approach:
# this is considered a bug due to task migration
const illegal_cache = [Vector{Vector{Float64}}() for _ in 1:Threads.nthreads()]
const illegal_cache_ranges = [0:-1 for _ in 1:Threads.nthreads()]
function illegal_getindex(idx)
tid = Threads.threadid()
cluster = illegal_cache_ranges[tid]
local data
if idx ∉ cluster
# if idx is outside of current cache, fetch new data
cluster = _findrange(all_ranges, idx)
data = illegal_cache[tid] = _mock_io(cluster)
illegal_cache_ranges[tid] = cluster
else
data = illegal_cache[tid]
end
localidx = idx - first(cluster) + 1
return data[localidx]
end
The idea is simple, if tasks don't migrate (before 1.8 or you used @threads :static
), then each tid
slot is exclusive to one task or thread (without migration, this doesn't have concurrency bug within each thread).
This design was motivated by the fact that users can painfully multi-thread their existing for-loop:
@threads for evt in mytree
# e.g. FHist.jl is thread-safe
push!(histogram, evt.col1)
...
end
In the old design, the equivalent of illegal_cache
are attached to each column of mytree
and gets used through out UnROOT.jl, it turns out just because how fast getindex()
is in reality, I could not observe any task migration. But theoretically it could happen. [2].
The correct implementation
Depending on what your use case actually is, there might be better solution, (e.g. Floops.jl ecosystem). For UnROOT.jl, we face a tough task of combined performance and flexibility requirement: the getindex()
runs in a tight loop, and we have a cluster-data
two-level system.
Many pointed out correctly people should use task_local_storage()
instead. Like the name suggests, it's simply a IdDict{Any, Any}
, local to a task. This avoids the pitfall in a trivial sense – since task is the scheduling unit, task has exclusive rights to task-local stuff...
However, almost everyone (including me) also thought this will be much slower than the threadid()
slot appraoch, because it feels like a {Any, Any}
dictionary is too slow. This is a false assumption.
function tls_getindex(idx)
tls = task_local_storage()
# default to an empty range
cluster = get(tls, :cluster, 0:-1)::UnitRange{Int}
local data
if idx ∉ cluster
cluster = _findrange(all_ranges, idx)
data = tls[:data] = _mock_io(cluster)
tls[:cluster] = cluster
else
data = tls[:data]::Vector{Vector{Float64}}
end
localidx = idx - first(cluster) + 1
return data[localidx]
end
Note: it is vital to have type assertion such as ::UnitRange{Int}
for the getindex(tls,
:data)
, because otherwise accessing the Id{Any, Any}
would infest the entire call stack with type instability.
Caveat of task-local storage
If user scheduled the getindex()
in a for ... @spawn
fashion, it will immediately use up all memory because peak residual memory grows linearly with how many tasks there is. Before it was bounded by nthreads()
(it was a buffer pool with finite slots)
In the end user has to know something about the suitble way to schedule tasks, so I think this is not deal-breaking. In the old-design, this pattern would cause huge slow down (because all caches are randomly evicted due to random order of getindex
), maybe it's better to crash as an extreme form of error.
Final thoughts
I think in the end there are a few things went wrong in the Julia ecosystem:
@threads
was never a good api, it's nice to have but it shouldn't be the predominant design pattern. It's in a limbo between low and high level api.we shouldn't have changed
@threads
to:dynamic
by default, could have added a@tasks
insteadCommunication was not sufficient, many libraries were not acutely aware this even today
Documentation was not sufficient, when we discusses this a week ago as of writing, less than 2 full people seem to know the correct mental model and what's "actually correct"
Specifically on 1, something more low-level for developers could be useful, or more high-level apis (in the relm of structured parallelism) so users don't just go to @threads
all the time.
There's a new blog post in the making aiming to make people aware of the mess and also recommend how to migrate their code with minimal changes. So hopefully the 100+ correctness bugs can be largely eliminated from Julia ecosystem within the next year.
[1] |
https://github.com/mkarikom/AdditiveCellCom.jl/blob/v0.3.11/src/features.jl#L71
https://github.com/pagnani/ArDCA.jl/blob/v0.5.1/src/dca.jl#L20
https://github.com/ivanslapnicar/Arrowhead.jl/blob/v1.0.0/src/arrowhead4.jl#L491
https://github.com/oxinabox/AutoPreallocation.jl/blob/v0.3.1/src/preallocate.jl#L13 (?)
https://github.com/ITensor/ITensorParallel.jl/blob/v0.1.0/src/references/ExtendedProjMPO.jl#L95
https://github.com/SciML/OrdinaryDiffEq.jl/blob/b7bb73f46669e6f35c33baf2b4a1a1dc9892a091/src/perform_step/extrapolation_perform_step.jl#L52-L55
[2] | https://github.com/JuliaHEP/UnROOT.jl/issues/257 |