Skip to content

Commit

Permalink
introduced an abstraction layer for all N5 dataset parameters and to …
Browse files Browse the repository at this point in the history
…potentially expose parts of the image (5D -> 3D for OME-ZARR), this allows us to directly use the N5ImageLoader for example for reading OME-ZARR based datasets. It also supports pre-fetching of all dataset attributes, which can slow down cloud processing if not done.
  • Loading branch information
StephanPreibisch committed Oct 31, 2024
1 parent 4d308f7 commit 44aaa01
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 20 deletions.
5 changes: 4 additions & 1 deletion src/main/java/bdv/export/n5/WriteSequenceToN5.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@
import bdv.export.ProgressWriterNull;
import bdv.export.SubTaskProgressWriter;
import bdv.img.cache.SimpleCacheArrayLoader;
import bdv.img.n5.BdvN5Format;
import bdv.img.n5.N5ImageLoader;
import bdv.img.n5.N5Properties;
import mpicbg.spim.data.generic.sequence.AbstractSequenceDescription;
import mpicbg.spim.data.generic.sequence.BasicImgLoader;
import mpicbg.spim.data.generic.sequence.BasicSetupImgLoader;
Expand Down Expand Up @@ -274,6 +276,7 @@ static class N5DatasetIO< T extends RealType< T > & NativeType< T > > implements
private final int timepointId;
private final DataType dataType;
private final T type;
private final N5Properties n5Properties = new BdvN5Format();

