Skip to content

Commit

Permalink
Merge pull request #5 from ydb-platform/feature_columnar
Browse files Browse the repository at this point in the history
Preparing for columnar tables support
  • Loading branch information
zinal authored Nov 16, 2024
2 parents 396c6c3 + e364627 commit a5952f1
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 43 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ The following Spark configuration properties are supported by the YDB connector

* `batchsize` - max batch rows to be ingested in a single portion, default 500. Use with care, typically should not exceed 1000;
* `primary_key` - list of comma-separated column names to define the YDB table's primary key (only supported for `CREATE TABLE` operations);
* `table_type` - one of values 'ROW' (default) or 'COLUMN', determining the store type for the tables being created (only supported for `CREATE TABLE` operations);
* `truncate` - a boolean value (`true` or `false`) specifying whether the connector should truncate the existing table before writing to it.

## Using the SQL statements with YDB catalog defined
Expand Down
21 changes: 19 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,24 @@
<version>1.3-SNAPSHOT</version>
<!-- <version>X.Y[-SNAPSHOT]</version> -->
<packaging>jar</packaging>
<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>https://www.apache.org/licenses/LICENSE-2.0</url>
</license>
</licenses>
<scm>
<url>https://github.com/ydb-platform/ydb-spark-connector</url>
<connection>scm:git:https://github.com/ydb-platform/ydb-spark-connector.git</connection>
<developerConnection>scm:git:https://github.com/ydb-platform/ydb-spark-connector.git</developerConnection>
</scm>
<developers>
<developer>
<name>Aleksandr Gorshenin</name>
<email>[email protected]</email>
<organization>YDB</organization>
<organizationUrl>https://ydb.tech/</organizationUrl>
</developer>
<developer>
<name>Maksim Zinal</name>
<email>[email protected]</email>
Expand All @@ -27,11 +44,11 @@
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-bom</artifactId>
<version>2.2.9</version>
<version>2.3.6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<dependency> <!-- for manual dependency injection in shaded mode -->
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>1.59.1</version>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/tech/ydb/spark/connector/YdbCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti
if (listIndexes) {
listIndexes(namespace, retval, e);
}
} else if (SchemeOperationProtos.Entry.Type.COLUMN_TABLE.equals(e.getType())) {
retval.add(Identifier.of(namespace, e.getName()));
}
}
return retval.toArray(new Identifier[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;

import tech.ydb.spark.connector.impl.YdbScanImpl;
import tech.ydb.spark.connector.impl.YdbScanReadTable;

/**
* Partition reader factory delivers the scan options to partition reader instances.
Expand Down Expand Up @@ -36,7 +36,7 @@ static class YdbReader implements PartitionReader<InternalRow> {

private final YdbScanOptions options;
private final YdbTablePartition partition;
private YdbScanImpl scan;
private YdbScanReadTable scan;

YdbReader(YdbScanOptions options, YdbTablePartition partition) {
this.options = options;
Expand All @@ -48,7 +48,7 @@ public boolean next() throws IOException {
if (scan == null) {
LOG.debug("Preparing scan for table {} at partition {}",
options.getTablePath(), partition);
scan = new YdbScanImpl(options, partition.getRange());
scan = new YdbScanReadTable(options, partition.getRange());
scan.prepare();
LOG.debug("Scan prepared, ready to fetch...");
}
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/tech/ydb/spark/connector/YdbScanBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.InputPartition;
Expand Down Expand Up @@ -39,16 +40,19 @@ public InputPartition[] planInputPartitions() {
new YdbKeyRange.Limit(options.getRangeBegin(), true),
new YdbKeyRange.Limit(options.getRangeEnd(), true)
);
InputPartition[] out = partitions.stream()
final Random random = new Random();
YdbTablePartition[] out = partitions.stream()
.map(kr -> kr.intersect(predicates))
.filter(kr -> !kr.isEmpty())
.map(kr -> new YdbTablePartition(kr))
.toArray(InputPartition[]::new);
.map(kr -> new YdbTablePartition(random.nextInt(999999999), kr))
.toArray(YdbTablePartition[]::new);
if (LOG.isDebugEnabled()) {
LOG.debug("Input partitions count {}, filtered partitions count {}",
partitions.size(), out.length);
LOG.debug("Filtered partition ranges: {}", Arrays.toString(out));
}
// Random ordering is better for multiple concurrent scans with limited parallelism.
Arrays.sort(out, (p1, p2) -> Integer.compare(p1.getOrderingKey(), p2.getOrderingKey()));
return out;
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/tech/ydb/spark/connector/YdbScanOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public class YdbScanOptions extends YdbTableOperationOptions implements Serializ
public YdbScanOptions(YdbTable table) {
super(table);
this.actualSchema = table.schema();
this.keyColumns = new ArrayList<>(table.keyColumns()); // ensure serializable list
this.keyTypes = table.keyTypes();
this.keyColumns = new ArrayList<>(table.getKeyColumns()); // ensure serializable list
this.keyTypes = table.getKeyTypes();
this.rangeBegin = new ArrayList<>();
this.rangeEnd = new ArrayList<>();
this.partitions = table.partitions();
this.partitions = table.getPartitions();
this.scanQueueDepth = table.getConnector().getScanQueueDepth();
this.scanSessionSeconds = table.getConnector().getScanSessionSeconds();
this.rowLimit = -1;
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/tech/ydb/spark/connector/YdbStoreType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package tech.ydb.spark.connector;

/**
*
* @author zinal
*/
public enum YdbStoreType {
UNSPECIFIED,
ROW,
COLUMN,
INDEX
}
57 changes: 39 additions & 18 deletions src/main/java/tech/ydb/spark/connector/YdbTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class YdbTable implements Table,
private final ArrayList<YdbFieldType> keyTypes;
private final ArrayList<YdbKeyRange> partitions;
private final Map<String, String> properties;
private final boolean indexPseudoTable;
private final YdbStoreType storeType;
private StructType schema;

/**
Expand All @@ -85,7 +85,7 @@ public class YdbTable implements Table,
this.keyTypes = new ArrayList<>();
this.partitions = new ArrayList<>();
this.properties = new HashMap<>();
this.indexPseudoTable = false;
this.storeType = convertStoreType(td);
Map<String, TableColumn> cm = buildColumnsMap(td);
for (String cname : td.getPrimaryKeys()) {
TableColumn tc = cm.get(cname);
Expand All @@ -104,10 +104,7 @@ public class YdbTable implements Table,
}
}
}
this.properties.put(YdbOptions.TABLE_TYPE, "table"); // TODO: columnshard support
this.properties.put(YdbOptions.TABLE_PATH, tablePath);
this.properties.put(YdbOptions.PRIMARY_KEY,
this.keyColumns.stream().collect(Collectors.joining(",")));
fillProperties(this.properties, this.tablePath, this.storeType, this.keyColumns);
convertPartitioningSettings(td, this.properties);
LOG.debug("Loaded table {} with {} columns and {} partitions",
this.tablePath, this.columns.size(), this.partitions.size());
Expand Down Expand Up @@ -136,7 +133,7 @@ public class YdbTable implements Table,
this.keyTypes = new ArrayList<>();
this.partitions = new ArrayList<>();
this.properties = new HashMap<>();
this.indexPseudoTable = true;
this.storeType = YdbStoreType.INDEX;
HashSet<String> known = new HashSet<>();
Map<String, TableColumn> cm = buildColumnsMap(tdMain);
// Add index key columns
Expand Down Expand Up @@ -166,10 +163,7 @@ public class YdbTable implements Table,
partitions.add(new YdbKeyRange(kr, connector.getDefaultTypes()));
}
}
this.properties.put(YdbOptions.TABLE_TYPE, "index");
this.properties.put(YdbOptions.TABLE_PATH, tablePath);
this.properties.put(YdbOptions.PRIMARY_KEY,
this.keyColumns.stream().collect(Collectors.joining(",")));
fillProperties(this.properties, this.tablePath, this.storeType, this.keyColumns);
LOG.debug("Loaded index {} with {} columns and {} partitions",
this.tablePath, this.columns.size(), this.partitions.size());
}
Expand Down Expand Up @@ -212,6 +206,29 @@ public static Result<YdbTable> lookup(YdbConnector connector, YdbTypes types,
}).join();
}

static YdbStoreType convertStoreType(TableDescription td) {
/* TODO: implement store type detection
switch (td.getStoreType()) {
case COLUMN:
return YdbStoreType.COLUMN;
case ROW:
return YdbStoreType.ROW;
default:
return YdbStoreType.UNSPECIFIED;
}
*/
return YdbStoreType.ROW;
}

static void fillProperties(Map<String, String> props, String tablePath,
YdbStoreType storeType, List<String> keyColumns) {
props.clear();
props.put(YdbOptions.TABLE_TYPE, storeType.name());
props.put(YdbOptions.TABLE_PATH, tablePath);
props.put(YdbOptions.PRIMARY_KEY,
keyColumns.stream().collect(Collectors.joining(",")));
}

static void convertPartitioningSettings(TableDescription td, Map<String, String> properties) {
PartitioningSettings ps = td.getPartitioningSettings();
if (ps != null) {
Expand Down Expand Up @@ -269,7 +286,7 @@ public Set<TableCapability> capabilities() {
final Set<TableCapability> c = new HashSet<>();
c.add(TableCapability.BATCH_READ);
c.add(TableCapability.ACCEPT_ANY_SCHEMA); // allow YDB to check the schema
if (!indexPseudoTable) {
if (!YdbStoreType.INDEX.equals(storeType)) {
// tables support writes, while indexes do not
c.add(TableCapability.BATCH_WRITE);
c.add(TableCapability.TRUNCATE);
Expand Down Expand Up @@ -322,30 +339,34 @@ public RowLevelOperationBuilder newRowLevelOperationBuilder(RowLevelOperationInf
return new YdbRowLevelBuilder();
}

final YdbConnector getConnector() {
public YdbConnector getConnector() {
return connector;
}

final YdbTypes getTypes() {
public YdbTypes getTypes() {
return types;
}

final String tablePath() {
public String getTablePath() {
return tablePath;
}

final List<String> keyColumns() {
public List<String> getKeyColumns() {
return keyColumns;
}

final ArrayList<YdbFieldType> keyTypes() {
public ArrayList<YdbFieldType> getKeyTypes() {
return keyTypes;
}

final ArrayList<YdbKeyRange> partitions() {
public ArrayList<YdbKeyRange> getPartitions() {
return partitions;
}

public YdbStoreType getStoreType() {
return storeType;
}

private StructField[] mapFields(List<TableColumn> columns) {
final List<StructField> fields = new ArrayList<>();
for (TableColumn tc : columns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public YdbTableOperationOptions(YdbTable table) {
this.connectOptions = new HashMap<>(table.getConnector().getConnectOptions());
this.types = table.getTypes();
this.tableName = table.name();
this.tablePath = table.tablePath();
this.tablePath = table.getTablePath();
this.fieldsList = table.makeColumns();
this.fieldsMap = new HashMap<>();
for (YdbFieldInfo yfi : fieldsList) {
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/tech/ydb/spark/connector/YdbTablePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
*/
public class YdbTablePartition implements InputPartition {

private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 2L;

private final int orderingKey;
private final YdbKeyRange range;

public YdbTablePartition(YdbKeyRange range) {
public YdbTablePartition(int orderingKey, YdbKeyRange range) {
this.orderingKey = orderingKey;
this.range = range;
}

public int getOrderingKey() {
return orderingKey;
}

public YdbKeyRange getRange() {
return range;
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/tech/ydb/spark/connector/YdbWriteBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ public YdbWriteBuilder(YdbTable table, LogicalWriteInfo info, boolean truncate)

@Override
public Write build() {
LOG.debug("Creating YdbWrite for table {}", table.tablePath());
LOG.debug("Creating YdbWrite for table {}", table.getTablePath());
boolean mapByNames = validateSchemas(table.schema(), info.schema());
return new YdbWrite(table, info, mapByNames, truncate);
}

@Override
public WriteBuilder truncate() {
LOG.debug("Truncation requested for table {}", table.tablePath());
LOG.debug("Truncation requested for table {}", table.getTablePath());
return new YdbWriteBuilder(table, info, true);
}

Expand Down
Loading

0 comments on commit a5952f1

Please sign in to comment.