diff --git a/examples/tutorial_parallel.jl b/examples/tutorial_parallel.jl index e79a019746..a48af944bb 100644 --- a/examples/tutorial_parallel.jl +++ b/examples/tutorial_parallel.jl @@ -135,7 +135,12 @@ catch err; err; end # hide # parallel](@ref parallel-word-count)). # ## Example: parallel `collect` - +# +# !!! note +# This section explains the _implementation ideas_ of parallel +# `collect`. Pre-defined functions such as [`tcopy`](@ref) and +# [`dcopy`](@ref) should cover many use-cases. +# # Suppose (pretend) there is a compute-heavy transducer: xf_compute = Filter(!ismissing) |> Map(x -> x^2) @@ -156,40 +161,160 @@ nothing # hide # However, it is easy to do this manually, too: using BangBang: append!! -using StaticArrays: SVector -y2 = reduce(append!!, xf_compute |> Map(SVector), xs; init = Union{}[]) +singleton_vector(x) = [x] +y2 = reduce(append!!, xf_compute |> Map(singleton_vector), xs) @assert y1 == y2 # This code illustrates the common pattern in parallel processing: # # 1. Put a result from the transducer in a "singleton solution". -# Here, it is a `SVector`. +# Here, it is `[x]`. # # 2. Then "merge" the (singleton) solution into the exsiting one. # This is done by `append!!` in the above example. + +# To illustrate how `reduce(append!!, ... |> Map(singleton_vector), +# xs)` works, let's create a reducing function that records the +# arguments and returned values of `append!!`: + +chan = Channel(Inf) + +function append_and_log!!(a, b) + # As arguments and output may be mutated later, we use `copy` to + # record the snapshots of their values at this moment: + #+ + a0 = copy(a) + b0 = copy(b) + c = append!!(a, b) + put!(chan, (a0, b0) => copy(c)) + return c +end +nothing # hide + +# This function can be used instead of `append!!`. Let's try simpler +# and shorter example. This is equivalent to `collect(1:4)`: + +reduce(append_and_log!!, Map(singleton_vector), 1:4; basesize = 1, init = Union{}[]) + +# (See below for why we are using `init = Union{}[]` here.) # -# Due to symmetry in the signature of the reducing step function `op` -# used for `reduce` (e.g., `+` and `append!!`; when ignoring mutation -# for `append!!`), it can be used for merging the singleton solutions -# directly passed from transducers as well as the "chunks" of results -# (e.g., `a + b` and `c + d` in the illustration above). This is why -# `append!!` is used for parallel `collect` instead of `push!!`. +# Here is the list of arguments and returned value of `append!!` in +# this reduction: + +records = Pair[] +while isready(chan) + push!(records, take!(chan)) +end +if VERSION >= v"1.3" #src + @test Set(records) == Set([ #src + (Union{}[], [1]) => [1], #src + (Union{}[], [2]) => [2], #src + (Union{}[], [3]) => [3], #src + (Union{}[], [4]) => [4], #src + ([1], [2]) => [1, 2], #src + ([3], [4]) => [3, 4], #src + ([1, 2], [3, 4]) => [1, 2, 3, 4], #src + ]) #src +end #src +records + +# This recorded inputs and outputs of `append!!` show that its "call +# tree" is: # -# Reducing step function `op` for `reduce` often requires explicit -# initial value (i.e., keyword argument `init`). In above example, -# omitting `init` would produce a long `SVector` that is impractical -# use; that's why `init = Union{}[]` is required. Note that passing -# `Vector` to `init` of `reduce` is usually a wrong choice. However, -# since `Vector{Union{}}` cannot have any element (as there is no -# object of type `Union{}`), `Union{}[]` is an exception and a good -# initial value to indicate that output vector should use the -# "smallest" `eltype` required. That is to say, `append!!` widens the -# vector "just enough" to fit the resulting elements. +# ``` +# [1,2,3,4] <------------- append!!([1,2], [3,4]) == [1,2,3,4] +# / \ +# [1,2] [3,4] <------- append!!([3], [4]) == [3, 4] +# / \ / \ +# [1] [2] [3] [4] <---- append!!([], [4]) == [4] +# / \ / \ / \ / \ +# [] [1] [] [2] [] [3] [] [4] +# ``` +# +# Compare this to the example `a + b + c + d` above. + +# ### Optimization and generic container handling +# +# Above usage of `reduce` is not quite efficient as `singleton_vector` +# allocates small objects in the heap. Thus, it makes sense to use +# immutable objects for the singleton solutions so that Julia compiler +# can eliminate allocation of the intermediate singleton solutions. +# Here, this can be done by simply using `SVector`instead of +# `singleton_vector`: + +using StaticArrays: SVector + +y3 = #src +reduce(append!!, Map(SVector), 1:4) +@test y3 isa SVector #src +@test y3 == 1:4 #src + +# However, notice that the return value is a static vector. This is +# not ideal when the input collection is large. The output collection +# type can be specified by `init`. We can simply use `init = +# Union{}[]` in this case: + +y4 = #src +reduce(append!!, Map(SVector), 1:4; init = Union{}[]) +@test y4 isa Vector #src +@test y4 == 1:4 #src + +# Note that passing `Vector` to `init` of `reduce` is usually a wrong +# choice as it would mean that the same object is simultaneously +# mutated by different threads. However, since `Vector{Union{}}` +# cannot have any element (as there is no object of type `Union{}`), +# using `Union{}[]` for `init` is an exception and it is a good way to +# indicate that output vector should use the "smallest" `eltype` +# required. That is to say, `append!!` widens the vector "just +# enough" to fit the resulting elements. # # For generic containers (e.g., various table types), use # [`BangBang.Empty`](https://tkf.github.io/BangBang.jl/dev/#BangBang.NoBang.singletonof-Union{Tuple{T},%20Tuple{Type{T},Any}}%20where%20T) -# as the empty initial value. +# as the empty initial value. This is useful for creating a table +# object such as +# [`DataFrame`](https://github.com/JuliaData/DataFrames.jl) as the +# result of parallel processing: + +using BangBang: Empty +using DataFrames: DataFrame + +y5 = # hide +reduce(append!!, Map(x -> SVector((a = x,))), 1:4; init = Empty(DataFrame)) +Text(y5) # hide +@test y5 isa DataFrame #src +@test y5.a == 1:4 #src + +# It is slightly more tricky to make this approach work with other +# table types such as +# [`StructArrays`](https://github.com/JuliaArrays/StructArrays.jl) and +# [`TypedTables`](https://github.com/JuliaData/TypedTables.jl). Use +# [`tcopy`](@ref) or [`dcopy`](@ref) to work with generic containers. + +#src It doesn't work: +#src +#src # Similar approach works with +#src # [`StructArrays.StructVector`](https://github.com/JuliaArrays/StructArrays.jl): +#src +#src using StructArrays: StructVector +#src +#src y6 = #src +#src reduce(append!!, Map(x -> SVector((a = x,))), 1:4; init = Empty(StructVector)) +#src @test y6 is StructVector #src +#src @test y6.a == 1:4 #src +#src +#src # or [`TypedTables.Table`](https://github.com/JuliaData/TypedTables.jl): +#src +#src using TypedTables: Table +#src +#src y7 = #src +#src reduce(append!!, Map(x -> SVector((a = x,))), 1:4; init = Empty(Table)) +#src @test y7 is Table #src +#src @test y7.a == 1:4 #src +#src +#src # or any type that can be constructed from a +#src # [table](https://github.com/JuliaData/Tables.jl) and behaves like a +#src # vector (e.g., support `push` and `vcat`). # ## Example: ad-hoc histogram diff --git a/src/reduce.jl b/src/reduce.jl index 9e170158ad..1ad83178c9 100644 --- a/src/reduce.jl +++ b/src/reduce.jl @@ -239,9 +239,28 @@ julia> @assert reduce( Map(x -> DataFrame(a = [x])), 1:2; basesize = 1, - init = Empty(DataFrame), + # init = Empty(DataFrame), ) == DataFrame(a = [1, 2]) ``` + +Note that above snippet assumes that it is OK to mutate the dataframe +returned by the transducer. Use `init = Empty(DataFrame)` if this is +not the case. + +This approach of using `reduce` works with other containers; e.g., +with `TypedTables.Table`: + +```jldoctest; setup = :(using Transducers) +julia> using TypedTables + +julia> @assert reduce( + append!!, + Map(x -> Table(a = [x])), + 1:2; + basesize = 1, + # init = Empty(Table), + ) == Table(a = [1, 2]) +``` """ tcopy(xf, T, reducible; kwargs...) = reduce(append!!, xf |> Map(SingletonVector), reducible; init = Empty(T), kwargs...)