Skip to content

Commit

Permalink
DRILL-8504: Add Schema Caching to Splunk Plugin (apache#2929)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgivre authored Nov 18, 2024
1 parent 827f4c4 commit 70d6a95
Show file tree
Hide file tree
Showing 17 changed files with 241 additions and 59 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
# added to the Runner and memory hungry tests are run separately.
run: |
sudo sh -c "
fallocate -l 4G /tmp/swapfile
fallocate -l 2G /tmp/swapfile
chmod 0600 /tmp/swapfile
mkswap /tmp/swapfile
swapon /tmp/swapfile
Expand All @@ -67,6 +67,7 @@ jobs:
- name: Remove swap space
run : |
sudo sh -c "
free -h
swapoff /tmp/swapfile
rm /tmp/swapfile
"
Expand Down
19 changes: 13 additions & 6 deletions contrib/storage-splunk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This plugin enables Drill to query Splunk.
## Configuration

| Option | Default | Description | Since |
|-----------------------| --------- | --------------------------------------------------------------- | ----- |
|-----------------------| --------- | --------------------------------------------------------------- |-------|
| type | (none) | Set to "splunk" to use this plugin | 1.19 |
| username | null | Splunk username to be used by Drill | 1.19 |
| password | null | Splunk password to be used by Drill | 1.19 |
Expand All @@ -13,12 +13,15 @@ This plugin enables Drill to query Splunk.
| port | 8089 | TCP port over which Drill will connect to Splunk | 1.19 |
| earliestTime | null | Global earliest record timestamp default | 1.19 |
| latestTime | null | Global latest record timestamp default | 1.19 |
| app | null | The application context of the service[^1] | 2.0 |
| owner | null | The owner context of the service[^1] | 2.0 |
| token | null | A Splunk authentication token to use for the session[^2] | 2.0 |
| cookie | null | A valid login cookie | 2.0 |
| validateCertificates | true | Whether the Splunk client will validates the server's SSL cert | 2.0 |
| app | null | The application context of the service[^1] | 1.21 |
| owner | null | The owner context of the service[^1] | 1.21 |
| token | null | A Splunk authentication token to use for the session[^2] | 1.21 |
| cookie | null | A valid login cookie | 1.21 |
| validateCertificates | true | Whether the Splunk client will validates the server's SSL cert | 1.21 |
| validateHostname | true | Whether the Splunk client will validate the server's host name | 1.22 |
| maxColumns | 1024 | The maximum number of columns Drill will accept from Splunk | 1.22 |
| maxCacheSize | 10000 | The size (in bytes) of Splunk's schema cache. | 1.22 |
| cacheExpiration | 1024 | The number of minutes for Drill to persist the schema cache. | 1.22 |

[^1]: See [this Splunk documentation](https://docs.splunk.com/Documentation/Splunk/latest/Admin/Apparchitectureandobjectownership) for more information.
[^2]: See [this Splunk documentation](https://docs.splunk.com/Documentation/Splunk/latest/Security/CreateAuthTokens) for more information.
Expand Down Expand Up @@ -46,6 +49,10 @@ To bypass it by Drill please specify "reconnectRetries": 3. It allows you to ret
### User Translation
The Splunk plugin supports user translation. Simply set the `authMode` parameter to `USER_TRANSLATION` and use either the plain or vault credential provider for credentials.

## Schema Caching
For every query that you send to Splunk from Drill, Drill will have to pull schema information from Splunk. If you have a lot of indexes, this process can cause slow planning time. To improve planning time, you can configure Drill to cache the index names so that it does not need to make additional calls to Splunk.

There are two configuration parameters for the schema caching: `maxCacheSize` and `cacheExpiration`. The maxCacheSize defaults to 10k bytes and the `cacheExpiration` defaults to 1024 minutes. To disable schema caching simply set the `cacheExpiration` parameter to a value less than zero.

## Understanding Splunk's Data Model
Splunk's primary use case is analyzing event logs with a timestamp. As such, data is indexed by the timestamp, with the most recent data being indexed first. By default, Splunk
Expand Down
6 changes: 6 additions & 0 deletions contrib/storage-splunk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.drill.exec</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ public class SplunkBatchReader implements ManagedReader<SchemaNegotiator> {
private static final Logger logger = LoggerFactory.getLogger(SplunkBatchReader.class);
private static final List<String> INT_COLS = new ArrayList<>(Arrays.asList("date_hour", "date_mday", "date_minute", "date_second", "date_year", "linecount"));
private static final List<String> TS_COLS = new ArrayList<>(Arrays.asList("_indextime", "_time"));
private static final String EARLIEST_TIME_COLUMN = "earliestTime";
private static final String LATEST_TIME_COLUMN = "latestTime";

private final SplunkPluginConfig config;
private final SplunkSubScan subScan;
private final List<SchemaPath> projectedColumns;
Expand Down Expand Up @@ -88,6 +85,8 @@ public SplunkBatchReader(SplunkPluginConfig config, SplunkSubScan subScan) {
RowListProcessor rowProcessor = new RowListProcessor();
csvSettings.setProcessor(rowProcessor);
csvSettings.setMaxCharsPerColumn(ValueVector.MAX_BUFFER_SIZE);
// Splunk can produce a lot of columns. The SDK default maximum is 512.
csvSettings.setMaxColumns(config.getMaxColumns());
}

@Override
Expand Down Expand Up @@ -174,7 +173,7 @@ private TupleMetadata buildSchema() {
}
}
}
logger.debug("Time to build schmea: {} milliseconds", timer.elapsed().getNano() / 100000);
logger.debug("Time to build schema: {} milliseconds", timer.elapsed().getNano() / 100000);
return builder.buildSchema();
}

Expand Down Expand Up @@ -241,18 +240,18 @@ private String buildQueryString () {

// Splunk searches perform best when they are time bound. This allows the user to set
// default time boundaries in the config. These will be overwritten in filter pushdowns
if (filters != null && filters.containsKey(EARLIEST_TIME_COLUMN)) {
earliestTime = filters.get(EARLIEST_TIME_COLUMN).value.value.toString();
if (filters != null && filters.containsKey(SplunkUtils.EARLIEST_TIME_COLUMN)) {
earliestTime = filters.get(SplunkUtils.EARLIEST_TIME_COLUMN).value.value.toString();

// Remove from map
filters.remove(EARLIEST_TIME_COLUMN);
filters.remove(SplunkUtils.EARLIEST_TIME_COLUMN);
}

if (filters != null && filters.containsKey(LATEST_TIME_COLUMN)) {
latestTime = filters.get(LATEST_TIME_COLUMN).value.value.toString();
if (filters != null && filters.containsKey(SplunkUtils.LATEST_TIME_COLUMN)) {
latestTime = filters.get(SplunkUtils.LATEST_TIME_COLUMN).value.value.toString();

// Remove from map so they are not pushed down into the query
filters.remove(LATEST_TIME_COLUMN);
filters.remove(SplunkUtils.LATEST_TIME_COLUMN);
}

if (earliestTime == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,63 @@ public GroupScan clone(List<SchemaPath> columns) {
return new SplunkGroupScan(this, columns);
}

/**
* Generates the query which will be sent to Splunk. This method exists for debugging purposes so
* that the actual SPL will be recorded in the query plan.
*/
private String generateQuery() {
String earliestTime = null;
String latestTime = null;

// Splunk searches perform best when they are time bound. This allows the user to set
// default time boundaries in the config. These will be overwritten in filter pushdowns
if (filters != null && filters.containsKey(SplunkUtils.EARLIEST_TIME_COLUMN)) {
earliestTime = filters.get(SplunkUtils.EARLIEST_TIME_COLUMN).value.value.toString();
}

if (filters != null && filters.containsKey(SplunkUtils.LATEST_TIME_COLUMN)) {
latestTime = filters.get(SplunkUtils.LATEST_TIME_COLUMN).value.value.toString();
}

if (earliestTime == null) {
earliestTime = config.getEarliestTime();
}

if (latestTime == null) {
latestTime = config.getLatestTime();
}

// Special case: If the user wishes to send arbitrary SPL to Splunk, the user can use the "SPL"
// Index and spl filter
if (splunkScanSpec.getIndexName().equalsIgnoreCase("spl")) {
if (filters != null && filters.containsKey("spl")) {
return filters.get("spl").value.value.toString();
}
}

SplunkQueryBuilder builder = new SplunkQueryBuilder(splunkScanSpec.getIndexName());

// Set the sourcetype
if (filters != null && filters.containsKey("sourcetype")) {
String sourcetype = filters.get("sourcetype").value.value.toString();
builder.addSourceType(sourcetype);
}

// Add projected columns, skipping star and specials.
for (SchemaPath projectedColumn: columns) {
builder.addField(projectedColumn.getAsUnescapedPath());
}

// Apply filters
builder.addFilters(filters);

// Apply limits
if (maxRecords > 0) {
builder.addLimit(maxRecords);
}
return builder.build();
}

@Override
public int hashCode() {

Expand Down Expand Up @@ -344,6 +401,7 @@ public String toString() {
.field("scan spec", splunkScanSpec)
.field("columns", columns)
.field("maxRecords", maxRecords)
.field("spl", generateQuery())
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public class SplunkPluginConfig extends StoragePluginConfig {
public static final String NAME = "splunk";
public static final int DISABLED_RECONNECT_RETRIES = 1;
public static final int DEFAULT_WRITER_BATCH_SIZE = 1000;
public static final int DEFAULT_MAX_READER_COLUMNS = 1024;
public static final int DEFAULT_MAX_CACHE_SIZE = 10000;
public static final int DEFAULT_CACHE_EXPIRATION = 1024;

private final String scheme;
private final String hostname;
Expand All @@ -55,6 +58,9 @@ public class SplunkPluginConfig extends StoragePluginConfig {
private final Integer reconnectRetries;
private final boolean writable;
private final Integer writerBatchSize;
private final Integer maxColumns;
private final Integer maxCacheSize;
private final Integer cacheExpiration;

@JsonCreator
public SplunkPluginConfig(@JsonProperty("username") String username,
Expand All @@ -74,7 +80,11 @@ public SplunkPluginConfig(@JsonProperty("username") String username,
@JsonProperty("reconnectRetries") Integer reconnectRetries,
@JsonProperty("authMode") String authMode,
@JsonProperty("writable") boolean writable,
@JsonProperty("writableBatchSize") Integer writerBatchSize) {
@JsonProperty("writableBatchSize") Integer writerBatchSize,
@JsonProperty("maxColumns") Integer maxColumns,
@JsonProperty("maxCacheSize") Integer maxCacheSize,
@JsonProperty("cacheExpiration") Integer cacheExpiration
) {
super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
credentialsProvider == null, AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER));
this.scheme = scheme;
Expand All @@ -91,6 +101,9 @@ public SplunkPluginConfig(@JsonProperty("username") String username,
this.latestTime = latestTime == null ? "now" : latestTime;
this.reconnectRetries = reconnectRetries;
this.writerBatchSize = writerBatchSize;
this.maxColumns = maxColumns;
this.maxCacheSize = maxCacheSize;
this.cacheExpiration = cacheExpiration;
}

private SplunkPluginConfig(SplunkPluginConfig that, CredentialsProvider credentialsProvider) {
Expand All @@ -109,6 +122,9 @@ private SplunkPluginConfig(SplunkPluginConfig that, CredentialsProvider credenti
this.latestTime = that.latestTime;
this.reconnectRetries = that.reconnectRetries;
this.writerBatchSize = that.writerBatchSize;
this.maxColumns = that.maxColumns;
this.maxCacheSize = that.maxCacheSize;
this.cacheExpiration = that.cacheExpiration;
}

/**
Expand Down Expand Up @@ -225,6 +241,21 @@ public int getWriterBatchSize() {
return writerBatchSize != null ? writerBatchSize : DEFAULT_WRITER_BATCH_SIZE;
}

@JsonProperty("maxColumns")
public int getMaxColumns() {
return maxColumns != null ? maxColumns : DEFAULT_MAX_READER_COLUMNS;
}

@JsonProperty("maxCacheSize")
public int getMaxCacheSize() {
return maxCacheSize != null ? maxCacheSize : DEFAULT_MAX_CACHE_SIZE;
}

@JsonProperty("cacheExpiration")
public int getCacheExpiration() {
return cacheExpiration != null ? cacheExpiration : DEFAULT_CACHE_EXPIRATION;
}

private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) {
return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
}
Expand All @@ -250,26 +281,32 @@ public boolean equals(Object that) {
Objects.equals(validateHostname, thatConfig.validateHostname) &&
Objects.equals(earliestTime, thatConfig.earliestTime) &&
Objects.equals(latestTime, thatConfig.latestTime) &&
Objects.equals(authMode, thatConfig.authMode);
Objects.equals(authMode, thatConfig.authMode) &&
Objects.equals(maxCacheSize, thatConfig.maxCacheSize) &&
Objects.equals(maxColumns, thatConfig.maxColumns) &&
Objects.equals(cacheExpiration, thatConfig.cacheExpiration);
}

@Override
public int hashCode() {
return Objects.hash(
credentialsProvider,
scheme,
hostname,
port,
app,
owner,
token,
cookie,
writable,
validateCertificates,
validateHostname,
earliestTime,
latestTime,
authMode
credentialsProvider,
scheme,
hostname,
port,
app,
owner,
token,
cookie,
writable,
validateCertificates,
validateHostname,
earliestTime,
latestTime,
authMode,
cacheExpiration,
maxCacheSize,
maxColumns
);
}

Expand All @@ -290,6 +327,9 @@ public String toString() {
.field("earliestTime", earliestTime)
.field("latestTime", latestTime)
.field("Authentication Mode", authMode)
.field("maxColumns", maxColumns)
.field("maxCacheSize", maxCacheSize)
.field("cacheExpiration", cacheExpiration)
.toString();
}

Expand Down
Loading

0 comments on commit 70d6a95

Please sign in to comment.