Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

De-threadiding parallel loops #588

Closed
tkf opened this issue Feb 14, 2022 · 11 comments
Closed

De-threadiding parallel loops #588

tkf opened this issue Feb 14, 2022 · 11 comments

Comments

@tkf
Copy link

tkf commented Feb 14, 2022

Continuing https://discourse.julialang.org/t/behavior-of-threads-threads-for-loop/76042/20, here's how I'd implement the code you quoted (IIUC).

Mechanical transformation that almost always works

First of all, here's a trick you can use almost always. If you have this pattern

Threads.@threads for x in xs
    i = Threads.threadid()
    f(x, i)
end

you can mechanically convert this to

n = cld(length(xs), Threads.nthreads())
@sync for (i, chunk) in enumerate(Iterators.partition(xs, n))
    Threads.@spawn for x in chunk
        f(x, i)
    end
end

This is very likely correct if the loop body f only uses threadid() with arrays allocated only for this parallel loop (e.g., pre-1.3 reduction pattern).

Array reduction with @reduce acc .+= x

I think

DFTK.jl/src/densities.jl

Lines 24 to 36 in a1791f6

# Build the partial density ρk_real for this k-point
ρk_real = [zeros(eltype(basis), basis.fft_size) for it = 1:Threads.nthreads()]
ψnk_real = [zeros(complex(eltype(basis)), basis.fft_size) for it = 1:Threads.nthreads()]
Threads.@threads for n = 1:size(ψk, 2)
ψnk = @views ψk[:, n]
tid = Threads.threadid()
G_to_r!(ψnk_real[tid], basis, kpt, ψnk)
ρk_real[tid] .+= occupation[n] .* abs2.(ψnk_real[tid])
end
for it = 2:Threads.nthreads()
ρk_real[1] .+= ρk_real[it]
end
ρk_real = ρk_real[1]

can be re-written using

    @floop for n = 1:size(ψk, 2)
        @init ψnk_real = zeros(complex(eltype(basis)), basis.fft_size)
        ψnk = @views ψk[:, n]
        G_to_r!(ψnk_real, basis, kpt, ψnk)
        @reduce ρk_real .+= occupation[n] .* abs2.(ψnk_real)
    end

This requires FLoops 0.1.12 or above.

Pre-allocated scratch space and TimerOutput

Threads.@threads for iband = 1:n_bands
to = TimerOutput() # Thread-local timer output
tid = Threads.threadid()
ψ_real = H.scratch.ψ_reals[tid]
@timeit to "local+kinetic" begin
G_to_r!(ψ_real, H.basis, H.kpoint, ψ[:, iband]; normalize=false)
ψ_real .*= potential
r_to_G!(Hψ[:, iband], H.basis, H.kpoint, ψ_real; normalize=false) # overwrites ψ_real
Hψ[:, iband] .+= H.fourier_op.multiplier .* ψ[:, iband]
end
if have_divAgrad
@timeit to "divAgrad" begin
apply!((fourier=Hψ[:, iband], real=nothing),
H.divAgrad_op,
(fourier=ψ[:, iband], real=nothing),
ψ_real) # ψ_real used as scratch
end
end
if tid == 1
merge!(DFTK.timer, to; tree_point=[t.name for t in DFTK.timer.timer_stack])
end
end

If you are OK with allocating about nthreads arrays every time executing this code, ψ_real = H.scratch.ψ_reals[tid] can simply be re-written as @init ψ_real = similar(H.scratch.ψ_reals[1]) or something equivalent on the RHS. If you must reuse H.scratch.ψ_reals, the easiest approach probably is to use the mechanical transformation I noted above.

I am not sure why you are throwing away the timer info on the non-primary thread, but, if you want to merge all of them, you can do

@reduce to = merge!(TimerOutput(), to; tree_point=[t.name for t in DFTK.timer.timer_stack])

in the loop body and then

merge!(DFTK.timer, to; tree_point=[t.name for t in DFTK.timer.timer_stack])

