Introduction

Julia was designed with multi-threading with in mind, somewhat, given the scientific computing roots of the language. For example, multi threading was highlighted in a 1.3 release blog. And widely used in the community [1].

Example: sum from 1 to 100

Let's first look at an example function. That simply sum from 1 to 100.

f(i) = (sleep(0.005); i);

state = [0]

for x in 1:100
    state[1] += f(x)
end

@show state
state = [5050]

Later, we will try to write concurrent / multi-threaded versions of this function to demosntrate incorrect behavior. (we will discuss the difference between "concurrency" and "multi-threading" in a moment).

So here's a version adopted from the blog, except it doesn't work anymore:

states = [0 for i in 1:Threads.nthreads()]

Threads.@threads for x in 1:100
    states[Threads.threadid()] += f(x)
end

@show states
@show sum(states)
states = [925, 924, 1508, 1575]
sum(states) = 4932

What's the problem?

The problem is since Julia 1.8, @threads doesn't mean "run each iteration of loop body in one thread exclusively", it means "schedule N tasks, N is same as number of OS-threads you have"; but more importantly, it defaults to @threads :dynamic (the old behavior is equivalent to :static). Under dynamic scheduling, each task you scheduled can migrate between threads, causing race condition.

This is why our dummy f(i) includes a sleep call, it's an easy way to induce "yielding" and thus causing tasks to migrate. As an example, here's one way how the execution could have gone wrong:

  1. task 1 start running on thread 1, evaluated threadid() to be 1, and hit sleep()

  2. when task 1 finished sleeping, it is now on running on thread 2

  3. but it still tries to "write" to states[1]

  4. however anotehr task at some point evaluated threadid() == 1 as well, and is now attemping to write to the same index

  5. 💥💥💥💥

Essentially, because each task no longer has exclusive rights to a thread, using threadid() is very risky business. In fact, fundamentally, this is a concurrency bug.

A working definition is concurrency involves multiple tasks but you may or may not have multiple threads (think actual CPU) working at the same time. You can see the same underlying problem in this construction:

async_state = [0]

@sync for x in 1:100
    @async async_state[1] += f(x)
end

# should be 5050
@show async_state
async_state = [100]

The pattern in UnROOT.jl

In ROOT File I/O, we have a slightly more complex problem at hand:

  • each column is chunked in to clusters, and each cluster is individually compressed on disk

  • when getindex(col, idx), we need to find the cluster, decompress it, compute localidx, and return the data

  • user is very likely to also immediately getindex(col, idx+1), so we cache decompressed cluster

I have whipped up a lengthy Discourse post and also an accompanying Github gist if you're interested, but this blog will be self contained for readers.

Naive implementation

For get about cache for a second, what would the a basic getindex() look like?

function no_cache_getindex(idx)
    # 1. find the cluster::UnitRange
    cluster = _findrange(all_ranges, idx)
    # 2. decompress it
    data = _mock_io(cluster)
    # 3. compute local index
    localidx = idx - first(cluster) + 1
    # 4. return user's data
    return data[localidx]
end

again, this is way too slow because the data may have a few thousands elements and user most likely to do getindex(idx+1) next.


Btw, how do end users interact with this code? In reality it's something like:

for evt in mytree
    push!(histogram, evt.col1)
    ...
end

and mytree.col1 is a column, and eventually it would call

getindex(mytree.col1, idx)

the above functions no_cache_getindex is basically the essense of the full story.

The "illegal" implementation

As alluded to above, using threadid() to index buffer is bad, nevertheless, because it was once advertized by official blog post, I had the genious idea to adopt the following approach:

# this is considered a bug due to task migration
const illegal_cache = [Vector{Vector{Float64}}() for _ in 1:Threads.nthreads()]
const illegal_cache_ranges = [0:-1 for _ in 1:Threads.nthreads()]

function illegal_getindex(idx)
    tid = Threads.threadid()
    cluster = illegal_cache_ranges[tid]
    local data
    if idx ∉ cluster
        # if idx is outside of current cache, fetch new data
        cluster = _findrange(all_ranges, idx)
        data = illegal_cache[tid] = _mock_io(cluster)
        illegal_cache_ranges[tid] = cluster
    else
        data = illegal_cache[tid]
    end
    localidx = idx - first(cluster) + 1
    return data[localidx]