public N5DatasetIO( final N5Writer n5, final Compression compression, final int setupId, final int timepointId, final T type )
{
Expand Down Expand Up @@ -335,7 +338,7 @@ public RandomAccessibleInterval< T > getImage( final int level ) throws IOExcept
final long[] dimensions = attributes.getDimensions();
final int[] cellDimensions = attributes.getBlockSize();
final CellGrid grid = new CellGrid( dimensions, cellDimensions );
final SimpleCacheArrayLoader< ? > cacheArrayLoader = N5ImageLoader.createCacheArrayLoader( n5, pathName );
final SimpleCacheArrayLoader< ? > cacheArrayLoader = N5ImageLoader.createCacheArrayLoader( n5Properties, n5, pathName );
return new ReadOnlyCachedCellImgFactory().createWithCacheLoader(
dimensions, type,
key -> {
Expand Down
59 changes: 58 additions & 1 deletion src/main/java/bdv/img/n5/BdvN5Format.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,68 @@
*/
package bdv.img.n5;

public class BdvN5Format
import org.janelia.saalfeldlab.n5.DataType;
import org.janelia.saalfeldlab.n5.DatasetAttributes;
import org.janelia.saalfeldlab.n5.N5Reader;

import bdv.img.cache.VolatileCachedCellImg;
import net.imglib2.RandomAccessibleInterval;
import net.imglib2.type.NativeType;

public class BdvN5Format implements N5Properties
{
public static final String DOWNSAMPLING_FACTORS_KEY = "downsamplingFactors";
public static final String DATA_TYPE_KEY = "dataType";

@Override
public String getPath( final int setupId )
{
return getPathName( setupId );
}

@Override
public String getPath( final int setupId, final int timepointId)
{
return getPathName( setupId, timepointId );
}

@Override
public String getPath( final int setupId, final int timepointId, final int level)
{
return getPathName( setupId, timepointId, level );
}

@Override
public DataType getDataType( final N5Reader n5, final int setupId )
{
// optionally cached as defined by N5ImageLoader.preFetchDatasetAttributes
return n5.getAttribute( getPath( setupId ), DATA_TYPE_KEY, DataType.class );
}

@Override
public double[][] getMipmapResolutions( final N5Reader n5, final int setupId )
{
// optionally cached as defined by N5ImageLoader.preFetchDatasetAttributes
return n5.getAttribute( getPath( setupId ), DOWNSAMPLING_FACTORS_KEY, double[][].class );
}

@Override
public DatasetAttributes getDatasetAttributes( final N5Reader n5, final String pathName )
{
// optionally cached as defined by N5ImageLoader.preFetchDatasetAttributes
return n5.getDatasetAttributes( pathName );
}

@Override
public <T extends NativeType<T>> RandomAccessibleInterval<T> extractImg(
final VolatileCachedCellImg<T, ?> img,
final int setupId,
final int timepointId)
{
return img;
}

// left the old code for compatibility
public static String getPathName( final int setupId )
{
return String.format( "setup%d", setupId );
Expand Down
114 changes: 96 additions & 18 deletions src/main/java/bdv/img/n5/N5ImageLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@
*/
package bdv.img.n5;

import static bdv.img.n5.BdvN5Format.DATA_TYPE_KEY;
import static bdv.img.n5.BdvN5Format.DOWNSAMPLING_FACTORS_KEY;
import static bdv.img.n5.BdvN5Format.getPathName;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.regex.Pattern;
import java.util.stream.IntStream;

import org.janelia.saalfeldlab.n5.DataBlock;
import org.janelia.saalfeldlab.n5.DataType;
Expand All @@ -54,6 +54,7 @@
import bdv.cache.CacheControl;
import bdv.cache.SharedQueue;
import bdv.img.cache.SimpleCacheArrayLoader;
import bdv.img.cache.VolatileCachedCellImg;
import bdv.img.cache.VolatileGlobalCellCache;
import bdv.util.ConstantRandomAccessible;
import bdv.util.MipmapTransforms;
Expand All @@ -62,6 +63,8 @@
import mpicbg.spim.data.generic.sequence.ImgLoaderHint;
import mpicbg.spim.data.sequence.MultiResolutionImgLoader;
import mpicbg.spim.data.sequence.MultiResolutionSetupImgLoader;
import mpicbg.spim.data.sequence.TimePoint;
import mpicbg.spim.data.sequence.ViewId;
import mpicbg.spim.data.sequence.VoxelDimensions;
import net.imglib2.Dimensions;
import net.imglib2.FinalDimensions;
Expand All @@ -82,11 +85,18 @@

public class N5ImageLoader implements ViewerImgLoader, MultiResolutionImgLoader
{
private final static Pattern FILE_SCHEME = Pattern.compile( "file", Pattern.CASE_INSENSITIVE );

// TODO: there are cases where one does not want to pre-fetch like distributed processing (or local filesystem?)
public static boolean preFetchDatasetAttributes = true;
public static int cloudThreads = 256;

private final URI n5URI;
protected final N5Properties n5properties;

// TODO: it would be good if this would not be needed
// find available setups from the n5
private final AbstractSequenceDescription< ?, ?, ? > seq;
protected final AbstractSequenceDescription< ?, ?, ? > seq;

/**
* Maps setup id to {@link SetupImgLoader}.
Expand All @@ -97,6 +107,7 @@ public N5ImageLoader( final URI n5URI, final AbstractSequenceDescription< ?, ?,
{
this.n5URI = n5URI;
this.seq = sequenceDescription;
this.n5properties = createN5PropertiesInstance();
}

public N5ImageLoader( final File n5File, final AbstractSequenceDescription< ?, ?, ? > sequenceDescription )
Expand All @@ -108,6 +119,10 @@ public N5ImageLoader( final N5Reader n5Reader, final URI n5URI, final AbstractSe
{
this( n5URI, sequenceDescription );
n5 = n5Reader;

// TODO: if we get something that's not a file and not relative set a different default for numFetcherThreads
if ( n5URI.getScheme() != null && !FILE_SCHEME.asPredicate().test( n5URI.getScheme() ) )
setNumFetcherThreads( cloudThreads );
}

public URI getN5URI()
Expand All @@ -120,10 +135,66 @@ public File getN5File()
return new File( n5URI );
}

/**
* this is only called from a synchronized open() context
*/
public void preFetch()
{
if ( preFetchDatasetAttributes && n5 != null )
{
try
{
// touch all metadata in advance in parallel so the N5-API caches them
final ForkJoinPool myPool = new ForkJoinPool( cloudThreads );

// prefetch all datatypes and MipmapResolutions
myPool.submit(() -> seq.getViewSetupsOrdered().parallelStream().forEach( setup -> {
n5properties.getDataType( n5, setup.getId() );
n5properties.getMipmapResolutions( n5, setup.getId() );
})).join();

// prefetch all DatasetAttributes for all views
final ArrayList< ViewId > views = new ArrayList<>();

for ( final TimePoint tp : seq.getTimePoints().getTimePointsOrdered() )
for ( final BasicViewSetup vs : seq.getViewSetupsOrdered() )
{
final ViewId v = new ViewId( tp.getId(), vs.getId() );

if ( !seq.getMissingViews().getMissingViews().contains( v ) )
views.add( v );
}

myPool.submit( () -> views.parallelStream().forEach( viewId ->
{
final int numLevels = n5properties.getMipmapResolutions( n5, viewId.getViewSetupId() ).length;

// TODO: nested parallel streams should be fine, tested it with a pool of 1 thread
// TODO: if you do not trust it we can create a List<Pair<ViewId,Level>> first
myPool.submit(() -> IntStream.range( 0, numLevels ).parallel().forEach( level ->
n5.getDatasetAttributes( n5properties.getPath( viewId.getViewSetupId(), viewId.getTimePointId(), level ) ) ) ).join();
})).join();

myPool.shutdown();
}
catch ( Exception e )
{
// TODO: no drama if this fails ... could be that some data is actually missing, one can still look at what's there
System.out.println( "Prefetching attributes failed: " + e);
}
}
}

/**
* @return a class that creates the pathnames for the setupId, timePointId and multiresolution levels
*/
public N5Properties createN5PropertiesInstance() { return new BdvN5Format(); }

private volatile boolean isOpen = false;
protected volatile boolean isPrefetched = false;
private SharedQueue createdSharedQueue;
private VolatileGlobalCellCache cache;
private N5Reader n5;
protected N5Reader n5;


private int requestedNumFetcherThreads = -1;
Expand Down Expand Up @@ -157,6 +228,13 @@ private void open()
n5 = new N5FSReader( getN5File().getAbsolutePath() );
}

if ( !isPrefetched )
{
// TODO: prefetching in the open() method once
preFetch();
isPrefetched = true;
}

int maxNumLevels = 0;
final List< ? extends BasicViewSetup > setups = seq.getViewSetupsOrdered();
for ( final BasicViewSetup setup : setups )
Expand Down Expand Up @@ -219,11 +297,10 @@ public void close()

private < T extends NativeType< T >, V extends Volatile< T > & NativeType< V > > SetupImgLoader< T, V > createSetupImgLoader( final int setupId ) throws IOException
{
final String pathName = getPathName( setupId );
final DataType dataType;
try
{
dataType = n5.getAttribute( pathName, DATA_TYPE_KEY, DataType.class );
dataType = n5properties.getDataType( n5, setupId );
}
catch ( final N5Exception e )
{
Expand Down Expand Up @@ -258,10 +335,9 @@ public SetupImgLoader( final int setupId, final T type, final V volatileType ) t
{
super( type, volatileType );
this.setupId = setupId;
final String pathName = getPathName( setupId );
try
{
mipmapResolutions = n5.getAttribute( pathName, DOWNSAMPLING_FACTORS_KEY, double[][].class );
mipmapResolutions = n5properties.getMipmapResolutions( n5, setupId );
}
catch ( final N5Exception e )
{
Expand Down Expand Up @@ -289,8 +365,8 @@ public Dimensions getImageSize( final int timepointId, final int level )
{
try
{
final String pathName = getPathName( setupId, timepointId, level );
final DatasetAttributes attributes = n5.getDatasetAttributes( pathName );
final String pathName = n5properties.getPath( setupId, timepointId, level );
final DatasetAttributes attributes = n5properties.getDatasetAttributes( n5, pathName );
return new FinalDimensions( attributes.getDimensions() );
}
catch( final RuntimeException e )
Expand Down Expand Up @@ -330,17 +406,19 @@ private < T extends NativeType< T > > RandomAccessibleInterval< T > prepareCache
{
try
{
final String pathName = getPathName( setupId, timepointId, level );
final DatasetAttributes attributes = n5.getDatasetAttributes( pathName );
final String pathName = n5properties.getPath( setupId, timepointId, level );
final DatasetAttributes attributes = n5properties.getDatasetAttributes( n5, pathName );
final long[] dimensions = attributes.getDimensions();
final int[] cellDimensions = attributes.getBlockSize();
final CellGrid grid = new CellGrid( dimensions, cellDimensions );

final int priority = numMipmapLevels() - 1 - level;
final CacheHints cacheHints = new CacheHints( loadingStrategy, priority, false );

final SimpleCacheArrayLoader< ? > loader = createCacheArrayLoader( n5, pathName );
return cache.createImg( grid, timepointId, setupId, level, cacheHints, loader, type );
final SimpleCacheArrayLoader< ? > loader = createCacheArrayLoader( n5properties, n5, pathName );
final VolatileCachedCellImg<T, ?> img = cache.createImg( grid, timepointId, setupId, level, cacheHints, loader, type );

return n5properties.extractImg( img, setupId, timepointId );
}
catch ( final IOException | N5Exception e )
{
Expand Down Expand Up @@ -417,12 +495,12 @@ public A loadArray( final long[] gridPosition, final int[] cellDimensions ) thro
}
}

public static SimpleCacheArrayLoader< ? > createCacheArrayLoader( final N5Reader n5, final String pathName ) throws IOException
public static SimpleCacheArrayLoader< ? > createCacheArrayLoader( final N5Properties n5Properties, final N5Reader n5, final String pathName ) throws IOException
{
final DatasetAttributes attributes;
try
{
attributes = n5.getDatasetAttributes( pathName );
attributes = n5Properties.getDatasetAttributes( n5, pathName );
}
catch ( final N5Exception e )
{
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/bdv/img/n5/N5Properties.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package bdv.img.n5;

import org.janelia.saalfeldlab.n5.DataType;
import org.janelia.saalfeldlab.n5.DatasetAttributes;
import org.janelia.saalfeldlab.n5.N5Reader;

import bdv.img.cache.VolatileCachedCellImg;
import net.imglib2.RandomAccessibleInterval;
import net.imglib2.type.NativeType;

public interface N5Properties
{
// in case of OME-ZARR, it has an underlying 5D container
public < T extends NativeType<T> > RandomAccessibleInterval<T> extractImg( final VolatileCachedCellImg<T, ?> img, final int setupId, final int timepointId );

// give the option to pre-fetch the attributes or store them in the XML
public DatasetAttributes getDatasetAttributes( final N5Reader n5, final String pathName );

public DataType getDataType( final N5Reader n5, final int setupId );
public double[][] getMipmapResolutions( final N5Reader n5, final int setupId );
public String getPath( final int setupId );
public String getPath( final int setupId, final int timepointId );
public String getPath( final int setupId, final int timepointId, final int level );
}

0 comments on commit 44aaa01

Please sign in to comment.