outside to merge all the timer outputs, provided that merge! on TimerOutput acts like the method on Dict.

@antoine-levitt
Copy link
Member

Thank you very much! I didn't think of the pattern of handling the range splitting manually and then using per-chunk rather than per-thread buffers.

I am not sure of the performance implications of re-allocating nthreads arrays in the inner loop, I'd rather avoid it if possible. I think doing the manual transform for that single very performance-critical part and using floops everywhere else is a good compromise.

@antoine-levitt
Copy link
Member

Also, do I understand correctly that

@sync for (i, chunk) in enumerate(Iterators.partition(xs, n))
    Threads.@spawn f(x, i)
end

is equivalent to

@threads for (i, chunk) in enumerate(Iterators.partition(xs, n))
    f(x, i)
end

@tkf
Copy link
Author

tkf commented Feb 14, 2022

Oops, sorry, the Iterators.partition version in the OP was wrong. I forgot the for x in chunk loop. I fixed code code in the OP.

It is equivalent to

chunks = collect(enumerate(Iterators.partition(xs, n)))
@threads for (i, chunk) in chunks
    for x in chunk
        f(x, i)
    end
end

if length(chunks) <= Threads.nthreads(). You also need to use collect since @threads only supports an array as the input currently.

@antoine-levitt
Copy link
Member

Thanks, I updated it in the discourse. One thing that confused me is that @spawn for doesn't actually do anything to the for, it's just equivalent to @spawn begin for ...; ...; end end

@antoine-levitt
Copy link
Member

So in our application we actually have two nested loops; this is currently in a PR but the final version will look like

    T = promote_type(eltype(basis), real(eltype(ψ[1])))
    ρ = similar(ψ[1], T, (basis.fft_size..., basis.model.n_spin_components))
    ρ .= 0
    ψnk_real = zeros(complex(T), basis.fft_size)
    for ik = 1:length(basis.kpoints)
        kpt = basis.kpoints[ik]
        for n = 1:size(ψ[ik], 2)
            ψnk = @views ψ[ik][:, n]
            G_to_r!(ψnk_real, basis, kpt, ψnk)
            ρ[:, :, :, kpt.spin] .+= occupation[ik][n] .* basis.kweights[ik] .* abs2.(ψnk_real)
        end
    end

Right now we don't really parallelize nested loops (we only parallelize the inner one, and use MPI on the outer one), but it'd be nice to potentially parallelize at both levels. Of course we can flatten the nested loop into a flat one, and apply the techniques you showed above, but that doesn't really compose well (this is a simple example but in other places we have this kind of nested structures with more involved code). I guess there's not really any good way to do composability here without allocating nthreads()^2 arrays...

