diff --git a/src/main/java/de/mpg/biochem/mars/n5/commands/MarsOpenN5asImagePlusCommand.java b/src/main/java/de/mpg/biochem/mars/n5/commands/MarsOpenN5asImagePlusCommand.java index d2db595..3d8c778 100644 --- a/src/main/java/de/mpg/biochem/mars/n5/commands/MarsOpenN5asImagePlusCommand.java +++ b/src/main/java/de/mpg/biochem/mars/n5/commands/MarsOpenN5asImagePlusCommand.java @@ -39,7 +39,6 @@ import net.imglib2.img.planar.PlanarImgFactory; import org.apache.commons.io.IOUtils; import org.janelia.saalfeldlab.n5.imglib2.N5Utils; -import org.janelia.saalfeldlab.n5.s3.N5AmazonS3Reader; import org.janelia.saalfeldlab.n5.universe.metadata.N5DatasetMetadata; import org.janelia.saalfeldlab.n5.universe.metadata.ome.ngff.v04.OmeNgffMetadataParser; import org.scijava.app.StatusService; @@ -56,6 +55,8 @@ import java.io.IOException; import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -84,6 +85,15 @@ import org.janelia.saalfeldlab.n5.universe.metadata.N5SingleScaleMetadataParser; import org.janelia.saalfeldlab.n5.universe.metadata.canonical.CanonicalMetadataParser; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.AnonymousAWSCredentials; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.client.builder.AwsClientBuilder; + import net.imglib2.loops.LoopBuilder; import net.imglib2.util.Util; import net.imglib2.parallel.DefaultTaskExecutor; @@ -91,6 +101,47 @@ import io.scif.Metadata; import de.mpg.biochem.mars.scifio.MarsMicromanagerFormat; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Reader; +import java.io.Writer; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.channels.NonReadableChannelException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import com.amazonaws.services.s3.AmazonS3URI; +import org.janelia.saalfeldlab.n5.KeyValueAccess; +import org.janelia.saalfeldlab.n5.LockedChannel; +import org.janelia.saalfeldlab.n5.N5Exception; +import org.janelia.saalfeldlab.n5.N5URI; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.CreateBucketRequest; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.GetObjectMetadataRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.Region; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.waiters.HeadObjectFunction; + @Plugin(type = Command.class, label = "Open N5 as ImagePlus", menu = { @Menu( label = MenuConstants.PLUGINS_LABEL, weight = MenuConstants.PLUGINS_WEIGHT, mnemonic = MenuConstants.PLUGINS_MNEMONIC), @Menu(label = "Mars", @@ -124,6 +175,9 @@ public class MarsOpenN5asImagePlusCommand extends DynamicCommand implements Comm new N5GenericSingleScaleMetadataParser() }; + private AmazonS3 s3; + private String bucketName; + @Override public void run() { DatasetSelectorDialog selectionDialog = new DatasetSelectorDialog( @@ -151,7 +205,7 @@ public void run() { N5Reader n5 = new MarsN5ViewerReaderFun().apply(rootPath); try { - InputStream inputStream = ((N5AmazonS3Reader) n5).getKeyValueAccess().lockForReading(datasetPath + "/metadata.txt").newInputStream(); + InputStream inputStream = getMetadataInputStream(rootPath, datasetPath); String result = IOUtils.toString(inputStream, StandardCharsets.UTF_8); String[] jsonData = new String[1]; jsonData[0] = result; @@ -214,4 +268,211 @@ private & NativeType> Dataset getImage(final N5Read return datasetService.create((ImgPlus) imgPlus); } + + private InputStream getMetadataInputStream(String rootPath, String datasetPath) { + try { + final URI uri = new URI(rootPath); + final String scheme = uri.getScheme(); + String[] parts = uri.getHost().split("\\.",3); + bucketName = parts[0]; + String path = uri.getPath(); + //ensures a single slash remains when no path is provided when opened by N5AmazonS3Reader. + if (path.equals("/")) path = "//"; + //String s3Url = "s3://" + bucket + path; + String endpointUrl = uri.getScheme() + "://" + parts[2] + ":" + uri.getPort(); + String key = path + datasetPath + "/metadata.txt"; + + AWSCredentials credentials = null; + try { + credentials = new DefaultAWSCredentialsProviderChain().getCredentials(); + } catch(final Exception e) { + System.out.println( "Could not load AWS credentials, falling back to anonymous." ); + } + final AWSStaticCredentialsProvider credentialsProvider = + new AWSStaticCredentialsProvider(credentials == null ? new AnonymousAWSCredentials() : credentials); + + //US_EAST_2 is used as a dummy region. + s3 = AmazonS3ClientBuilder.standard() + .withPathStyleAccessEnabled(true) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpointUrl, Regions.US_EAST_2.getName())) + .withCredentials(credentialsProvider) + .build(); + + return new LocalS3ObjectChannel(key, true).newInputStream(); + + } catch (final URISyntaxException e) { + return null; + } + } + + private class LocalS3ObjectChannel implements LockedChannel { + + protected final String path; + final boolean readOnly; + private final ArrayList resources = new ArrayList<>(); + + protected LocalS3ObjectChannel(final String path, final boolean readOnly) { + + this.path = path; + this.readOnly = readOnly; + } + + private void checkWritable() { + + if (readOnly) { + throw new NonReadableChannelException(); + } + } + + @Override + public InputStream newInputStream() { + + final S3ObjectInputStream in = s3.getObject(bucketName, path).getObjectContent(); + final S3ObjectInputStreamDrain s3in = new S3ObjectInputStreamDrain(in); + synchronized (resources) { + resources.add(s3in); + } + return s3in; + } + + @Override + public Reader newReader() { + + final InputStreamReader reader = new InputStreamReader(newInputStream(), StandardCharsets.UTF_8); + synchronized (resources) { + resources.add(reader); + } + return reader; + } + + @Override + public OutputStream newOutputStream() { + + checkWritable(); + final S3OutputStream s3Out = new S3OutputStream(); + synchronized (resources) { + resources.add(s3Out); + } + return s3Out; + } + + @Override + public Writer newWriter() throws IOException { + + checkWritable(); + final OutputStreamWriter writer = new OutputStreamWriter(newOutputStream(), StandardCharsets.UTF_8); + synchronized (resources) { + resources.add(writer); + } + return writer; + } + + @Override + public void close() throws IOException { + + synchronized (resources) { + for (final Closeable resource : resources) + resource.close(); + resources.clear(); + } + } + + final class S3OutputStream extends OutputStream { + private final ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + private boolean closed = false; + + @Override + public void write(final byte[] b, final int off, final int len) { + + buf.write(b, off, len); + } + + @Override + public void write(final int b) { + + buf.write(b); + } + + @Override + public synchronized void close() throws IOException { + + if (!closed) { + closed = true; + final byte[] bytes = buf.toByteArray(); + final ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setContentLength(bytes.length); + try (final InputStream data = new ByteArrayInputStream(bytes)) { + s3.putObject(bucketName, path, data, objectMetadata); + } + buf.close(); + } + } + } + } + + private static class S3ObjectInputStreamDrain extends InputStream { + + private final S3ObjectInputStream in; + private boolean closed; + + public S3ObjectInputStreamDrain(final S3ObjectInputStream in) { + + this.in = in; + } + + @Override + public int read() throws IOException { + + return in.read(); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + + return in.read(b, off, len); + } + + @Override + public boolean markSupported() { + + return in.markSupported(); + } + + @Override + public void mark(final int readlimit) { + + in.mark(readlimit); + } + + @Override + public void reset() throws IOException { + + in.reset(); + } + + @Override + public int available() throws IOException { + + return in.available(); + } + + @Override + public long skip(final long n) throws IOException { + + return in.skip(n); + } + + @Override + public void close() throws IOException { + + if (!closed) { + do { + in.skip(in.available()); + } while (read() != -1); + in.close(); + closed = true; + } + } + } }