end

The idea is simple, if tasks don't migrate (before 1.8 or you used @threads :static), then each tid slot is exclusive to one task or thread (without migration, this doesn't have concurrency bug within each thread).


This design was motivated by the fact that users can painfully multi-thread their existing for-loop:

@threads for evt in mytree
    # e.g. FHist.jl is thread-safe
    push!(histogram, evt.col1)
    ...
end

In the old design, the equivalent of illegal_cache are attached to each column of mytree and gets used through out UnROOT.jl, it turns out just because how fast getindex() is in reality, I could not observe any task migration. But theoretically it could happen. [2].

The correct implementation

Depending on what your use case actually is, there might be better solution, (e.g. Floops.jl ecosystem). For UnROOT.jl, we face a tough task of combined performance and flexibility requirement: the getindex() runs in a tight loop, and we have a cluster-data two-level system.

Many pointed out correctly people should use task_local_storage() instead. Like the name suggests, it's simply a IdDict{Any, Any}, local to a task. This avoids the pitfall in a trivial sense – since task is the scheduling unit, task has exclusive rights to task-local stuff...

However, almost everyone (including me) also thought this will be much slower than the threadid() slot appraoch, because it feels like a {Any, Any} dictionary is too slow. This is a false assumption.

function tls_getindex(idx)
    tls = task_local_storage()
    # default to an empty range
    cluster = get(tls, :cluster, 0:-1)::UnitRange{Int}
    local data

    if idx ∉ cluster
        cluster = _findrange(all_ranges, idx)
        data = tls[:data] = _mock_io(cluster)
        tls[:cluster] = cluster
    else
        data = tls[:data]::Vector{Vector{Float64}}
    end

    localidx = idx - first(cluster) + 1
    return data[localidx]
end

Note: it is vital to have type assertion such as ::UnitRange{Int} for the getindex(tls, :data), because otherwise accessing the Id{Any, Any} would infest the entire call stack with type instability.


Caveat of task-local storage

If user scheduled the getindex() in a for ... @spawn fashion, it will immediately use up all memory because peak residual memory grows linearly with how many tasks there is. Before it was bounded by nthreads() (it was a buffer pool with finite slots)

In the end user has to know something about the suitble way to schedule tasks, so I think this is not deal-breaking. In the old-design, this pattern would cause huge slow down (because all caches are randomly evicted due to random order of getindex), maybe it's better to crash as an extreme form of error.

Final thoughts

I think in the end there are a few things went wrong in the Julia ecosystem:

  1. @threads was never a good api, it's nice to have but it shouldn't be the predominant design pattern. It's in a limbo between low and high level api.

  2. we shouldn't have changed @threads to :dynamic by default, could have added a @tasks instead

  3. Communication was not sufficient, many libraries were not acutely aware this even today

  4. Documentation was not sufficient, when we discusses this a week ago as of writing, less than 2 full people seem to know the correct mental model and what's "actually correct"

Specifically on 1, something more low-level for developers could be useful, or more high-level apis (in the relm of structured parallelism) so users don't just go to @threads all the time.

There's a new blog post in the making aiming to make people aware of the mess and also recommend how to migrate their code with minimal changes. So hopefully the 100+ correctness bugs can be largely eliminated from Julia ecosystem within the next year.


[1]
https://github.com/mkarikom/AdditiveCellCom.jl/blob/v0.3.11/src/features.jl#L71
https://github.com/pagnani/ArDCA.jl/blob/v0.5.1/src/dca.jl#L20
https://github.com/ivanslapnicar/Arrowhead.jl/blob/v1.0.0/src/arrowhead4.jl#L491
https://github.com/oxinabox/AutoPreallocation.jl/blob/v0.3.1/src/preallocate.jl#L13 (?)
https://github.com/ITensor/ITensorParallel.jl/blob/v0.1.0/src/references/ExtendedProjMPO.jl#L95
https://github.com/SciML/OrdinaryDiffEq.jl/blob/b7bb73f46669e6f35c33baf2b4a1a1dc9892a091/src/perform_step/extrapolation_perform_step.jl#L52-L55
[2] https://github.com/JuliaHEP/UnROOT.jl/issues/257