Skip to content

Commit

Permalink
More illustrations in parallel processing tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
tkf committed Dec 15, 2019
1 parent d22f036 commit b719bb6
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 22 deletions.
167 changes: 146 additions & 21 deletions examples/tutorial_parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ catch err; err; end # hide
# parallel](@ref parallel-word-count)).

# ## Example: parallel `collect`

#
# !!! note
# This section explains the _implementation ideas_ of parallel
# `collect`. Pre-defined functions such as [`tcopy`](@ref) and
# [`dcopy`](@ref) should cover many use-cases.
#
# Suppose (pretend) there is a compute-heavy transducer:

xf_compute = Filter(!ismissing) |> Map(x -> x^2)
Expand All @@ -156,40 +161,160 @@ nothing # hide
# However, it is easy to do this manually, too:

using BangBang: append!!
using StaticArrays: SVector

y2 = reduce(append!!, xf_compute |> Map(SVector), xs; init = Union{}[])
singleton_vector(x) = [x]
y2 = reduce(append!!, xf_compute |> Map(singleton_vector), xs)
@assert y1 == y2

# This code illustrates the common pattern in parallel processing:
#
# 1. Put a result from the transducer in a "singleton solution".
# Here, it is a `SVector`.
# Here, it is `[x]`.
#
# 2. Then "merge" the (singleton) solution into the exsiting one.
# This is done by `append!!` in the above example.

# To illustrate how `reduce(append!!, ... |> Map(singleton_vector),
# xs)` works, let's create a reducing function that records the
# arguments and returned values of `append!!`:

chan = Channel(Inf)

function append_and_log!!(a, b)
# As arguments and output may be mutated later, we use `copy` to
# record the snapshots of their values at this moment:
#+
a0 = copy(a)
b0 = copy(b)
c = append!!(a, b)
put!(chan, (a0, b0) => copy(c))
return c
end
nothing # hide

# This function can be used instead of `append!!`. Let's try simpler
# and shorter example. This is equivalent to `collect(1:4)`:

reduce(append_and_log!!, Map(singleton_vector), 1:4; basesize = 1, init = Union{}[])

# (See below for why we are using `init = Union{}[]` here.)
#
# Due to symmetry in the signature of the reducing step function `op`
# used for `reduce` (e.g., `+` and `append!!`; when ignoring mutation
# for `append!!`), it can be used for merging the singleton solutions
# directly passed from transducers as well as the "chunks" of results
# (e.g., `a + b` and `c + d` in the illustration above). This is why
# `append!!` is used for parallel `collect` instead of `push!!`.
# Here is the list of arguments and returned value of `append!!` in
# this reduction:

records = Pair[]
while isready(chan)
push!(records, take!(chan))
end
if VERSION >= v"1.3" #src
@test Set(records) == Set([ #src
(Union{}[], [1]) => [1], #src
(Union{}[], [2]) => [2], #src
(Union{}[], [3]) => [3], #src
(Union{}[], [4]) => [4], #src
([1], [2]) => [1, 2], #src
([3], [4]) => [3, 4], #src
([1, 2], [3, 4]) => [1, 2, 3, 4], #src
]) #src
end #src
records

# This recorded inputs and outputs of `append!!` show that its "call
# tree" is:
#
# Reducing step function `op` for `reduce` often requires explicit
# initial value (i.e., keyword argument `init`). In above example,
# omitting `init` would produce a long `SVector` that is impractical
# use; that's why `init = Union{}[]` is required. Note that passing
# `Vector` to `init` of `reduce` is usually a wrong choice. However,
# since `Vector{Union{}}` cannot have any element (as there is no
# object of type `Union{}`), `Union{}[]` is an exception and a good
# initial value to indicate that output vector should use the
# "smallest" `eltype` required. That is to say, `append!!` widens the
# vector "just enough" to fit the resulting elements.
# ```
# [1,2,3,4] <------------- append!!([1,2], [3,4]) == [1,2,3,4]
# / \
# [1,2] [3,4] <------- append!!([3], [4]) == [3, 4]
# / \ / \
# [1] [2] [3] [4] <---- append!!([], [4]) == [4]
# / \ / \ / \ / \
# [] [1] [] [2] [] [3] [] [4]
# ```
#
# Compare this to the example `a + b + c + d` above.

