Skip to content

Commit

Permalink
handle large array when copying
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander-Barth committed Sep 13, 2023
1 parent 08a4888 commit eb90a24
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ subdata = NCDataset("/tmp/test.nc")["temperature"][10:30,30:5:end]
```

This might be useful in an interactive session. However, the file `test.nc` is not closed, which can be a problem if you open many files. On Linux the number of opened files is often limited to 1024 (soft limit). If you write to a file, you should also always close the file to make sure that the data is properly written to the disk.
(open files will get closed eventually when the dataset variable is finalized by julia's garbage collector).

An alternative way to ensure the file has been closed is to use a `do` block: the file will be closed automatically when leaving the block.

Expand Down
1 change: 1 addition & 0 deletions src/NCDatasets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ include("defer.jl")
include("multifile.jl")
include("ncgen.jl")
include("select.jl")
include("chunks.jl")
include("precompile.jl")

export CatArrays
Expand Down
120 changes: 120 additions & 0 deletions src/chunks.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@

# tuple peeling for type-stability
_good_chunk_size(chunk_i) = ()

function _good_chunk_size(chunk_i,sz_i,sz...)
if chunk_i == 0
chunk_size_i = 1
elseif sz_i >= chunk_i
chunk_size_i = chunk_i
else
chunk_size_i = sz_i # take them all
end

chunk_i = chunk_i ÷ sz_i

return (chunk_size_i, _good_chunk_size(chunk_i,sz...)...)
end

"""
chunk_size = good_chunk_size(sz,chunk_max_length)
chunk_size = good_chunk_size(array::AbstractArray,chunk_max_length)
Return a tuple of indices representing a subset size (chunk) of an array with size
`sz` (assuming column-major storage order). The number of elements in this chunk is not
larger than `chunk_max_length`.
If the `array` is provided, then `chunk_size` takes the storage order into
account.
"""
good_chunk_size(sz,chunk_max_length) = _good_chunk_size(chunk_max_length,sz...)



function good_chunk_size(array::AbstractArray{T,N},chunk_max_length) where {T,N}
dim_permute = sortperm(collect(strides(array)))
dim_inv_permute = invperm(dim_permute)
good_chunk_size(size(array)[dim_permute],chunk_max_length)[dim_inv_permute] :: NTuple{N,Int}
end



function each_chunk_index(ax::NTuple{N,<:AbstractRange},chunk_size::NTuple{N, <:Integer}) where N
cindices = CartesianIndices(ax)
ci_first = first(cindices)
ci_last = last(cindices)
ci_chunk_size = CartesianIndex(chunk_size)
ci_ones = CartesianIndex(ntuple(i->1,Val(N)))

(ci:min(ci + ci_chunk_size - ci_ones, ci_last)
for ci = ci_first:ci_chunk_size:ci_last)
end


function each_chunk_index(sz::NTuple{N,<:Integer},chunk_size::NTuple{N, <:Integer}) where N
each_chunk_index(Base.OneTo.(sz),chunk_size)
end

each_chunk_index(ax::NTuple{N,<:AbstractRange},chunk_max_length::Integer) where N =
each_chunk_index(ax,good_chunk_size(length.(ax),chunk_max_length))

each_chunk_index(sz::NTuple{N,<:Integer},chunk_max_length::Integer) where N =
each_chunk_index(sz,good_chunk_size(sz,chunk_max_length))

each_chunk_index(array::AbstractArray,chunk_max_length::Integer) =
each_chunk_index(axes(array),good_chunk_size(array,chunk_max_length))

each_chunk_index(array::AbstractArray,chunk_size::NTuple) =
each_chunk_index(axes(array),chunk_size)


"""
chunk_iterator = each_chunk(array,chunk_max_length::Integer)
chunk_iterator = each_chunk(array,chunk_size::NTuple)
Return an iterator of non-overlapping subsets (chunks) for the the `array`.
Each element of the iterator is a view into the `array`.
The size of each chunk is equal to to the
tuple `chunk_size` if provided (the last chunk can also be smaller).
Instead of providing the chunk size, one can also provide the
maximum number of elements of a chunk with the parameter `chunk_max_length`.
The `array` parameter of `each_chunk` can also be an `OffsetArrays`.
`chunk_max_length` is typically a large number, but it should not be
so large that a single chunk exceeds the system memory.
If a chunk should not exceed a twentieth of the memory one can for example
use `chunk_max_length` equal to `Sys.free_memory() ÷ (20 * sizeof(eltype(array)))`
Example:
```julia
data = zeros(Int,(10,20,30))
for data_chunk in each_chunk(data,100)
# check that the chunk has not being processed
@assert all(data_chunk .== 0)
# check that the chunk is not larger than 100
@assert length(data_chunk) <= 100
# set corresponding elements to one.
data_chunk .= 1
end
# check that all the data array was processed
@assert all(data .== 1)
```
All `@assert`s statements are true in example above.
See also `parentindices`.
"""
function each_chunk(array,chunk_or_max_length)
(view(array,ci) for ci in each_chunk_index(array,chunk_or_max_length))
end




# LocalWords: sz OffsetArrays Sys sizeof eltype julia parentindices
32 changes: 30 additions & 2 deletions src/dataset.jl
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,10 @@ It is assumed that all the variable of the output file can be loaded in memory.
function Base.write(dest::NCDataset, src::AbstractDataset;
include = keys(src),
exclude = String[],
idimensions = Dict())
idimensions = Dict(),
chunk_max_length = 10_000_000,
_ignore_checksum = false,
)

torange(indices::Colon) = indices
function torange(indices)
Expand Down Expand Up @@ -500,9 +503,34 @@ function Base.write(dest::NCDataset, src::AbstractDataset;
# indices for subset
index = ntuple(i -> torange(get(idimensions,dimension_names[i],:)),length(dimension_names))

var_slice = view(var,index...)


destvar = defVar(dest, varname, eltype(var), dimension_names; attrib = attribs(cfvar))

if hasmethod(chunking,Tuple{typeof(var_slice)})
storage,chunksizes = chunking(var_slice)
@debug "chunking " storage chunksizes
chunking(destvar,storage,chunksizes)
end

if hasmethod(deflate,Tuple{typeof(var_slice)})
isshuffled,isdeflated,deflate_level = deflate(var_slice)
@debug "compression" isshuffled isdeflated deflate_level
deflate(destvar,isshuffled,isdeflated,deflate_level)
end

if hasmethod(checksum,Tuple{typeof(var_slice)}) && !_ignore_checksum
checksummethod = checksum(var_slice)
@debug "check-sum" checksummethod
checksum(destvar,checksummethod)
end

# copy data
destvar.var[:] = cfvar.var[index...]
for ci in each_chunk_index(var_slice,chunk_max_length)
@debug "indices" ci
destvar.var[ci] = var_slice[ci]
end
end

# loop over all global attributes
Expand Down
8 changes: 8 additions & 0 deletions src/subvariable.jl
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,11 @@ function dataset(v::SubVariable)
indices = (;((Symbol(d),i) for (d,i) in zip(dimnames(v),v.indices))...)
return SubDataset(dataset(v.parent),indices)
end

function chunking(v::SubVariable)
storage, chunksizes = chunking(v.parent)
return storage, min.(chunksizes,collect(size(v)))
end

deflate(v::SubVariable) = deflate(v.parent)
checksum(v::SubVariable) = checksum(v.parent)

0 comments on commit eb90a24

Please sign in to comment.