-
Notifications
You must be signed in to change notification settings - Fork 1
Storage Scan
We have now helped Calcite resolve a table name. Next we begin to define what we will do with the table, which is to scan (read) it. Some systems are fancier than others: some read a single, contiguous data source, some read "splits" of a data source (such as file blocks in HDFS), some arbitrarily break up a scan into chunks, others "push" the scan to an external system, and so forth. Calcite provides a great deal of flexibility to define these cases. For our needs, we assume we scan a single data source from start to end.
Drill uses a three-stage process to convert a table into a scan operator definition:
- Resolve the table to create a scan specification.
- Build up the scan with as a group scan. The group scan handles project push-down (columns), optional filter push-down, parallelization and more.
- Determine the information needed to perform the physical scan and create a sub (specific) scan specification.
We start by defining a "scan specification": a description of the overall logical scan, before we've started worrying about the details. Our class just needs the table name:
public class ExampleScanSpec {
private final String tableName;
@JsonCreator
public ExampleScanSpec(String tableName) {
this.tableName = tableName;
}
public String getTableName() { return tableName; }
}
You can add any additional information needed by your plugin (schema path, table metadata, etc.) For this reason, the scan spec is unique to each plugin; there is no common base class.
Tell Drill about the scan spec by attaching it to the Drill table created in the schema:
class DefaultSchema extends AbstractSchema {
...
@Override
public Table getTable(String tableName) {
...
return registerTable(name,
new DynamicDrillTable(plugin, plugin.getName(),
new ExampleScanSpec(name)));
...
We can now repeat our test, stepping through the code to verify that the scan spec and dynamic table are created correctly.
Drill is a member of the Hadoop ecosystem, and so supports the idea that a table may be represented by a directory of files, where each file consists of blocks distributed across storage nodes. A "group scan" is the term for the logical scan of the pieces that make up a table, and represents one additional layer of refinement from the scan specification.
As we have noted, Drill uses Calcite for query planning, and so the scan objects fit into the Calcite structure. As a result, the group (and later, "sub") scans are a bit complex.
In our case, our table consists of a single chunk of data, so the "group" consists of a single physical scan.
The group scan brings together several layers of information:
- The configuration of the storage plugin (the "storage engine"),
- The (optional schema) and table,
- The set of columns.
- Physical query plan (distribution, definition of scan operators)
- Plugin-specific implementation details for the scan.
The group scan itself has to entirely different lifecycles:
- The logical plan lifecycle, where fields must be Jackson-serializable to JSON.
- The physical plan lifecycle, where all work is done in-memory and no serialization is needed.
It is unfortunate that both phases happen in the same class; you need to know which of your fields to serialize and which can be transient. (And, you need to tell Jackson to not serialize the transient fields.) For now, we will focus only on the logical plan aspects, and rather rush through the physical plan bits in order to get something running.
The group scan extends the scan spec by providing a list of columns from our query:
SELECT * FROM example.myTable
SELECT a, b, c FROM example.myTable
Drill uses schema-on-read, so we will assume that we can figure out the table names and types at runtime. However, if we know the available columns and types at plan time, we can tell Calcite to use that information. See the existing storage plugins to see how that is done.
We need the group scan class and a variety of constructors:
public class ExampleGroupScan extends BaseGroupScan {
private final ExampleScanSpec scanSpec;
// Initial constructor
public SumoGroupScan(ExampleStoragePlugin storagePlugin, String userName,
ExampleScanSpec scanSpec) {
super(storagePlugin, userName, null);
this.scanSpec = scanSpec;
}
// Copy with columns
public ExampleGroupScan(ExampleGroupScan from, List<SchemaPath> columns) {
super(from.storagePlugin, from.getUserName(), columns);
this.scanSpec = that.scanSpec;
}
// Deserialization constructor
@JsonCreator
public SumoGroupScan(
@JsonProperty("config") SumoStoragePluginConfig config,
@JsonProperty("userName") String userName,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("scanSpec") ExampleScanSpec scanSpec,
@JacksonInject StoragePluginRegistry engineRegistry) {
super(config, userName, columns, engineRegistry);
this.scanSpec = scanSpec;
}
As mentioned above, this class is part of the "logical plan" and thus must be Jackson serializable.
The config
, userName
, columns
, and engineRegistry
fields are all handled by the base class. scanSpec
is a proxy for whatever information you choose to store in your class.
The process to create a group scan from a scan spec is a bit complex. The base class tries to simplify this using a "scan factory." Back in the storage plugin class:
public class ExampleStoragePlugin extends BaseStoragePlugin<ExampleStoragePluginConfig> {
private static class ExampleScanFactory extends
BaseScanFactory<ExampleStoragePlugin, ExampleScanSpec,
ExampleGroupScan, ExampleSubScan> {
@Override
public ExampleGroupScan newGroupScan(ExampleStoragePlugin storagePlugin,
String userName, ExampleScanSpec scanSpec,
SessionOptionManager sessionOptions,
MetadataProviderManager metadataProviderManager) {
return new ExampleGroupScan(storagePlugin, userName, scanSpec);
}
@Override
public ExampleGroupScan groupWithColumns(ExampleGroupScan group,
List<SchemaPath> columns) {
return new ExampleGroupScan(group, columns);
}
@Override
public ScanFrameworkBuilder scanBuilder(SumoStoragePlugin storagePlugin,
OptionManager options, SumoSubScan subScan) {
return null; // TODO
}
}
private static StoragePluginOptions buildOptions() {
...
options.scanSpecType = new TypeReference<ExampleScanSpec>() { };
options.scanFactory = new SumoScanFactory();
return options;
}
The scan factory class provides a type-friendly way to create instances in lieu of the somewhat obscure way other plugins do the same work. Notice the use of two of the group scan constructors just created.
The Base
plugin framework helps us get started by assuming some basic functionality.
- The plugin supports project push-down via the Enhanced Vector Framework. (If you look at existing plugins, you'll see that those implement the
canPushdownProjects()
, but you get it for free.) - The plugin supports a single reader (no parallelism.) (Once the basics work, you can go back and implement parallelism by overriding
applyAssignments()
andgetMaxParallelizationWidth()
.)
Calcite is a cost-based planner: it will use cost information to choose among plans. We must provide a cost estimate, however crude. As it turns out, the actual row count is less critical than adjusting the cost in reaction to projection and filter push-down events. You can start by doing pretty much the same as the "test mule" DummyGroupScan
does:
@Override
public ScanStats computeScanStats() {
// No good estimates at all, just make up something.
int estRowCount = 10_000;
// If doing filter push-down, apply filter selectivity
// here to reduce row count.
// If this is an adapter to an external system, assume no disk
// I/O. Instead, we have to explain costs by reducing CPU.
double cpuRatio = 1.0;
// If columns provided, then assume this saves data transfer
if (getColumns() != BaseGroupScan.ALL_COLUMNS) {
cpuRatio = 0.75;
}
// Would like to reduce network costs, but not easy to do here since Drill
// scans assume we read from disk, not network.
return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, cpuRatio, 0);
}
If you are reading from a file, you can estimate the data size from file size, and estimate row count by dividing file size by some reasonable number, say 50, 100 or 200, depending on the data. You can also include an estimated disk I/O cost. If you are reading a database or some other system, you may be able to get accurate estimates. If reading from, say, a REST service, you may have no data at all and must just make something up.
You want the estimate to at least be small or large: small if you know it is safe for Drill to broadcast data to all nodes (for a hash join, say) and large if Drill should never make copies of the data.
In the above, we reduce the effective CPU cost when we have accepted project push-down (as set of columns). Without this, Calcite won't see any benefit from project-push down and may go ahead and include a Project operator in the query, which we don't need.
Later, if you add filter push-down, then you must emulate the same "selectivity" (percentage of rows that remain after the filter) as Drill so that, again, Calcite chooses our alternative over the built-in Drill Filter operator. We'll discuss that more later.
Once physical planning is complete (which, so far, kind of happens in the background), the logical group scan class must produce a "physical" scan operator definition. (The terminology is awkward: the run-time ScanBatch
or ScanOperatorExec
is the physical operator, the SubScan
class described here is the definition of that physical operator.)
The group scan creates the specific (sub) scan: one for each execution slice (which Drill calls a "minor fragment.) For now, we've told Drill that our plugin supports only a single slice, so we create a single sub scan:
@Override
public SubScan getSpecificScan(int minorFragmentId) {
// TODO Auto-generated method stub
return null;
}
We'll fill in the actual sub scan class later.
Finally, we need some boilerplate required by Drill. The Base
framework provides much of the plumbing (via the scan factory class we defined earlier.) Again, if you look at existing group scan implementations, you will see the boilerplate that Base
handles for you.
The one method you should provide is to convert your plugin-specific fields to string. This string appears as part of the EXPLAIN PLAN FOR
output, so we want to use the standard format:
@Override
public void buildPlanString(PlanStringBuilder builder) {
super.buildPlanString(builder);
builder.field("scanSpec", scanSpec);
}
We can now again run our test. First set a breakpoint in the getSpecificScan
, and run the test. This will verify that things work up to this point.