Skip to content

Commit

Permalink
Version 2.3.0-preview1, see CHANGES.md for list of changes
Browse files Browse the repository at this point in the history
  • Loading branch information
asikaria-msft committed Sep 13, 2017
1 parent 75c47b4 commit 47d3eaa
Show file tree
Hide file tree
Showing 12 changed files with 605 additions and 17 deletions.
28 changes: 19 additions & 9 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes to the SDK

### Version 2.3.0-preview1
1. ADLInputStream can now optionally do read-ahead (on by default, with queue depth=4). Added
configuration option to ADLStoreOptions to set the queue depth for read-ahead.

### Version 2.2.3
1. Made port number and tenant Guid optional for MsiTokenProvider.

Expand All @@ -9,28 +13,34 @@
### Version 2.2.1
1. Added support for DeviceCode auth flow
2. Added support for acquiring token using Azure VM's MSI service
3. Switched all internal TokenProviders to use https://datalake.azure.net/ as "resource" in AAD Tokens instead of https://management.core.windows.net/
3. Switched all internal TokenProviders to use https://datalake.azure.net/ as "resource" in AAD Tokens instead of
https://management.core.windows.net/
4. Misc robustness fixes in ADLStoreClient.GetContentSummary

### Version 2.1.5
1. Fixed bug in ADLFileOutputStream, where calling close() right after calling flush() would not release the lease on the file, locking the file out for 10 mins
1. Fixed bug in ADLFileOutputStream, where calling close() right after calling flush() would not release the lease
on the file, locking the file out for 10 mins
2. Added server trace ID to exception messages, so failures are easier to troubleshoot for customer-support calls
3. Changed exception handling for token fetching; exceptions from the token fetching process were previously getting replaced with a generic (unhelpful) error message
3. Changed exception handling for token fetching; exceptions from the token fetching process were previously getting
replaced with a generic (unhelpful) error message

### Version 2.1.4
1. fixed bug in Core.listStatus() for expiryTime parsing
1. Fixed bug in Core.listStatus() for expiryTime parsing

### Version 2.1.2
1. Changed implementation of ADLStoreClient.getContentSummary to do the directory enumeration on client side rather than server side. This
makes the call more performant and reliable.
1. Changed implementation of ADLStoreClient.getContentSummary to do the directory enumeration on client side rather than
server side. This makes the call more performant and reliable.
2. Removed short-circuit check in Core.concurrentAppend, which bypassed sending append to server for a 0-length append.

### Version 2.1.1
1. Added setExpiry method
2. Core.concat has an additional parameter called deleteSourceDirectory, to address specific needs for tool-writers
3. enumerateDirectory and getDirectoryEntry (liststatus and getfilestatus in Core) now return two additional fields: aclBit and expiryTime
4. enumerateDirectory, getDirectoryEntry and getAclStatus now take additional parameter for UserGroupRepresentation (OID or UPN)
5. enumerateDirectory now does paging on the client side, to avoid timeouts on lerge directories (no change to API interface)
3. enumerateDirectory and getDirectoryEntry (liststatus and getfilestatus in Core) now return two additional
fields: aclBit and expiryTime
4. enumerateDirectory, getDirectoryEntry and getAclStatus now take additional parameter for
UserGroupRepresentation (OID or UPN)
5. enumerateDirectory now does paging on the client side, to avoid timeouts on lerge directories (no change
to API interface)

### Version 2.0.11
- Initial Release
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.microsoft.azure</groupId>
<artifactId>azure-data-lake-store-sdk</artifactId>
<version>2.2.3</version>
<version>2.3.0-preview1</version>
<packaging>jar</packaging>

