diff --git a/Project.toml b/Project.toml index 177bf56..d08690d 100644 --- a/Project.toml +++ b/Project.toml @@ -5,6 +5,7 @@ version = "0.1.4" [deps] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" +Folds = "41a02a25-b8f0-4f67-bc48-60067656b558" IterativeSolvers = "42fd0dbc-a981-5370-80f2-aaf504508153" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" LowRankApprox = "898213cb-b102-5a47-900c-97e73b919f73" @@ -13,6 +14,8 @@ ParallelDataTransfer = "2dcacdae-9679-587a-88bb-8b444fb7085b" ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca" SharedArrays = "1a1011a3-84de-559e-8e89-a11a2f7dc383" StaticArrays = "90137ffa-7385-5640-81b9-e52037218182" +ThreadedIterables = "11d239b0-c0b9-11e8-1935-d5cfa53abb03" +ThreadsX = "ac1d9e8a-700a-412c-b207-f0111f4b6c0d" [weakdeps] MLJ = "add582a8-e3ab-11e8-2d5e-e98b27df1bc7" @@ -26,7 +29,6 @@ ACEfit_MLJLinearModels_ext = [ "MLJ", "MLJLinearModels" ] ACEfit_MLJScikitLearnInterface_ext = ["MLJScikitLearnInterface", "PythonCall", "MLJ"] [compat] -julia = "1.9" IterativeSolvers = "0.9.2" MLJ = "0.19" MLJLinearModels = "0.9" @@ -37,9 +39,10 @@ ParallelDataTransfer = "0.5.0" ProgressMeter = "1.7" PythonCall = "0.9" StaticArrays = "1.5" +julia = "1.9" [extras] Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test", ] +test = ["Test"] diff --git a/src/assemble.jl b/src/assemble.jl index a3b189f..bef7b6b 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -1,7 +1,11 @@ using Distributed +using Folds using ParallelDataTransfer using ProgressMeter using SharedArrays +using Base.Threads: nthreads, @threads +using ThreadedIterables +using ThreadsX struct DataPacket{T <: AbstractData} rows::UnitRange @@ -13,7 +17,7 @@ Base.length(d::DataPacket) = count_observations(d.data) """ Assemble feature matrix and target vector for given data and basis. """ -function assemble(data::AbstractVector{<:AbstractData}, basis) +function assemble(data::AbstractVector{<:AbstractData}, basis; do_gc = true) @info "Assembling linear problem." rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data rows[1] = 1:count_observations(data[1]) @@ -30,6 +34,43 @@ function assemble(data::AbstractVector{<:AbstractData}, basis) @showprogress pmap(packets) do p A[p.rows, :] .= feature_matrix(p.data, basis) Y[p.rows] .= target_vector(p.data) + do_gc && GC.gc() + end + @info " - Assembly completed." + return Array(A), Array(Y), assemble_weights(data) +end + +""" +Assemble feature matrix and target vector for given data and basis. +""" +function assemble_mixed(data::AbstractVector{<:AbstractData}, basis; mode=:threaded) + @info "Assembling linear problem." + rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data + rows[1] = 1:count_observations(data[1]) + for i in 2:length(data) + rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i])) + end + packets = DataPacket.(rows, data) + sort!(packets, by = length, rev = true) + @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." + A = SharedArray(zeros(rows[end][end], length(basis))) + Y = SharedArray(zeros(size(A, 1))) + W = SharedArray(zeros(size(A, 1))) + if mode == :serial + @info " - Beginning serial assembly." + elseif mode == :threaded + @info " - Beginning threaded assembly with $(Threads.nthreads()) threads." + map = Folds.map + elseif mode == :distributed + @info " - Beginning distributed assembly with $(nprocs()) processes." + map = pmap + (nprocs() > 1) && sendto(workers(), basis = basis) + end + progress = Progress(length(data)) + map(packets) do p + A[p.rows,:] .= feature_matrix(p.data, basis) + Y[p.rows] .= target_vector(p.data) + next!(progress) GC.gc() end @info " - Assembly completed." @@ -51,6 +92,146 @@ function assemble_weights(data::AbstractVector{<:AbstractData}) W = SharedArray(zeros(rows[end][end])) @showprogress pmap(packets) do p W[p.rows] .= weight_vector(p.data) + GC.gc() end return Array(W) end + + +""" +Assemble feature matrix, target vector, and weight vector for given data and basis. +""" +function mt_assemble(data::AbstractVector{<:AbstractData}, basis; do_gc = true) + @info "Multi-threaded assembly of linear problem." + rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data + rows[1] = 1:count_observations(data[1]) + for i in 2:length(data) + rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i])) + end + packets = DataPacket.(rows, data) + sort!(packets, by = length, rev = true) + @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." + A = zeros(rows[end][end], length(basis)) + Y = zeros(size(A, 1)) + W = zeros(size(A, 1)) + @info " - Beginning assembly with $(Threads.nthreads()) threads." + _lock = ReentrantLock() + _prog = Progress(sum(length, rows)) + _prog_ctr = 0 + next = 1 + + failed = Int[] + + Threads.@threads for _i = 1:nthreads() + + while next <= length(packets) + # retrieve the next packet + if next > length(packets) + break + end + lock(_lock) + cur = next + next += 1 + unlock(_lock) + if cur > length(packets) + break + end + p = packets[cur] + + # assemble the corresponding data + try + Ap = feature_matrix(p.data, basis) + Yp = target_vector(p.data) + Wp = weight_vector(p.data) + + # write into global design matrix + lock(_lock) + A[p.rows, :] .= Ap + Y[p.rows] .= Yp + W[p.rows] .= Wp + _prog_ctr += length(p.rows) + ProgressMeter.update!(_prog, _prog_ctr) + unlock(_lock) + do_gc && GC.gc() + catch + @info("failed assembly: cur = $cur") + push!(failed, cur) + end + end + @info("thread $_i done") + end + @info " - Assembly completed." + @show failed + return Array(A), Array(Y), Array(W) +end + +function assemble_threadediterables(data::AbstractVector{<:AbstractData}, basis) + @info "Assembling linear problem." + rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data + rows[1] = 1:count_observations(data[1]) + for i in 2:length(data) + rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i])) + end + packets = DataPacket.(rows, data) + sort!(packets, by = length, rev = true) + @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." + A = Array(zeros(rows[end][end], length(basis))) + Y = Array(zeros(size(A, 1))) + W = Array(zeros(size(A, 1))) + @info " - Beginning assembly with thread count: $(Threads.nthreads())." + @time tmap(packets) do p + A[p.rows, :] .= feature_matrix(p.data, basis) + Y[p.rows] .= target_vector(p.data) + W[p.rows] .= weight_vector(p.data) + GC.gc() + end + @info " - Assembly completed." + return A, Y, W +end + +function assemble_threadsx(data::AbstractVector{<:AbstractData}, basis) + @info "Assembling linear problem." + rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data + rows[1] = 1:count_observations(data[1]) + for i in 2:length(data) + rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i])) + end + packets = DataPacket.(rows, data) + sort!(packets, by = length, rev = true) + @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." + A = Array(zeros(rows[end][end], length(basis))) + Y = Array(zeros(size(A, 1))) + W = Array(zeros(size(A, 1))) + @info " - Beginning assembly with thread count: $(Threads.nthreads())." + @time ThreadsX.map(packets) do p + A[p.rows, :] .= feature_matrix(p.data, basis) + Y[p.rows] .= target_vector(p.data) + W[p.rows] .= weight_vector(p.data) + GC.gc() + end + @info " - Assembly completed." + return A, Y, W +end + + +function assemble_new(data::AbstractArray, basis; batch_size=1, kwargs...) + W = Threads.@spawn ACEfit.assemble_weights_new(data; kwargs...) + raw_data = @showprogress desc="Assembly progress:" pmap( data; batch_size=batch_size ) do d + A = ACEfit.feature_matrix(d, basis; kwargs...) + Y = ACEfit.target_vector(d; kwargs...) + (A, Y) + end + A = [ a[1] for a in raw_data ] + Y = [ a[2] for a in raw_data ] + + A_final = reduce(vcat, A) + Y_final = reduce(vcat, Y) + return A_final, Y_final, fetch(W) +end + +function assemble_weights_new(data::AbstractArray; kwargs...) + w = map( data ) do d + ACEfit.weight_vector(d; kwargs...) + end + return reduce(vcat, w) +end