From c0e7f029c0ff2d1c0db524cccd75e7cd87dd6d3c Mon Sep 17 00:00:00 2001 From: Juan Ignacio Polanco Date: Mon, 25 Nov 2024 21:36:34 +0100 Subject: [PATCH] Improve parallel performance of `set_points!` on CPU (#47) * Rename BlockData -> BlockDataCPU * More consistent BlockDataCPU/GPU * CPU: improve parallel performance of set_points! * Update CHANGELOG * Fix issue with small datasets * Try to fix tests * Convert points to the right type * Try to fix tests (again) * Update GPU blocking code as well --- CHANGELOG.md | 4 + src/blocking/blocking.jl | 13 +++ src/blocking/cpu.jl | 186 ++++++++++++++++++++----------- src/blocking/gpu.jl | 8 +- src/interpolation/cpu_blocked.jl | 2 +- src/plan.jl | 2 +- src/spreading/cpu_blocked.jl | 2 +- 7 files changed, 147 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fa6ea7..6b21e6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Changed + +- Improve parallel performance of `set_points!` with `CPU` backend. + ## [v0.6.5](https://github.com/jipolanco/NonuniformFFTs.jl/releases/tag/v0.6.5) - 2024-11-18 ### Fixed diff --git a/src/blocking/blocking.jl b/src/blocking/blocking.jl index 63a65a5..7050f62 100644 --- a/src/blocking/blocking.jl +++ b/src/blocking/blocking.jl @@ -24,6 +24,19 @@ end type_length(::Type{T}) where {T} = length(T) # usually for SVector type_length(::Type{<:NTuple{N}}) where {N} = N +# Get point from vector of points, converting it to a tuple of the wanted type T. +# The first argument is only used to determine the output type. +# The "unsafe" is because we apply @inbounds. +function unsafe_get_point_as_tuple( + ::Type{NTuple{D, T}}, + xp::AbstractVector, + i::Integer, + ) where {D, T <: AbstractFloat} + ntuple(Val(D)) do d + @inbounds T(xp[i][d]) + end +end + # Resize vector trying to avoid copy when N is larger than the original length. # In other words, we completely discard the original contents of x, which is not the # original behaviour of resize!. This can save us some device-to-device copies. diff --git a/src/blocking/cpu.jl b/src/blocking/cpu.jl index 174595f..24e7574 100644 --- a/src/blocking/cpu.jl +++ b/src/blocking/cpu.jl @@ -1,75 +1,148 @@ # CPU only -struct BlockData{ - T, N, Nc, - Tr, # = real(T) - Buffers <: AbstractVector{<:NTuple{Nc, AbstractArray{T, N}}}, +struct BlockDataCPU{ + Z, N, Nc, + I <: Integer, + T, # = real(Z) + Buffers <: AbstractVector{<:NTuple{Nc, AbstractArray{Z, N}}}, Indices <: CartesianIndices{N}, SortPoints <: StaticBool, } <: AbstractBlockData - block_dims :: Dims{N} # size of each block (in number of elements) - block_sizes :: NTuple{N, Tr} # size of each block (in units of length) - buffers :: Buffers # length = nthreads - blocks_per_thread :: Vector{Int} # maps a set of blocks i_start:i_end to a thread (length = nthreads + 1) - indices :: Indices # index associated to each block (length = num_blocks) + # The following fields are the same as in BlockDataGPU: + Δxs :: NTuple{N, T} # grid step; the size of a block in units of length is block_dims .* Δxs. + nblocks_per_dir :: NTuple{N, I} # number of blocks in each direction + block_dims :: NTuple{N, I} # size of each block (in number of elements) cumulative_npoints_per_block :: Vector{Int} # cumulative sum of number of points in each block (length = 1 + num_blocks, first value is 0) blockidx :: Vector{Int} # linear index of block associated to each point (length = Np) pointperm :: Vector{Int} # index permutation for sorting points according to their block (length = Np) sort_points :: SortPoints + + buffers :: Buffers # length = nthreads + blocks_per_thread :: Vector{Int} # maps a set of blocks i_start:i_end to a thread (length = nthreads + 1) + indices :: Indices # index associated to each block (length = num_blocks) + + function BlockDataCPU( + Δxs::NTuple{N, T}, + nblocks_per_dir::NTuple{N, I}, + block_dims::NTuple{N, I}, + npoints_per_block::Vector{Int}, + buffers::AbstractVector{<:NTuple{Nc, AbstractArray{Z, N}}}, + indices::Indices, + sort::S, + ) where {Z <: Number, Nc, N, I, T, Indices, S} + @assert T === real(Z) + Nt = length(buffers) + blockidx = similar(npoints_per_block, 0) + pointperm = similar(npoints_per_block, 0) + blocks_per_thread = similar(blockidx, Nt + 1) + new{Z, N, Nc, I, T, typeof(buffers), Indices, S}( + Δxs, nblocks_per_dir, block_dims, npoints_per_block, blockidx, pointperm, sort, + buffers, blocks_per_thread, indices, + ) + end end -function BlockData( - ::Type{T}, block_dims::Dims{D}, Ñs::Dims{D}, ::HalfSupport{M}, num_transforms::Val{Nc}, +function BlockDataCPU( + ::Type{Z}, block_dims_in::Dims{D}, Ñs::Dims{D}, ::HalfSupport{M}, num_transforms::Val{Nc}, sort_points::StaticBool, - ) where {T, D, M, Nc} + ) where {Z <: Number, D, M, Nc} @assert Nc > 0 - Nt = Threads.nthreads() - # Nt = ifelse(Nt == 1, zero(Nt), Nt) # this disables blocking if running on single thread - # Reduce block size if the total grid size is not sufficiently large in a given - # direction. This maximum block size is assumed in spreading and interpolation. - block_dims = map(Ñs, block_dims) do N, B - @assert N - M > 0 - min(B, N ÷ 2, N - M) + # Reduce block size if the actual dataset is too small. + # The block size must satisfy B ≤ N - M (this is assumed in spreading/interpolation). + block_dims = map(Ñs, block_dims_in) do N, B + min(B, N - M) end + nblocks_per_dir = map(cld, Ñs, block_dims) # basically equal to ceil(Ñ / block_dim) + T = real(Z) + L = T(2) * π # domain period + Δxs = map(N -> L / N, Ñs) # grid step (oversampled grid) + cumulative_npoints_per_block = Vector{Int}(undef, prod(nblocks_per_dir) + 1) dims = block_dims .+ 2M # include padding for values outside of block (TODO: include padding in original block_dims? requires minimal block_size in each direction) - Tr = real(T) - block_sizes = map(Ñs, block_dims) do N, B - @inline - Δx = Tr(2π) / N # grid step - B * Δx - end + Nt = Threads.nthreads() + # Nt = ifelse(Nt == 1, zero(Nt), Nt) # this disables blocking if running on single thread buffers = map(1:Nt) do _ - ntuple(_ -> Array{T}(undef, dims), num_transforms) # one buffer per transform (or "component") + ntuple(_ -> Array{Z}(undef, dims), num_transforms) # one buffer per transform (or "component") end indices_tup = map(Ñs, block_dims) do N, B range(0, N - 1; step = B) end indices = CartesianIndices(indices_tup) - nblocks = length(indices) # total number of blocks - cumulative_npoints_per_block = Vector{Int}(undef, nblocks + 1) - blockidx = Int[] - pointperm = Int[] - blocks_per_thread = zeros(Int, Nt + 1) - BlockData( - block_dims, block_sizes, buffers, blocks_per_thread, indices, - cumulative_npoints_per_block, blockidx, pointperm, + BlockDataCPU( + Δxs, nblocks_per_dir, block_dims, cumulative_npoints_per_block, + buffers, indices, sort_points, ) end +# This is similar to assign_blocks_kernel! (GPU implementation), but using `to_unit_cell` +# instead of `to_unit_cell_gpu` (which does seem to make a difference in performance). +function assign_blocks_cpu!( + blockidx::AbstractVector{<:Integer}, + cumulative_npoints_per_block::AbstractVector{<:Integer}, + points::NTuple, + xp::AbstractVector, + Δxs::NTuple, + block_dims::NTuple, + nblocks_per_dir::NTuple, + sort_points, + transform::F, + ) where {F} + Threads.@threads :static for I ∈ eachindex(points[1]) + x⃗ = unsafe_get_point_as_tuple(typeof(Δxs), xp, I) + y⃗ = to_unit_cell(transform(x⃗)) :: NTuple + n = block_index(y⃗, Δxs, block_dims, nblocks_per_dir) + + # Note: here index_within_block is the value *after* incrementing (≥ 1). + S = eltype(cumulative_npoints_per_block) + index_within_block = @inbounds (Atomix.@atomic cumulative_npoints_per_block[n + 1] += one(S))::S + @inbounds blockidx[I] = index_within_block + + # If points need to be sorted, then we fill `points` some time later (in permute_kernel!). + if sort_points === False() + for n ∈ eachindex(x⃗) + @inbounds points[n][I] = y⃗[n] + end + end + end + nothing +end + +function sortperm_cpu!( + pointperm::AbstractVector, + cumulative_npoints_per_block, + blockidx, + xp::AbstractVector, + Δxs::NTuple, + block_dims, + nblocks_per_dir, + transform::F, + ) where {F} + Threads.@threads :static for I ∈ eachindex(xp) + x⃗ = unsafe_get_point_as_tuple(typeof(Δxs), xp, I) + y⃗ = to_unit_cell(transform(x⃗)) :: NTuple + n = block_index(y⃗, Δxs, block_dims, nblocks_per_dir) + @inbounds J = cumulative_npoints_per_block[n] + blockidx[I] + @inbounds pointperm[J] = I + end + nothing +end + function set_points_impl!( - backend::CPU, bd::BlockData, points::StructVector, xp, timer; + backend::CPU, bd::BlockDataCPU, points::StructVector, xp, timer; transform::F = identity, synchronise, ) where {F <: Function} # This technically never happens, but we might use it as a way to disable blocking. isempty(bd.buffers) && return set_points_impl!(backend, NullBlockData(), points, xp, timer; transform, synchronise) - (; indices, cumulative_npoints_per_block, blockidx, pointperm, block_sizes,) = bd + (; + Δxs, cumulative_npoints_per_block, nblocks_per_dir, block_dims, + blockidx, pointperm, sort_points, + ) = bd + N = type_length(eltype(xp)) # = number of dimensions - @assert N == length(block_sizes) + @assert N == length(block_dims) @timeit timer "(0) Init arrays" begin - to_linear_index = LinearIndices(axes(indices)) # maps Cartesian to linear index of a block Np = length(xp) resize_no_copy!(blockidx, Np) resize_no_copy!(pointperm, Np) @@ -77,21 +150,16 @@ function set_points_impl!( fill!(cumulative_npoints_per_block, 0) end - @timeit timer "(1) Assign blocks" @inbounds for (i, x⃗) ∈ pairs(xp) - # Get index of block where point x⃗ is located. - y⃗ = to_unit_cell(transform(NTuple{N}(x⃗))) # converts `x⃗` to Tuple if it's an SVector - is = map(first ∘ Kernels.point_to_cell, y⃗, block_sizes) # we use first((i, r)) -> i - if bd.sort_points === False() - points[i] = y⃗ # copy folded point (doesn't need to be sorted) - end - # checkbounds(indices, CartesianIndex(is)) - n = to_linear_index[is...] # linear index of block - index_within_block = (cumulative_npoints_per_block[n + 1] += 1) # ≥ 1 - blockidx[i] = index_within_block + @timeit timer "(1) Assign blocks" let + points_comp = StructArrays.components(points) + assign_blocks_cpu!( + blockidx, cumulative_npoints_per_block, points_comp, xp, Δxs, + block_dims, nblocks_per_dir, sort_points, transform, + ) end # Compute cumulative sum (we don't use cumsum! due to aliasing warning in its docs). - for i ∈ eachindex(IndexLinear(), cumulative_npoints_per_block)[2:end] + @inbounds for i ∈ eachindex(IndexLinear(), cumulative_npoints_per_block)[2:end] cumulative_npoints_per_block[i] += cumulative_npoints_per_block[i - 1] end @assert cumulative_npoints_per_block[begin] == 0 @@ -103,18 +171,10 @@ function set_points_impl!( map_blocks_to_threads!(bd.blocks_per_thread, cumulative_npoints_per_block) @timeit timer "(2) Sort" begin - # Note: we don't use threading since it seems to be much slower. - # This is very likely due to false sharing (https://en.wikipedia.org/wiki/False_sharing), - # since all threads modify the same data in "random" order. - @inbounds for i ∈ eachindex(xp) - # We recompute the block index associated to this point. - x⃗ = xp[i] - y⃗ = to_unit_cell(transform(NTuple{N}(x⃗))) # converts `x⃗` to Tuple if it's an SVector - is = map(first ∘ Kernels.point_to_cell, y⃗, block_sizes) # we use first((i, r)) -> i - n = to_linear_index[is...] # linear index of block - j = cumulative_npoints_per_block[n] + blockidx[i] - pointperm[j] = i - end + sortperm_cpu!( + pointperm, cumulative_npoints_per_block, blockidx, xp, Δxs, + block_dims, nblocks_per_dir, transform, + ) end # Write sorted points into `points`. @@ -122,7 +182,7 @@ function set_points_impl!( # Note: we don't use threading since it seems to be much slower. # This is very likely due to false sharing (https://en.wikipedia.org/wiki/False_sharing), # since all threads modify the same data in "random" order. - if bd.sort_points === True() + if sort_points === True() @timeit timer "(3) Permute points" begin # TODO: combine this with Sort step? @inbounds for j ∈ eachindex(pointperm) diff --git a/src/blocking/gpu.jl b/src/blocking/gpu.jl index 7b74bd8..0c58662 100644 --- a/src/blocking/gpu.jl +++ b/src/blocking/gpu.jl @@ -170,8 +170,8 @@ end @Const(transform::F), ) where {F} I = @index(Global, Linear) - @inbounds x⃗ = xp[I] - y⃗ = to_unit_cell_gpu(transform(Tuple(x⃗))) :: NTuple + x⃗ = unsafe_get_point_as_tuple(typeof(Δxs), xp, I) + y⃗ = to_unit_cell_gpu(transform(x⃗)) :: NTuple n = block_index(y⃗, Δxs, block_dims, nblocks_per_dir) # Note: here index_within_block is the value *after* incrementing (≥ 1). @@ -200,8 +200,8 @@ end @Const(transform::F), ) where {F} I = @index(Global, Linear) - @inbounds x⃗ = xp[I] - y⃗ = to_unit_cell_gpu(transform(Tuple(x⃗))) :: NTuple + x⃗ = unsafe_get_point_as_tuple(typeof(Δxs), xp, I) + y⃗ = to_unit_cell_gpu(transform(x⃗)) :: NTuple n = block_index(y⃗, Δxs, block_dims, nblocks_per_dir) @inbounds J = cumulative_npoints_per_block[n] + blockidx[I] @inbounds pointperm[J] = I diff --git a/src/interpolation/cpu_blocked.jl b/src/interpolation/cpu_blocked.jl index cf89fa5..73d2c93 100644 --- a/src/interpolation/cpu_blocked.jl +++ b/src/interpolation/cpu_blocked.jl @@ -92,7 +92,7 @@ end function interpolate!( backend::CPU, - bd::BlockData, + bd::BlockDataCPU, gs, evalmode::EvaluationMode, vp_all::NTuple{C, AbstractVector}, diff --git a/src/plan.jl b/src/plan.jl index 3bae127..f13e6f1 100644 --- a/src/plan.jl +++ b/src/plan.jl @@ -378,7 +378,7 @@ function _PlanNUFFT( if backend isa GPU blocks = BlockDataGPU(T, backend, block_dims, Ñs, h, sort_points; method = gpu_method, batch_size = gpu_batch_size,) else - blocks = BlockData(T, block_dims, Ñs, h, num_transforms, sort_points) + blocks = BlockDataCPU(T, block_dims, Ñs, h, num_transforms, sort_points) FFTW.set_num_threads(Threads.nthreads()) end end diff --git a/src/spreading/cpu_blocked.jl b/src/spreading/cpu_blocked.jl index b7e892e..5daadee 100644 --- a/src/spreading/cpu_blocked.jl +++ b/src/spreading/cpu_blocked.jl @@ -83,7 +83,7 @@ end function spread_from_points!( ::CPU, - bd::BlockData, + bd::BlockDataCPU, gs, evalmode::EvaluationMode, us_all::NTuple{C, AbstractArray},