<name>Azure Data Lake Store - Java client SDK</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ public class ADLFileInputStream extends InputStream {
private final ADLStoreClient client;
private final DirectoryEntry directoryEntry;
private final String sessionId = UUID.randomUUID().toString();
private static final int defaultQueueDepth = 4; // need to experiment to see what is a good number

private int blocksize = 4 * 1024 * 1024; // 4MB default buffer size
private byte[] buffer = null; // will be initialized on first use
private byte[] buffer = null; // will be initialized on first use
private int readAheadQueueDepth; // initialized in constructor

private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
Expand All @@ -51,6 +53,8 @@ public class ADLFileInputStream extends InputStream {
this.filename = filename;
this.client = client;
this.directoryEntry = de;
int requestedQD = client.getReadAheadQueueDepth();
this.readAheadQueueDepth = (requestedQD >= 0) ? requestedQD : defaultQueueDepth;
if (log.isTraceEnabled()) {
log.trace("ADLFIleInputStream created for client {} for file {}", client.getClientId(), filename);
}
Expand Down Expand Up @@ -124,7 +128,7 @@ protected long readFromService() throws IOException {
limit = 0;
if (buffer == null) buffer = new byte[blocksize];

int bytesRead = readInternal(fCursor, buffer, 0, blocksize);
int bytesRead = readInternal(fCursor, buffer, 0, blocksize, false);
limit += bytesRead;
fCursor += bytesRead;
return bytesRead;
Expand Down Expand Up @@ -155,7 +159,7 @@ protected long slurpFullFile() throws IOException {

// if one OPEN request doesnt get full file, then read again at fCursor
while (fCursor < directoryEntry.length) {
int bytesRead = readInternal(fCursor, buffer, limit, blocksize - limit);
int bytesRead = readInternal(fCursor, buffer, limit, blocksize - limit, true);
limit += bytesRead;
fCursor += bytesRead;

Expand Down Expand Up @@ -185,11 +189,42 @@ public int read(long position, byte[] b, int offset, int length)
if (log.isTraceEnabled()) {
log.trace("ADLFileInputStream positioned read() - at offset {} using client {} from file {}", position, client.getClientId(), filename);
}
return readInternal(position, b, offset, length, true);
}

private int readInternal(long position, byte[] b, int offset, int length, boolean bypassReadAhead) throws IOException {
boolean readAheadEnabled = true;
if (readAheadEnabled && !bypassReadAhead && !client.disableReadAheads) {
// try reading from read-ahead
if (offset != 0) throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
int receivedBytes;

// queue read-aheads
int numReadAheads = this.readAheadQueueDepth;
long nextSize;
long nextOffset = position;
while (numReadAheads > 0 && nextOffset < directoryEntry.length) {
nextSize = Math.min( (long)blocksize, directoryEntry.length-nextOffset);
if (log.isTraceEnabled())
log.trace("Queueing readAhead for file " + filename + " offset " + nextOffset + " thread " + Thread.currentThread().getName());
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
nextOffset = nextOffset + nextSize;
numReadAheads--;
}

// try reading from buffers first
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
if (receivedBytes > 0) return receivedBytes;

return readInternal(position, b, offset, length);
// got nothing from read-ahead, do our own read now
receivedBytes = readRemote(position, b, offset, length, false);
return receivedBytes;
} else {
return readRemote(position, b, offset, length, false);
}
}

private int readInternal(long position, byte[] b, int offset, int length) throws IOException {
int readRemote(long position, byte[] b, int offset, int length, boolean speculative) throws IOException {
if (position < 0) throw new IllegalArgumentException("attempting to read from negative offset");
if (position >= directoryEntry.length) return -1; // Hadoop prefers -1 to EOFException
if (b == null) throw new IllegalArgumentException("null byte array passed in to read() method");
Expand All @@ -202,11 +237,16 @@ private int readInternal(long position, byte[] b, int offset, int length) throws
RequestOptions opts = new RequestOptions();
opts.retryPolicy = new ExponentialBackoffPolicy();
OperationResponse resp = new OperationResponse();
InputStream inStream = Core.open(filename, position, length, sessionId, client, opts, resp);
InputStream inStream = Core.open(filename, position, length, sessionId, speculative, client, opts, resp);
if (speculative && !resp.successful && resp.httpResponseCode == 400 && resp.remoteExceptionName.equals("SpeculativeReadNotSupported")) {
client.disableReadAheads = true;
return 0;
}
if (!resp.successful) throw client.getExceptionFromResponse(resp, "Error reading from file " + filename);
if (resp.responseContentLength == 0 && !resp.responseChunked) return 0; //Got nothing
int bytesRead;
int totalBytesRead = 0;
long start = System.nanoTime();
try {
do {
bytesRead = inStream.read(b, offset + totalBytesRead, length - totalBytesRead);
Expand All @@ -221,6 +261,17 @@ private int readInternal(long position, byte[] b, int offset, int length) throws
throw new ADLException("Error reading data from response stream in positioned read() for file " + filename, ex);
} finally {
if (inStream != null) inStream.close();
long timeTaken=(System.nanoTime() - start)/1000000;
if (log.isDebugEnabled()) {
String logline ="HTTPRequestRead," + (resp.successful?"Succeeded":"Failed") +
",cReqId:" + opts.requestid +
",lat:" + Long.toString(resp.lastCallLatency+timeTaken) +
",Reqlen:" + totalBytesRead +
",sReqId:" + resp.requestId +
",path:" + filename +
",offset:" + position;
log.debug(logline);
}
}
return totalBytesRead;
}
Expand Down Expand Up @@ -294,6 +345,16 @@ public void setBufferSize(int newSize) throws IOException {
buffer = null;
}

/**
* Sets the Queue depth to be used for read-aheads in this stream.
*
* @param queueDepth the desired queue depth, set to 0 to disable read-ahead
*/
public void setReadAheadQueueDepth(int queueDepth) {
if (queueDepth < 0) throw new IllegalArgumentException("Queue depth has to be 0 or more");
this.readAheadQueueDepth = queueDepth;
}

/**
* returns the remaining number of bytes available to read from the buffer, without having to call
* the server
Expand Down Expand Up @@ -352,6 +413,11 @@ public void close() throws IOException {
buffer = null; // de-reference the buffer so it can be GC'ed sooner
}


public String getFilename() {
return this.filename;
}

/**
* Not supported by this stream. Throws {@link UnsupportedOperationException}
* @param readlimit ignored
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class ADLStoreClient {
private String proto = "https";
private boolean enableRemoteExceptions = false;
private String pathPrefix = null;
private int readAheadQueueDepth = -1; // no preference set by caller, use default in ADLFileInputStream
volatile boolean disableReadAheads = false;


private static String sdkVersion = null;
Expand Down Expand Up @@ -974,6 +976,7 @@ public synchronized void setOptions(ADLStoreOptions o) throws IOException {
if (o.isUsingInsecureTransport()) this.setInsecureTransport();
if (o.isThrowingRemoteExceptionsEnabled()) this.enableThrowingRemoteExceptions();
if (o.getUserAgentSuffix() != null) this.setUserAgentSuffix(o.getUserAgentSuffix());
if (o.getReadAheadQueueDepth() >= 0 ) this.readAheadQueueDepth = o.getReadAheadQueueDepth();
}


Expand Down Expand Up @@ -1003,6 +1006,14 @@ public synchronized void updateToken(String accessToken) {
/* Private and internal methods */
/* ----------------------------------------------------------------------------------------------------------*/

/**
* Gets the Queue depth used for read-aheads in {@link ADLFileInputStream}
* @return the queue depth
*/
synchronized int getReadAheadQueueDepth() {
return this.readAheadQueueDepth;
}


/**
* gets the AAD access token associated with this client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class ADLStoreOptions {
private boolean insecureTransport = false;
private boolean enableRemoteExceptions = false;
private String pathPrefix = null;
private int readAheadQueueDepth = -1; // no preference set by caller, use default in ADLFileInputStream

public ADLStoreOptions() {
}
Expand Down Expand Up @@ -110,4 +111,25 @@ public ADLStoreOptions setFilePathPrefix(String prefix) {
String getFilePathPrefix() {
return this.pathPrefix;
}

/**
* Sets the default Queue depth to be used for read-aheads in {@link ADLFileInputStream}.
*
* @param queueDepth the desired queue depth, set to 0 to disable read-ahead
* @return {@code this}
*/
public ADLStoreOptions setReadAheadQueueDepth(int queueDepth) {
if (queueDepth < 0) throw new IllegalArgumentException("Queue depth has to be 0 or more");
this.readAheadQueueDepth = queueDepth;
return this;
}

/**
* Gets the default Queue depth used for read-aheads in {@link ADLFileInputStream}
* @return the queue depth
*/
int getReadAheadQueueDepth() {
return this.readAheadQueueDepth;
}

}
29 changes: 29 additions & 0 deletions src/main/java/com/microsoft/azure/datalake/store/Core.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,34 @@ public static InputStream open(String path,
ADLStoreClient client,
RequestOptions opts,
OperationResponse resp) {
return open(path, offset, length, sessionId, false, client, opts, resp);
}




/**
* read from a file. This is the stateless read method, that reads bytes from an offset in a file.
*
*
* @param path the full path of the file to read. The file must already exist.
* @param offset the offset within the ADL file to read from
* @param length the number of bytes to read from file
* @param sessionId a String containing the session ID (generated by client). Can be null.
* @param speculativeRead indicates if the read is speculative. Currently fails the read - so dont use.
* @param client the {@link ADLStoreClient}
* @param opts options to change the behavior of the call
* @param resp response from the call, and any error info generated by the call
* @return returns an {@link com.microsoft.azure.datalake.store.ADLFileInputStream}
*/
public static InputStream open(String path,
long offset,
long length,
String sessionId,
boolean speculativeRead,
ADLStoreClient client,
RequestOptions opts,
OperationResponse resp) {
QueryParams qp = new QueryParams();
qp.add("read", "true");
if (offset < 0) {
Expand All @@ -246,6 +274,7 @@ public static InputStream open(String path,

if (offset > 0) qp.add("offset", Long.toString(offset));
if (length > 0) qp.add("length", Long.toString(length));
if (speculativeRead) qp.add("speculative", "true");
if (sessionId != null && !sessionId.equals("")) {
qp.add("filesessionid", sessionId);
}
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/com/microsoft/azure/datalake/store/ReadBuffer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.microsoft.azure.datalake.store;


import java.util.concurrent.CountDownLatch;

/**
* This object represents the buffer state as it is going through it's lifecycle.
* The buffer (the byte array) itself is assigned to this object from a free pool,
* so we do not create tons of objects in large object heap.
*/
class ReadBuffer {
ADLFileInputStream file;
long offset; // offset within the file for the buffer
int length; // actual length, set after the buffer is filles
int requestedLength; // requested length of the read
byte[] buffer; // the buffer itself
int bufferindex = -1; // index in the buffers array in Buffer manager
ReadBufferStatus status; // status of the buffer
CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
// waiting on this buffer gets unblocked

// fields to help with eviction logic
long birthday = 0; // tick at which buffer became available to read
boolean firstByteConsumed = false;
boolean lastByteConsumed = false;
boolean anyByteConsumed = false;
}
Loading

0 comments on commit 47d3eaa

Please sign in to comment.