Skip to content

Commit

Permalink
Improve parallel performance of set_points! on CPU (#47)
Browse files Browse the repository at this point in the history
* Rename BlockData -> BlockDataCPU

* More consistent BlockDataCPU/GPU

* CPU: improve parallel performance of set_points!

* Update CHANGELOG

* Fix issue with small datasets

* Try to fix tests

* Convert points to the right type

* Try to fix tests (again)

* Update GPU blocking code as well
  • Loading branch information
jipolanco authored Nov 25, 2024
1 parent fd4ffc4 commit c0e7f02
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 70 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Changed

- Improve parallel performance of `set_points!` with `CPU` backend.

## [v0.6.5](https://github.com/jipolanco/NonuniformFFTs.jl/releases/tag/v0.6.5) - 2024-11-18

### Fixed
Expand Down
13 changes: 13 additions & 0 deletions src/blocking/blocking.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@ end
type_length(::Type{T}) where {T} = length(T) # usually for SVector
type_length(::Type{<:NTuple{N}}) where {N} = N

# Get point from vector of points, converting it to a tuple of the wanted type T.
# The first argument is only used to determine the output type.
# The "unsafe" is because we apply @inbounds.
function unsafe_get_point_as_tuple(
::Type{NTuple{D, T}},
xp::AbstractVector,
i::Integer,
) where {D, T <: AbstractFloat}
ntuple(Val(D)) do d
@inbounds T(xp[i][d])
end
end

# Resize vector trying to avoid copy when N is larger than the original length.
# In other words, we completely discard the original contents of x, which is not the
# original behaviour of resize!. This can save us some device-to-device copies.
Expand Down
186 changes: 123 additions & 63 deletions src/blocking/cpu.jl
Original file line number Diff line number Diff line change
@@ -1,97 +1,165 @@
# CPU only
struct BlockData{
T, N, Nc,
Tr, # = real(T)
Buffers <: AbstractVector{<:NTuple{Nc, AbstractArray{T, N}}},
struct BlockDataCPU{
Z, N, Nc,
I <: Integer,
T, # = real(Z)
Buffers <: AbstractVector{<:NTuple{Nc, AbstractArray{Z, N}}},
Indices <: CartesianIndices{N},
SortPoints <: StaticBool,
} <: AbstractBlockData
block_dims :: Dims{N} # size of each block (in number of elements)
block_sizes :: NTuple{N, Tr} # size of each block (in units of length)
buffers :: Buffers # length = nthreads
blocks_per_thread :: Vector{Int} # maps a set of blocks i_start:i_end to a thread (length = nthreads + 1)
indices :: Indices # index associated to each block (length = num_blocks)
# The following fields are the same as in BlockDataGPU:
Δxs :: NTuple{N, T} # grid step; the size of a block in units of length is block_dims .* Δxs.
nblocks_per_dir :: NTuple{N, I} # number of blocks in each direction
block_dims :: NTuple{N, I} # size of each block (in number of elements)
cumulative_npoints_per_block :: Vector{Int} # cumulative sum of number of points in each block (length = 1 + num_blocks, first value is 0)
blockidx :: Vector{Int} # linear index of block associated to each point (length = Np)
pointperm :: Vector{Int} # index permutation for sorting points according to their block (length = Np)
sort_points :: SortPoints

buffers :: Buffers # length = nthreads
blocks_per_thread :: Vector{Int} # maps a set of blocks i_start:i_end to a thread (length = nthreads + 1)
indices :: Indices # index associated to each block (length = num_blocks)

function BlockDataCPU(
Δxs::NTuple{N, T},
nblocks_per_dir::NTuple{N, I},
block_dims::NTuple{N, I},
npoints_per_block::Vector{Int},
buffers::AbstractVector{<:NTuple{Nc, AbstractArray{Z, N}}},
indices::Indices,
sort::S,
) where {Z <: Number, Nc, N, I, T, Indices, S}
@assert T === real(Z)
Nt = length(buffers)
blockidx = similar(npoints_per_block, 0)
pointperm = similar(npoints_per_block, 0)
blocks_per_thread = similar(blockidx, Nt + 1)
new{Z, N, Nc, I, T, typeof(buffers), Indices, S}(
Δxs, nblocks_per_dir, block_dims, npoints_per_block, blockidx, pointperm, sort,
buffers, blocks_per_thread, indices,
)
end
end

function BlockData(
::Type{T}, block_dims::Dims{D}, Ñs::Dims{D}, ::HalfSupport{M}, num_transforms::Val{Nc},
function BlockDataCPU(
::Type{Z}, block_dims_in::Dims{D}, Ñs::Dims{D}, ::HalfSupport{M}, num_transforms::Val{Nc},
sort_points::StaticBool,
) where {T, D, M, Nc}
) where {Z <: Number, D, M, Nc}
@assert Nc > 0
Nt = Threads.nthreads()
# Nt = ifelse(Nt == 1, zero(Nt), Nt) # this disables blocking if running on single thread
# Reduce block size if the total grid size is not sufficiently large in a given
# direction. This maximum block size is assumed in spreading and interpolation.
block_dims = map(Ñs, block_dims) do N, B
@assert N - M > 0
min(B, N ÷ 2, N - M)
# Reduce block size if the actual dataset is too small.
# The block size must satisfy B ≤ N - M (this is assumed in spreading/interpolation).
block_dims = map(Ñs, block_dims_in) do N, B
min(B, N - M)
end
nblocks_per_dir = map(cld, Ñs, block_dims) # basically equal to ceil(Ñ / block_dim)
T = real(Z)
L = T(2) * π # domain period
Δxs = map(N -> L / N, Ñs) # grid step (oversampled grid)
cumulative_npoints_per_block = Vector{Int}(undef, prod(nblocks_per_dir) + 1)
dims = block_dims .+ 2M # include padding for values outside of block (TODO: include padding in original block_dims? requires minimal block_size in each direction)
Tr = real(T)
block_sizes = map(Ñs, block_dims) do N, B
@inline
Δx = Tr(2π) / N # grid step
B * Δx
end
Nt = Threads.nthreads()
# Nt = ifelse(Nt == 1, zero(Nt), Nt) # this disables blocking if running on single thread
buffers = map(1:Nt) do _
ntuple(_ -> Array{T}(undef, dims), num_transforms) # one buffer per transform (or "component")
ntuple(_ -> Array{Z}(undef, dims), num_transforms) # one buffer per transform (or "component")
end
indices_tup = map(Ñs, block_dims) do N, B
range(0, N - 1; step = B)
end
indices = CartesianIndices(indices_tup)
nblocks = length(indices) # total number of blocks
cumulative_npoints_per_block = Vector{Int}(undef, nblocks + 1)
blockidx = Int[]
pointperm = Int[]
blocks_per_thread = zeros(Int, Nt + 1)
BlockData(
block_dims, block_sizes, buffers, blocks_per_thread, indices,
cumulative_npoints_per_block, blockidx, pointperm,
BlockDataCPU(
Δxs, nblocks_per_dir, block_dims, cumulative_npoints_per_block,
buffers, indices,
sort_points,
)
end

# This is similar to assign_blocks_kernel! (GPU implementation), but using `to_unit_cell`
# instead of `to_unit_cell_gpu` (which does seem to make a difference in performance).
function assign_blocks_cpu!(
blockidx::AbstractVector{<:Integer},
cumulative_npoints_per_block::AbstractVector{<:Integer},
points::NTuple,
xp::AbstractVector,
Δxs::NTuple,
block_dims::NTuple,
nblocks_per_dir::NTuple,
sort_points,
transform::F,
) where {F}
Threads.@threads :static for I eachindex(points[1])
x⃗ = unsafe_get_point_as_tuple(typeof(Δxs), xp, I)
y⃗ = to_unit_cell(transform(x⃗)) :: NTuple
n = block_index(y⃗, Δxs, block_dims, nblocks_per_dir)

# Note: here index_within_block is the value *after* incrementing (≥ 1).
S = eltype(cumulative_npoints_per_block)
index_within_block = @inbounds (Atomix.@atomic cumulative_npoints_per_block[n + 1] += one(S))::S
@inbounds blockidx[I] = index_within_block

# If points need to be sorted, then we fill `points` some time later (in permute_kernel!).
if sort_points === False()
for n eachindex(x⃗)
@inbounds points[n][I] = y⃗[n]
end
end
end
nothing
end

function sortperm_cpu!(
pointperm::AbstractVector,
cumulative_npoints_per_block,
blockidx,
xp::AbstractVector,
Δxs::NTuple,
block_dims,
nblocks_per_dir,
transform::F,
) where {F}
Threads.@threads :static for I eachindex(xp)
x⃗ = unsafe_get_point_as_tuple(typeof(Δxs), xp, I)
y⃗ = to_unit_cell(transform(x⃗)) :: NTuple
n = block_index(y⃗, Δxs, block_dims, nblocks_per_dir)
@inbounds J = cumulative_npoints_per_block[n] + blockidx[I]
@inbounds pointperm[J] = I
end
nothing
end

function set_points_impl!(
backend::CPU, bd::BlockData, points::StructVector, xp, timer;
backend::CPU, bd::BlockDataCPU, points::StructVector, xp, timer;
transform::F = identity,
synchronise,
) where {F <: Function}
# This technically never happens, but we might use it as a way to disable blocking.
isempty(bd.buffers) && return set_points_impl!(backend, NullBlockData(), points, xp, timer; transform, synchronise)

(; indices, cumulative_npoints_per_block, blockidx, pointperm, block_sizes,) = bd
(;
Δxs, cumulative_npoints_per_block, nblocks_per_dir, block_dims,
blockidx, pointperm, sort_points,
) = bd

N = type_length(eltype(xp)) # = number of dimensions
@assert N == length(block_sizes)
@assert N == length(block_dims)

@timeit timer "(0) Init arrays" begin
to_linear_index = LinearIndices(axes(indices)) # maps Cartesian to linear index of a block
Np = length(xp)
resize_no_copy!(blockidx, Np)
resize_no_copy!(pointperm, Np)
resize_no_copy!(points, Np)
fill!(cumulative_npoints_per_block, 0)
end

@timeit timer "(1) Assign blocks" @inbounds for (i, x⃗) pairs(xp)
# Get index of block where point x⃗ is located.
y⃗ = to_unit_cell(transform(NTuple{N}(x⃗))) # converts `x⃗` to Tuple if it's an SVector
is = map(first Kernels.point_to_cell, y⃗, block_sizes) # we use first((i, r)) -> i
if bd.sort_points === False()
points[i] = y⃗ # copy folded point (doesn't need to be sorted)
end
# checkbounds(indices, CartesianIndex(is))
n = to_linear_index[is...] # linear index of block
index_within_block = (cumulative_npoints_per_block[n + 1] += 1) # ≥ 1
blockidx[i] = index_within_block
@timeit timer "(1) Assign blocks" let
points_comp = StructArrays.components(points)
assign_blocks_cpu!(
blockidx, cumulative_npoints_per_block, points_comp, xp, Δxs,
block_dims, nblocks_per_dir, sort_points, transform,
)
end

# Compute cumulative sum (we don't use cumsum! due to aliasing warning in its docs).
for i eachindex(IndexLinear(), cumulative_npoints_per_block)[2:end]
@inbounds for i eachindex(IndexLinear(), cumulative_npoints_per_block)[2:end]
cumulative_npoints_per_block[i] += cumulative_npoints_per_block[i - 1]
end
@assert cumulative_npoints_per_block[begin] == 0
Expand All @@ -103,26 +171,18 @@ function set_points_impl!(
map_blocks_to_threads!(bd.blocks_per_thread, cumulative_npoints_per_block)

@timeit timer "(2) Sort" begin
# Note: we don't use threading since it seems to be much slower.
# This is very likely due to false sharing (https://en.wikipedia.org/wiki/False_sharing),
# since all threads modify the same data in "random" order.
@inbounds for i eachindex(xp)
# We recompute the block index associated to this point.
x⃗ = xp[i]
y⃗ = to_unit_cell(transform(NTuple{N}(x⃗))) # converts `x⃗` to Tuple if it's an SVector
is = map(first Kernels.point_to_cell, y⃗, block_sizes) # we use first((i, r)) -> i
n = to_linear_index[is...] # linear index of block
j = cumulative_npoints_per_block[n] + blockidx[i]
pointperm[j] = i
end
sortperm_cpu!(
pointperm, cumulative_npoints_per_block, blockidx, xp, Δxs,
block_dims, nblocks_per_dir, transform,
)
end

# Write sorted points into `points`.
# (This should rather be called "permute" instead of "sort"...)
# Note: we don't use threading since it seems to be much slower.
# This is very likely due to false sharing (https://en.wikipedia.org/wiki/False_sharing),
# since all threads modify the same data in "random" order.
if bd.sort_points === True()
if sort_points === True()
@timeit timer "(3) Permute points" begin
# TODO: combine this with Sort step?
@inbounds for j eachindex(pointperm)
Expand Down
8 changes: 4 additions & 4 deletions src/blocking/gpu.jl
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ end
@Const(transform::F),
) where {F}
I = @index(Global, Linear)
@inbounds x⃗ = xp[I]
y⃗ = to_unit_cell_gpu(transform(Tuple(x⃗))) :: NTuple
x⃗ = unsafe_get_point_as_tuple(typeof(Δxs), xp, I)
y⃗ = to_unit_cell_gpu(transform(x⃗)) :: NTuple
n = block_index(y⃗, Δxs, block_dims, nblocks_per_dir)

# Note: here index_within_block is the value *after* incrementing (≥ 1).
Expand Down Expand Up @@ -200,8 +200,8 @@ end
@Const(transform::F),
) where {F}
I = @index(Global, Linear)
@inbounds x⃗ = xp[I]
y⃗ = to_unit_cell_gpu(transform(Tuple(x⃗))) :: NTuple
x⃗ = unsafe_get_point_as_tuple(typeof(Δxs), xp, I)
y⃗ = to_unit_cell_gpu(transform(x⃗)) :: NTuple
n = block_index(y⃗, Δxs, block_dims, nblocks_per_dir)
@inbounds J = cumulative_npoints_per_block[n] + blockidx[I]
@inbounds pointperm[J] = I
Expand Down
2 changes: 1 addition & 1 deletion src/interpolation/cpu_blocked.jl
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ end

function interpolate!(
backend::CPU,
bd::BlockData,
bd::BlockDataCPU,
gs,
evalmode::EvaluationMode,
vp_all::NTuple{C, AbstractVector},
Expand Down
2 changes: 1 addition & 1 deletion src/plan.jl
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ function _PlanNUFFT(
if backend isa GPU
blocks = BlockDataGPU(T, backend, block_dims, Ñs, h, sort_points; method = gpu_method, batch_size = gpu_batch_size,)
else
blocks = BlockData(T, block_dims, Ñs, h, num_transforms, sort_points)
blocks = BlockDataCPU(T, block_dims, Ñs, h, num_transforms, sort_points)
FFTW.set_num_threads(Threads.nthreads())
end
end
Expand Down
2 changes: 1 addition & 1 deletion src/spreading/cpu_blocked.jl
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ end

function spread_from_points!(
::CPU,
bd::BlockData,
bd::BlockDataCPU,
gs,
evalmode::EvaluationMode,
us_all::NTuple{C, AbstractArray},
Expand Down

0 comments on commit c0e7f02

Please sign in to comment.