Skip to content

Commit

Permalink
Expose configuration options on the session context
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Sep 13, 2023
1 parent 4f56a9a commit ae38e99
Show file tree
Hide file tree
Showing 10 changed files with 862 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public final long getPointer() {

abstract void doClose(long pointer) throws Exception;

// Ensure native library is loaded before any proxy object is used
static {
JNILoader.load();
}

@Override
public final void close() throws Exception {
if (closed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.apache.arrow.datafusion;

/** Configures options related to query execution */
@SuppressWarnings("UnusedReturnValue")
public class ExecutionOptions {
private final SessionConfig config;

ExecutionOptions(SessionConfig config) {
this.config = config;
}

/**
* Get execution options related to reading Parquet data
*
* @return {@link ParquetOptions} instance for this config
*/
public ParquetOptions parquet() {
return new ParquetOptions(config);
}

/**
* Get the batch size
*
* @return batch size
*/
public long batchSize() {
return SessionConfig.getExecutionOptionsBatchSize(config.getPointer());
}

/**
* Set the size of batches to use when creating new data batches
*
* @param batchSize the batch size to set
* @return the modified {@link ExecutionOptions} instance
*/
public ExecutionOptions withBatchSize(long batchSize) {
SessionConfig.setExecutionOptionsBatchSize(config.getPointer(), batchSize);
return this;
}

/**
* Get whether batch coalescing is enabled
*
* @return whether batch coalescing is enabled
*/
public boolean coalesceBatches() {
return SessionConfig.getExecutionOptionsCoalesceBatches(config.getPointer());
}

/**
* Set whether to enable batch coalescing
*
* @param enabled whether to enable batch coalescing
* @return the modified {@link ExecutionOptions} instance
*/
public ExecutionOptions withCoalesceBatches(boolean enabled) {
SessionConfig.setExecutionOptionsCoalesceBatches(config.getPointer(), enabled);
return this;
}

/**
* Get whether statistics collection is enabled
*
* @return whether statistics collection is enabled
*/
public boolean collectStatistics() {
return SessionConfig.getExecutionOptionsCollectStatistics(config.getPointer());
}

/**
* Set whether to enable statistics collection
*
* @param enabled whether to enable statistics collection
* @return the modified {@link ExecutionOptions} instance
*/
public ExecutionOptions withCollectStatistics(boolean enabled) {
SessionConfig.setExecutionOptionsCollectStatistics(config.getPointer(), enabled);
return this;
}

/**
* Get the target number of partitions
*
* @return number of partitions
*/
public long targetPartitions() {
return SessionConfig.getExecutionOptionsTargetPartitions(config.getPointer());
}

/**
* Set the target number of partitions
*
* @param targetPartitions the number of partitions to set
* @return the modified {@link ExecutionOptions} instance
*/
public ExecutionOptions withTargetPartitions(long targetPartitions) {
SessionConfig.setExecutionOptionsTargetPartitions(config.getPointer(), targetPartitions);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package org.apache.arrow.datafusion;

import java.util.Optional;

/** Configures options specific to reading Parquet data */
@SuppressWarnings("UnusedReturnValue")
public class ParquetOptions {
private final SessionConfig config;

ParquetOptions(SessionConfig config) {
this.config = config;
}

/**
* Get whether parquet data page level metadata (Page Index) statistics are used
*
* @return whether using the page index is enabled
*/
public boolean enablePageIndex() {
return SessionConfig.getParquetOptionsEnablePageIndex(config.getPointer());
}

/**
* Set whether to use parquet data page level metadata (Page Index) statistics to reduce the
* number of rows decoded.
*
* @param enabled whether using the page index is enabled
* @return the modified {@link ParquetOptions} instance
*/
public ParquetOptions withEnablePageIndex(boolean enabled) {
SessionConfig.setParquetOptionsEnablePageIndex(config.getPointer(), enabled);
return this;
}

/**
* Get whether pruning is enabled, meaning reading row groups will be skipped based on metadata
*
* @return whether pruning is enabled
*/
public boolean pruning() {
return SessionConfig.getParquetOptionsPruning(config.getPointer());
}

/**
* Set whether pruning is enabled, meaning reading row groups will be skipped based on metadata
*
* @param enabled whether to enable pruning
* @return the modified {@link ParquetOptions} instance
*/
public ParquetOptions withPruning(boolean enabled) {
SessionConfig.setParquetOptionsPruning(config.getPointer(), enabled);
return this;
}

/**
* Get whether file metadata is skipped, to avoid schema conflicts
*
* @return whether metadata is skipped
*/
public boolean skipMetadata() {
return SessionConfig.getParquetOptionsSkipMetadata(config.getPointer());
}

/**
* Set whether file metadata is skipped, to avoid schema conflicts
*
* @param enabled whether to skip metadata
* @return the modified {@link ParquetOptions} instance
*/
public ParquetOptions withSkipMetadata(boolean enabled) {
SessionConfig.setParquetOptionsSkipMetadata(config.getPointer(), enabled);
return this;
}

/**
* Get the metadata size hint
*
* @return metadata size hint value
*/
public Optional<Long> metadataSizeHint() {
long sizeHint = SessionConfig.getParquetOptionsMetadataSizeHint(config.getPointer());
return sizeHint < 0 ? Optional.empty() : Optional.of(sizeHint);
}

/**
* Set the metadata size hint, which is used to attempt to read the full metadata at once rather
* than needing one read to get the metadata size and then a second read for the metadata itself.
*
* @param metadataSizeHint the metadata size hint
* @return the modified {@link ParquetOptions} instance
*/
public ParquetOptions withMetadataSizeHint(Optional<Long> metadataSizeHint) {
long value = -1L;
if (metadataSizeHint.isPresent()) {
value = metadataSizeHint.get();
if (value < 0) {
throw new RuntimeException("metadataSizeHint cannot be negative");
}
}
SessionConfig.setParquetOptionsMetadataSizeHint(config.getPointer(), value);
return this;
}

/**
* Get whether filter pushdown is enabled, so filters are applied during parquet decoding
*
* @return whether filter pushdown is enabled
*/
public boolean pushdownFilters() {
return SessionConfig.getParquetOptionsPushdownFilters(config.getPointer());
}

/**
* Set whether filter pushdown is enabled, so filters are applied during parquet decoding
*
* @param enabled whether to pushdown filters
* @return the modified {@link ParquetOptions} instance
*/
public ParquetOptions withPushdownFilters(boolean enabled) {
SessionConfig.setParquetOptionsPushdownFilters(config.getPointer(), enabled);
return this;
}

/**
* Get whether filter reordering is enabled to minimize evaluation cost
*
* @return whether filter reordering is enabled
*/
public boolean reorderFilters() {
return SessionConfig.getParquetOptionsReorderFilters(config.getPointer());
}

/**
* Set whether filter reordering is enabled to minimize evaluation cost
*
* @param enabled whether to reorder filters
* @return the modified {@link ParquetOptions} instance
*/
public ParquetOptions withReorderFilters(boolean enabled) {
SessionConfig.setParquetOptionsReorderFilters(config.getPointer(), enabled);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.apache.arrow.datafusion;

import java.util.function.Consumer;

/** Configuration for creating a {@link SessionContext} using {@link SessionContexts#withConfig} */
public class SessionConfig extends AbstractProxy implements AutoCloseable {
/** Create a new default {@link SessionConfig} */
public SessionConfig() {
super(create());
}

/**
* Get options related to query execution
*
* @return {@link ExecutionOptions} instance for this config
*/
public ExecutionOptions executionOptions() {
return new ExecutionOptions(this);
}

/**
* Get options specific to parsing SQL queries
*
* @return {@link SqlParserOptions} instance for this config
*/
public SqlParserOptions sqlParserOptions() {
return new SqlParserOptions(this);
}

/**
* Modify this session configuration and then return it, to simplify use in a try-with-resources
* statement
*
* @param configurationCallback Callback used to update the configuration
* @return This {@link SessionConfig} instance after being updated
*/
public SessionConfig withConfiguration(Consumer<SessionConfig> configurationCallback) {
configurationCallback.accept(this);
return this;
}

@Override
void doClose(long pointer) {
destroy(pointer);
}

private static native long create();

private static native void destroy(long pointer);

// ExecutionOptions native methods

static native long getExecutionOptionsBatchSize(long pointer);

static native void setExecutionOptionsBatchSize(long pointer, long batchSize);

static native boolean getExecutionOptionsCoalesceBatches(long pointer);

static native void setExecutionOptionsCoalesceBatches(long pointer, boolean enabled);

static native boolean getExecutionOptionsCollectStatistics(long pointer);

static native void setExecutionOptionsCollectStatistics(long pointer, boolean enabled);

static native long getExecutionOptionsTargetPartitions(long pointer);

static native void setExecutionOptionsTargetPartitions(long pointer, long batchSize);

// ParquetOptions native methods

static native boolean getParquetOptionsEnablePageIndex(long pointer);

static native void setParquetOptionsEnablePageIndex(long pointer, boolean enabled);

static native boolean getParquetOptionsPruning(long pointer);

static native void setParquetOptionsPruning(long pointer, boolean enabled);

static native boolean getParquetOptionsSkipMetadata(long pointer);

static native void setParquetOptionsSkipMetadata(long pointer, boolean enabled);

static native long getParquetOptionsMetadataSizeHint(long pointer);

static native void setParquetOptionsMetadataSizeHint(long pointer, long value);

static native boolean getParquetOptionsPushdownFilters(long pointer);

static native void setParquetOptionsPushdownFilters(long pointer, boolean enabled);

static native boolean getParquetOptionsReorderFilters(long pointer);

static native void setParquetOptionsReorderFilters(long pointer, boolean enabled);

// SqlParserOptions native methods

static native boolean getSqlParserOptionsParseFloatAsDecimal(long pointer);

static native void setSqlParserOptionsParseFloatAsDecimal(long pointer, boolean enabled);

static native boolean getSqlParserOptionsEnableIdentNormalization(long pointer);

static native void setSqlParserOptionsEnableIdentNormalization(long pointer, boolean enabled);

static native String getSqlParserOptionsDialect(long pointer);

static native void setSqlParserOptionsDialect(long pointer, String dialect);
}
Loading

0 comments on commit ae38e99

Please sign in to comment.