# ### Optimization and generic container handling
#
# Above usage of `reduce` is not quite efficient as `singleton_vector`
# allocates small objects in the heap. Thus, it makes sense to use
# immutable objects for the singleton solutions so that Julia compiler
# can eliminate allocation of the intermediate singleton solutions.
# Here, this can be done by simply using `SVector`instead of
# `singleton_vector`:

using StaticArrays: SVector

y3 = #src
reduce(append!!, Map(SVector), 1:4)
@test y3 isa SVector #src
@test y3 == 1:4 #src

# However, notice that the return value is a static vector. This is
# not ideal when the input collection is large. The output collection
# type can be specified by `init`. We can simply use `init =
# Union{}[]` in this case:

y4 = #src
reduce(append!!, Map(SVector), 1:4; init = Union{}[])
@test y4 isa Vector #src
@test y4 == 1:4 #src

# Note that passing `Vector` to `init` of `reduce` is usually a wrong
# choice as it would mean that the same object is simultaneously
# mutated by different threads. However, since `Vector{Union{}}`
# cannot have any element (as there is no object of type `Union{}`),
# using `Union{}[]` for `init` is an exception and it is a good way to
# indicate that output vector should use the "smallest" `eltype`
# required. That is to say, `append!!` widens the vector "just
# enough" to fit the resulting elements.
#
# For generic containers (e.g., various table types), use
# [`BangBang.Empty`](https://tkf.github.io/BangBang.jl/dev/#BangBang.NoBang.singletonof-Union{Tuple{T},%20Tuple{Type{T},Any}}%20where%20T)
# as the empty initial value.
# as the empty initial value. This is useful for creating a table
# object such as
# [`DataFrame`](https://github.com/JuliaData/DataFrames.jl) as the
# result of parallel processing:

using BangBang: Empty
using DataFrames: DataFrame

y5 = # hide
reduce(append!!, Map(x -> SVector((a = x,))), 1:4; init = Empty(DataFrame))
Text(y5) # hide
@test y5 isa DataFrame #src
@test y5.a == 1:4 #src

# It is slightly more tricky to make this approach work with other
# table types such as
# [`StructArrays`](https://github.com/JuliaArrays/StructArrays.jl) and
# [`TypedTables`](https://github.com/JuliaData/TypedTables.jl). Use
# [`tcopy`](@ref) or [`dcopy`](@ref) to work with generic containers.

#src It doesn't work:
#src
#src # Similar approach works with
#src # [`StructArrays.StructVector`](https://github.com/JuliaArrays/StructArrays.jl):
#src
#src using StructArrays: StructVector
#src
#src y6 = #src
#src reduce(append!!, Map(x -> SVector((a = x,))), 1:4; init = Empty(StructVector))
#src @test y6 is StructVector #src
#src @test y6.a == 1:4 #src
#src
#src # or [`TypedTables.Table`](https://github.com/JuliaData/TypedTables.jl):
#src
#src using TypedTables: Table
#src
#src y7 = #src
#src reduce(append!!, Map(x -> SVector((a = x,))), 1:4; init = Empty(Table))
#src @test y7 is Table #src
#src @test y7.a == 1:4 #src
#src
#src # or any type that can be constructed from a
#src # [table](https://github.com/JuliaData/Tables.jl) and behaves like a
#src # vector (e.g., support `push` and `vcat`).

# ## Example: ad-hoc histogram

Expand Down
21 changes: 20 additions & 1 deletion src/reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,28 @@ julia> @assert reduce(
Map(x -> DataFrame(a = [x])),
1:2;
basesize = 1,
init = Empty(DataFrame),
# init = Empty(DataFrame),
) == DataFrame(a = [1, 2])
```
Note that above snippet assumes that it is OK to mutate the dataframe
returned by the transducer. Use `init = Empty(DataFrame)` if this is
not the case.
This approach of using `reduce` works with other containers; e.g.,
with `TypedTables.Table`:
```jldoctest; setup = :(using Transducers)
julia> using TypedTables
julia> @assert reduce(
append!!,
Map(x -> Table(a = [x])),
1:2;
basesize = 1,
# init = Empty(Table),
) == Table(a = [1, 2])
```
"""
tcopy(xf, T, reducible; kwargs...) =
reduce(append!!, xf |> Map(SingletonVector), reducible; init = Empty(T), kwargs...)
Expand Down

0 comments on commit b719bb6

Please sign in to comment.