(the inner loop can also be BLAS/FFTW calls, but that's for later.)

@tkf
Copy link
Author

tkf commented Feb 15, 2022

FWIW, FLoops.jl supports nested loop like @floop for ik = 1:length(basis.kpoints), n = 1:size(ψ[ik], 2). However, if the iteration space of the inner loop depends on the outer one (via ik here) and you'd need to pass nestlevel = Val(2) and basesize smaller than n loop. So something like

@floop ThreadedEx(nestlevel = Val(2), basesize = bs) for ik = 1:length(basis.kpoints), n = 1:size(ψ[ik], 2)
    kpt = basis.kpoints[ik]
    ψnk = @views ψ[ik][:, n]
    # It's unclear if the following statements are data-race free:
    G_to_r!(ψnk_real, basis, kpt, ψnk)
    ρ[:, :, :, kpt.spin] .+= occupation[ik][n] .* basis.kweights[ik] .* abs2.(ψnk_real)
end

where bs a number such that evaluating the loop body bs times typically takes (say) more than 50 micro seconds.

Alternatively, you can also do

@floop ThreadedEx(basesize = bs1) for ik = 1:length(basis.kpoints)
    kpt = basis.kpoints[ik]
    @floop ThreadedEx(basesize = bs2) for n = 1:size(ψ[ik], 2)
        ...
    end
end

with some appropriate bs1 and bs2. The choice of these values depend on how size(ψ[ik], 2) varies across ik. The above nestlevel = Val(2) approach corresponds to bs1 = 1 and bs2 = bs.

But I don't know exactly how G_to_r! and ρ[:, :, :, kpt.spin] .+= mutate the output and maybe that's what you meant by "nthreads()^2 arrays." However, you can always decrease the number of arrays to allocate by increasing the basesize option. Also, I think it'd be better to profile the program first to see if allocating arrays really slow down things significantly when the code is compute-intensive enough such that parallelism helps. That said, I see people asking for manual pre-allocation patterns a lot and I'm hoping to support them via JuliaFolds/FLoops.jl#117.

@antoine-levitt
Copy link
Member

antoine-levitt commented Feb 15, 2022

What I meant by composable is something like

for k=1:Nk
   diagonalize(x -> do_stuff(x,k))
end

function diagonalize(A)
    # solve A(x) = lambda x iteratively
end

function do_stuff(x,k)
    for n=1:N
        ...
    end
end

Then if we were to do the above spawn trick or floops on each of the for loops we'd have to allocate nthreads()^2 temporaries, or do I misunderstand? Whereas currently with the threadid() approach we just allocate nthreads() temporaries as global variables.

where bs a number such that evaluating the loop body bs times typically takes (say) more than 50 micro seconds.

For my future reference, a single 16x16x16 FFT is 23microseconds. Even on very coarse examples we are much above this (silicon Ecut 15 is 27) so overhead is not an issue (might not be the case elsewhere).

Also, I think it'd be better to profile the program first to see if allocating arrays really slow down things significantly when the code is compute-intensive enough such that parallelism helps

We did do some profiling before optimizing, and I think I recall preallocations turning out to be important (of course we'd love to get rid of them). That said, it was 2 years ago, things might have improved since then and we should do that again. I also saw reports that allocations were very slow in a multithreaded context. In the case of the nthreads()^2 though I'd be more concerned about crashing the memory (the temporary psink_real is much bigger than the data it operates on, and storing nthreads()^2 of them is probably too much).

That said, I see people asking for manual pre-allocation patterns a lot and I'm hoping to support them via JuliaFolds/FLoops.jl#117.

Cool, looking forward to trying it out!

@tkf
Copy link
Author

tkf commented Feb 15, 2022

If you want to pre-allocate arrays and disallow overlapping uses in nested parallelism, you can use a channel to manage buffers

nbuffers = Threads.nthreads()  # it doesn't have to be nthreads()
buffers = Channel{???}(nbuffers)
for _ in 1:nbuffers
    put!(buffers, allocate_buffer())
end

then, using the idiom in the OP

n = cld(length(xs), Threads.nthreads())
@sync for (i, chunk) in enumerate(Iterators.partition(xs, n))
    Threads.@spawn begin
        b = take!(buffers)
        try
            for x in chunk
                f!(x, b)
            end
        finally
            put!(buffers, b)
        end
    end
end

If f! contains I/O (printing, logging, file read/write, etc.), it could waste a lot of stacks (though how much it matters is very case-dependent). On the other hand, if f! only does computation, it should work pretty well.

@antoine-levitt
Copy link
Member

Thanks! That kind of low-level primitives always scares me but perhaps there's no real reason for that, esp because that would maybe free us to use higher-level primitives (like floops). That looks like a good way to go!

@tkf
Copy link
Author

tkf commented Feb 15, 2022

The Channel-based buffer recycling can cause a deadlock if the function containing the @sync loop can be called recursively. So, I think your aversion to this pattern is totally justified. That said, I think the pattern that would cause a deadlock with the Channel approach would typically corrupt the computation with the buffers[threadid()] approach. So, I'd say the Channel approach is "as low-level as" the buffers[threadid()] approach in a sense.

@antoine-levitt
Copy link
Member

Closing this for now: we've implemented the manual chunking with spawn for now; if we need more parallelism we'll do the Channel thing. Thank you very much @tkf !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants