From 91939effea80803a35e386a5a3cf0f808039fec9 Mon Sep 17 00:00:00 2001 From: Christoph Ortner Date: Wed, 21 Jun 2023 08:21:21 -0700 Subject: [PATCH 01/15] multi-threaded assembly --- src/assemble.jl | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/assemble.jl b/src/assemble.jl index 4c60d72..afb5c8c 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -2,6 +2,7 @@ using Distributed using ParallelDataTransfer using ProgressMeter using SharedArrays +using Base.Threads: nthreads, @threads struct DataPacket{T <: AbstractData} rows::UnitRange @@ -36,3 +37,43 @@ function assemble(data::AbstractVector{<:AbstractData}, basis) @info " - Assembly completed." return Array(A), Array(Y), Array(W) end + + +""" +Assemble feature matrix, target vector, and weight vector for given data and basis. +""" +function mt_assemble(data::AbstractVector{<:AbstractData}, basis) + @info "Multi-threaded assembly of linear problem." + rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data + rows[1] = 1:count_observations(data[1]) + for i in 2:length(data) + rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i])) + end + packets = DataPacket.(rows, data) + sort!(packets, by = length, rev = true) + @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." + A = zeros(rows[end][end], length(basis)) + Y = zeros(size(A, 1)) + W = zeros(size(A, 1)) + @info " - Beginning assembly with $(Threads.nthreads()) threads." + _lock = ReentrantLock() + _prog = Progress(sum(length, rows)) + _prog_ctr = 0 + Threads.@threads for _i = 1:length(packets) + p = packets[_i] + Ap = feature_matrix(p.data, basis) + Yp = target_vector(p.data) + Wp = weight_vector(p.data) + # ----- syncronized communication block + lock(_lock) + A[p.rows, :] .= Ap + Y[p.rows] .= Yp + W[p.rows] .= Wp + _prog_ctr += length(p.rows) + ProgressMeter.update!(_prog, _prog_ctr) + unlock(_lock) + # ----- + end + @info " - Assembly completed." + return Array(A), Array(Y), Array(W) +end From 4c0cda2a3ff7e3e752dd8858d489a9c6cf8a3c16 Mon Sep 17 00:00:00 2001 From: Christoph Ortner Date: Thu, 22 Jun 2023 21:35:09 -0700 Subject: [PATCH 02/15] better scheduling of data packets --- src/assemble.jl | 42 ++++++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/src/assemble.jl b/src/assemble.jl index afb5c8c..9f03eb2 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -59,20 +59,34 @@ function mt_assemble(data::AbstractVector{<:AbstractData}, basis) _lock = ReentrantLock() _prog = Progress(sum(length, rows)) _prog_ctr = 0 - Threads.@threads for _i = 1:length(packets) - p = packets[_i] - Ap = feature_matrix(p.data, basis) - Yp = target_vector(p.data) - Wp = weight_vector(p.data) - # ----- syncronized communication block - lock(_lock) - A[p.rows, :] .= Ap - Y[p.rows] .= Yp - W[p.rows] .= Wp - _prog_ctr += length(p.rows) - ProgressMeter.update!(_prog, _prog_ctr) - unlock(_lock) - # ----- + next = 1 + + Threads.@threads for _i = 1:nthreads() + + while next <= length(packets) + # retrieve the next packet + lock(_lock) + if next > length(packets) + break + end + p = packets[next] + next += 1 + unlock(_lock) + + # assemble the corresponding data + Ap = feature_matrix(p.data, basis) + Yp = target_vector(p.data) + Wp = weight_vector(p.data) + + # write into global design matrix + lock(_lock) + A[p.rows, :] .= Ap + Y[p.rows] .= Yp + W[p.rows] .= Wp + _prog_ctr += length(p.rows) + ProgressMeter.update!(_prog, _prog_ctr) + unlock(_lock) + end end @info " - Assembly completed." return Array(A), Array(Y), Array(W) From 31cfd8f98fcc357ba663b5ec9441c9083bd7e651 Mon Sep 17 00:00:00 2001 From: Christoph Ortner Date: Sat, 24 Jun 2023 09:10:39 -0700 Subject: [PATCH 03/15] lock bugfix --- src/assemble.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/assemble.jl b/src/assemble.jl index 9f03eb2..b60a8b6 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -65,10 +65,10 @@ function mt_assemble(data::AbstractVector{<:AbstractData}, basis) while next <= length(packets) # retrieve the next packet - lock(_lock) if next > length(packets) break end + lock(_lock) p = packets[next] next += 1 unlock(_lock) From f4b3b04e13c238a86789ca19d6dcad4cc0e4b885 Mon Sep 17 00:00:00 2001 From: Christoph Ortner Date: Sat, 24 Jun 2023 15:30:22 -0700 Subject: [PATCH 04/15] more bug squashing --- src/assemble.jl | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/src/assemble.jl b/src/assemble.jl index b60a8b6..a2bfd55 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -61,6 +61,8 @@ function mt_assemble(data::AbstractVector{<:AbstractData}, basis) _prog_ctr = 0 next = 1 + failed = Int[] + Threads.@threads for _i = 1:nthreads() while next <= length(packets) @@ -69,25 +71,36 @@ function mt_assemble(data::AbstractVector{<:AbstractData}, basis) break end lock(_lock) - p = packets[next] + cur = next next += 1 unlock(_lock) + if cur > length(packets) + break + end + p = packets[cur] # assemble the corresponding data - Ap = feature_matrix(p.data, basis) - Yp = target_vector(p.data) - Wp = weight_vector(p.data) + try + Ap = feature_matrix(p.data, basis) + Yp = target_vector(p.data) + Wp = weight_vector(p.data) - # write into global design matrix - lock(_lock) - A[p.rows, :] .= Ap - Y[p.rows] .= Yp - W[p.rows] .= Wp - _prog_ctr += length(p.rows) - ProgressMeter.update!(_prog, _prog_ctr) - unlock(_lock) + # write into global design matrix + lock(_lock) + A[p.rows, :] .= Ap + Y[p.rows] .= Yp + W[p.rows] .= Wp + _prog_ctr += length(p.rows) + ProgressMeter.update!(_prog, _prog_ctr) + unlock(_lock) + catch + @info("failed assembly: cur = $cur") + push!(failed, cur) + end end + @info("thread $_i done") end @info " - Assembly completed." + @show failed return Array(A), Array(Y), Array(W) end From 51922e7274d880ed73e98228278ce14d6a3b3fd3 Mon Sep 17 00:00:00 2001 From: Chuck Witt Date: Mon, 26 Jun 2023 11:47:44 +0100 Subject: [PATCH 05/15] first attempt at threaded map for assembly. --- src/assemble.jl | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/assemble.jl b/src/assemble.jl index 4c60d72..324d3da 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -2,6 +2,7 @@ using Distributed using ParallelDataTransfer using ProgressMeter using SharedArrays +using ThreadedIterables struct DataPacket{T <: AbstractData} rows::UnitRange @@ -36,3 +37,27 @@ function assemble(data::AbstractVector{<:AbstractData}, basis) @info " - Assembly completed." return Array(A), Array(Y), Array(W) end + +function assemble_threaded(data::AbstractVector{<:AbstractData}, basis) + @info "Assembling linear problem." + rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data + rows[1] = 1:count_observations(data[1]) + for i in 2:length(data) + rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i])) + end + packets = DataPacket.(rows, data) + sort!(packets, by = length, rev = true) + #(nprocs() > 1) && sendto(workers(), basis = basis) + @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." + A = SharedArray(zeros(rows[end][end], length(basis))) + Y = SharedArray(zeros(size(A, 1))) + W = SharedArray(zeros(size(A, 1))) + @info " - Beginning assembly with thread count: $(nthreads())." + @showprogress @threaded map(packets) do p + A[p.rows, :] .= feature_matrix(p.data, basis) + Y[p.rows] .= target_vector(p.data) + W[p.rows] .= weight_vector(p.data) + end + @info " - Assembly completed." + return Array(A), Array(Y), Array(W) +end From 2ea22b00fdce9bd367869216fde88e23fffff644 Mon Sep 17 00:00:00 2001 From: Chuck Witt Date: Mon, 26 Jun 2023 11:58:58 +0100 Subject: [PATCH 06/15] bugfix. --- Project.toml | 5 +++-- src/assemble.jl | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Project.toml b/Project.toml index b3a9815..073aa7a 100644 --- a/Project.toml +++ b/Project.toml @@ -13,6 +13,7 @@ ParallelDataTransfer = "2dcacdae-9679-587a-88bb-8b444fb7085b" ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca" SharedArrays = "1a1011a3-84de-559e-8e89-a11a2f7dc383" StaticArrays = "90137ffa-7385-5640-81b9-e52037218182" +ThreadedIterables = "11d239b0-c0b9-11e8-1935-d5cfa53abb03" [weakdeps] PythonCall = "6099a3de-0909-46bc-b1f4-468b9a2dfc0d" @@ -21,7 +22,6 @@ PythonCall = "6099a3de-0909-46bc-b1f4-468b9a2dfc0d" ACEfit_PythonCall_ext = "PythonCall" [compat] -julia = "1.9" IterativeSolvers = "0.9.2" LowRankApprox = "0.5.3" Optim = "1.7" @@ -29,9 +29,10 @@ ParallelDataTransfer = "0.5.0" ProgressMeter = "1.7" PythonCall = "0.9" StaticArrays = "1.5" +julia = "1.9" [extras] Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test", ] +test = ["Test"] diff --git a/src/assemble.jl b/src/assemble.jl index 324d3da..66c5ec6 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -53,7 +53,7 @@ function assemble_threaded(data::AbstractVector{<:AbstractData}, basis) Y = SharedArray(zeros(size(A, 1))) W = SharedArray(zeros(size(A, 1))) @info " - Beginning assembly with thread count: $(nthreads())." - @showprogress @threaded map(packets) do p + tmap(packets) do p A[p.rows, :] .= feature_matrix(p.data, basis) Y[p.rows] .= target_vector(p.data) W[p.rows] .= weight_vector(p.data) From ac3ef562174a7820ec1b0cf425586b8d902c799b Mon Sep 17 00:00:00 2001 From: Chuck Witt Date: Mon, 26 Jun 2023 12:04:48 +0100 Subject: [PATCH 07/15] bugfix. --- src/assemble.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/assemble.jl b/src/assemble.jl index 66c5ec6..7f46ea3 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -52,7 +52,7 @@ function assemble_threaded(data::AbstractVector{<:AbstractData}, basis) A = SharedArray(zeros(rows[end][end], length(basis))) Y = SharedArray(zeros(size(A, 1))) W = SharedArray(zeros(size(A, 1))) - @info " - Beginning assembly with thread count: $(nthreads())." + @info " - Beginning assembly with thread count: $(Threads.nthreads())." tmap(packets) do p A[p.rows, :] .= feature_matrix(p.data, basis) Y[p.rows] .= target_vector(p.data) From de7730273f3a5965c43ef5dc8c12e8732547bdd2 Mon Sep 17 00:00:00 2001 From: Chuck Witt Date: Mon, 26 Jun 2023 14:12:48 +0100 Subject: [PATCH 08/15] add threadsx options. --- Project.toml | 1 + src/assemble.jl | 60 +++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/Project.toml b/Project.toml index 073aa7a..5e6d547 100644 --- a/Project.toml +++ b/Project.toml @@ -14,6 +14,7 @@ ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca" SharedArrays = "1a1011a3-84de-559e-8e89-a11a2f7dc383" StaticArrays = "90137ffa-7385-5640-81b9-e52037218182" ThreadedIterables = "11d239b0-c0b9-11e8-1935-d5cfa53abb03" +ThreadsX = "ac1d9e8a-700a-412c-b207-f0111f4b6c0d" [weakdeps] PythonCall = "6099a3de-0909-46bc-b1f4-468b9a2dfc0d" diff --git a/src/assemble.jl b/src/assemble.jl index 7f46ea3..163eae8 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -3,6 +3,7 @@ using ParallelDataTransfer using ProgressMeter using SharedArrays using ThreadedIterables +using ThreadsX struct DataPacket{T <: AbstractData} rows::UnitRange @@ -38,7 +39,7 @@ function assemble(data::AbstractVector{<:AbstractData}, basis) return Array(A), Array(Y), Array(W) end -function assemble_threaded(data::AbstractVector{<:AbstractData}, basis) +function assemble_threadediterables(data::AbstractVector{<:AbstractData}, basis) @info "Assembling linear problem." rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data rows[1] = 1:count_observations(data[1]) @@ -47,17 +48,62 @@ function assemble_threaded(data::AbstractVector{<:AbstractData}, basis) end packets = DataPacket.(rows, data) sort!(packets, by = length, rev = true) - #(nprocs() > 1) && sendto(workers(), basis = basis) @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." - A = SharedArray(zeros(rows[end][end], length(basis))) - Y = SharedArray(zeros(size(A, 1))) - W = SharedArray(zeros(size(A, 1))) + A = Array(zeros(rows[end][end], length(basis))) + Y = Array(zeros(size(A, 1))) + W = Array(zeros(size(A, 1))) @info " - Beginning assembly with thread count: $(Threads.nthreads())." - tmap(packets) do p + @time tmap(packets) do p A[p.rows, :] .= feature_matrix(p.data, basis) Y[p.rows] .= target_vector(p.data) W[p.rows] .= weight_vector(p.data) end @info " - Assembly completed." - return Array(A), Array(Y), Array(W) + return A, Y, W +end + +function assemble_threadsx(data::AbstractVector{<:AbstractData}, basis) + @info "Assembling linear problem." + rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data + rows[1] = 1:count_observations(data[1]) + for i in 2:length(data) + rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i])) + end + packets = DataPacket.(rows, data) + sort!(packets, by = length, rev = true) + @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." + A = Array(zeros(rows[end][end], length(basis))) + Y = Array(zeros(size(A, 1))) + W = Array(zeros(size(A, 1))) + @info " - Beginning assembly with thread count: $(Threads.nthreads())." + @time ThreadsX.map(packets) do p + A[p.rows, :] .= feature_matrix(p.data, basis) + Y[p.rows] .= target_vector(p.data) + W[p.rows] .= weight_vector(p.data) + end + @info " - Assembly completed." + return A, Y, W +end + +function assemble_threadsx_v2(data::AbstractVector{<:AbstractData}, basis) + @info "Assembling linear problem." + rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data + rows[1] = 1:count_observations(data[1]) + for i in 2:length(data) + rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i])) + end + packets = DataPacket.(rows, data) + sort!(packets, by = length, rev = true) + @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." + A = Array(zeros(rows[end][end], length(basis))) + Y = Array(zeros(size(A, 1))) + W = Array(zeros(size(A, 1))) + @info " - Beginning assembly with thread count: $(Threads.nthreads())." + @time ThreadsX.map!(similar(packets), packets) do p + A[p.rows, :] .= feature_matrix(p.data, basis) + Y[p.rows] .= target_vector(p.data) + W[p.rows] .= weight_vector(p.data) + end + @info " - Assembly completed." + return A, Y, W end From 82ee211c70957d9c0dbeec68b07a4e3eac28fb18 Mon Sep 17 00:00:00 2001 From: Chuck Witt Date: Wed, 19 Jul 2023 11:20:56 +0100 Subject: [PATCH 09/15] wip. --- Project.toml | 5 +++-- src/assemble.jl | 22 +++++++++++++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/Project.toml b/Project.toml index b3a9815..bee131b 100644 --- a/Project.toml +++ b/Project.toml @@ -5,6 +5,7 @@ version = "0.1.1" [deps] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" +Folds = "41a02a25-b8f0-4f67-bc48-60067656b558" IterativeSolvers = "42fd0dbc-a981-5370-80f2-aaf504508153" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" LowRankApprox = "898213cb-b102-5a47-900c-97e73b919f73" @@ -21,7 +22,6 @@ PythonCall = "6099a3de-0909-46bc-b1f4-468b9a2dfc0d" ACEfit_PythonCall_ext = "PythonCall" [compat] -julia = "1.9" IterativeSolvers = "0.9.2" LowRankApprox = "0.5.3" Optim = "1.7" @@ -29,9 +29,10 @@ ParallelDataTransfer = "0.5.0" ProgressMeter = "1.7" PythonCall = "0.9" StaticArrays = "1.5" +julia = "1.9" [extras] Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test", ] +test = ["Test"] diff --git a/src/assemble.jl b/src/assemble.jl index 4c60d72..f1a35b8 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -1,4 +1,5 @@ using Distributed +using Folds using ParallelDataTransfer using ProgressMeter using SharedArrays @@ -13,7 +14,7 @@ Base.length(d::DataPacket) = count_observations(d.data) """ Assemble feature matrix, target vector, and weight vector for given data and basis. """ -function assemble(data::AbstractVector{<:AbstractData}, basis) +function assemble(data::AbstractVector{<:AbstractData}, basis; mode=:threaded) @info "Assembling linear problem." rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data rows[1] = 1:count_observations(data[1]) @@ -22,16 +23,27 @@ function assemble(data::AbstractVector{<:AbstractData}, basis) end packets = DataPacket.(rows, data) sort!(packets, by = length, rev = true) - (nprocs() > 1) && sendto(workers(), basis = basis) @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." A = SharedArray(zeros(rows[end][end], length(basis))) Y = SharedArray(zeros(size(A, 1))) W = SharedArray(zeros(size(A, 1))) - @info " - Beginning assembly with processor count: $(nprocs())." - @showprogress pmap(packets) do p - A[p.rows, :] .= feature_matrix(p.data, basis) + if mode == :serial + @info " - Beginning serial assembly." + elseif mode == :threaded + @info " - Beginning threaded assembly with $(Threads.nthreads()) threads." + map = Folds.map + elseif mode == :distributed + @info " - Beginning distributed assembly with $(nprocs()) processes." + map = pmap + (nprocs() > 1) && sendto(workers(), basis = basis) + end + progress = Progress(length(data)) + map(packets) do p + A[p.rows,:] .= feature_matrix(p.data, basis) Y[p.rows] .= target_vector(p.data) W[p.rows] .= weight_vector(p.data) + next!(progress) + GC.gc() end @info " - Assembly completed." return Array(A), Array(Y), Array(W) From 7fe3b71de578c118383fe42ff34840d3d9892ca5 Mon Sep 17 00:00:00 2001 From: Chuck Witt Date: Wed, 19 Jul 2023 14:10:20 +0100 Subject: [PATCH 10/15] re-add garbage collection. --- src/assemble.jl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/assemble.jl b/src/assemble.jl index 7a2943f..0f99607 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -58,6 +58,7 @@ function assemble_threadediterables(data::AbstractVector{<:AbstractData}, basis) A[p.rows, :] .= feature_matrix(p.data, basis) Y[p.rows] .= target_vector(p.data) W[p.rows] .= weight_vector(p.data) + GC.gc() end @info " - Assembly completed." return A, Y, W @@ -81,6 +82,7 @@ function assemble_threadsx(data::AbstractVector{<:AbstractData}, basis) A[p.rows, :] .= feature_matrix(p.data, basis) Y[p.rows] .= target_vector(p.data) W[p.rows] .= weight_vector(p.data) + GC.gc() end @info " - Assembly completed." return A, Y, W @@ -104,6 +106,7 @@ function assemble_threadsx_v2(data::AbstractVector{<:AbstractData}, basis) A[p.rows, :] .= feature_matrix(p.data, basis) Y[p.rows] .= target_vector(p.data) W[p.rows] .= weight_vector(p.data) + GC.gc() end @info " - Assembly completed." return A, Y, W From 44f1a3a76a52dcd59461dc5fc302a726ea578c2a Mon Sep 17 00:00:00 2001 From: Chuck Witt Date: Wed, 19 Jul 2023 14:11:18 +0100 Subject: [PATCH 11/15] remove experimental threadsx_v2. --- src/assemble.jl | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/src/assemble.jl b/src/assemble.jl index 0f99607..83a0647 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -87,27 +87,3 @@ function assemble_threadsx(data::AbstractVector{<:AbstractData}, basis) @info " - Assembly completed." return A, Y, W end - -function assemble_threadsx_v2(data::AbstractVector{<:AbstractData}, basis) - @info "Assembling linear problem." - rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data - rows[1] = 1:count_observations(data[1]) - for i in 2:length(data) - rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i])) - end - packets = DataPacket.(rows, data) - sort!(packets, by = length, rev = true) - @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." - A = Array(zeros(rows[end][end], length(basis))) - Y = Array(zeros(size(A, 1))) - W = Array(zeros(size(A, 1))) - @info " - Beginning assembly with thread count: $(Threads.nthreads())." - @time ThreadsX.map!(similar(packets), packets) do p - A[p.rows, :] .= feature_matrix(p.data, basis) - Y[p.rows] .= target_vector(p.data) - W[p.rows] .= weight_vector(p.data) - GC.gc() - end - @info " - Assembly completed." - return A, Y, W -end From a0adea91024e670e87f8dab95b37194c1cdcbfe4 Mon Sep 17 00:00:00 2001 From: Chuck Witt Date: Thu, 20 Jul 2023 15:24:48 +0100 Subject: [PATCH 12/15] add gc flags. --- src/assemble.jl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/assemble.jl b/src/assemble.jl index ff4812e..f887b97 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -14,7 +14,7 @@ Base.length(d::DataPacket) = count_observations(d.data) """ Assemble feature matrix, target vector, and weight vector for given data and basis. """ -function assemble(data::AbstractVector{<:AbstractData}, basis) +function assemble(data::AbstractVector{<:AbstractData}, basis; do_gc = true) @info "Assembling linear problem." rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data rows[1] = 1:count_observations(data[1]) @@ -33,7 +33,7 @@ function assemble(data::AbstractVector{<:AbstractData}, basis) A[p.rows, :] .= feature_matrix(p.data, basis) Y[p.rows] .= target_vector(p.data) W[p.rows] .= weight_vector(p.data) - GC.gc() + do_gc && GC.gc() end @info " - Assembly completed." return Array(A), Array(Y), Array(W) @@ -43,7 +43,7 @@ end """ Assemble feature matrix, target vector, and weight vector for given data and basis. """ -function mt_assemble(data::AbstractVector{<:AbstractData}, basis) +function mt_assemble(data::AbstractVector{<:AbstractData}, basis; do_gc = true) @info "Multi-threaded assembly of linear problem." rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data rows[1] = 1:count_observations(data[1]) @@ -94,6 +94,7 @@ function mt_assemble(data::AbstractVector{<:AbstractData}, basis) _prog_ctr += length(p.rows) ProgressMeter.update!(_prog, _prog_ctr) unlock(_lock) + do_gc && GC.gc() catch @info("failed assembly: cur = $cur") push!(failed, cur) From c4c1aa0e15932c234715dfeafe07b1b74d5809e2 Mon Sep 17 00:00:00 2001 From: Chuck Witt Date: Fri, 22 Sep 2023 13:37:00 +0100 Subject: [PATCH 13/15] add copy of assemble routine in anticipation of merge. --- src/assemble.jl | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/assemble.jl b/src/assemble.jl index 9d259c3..6e626f9 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -39,6 +39,32 @@ function assemble(data::AbstractVector{<:AbstractData}, basis; do_gc = true) return Array(A), Array(Y), assemble_weights(data) end +""" +Assemble feature matrix and target vector for given data and basis. +""" +function assemble(data::AbstractVector{<:AbstractData}, basis; do_gc = true) + @info "Assembling linear problem." + rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data + rows[1] = 1:count_observations(data[1]) + for i in 2:length(data) + rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i])) + end + packets = DataPacket.(rows, data) + sort!(packets, by = length, rev = true) + (nprocs() > 1) && sendto(workers(), basis = basis) + @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." + A = SharedArray(zeros(rows[end][end], length(basis))) + Y = SharedArray(zeros(size(A, 1))) + @info " - Beginning assembly with processor count: $(nprocs())." + @showprogress pmap(packets) do p + A[p.rows, :] .= feature_matrix(p.data, basis) + Y[p.rows] .= target_vector(p.data) + do_gc && GC.gc() + end + @info " - Assembly completed." + return Array(A), Array(Y), assemble_weights(data) +end + """ Assemble full weight vector for vector of data elements. """ From 08cd3c9e2ad66ef34ae258badf9570638025cd2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teemu=20J=C3=A4rvinen?= Date: Fri, 22 Sep 2023 11:45:39 -0700 Subject: [PATCH 14/15] add new assemble --- src/assemble.jl | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/assemble.jl b/src/assemble.jl index cc4b7cd..8d56a88 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -213,3 +213,26 @@ function assemble_threadsx(data::AbstractVector{<:AbstractData}, basis) @info " - Assembly completed." return A, Y, W end + + +function assemble_new(data::AbstractArray, basis; batch_size=1, kwargs...) + W = Threads.@spawn ACEfit.assemble_weights_new(data; kwargs...) + raw_data = @showprogress desc="Assembly progress:" pmap( data; batch_size=batch_size ) do d + A = ACEfit.feature_matrix(d, basis; kwargs...) + Y = ACEfit.target_vector(d; kwargs...) + (A, Y) + end + A = [ a[1] for a in raw_data ] + Y = [ a[2] for a in raw_data ] + + A_final = reduce(vcat, A) + Y_final = reduce(vcat, Y) + return A_final, Y_final, fetch(W) +end + +function assemble_weights_new(data::AbstractArray; kwargs...) + w = map( data ) do d + ACEfit.weight_vector(d; kwargs...) + end + return reduce(vcat, w) +end \ No newline at end of file From e7b670e3b0245dd9d3d0fd71cdab992b7c9db477 Mon Sep 17 00:00:00 2001 From: Chuck Witt Date: Fri, 3 Nov 2023 18:45:55 +0000 Subject: [PATCH 15/15] bugfix. --- src/assemble.jl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/assemble.jl b/src/assemble.jl index 8d56a88..bef7b6b 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -92,7 +92,6 @@ function assemble_weights(data::AbstractVector{<:AbstractData}) W = SharedArray(zeros(rows[end][end])) @showprogress pmap(packets) do p W[p.rows] .= weight_vector(p.data) - next!(progress) GC.gc() end return Array(W) @@ -235,4 +234,4 @@ function assemble_weights_new(data::AbstractArray; kwargs...) ACEfit.weight_vector(d; kwargs...) end return reduce(vcat, w) -end \ No newline at end of file +end