From eee7cba231c8b45e19e4f5aee824caa211654198 Mon Sep 17 00:00:00 2001 From: Alexander Barth Date: Tue, 12 Sep 2023 16:12:13 +0200 Subject: [PATCH] load by chunks --- src/NCDatasets.jl | 1 - src/chunks.jl | 120 +++++++++++++++++++++++++++++++++++++++++++++ src/dataset.jl | 32 +----------- src/subvariable.jl | 8 --- 4 files changed, 122 insertions(+), 39 deletions(-) create mode 100644 src/chunks.jl diff --git a/src/NCDatasets.jl b/src/NCDatasets.jl index 3a5981a3..df7b346a 100644 --- a/src/NCDatasets.jl +++ b/src/NCDatasets.jl @@ -63,7 +63,6 @@ include("defer.jl") include("multifile.jl") include("ncgen.jl") include("select.jl") -include("chunks.jl") include("precompile.jl") export CatArrays diff --git a/src/chunks.jl b/src/chunks.jl new file mode 100644 index 00000000..50efab6e --- /dev/null +++ b/src/chunks.jl @@ -0,0 +1,120 @@ + +# tuple peeling for type-stability +_good_chunk_size(chunk_i) = () + +function _good_chunk_size(chunk_i,sz_i,sz...) + if chunk_i == 0 + chunk_size_i = 1 + elseif sz_i >= chunk_i + chunk_size_i = chunk_i + else + chunk_size_i = sz_i # take them all + end + + chunk_i = chunk_i ÷ sz_i + + return (chunk_size_i, _good_chunk_size(chunk_i,sz...)...) +end + +""" + chunk_size = good_chunk_size(sz,chunk_max_length) + chunk_size = good_chunk_size(array::AbstractArray,chunk_max_length) + +Return a tuple of indices representing a subset size (chunk) of an array with size +`sz` (assuming column-major storage order). The number of elements in this chunk is not +larger than `chunk_max_length`. + +If the `array` is provided, then `chunk_size` takes the storage order into +account. +""" +good_chunk_size(sz,chunk_max_length) = _good_chunk_size(chunk_max_length,sz...) + + + +function good_chunk_size(array::AbstractArray{T,N},chunk_max_length) where {T,N} + dim_permute = sortperm(collect(strides(array))) + dim_inv_permute = invperm(dim_permute) + good_chunk_size(size(array)[dim_permute],chunk_max_length)[dim_inv_permute] :: NTuple{N,Int} +end + + + +function each_chunk_index(ax::NTuple{N,<:AbstractRange},chunk_size::NTuple{N, <:Integer}) where N + cindices = CartesianIndices(ax) + ci_first = first(cindices) + ci_last = last(cindices) + ci_chunk_size = CartesianIndex(chunk_size) + ci_ones = CartesianIndex(ntuple(i->1,Val(N))) + + (ci:min(ci + ci_chunk_size - ci_ones, ci_last) + for ci = ci_first:ci_chunk_size:ci_last) +end + + +function each_chunk_index(sz::NTuple{N,<:Integer},chunk_size::NTuple{N, <:Integer}) where N + each_chunk_index(Base.OneTo.(sz),chunk_size) +end + +each_chunk_index(ax::NTuple{N,<:AbstractRange},chunk_max_length::Integer) where N = + each_chunk_index(ax,good_chunk_size(length.(ax),chunk_max_length)) + +each_chunk_index(sz::NTuple{N,<:Integer},chunk_max_length::Integer) where N = + each_chunk_index(sz,good_chunk_size(sz,chunk_max_length)) + +each_chunk_index(array::AbstractArray,chunk_max_length::Integer) = + each_chunk_index(axes(array),good_chunk_size(array,chunk_max_length)) + +each_chunk_index(array::AbstractArray,chunk_size::NTuple) = + each_chunk_index(axes(array),chunk_size) + + +""" + chunk_iterator = each_chunk(array,chunk_max_length::Integer) + chunk_iterator = each_chunk(array,chunk_size::NTuple) + +Return an iterator of non-overlapping subsets (chunks) for the the `array`. +Each element of the iterator is a view into the `array`. +The size of each chunk is equal to to the +tuple `chunk_size` if provided (the last chunk can also be smaller). +Instead of providing the chunk size, one can also provide the +maximum number of elements of a chunk with the parameter `chunk_max_length`. +The `array` parameter of `each_chunk` can also be an `OffsetArrays`. + +`chunk_max_length` is typically a large number, but it should not be +so large that a single chunk exceeds the system memory. +If a chunk should not exceed a twentieth of the memory one can for example +use `chunk_max_length` equal to `Sys.free_memory() ÷ (20 * sizeof(eltype(array)))` + + +Example: + +```julia +data = zeros(Int,(10,20,30)) + +for data_chunk in each_chunk(data,100) + # check that the chunk has not being processed + @assert all(data_chunk .== 0) + # check that the chunk is not larger than 100 + @assert length(data_chunk) <= 100 + # set corresponding elements to one. + data_chunk .= 1 +end + +# check that all the data array was processed +@assert all(data .== 1) +``` + +All `@assert`s statements are true in example above. + + + +See also `parentindices`. +""" +function each_chunk(array,chunk_or_max_length) + (view(array,ci) for ci in each_chunk_index(array,chunk_or_max_length)) +end + + + + +# LocalWords: sz OffsetArrays Sys sizeof eltype julia parentindices diff --git a/src/dataset.jl b/src/dataset.jl index 5a72a0e0..851c05b9 100644 --- a/src/dataset.jl +++ b/src/dataset.jl @@ -453,10 +453,7 @@ It is assumed that all the variable of the output file can be loaded in memory. function Base.write(dest::NCDataset, src::AbstractDataset; include = keys(src), exclude = String[], - idimensions = Dict(), - chunk_max_length = 10_000_000, - _ignore_checksum = false, - ) + idimensions = Dict()) torange(indices::Colon) = indices function torange(indices) @@ -503,34 +500,9 @@ function Base.write(dest::NCDataset, src::AbstractDataset; # indices for subset index = ntuple(i -> torange(get(idimensions,dimension_names[i],:)),length(dimension_names)) - var_slice = view(var,index...) - - destvar = defVar(dest, varname, eltype(var), dimension_names; attrib = attribs(cfvar)) - - if hasmethod(chunking,Tuple{typeof(var_slice)}) - storage,chunksizes = chunking(var_slice) - @debug "chunking " storage chunksizes - chunking(destvar,storage,chunksizes) - end - - if hasmethod(deflate,Tuple{typeof(var_slice)}) - isshuffled,isdeflated,deflate_level = deflate(var_slice) - @debug "compression" isshuffled isdeflated deflate_level - deflate(destvar,isshuffled,isdeflated,deflate_level) - end - - if hasmethod(checksum,Tuple{typeof(var_slice)}) && !_ignore_checksum - checksummethod = checksum(var_slice) - @debug "check-sum" checksummethod - checksum(destvar,checksummethod) - end - # copy data - for ci in each_chunk_index(var_slice,chunk_max_length) - @debug "indices" ci - destvar.var[ci] = var_slice[ci] - end + destvar.var[:] = cfvar.var[index...] end # loop over all global attributes diff --git a/src/subvariable.jl b/src/subvariable.jl index 488b4dec..aa6eeda9 100644 --- a/src/subvariable.jl +++ b/src/subvariable.jl @@ -174,11 +174,3 @@ function dataset(v::SubVariable) indices = (;((Symbol(d),i) for (d,i) in zip(dimnames(v),v.indices))...) return SubDataset(dataset(v.parent),indices) end - -function chunking(v::SubVariable) - storage, chunksizes = chunking(v.parent) - return storage, min.(chunksizes,collect(size(v))) -end - -deflate(v::SubVariable) = deflate(v.parent) -checksum(v::SubVariable) = checksum(v.parent)