diff --git a/README.md b/README.md
index 8e8985d..a6d5a03 100644
--- a/README.md
+++ b/README.md
@@ -121,6 +121,7 @@ The following Spark configuration properties are supported by the YDB connector
* `batchsize` - max batch rows to be ingested in a single portion, default 500. Use with care, typically should not exceed 1000;
* `primary_key` - list of comma-separated column names to define the YDB table's primary key (only supported for `CREATE TABLE` operations);
+* `table_type` - one of values 'ROW' (default) or 'COLUMN', determining the store type for the tables being created (only supported for `CREATE TABLE` operations);
* `truncate` - a boolean value (`true` or `false`) specifying whether the connector should truncate the existing table before writing to it.
## Using the SQL statements with YDB catalog defined
diff --git a/pom.xml b/pom.xml
index e82cfb3..414ff9a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,24 @@
1.3-SNAPSHOT
jar
+
+
+ Apache License, Version 2.0
+ https://www.apache.org/licenses/LICENSE-2.0
+
+
+
+ https://github.com/ydb-platform/ydb-spark-connector
+ scm:git:https://github.com/ydb-platform/ydb-spark-connector.git
+ scm:git:https://github.com/ydb-platform/ydb-spark-connector.git
+
+
+ Aleksandr Gorshenin
+ alexandr268@ydb.tech
+ YDB
+ https://ydb.tech/
+
Maksim Zinal
zinal@ydb.tech
@@ -27,11 +44,11 @@
tech.ydb
ydb-sdk-bom
- 2.2.9
+ 2.3.6
pom
import
-
+
io.grpc
grpc-bom
1.59.1
diff --git a/src/main/java/tech/ydb/spark/connector/YdbCatalog.java b/src/main/java/tech/ydb/spark/connector/YdbCatalog.java
index c2b47ba..b9b722a 100644
--- a/src/main/java/tech/ydb/spark/connector/YdbCatalog.java
+++ b/src/main/java/tech/ydb/spark/connector/YdbCatalog.java
@@ -129,6 +129,8 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti
if (listIndexes) {
listIndexes(namespace, retval, e);
}
+ } else if (SchemeOperationProtos.Entry.Type.COLUMN_TABLE.equals(e.getType())) {
+ retval.add(Identifier.of(namespace, e.getName()));
}
}
return retval.toArray(new Identifier[0]);
diff --git a/src/main/java/tech/ydb/spark/connector/YdbPartitionReaderFactory.java b/src/main/java/tech/ydb/spark/connector/YdbPartitionReaderFactory.java
index 871167d..28e3f58 100644
--- a/src/main/java/tech/ydb/spark/connector/YdbPartitionReaderFactory.java
+++ b/src/main/java/tech/ydb/spark/connector/YdbPartitionReaderFactory.java
@@ -7,7 +7,7 @@
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
-import tech.ydb.spark.connector.impl.YdbScanImpl;
+import tech.ydb.spark.connector.impl.YdbScanReadTable;
/**
* Partition reader factory delivers the scan options to partition reader instances.
@@ -36,7 +36,7 @@ static class YdbReader implements PartitionReader {
private final YdbScanOptions options;
private final YdbTablePartition partition;
- private YdbScanImpl scan;
+ private YdbScanReadTable scan;
YdbReader(YdbScanOptions options, YdbTablePartition partition) {
this.options = options;
@@ -48,7 +48,7 @@ public boolean next() throws IOException {
if (scan == null) {
LOG.debug("Preparing scan for table {} at partition {}",
options.getTablePath(), partition);
- scan = new YdbScanImpl(options, partition.getRange());
+ scan = new YdbScanReadTable(options, partition.getRange());
scan.prepare();
LOG.debug("Scan prepared, ready to fetch...");
}
diff --git a/src/main/java/tech/ydb/spark/connector/YdbScanBatch.java b/src/main/java/tech/ydb/spark/connector/YdbScanBatch.java
index 44457c4..bf85f6a 100644
--- a/src/main/java/tech/ydb/spark/connector/YdbScanBatch.java
+++ b/src/main/java/tech/ydb/spark/connector/YdbScanBatch.java
@@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Random;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.InputPartition;
@@ -39,16 +40,19 @@ public InputPartition[] planInputPartitions() {
new YdbKeyRange.Limit(options.getRangeBegin(), true),
new YdbKeyRange.Limit(options.getRangeEnd(), true)
);
- InputPartition[] out = partitions.stream()
+ final Random random = new Random();
+ YdbTablePartition[] out = partitions.stream()
.map(kr -> kr.intersect(predicates))
.filter(kr -> !kr.isEmpty())
- .map(kr -> new YdbTablePartition(kr))
- .toArray(InputPartition[]::new);
+ .map(kr -> new YdbTablePartition(random.nextInt(999999999), kr))
+ .toArray(YdbTablePartition[]::new);
if (LOG.isDebugEnabled()) {
LOG.debug("Input partitions count {}, filtered partitions count {}",
partitions.size(), out.length);
LOG.debug("Filtered partition ranges: {}", Arrays.toString(out));
}
+ // Random ordering is better for multiple concurrent scans with limited parallelism.
+ Arrays.sort(out, (p1, p2) -> Integer.compare(p1.getOrderingKey(), p2.getOrderingKey()));
return out;
}
diff --git a/src/main/java/tech/ydb/spark/connector/YdbScanOptions.java b/src/main/java/tech/ydb/spark/connector/YdbScanOptions.java
index 8b5cc38..8b132e8 100644
--- a/src/main/java/tech/ydb/spark/connector/YdbScanOptions.java
+++ b/src/main/java/tech/ydb/spark/connector/YdbScanOptions.java
@@ -41,11 +41,11 @@ public class YdbScanOptions extends YdbTableOperationOptions implements Serializ
public YdbScanOptions(YdbTable table) {
super(table);
this.actualSchema = table.schema();
- this.keyColumns = new ArrayList<>(table.keyColumns()); // ensure serializable list
- this.keyTypes = table.keyTypes();
+ this.keyColumns = new ArrayList<>(table.getKeyColumns()); // ensure serializable list
+ this.keyTypes = table.getKeyTypes();
this.rangeBegin = new ArrayList<>();
this.rangeEnd = new ArrayList<>();
- this.partitions = table.partitions();
+ this.partitions = table.getPartitions();
this.scanQueueDepth = table.getConnector().getScanQueueDepth();
this.scanSessionSeconds = table.getConnector().getScanSessionSeconds();
this.rowLimit = -1;
diff --git a/src/main/java/tech/ydb/spark/connector/YdbStoreType.java b/src/main/java/tech/ydb/spark/connector/YdbStoreType.java
new file mode 100644
index 0000000..8f36232
--- /dev/null
+++ b/src/main/java/tech/ydb/spark/connector/YdbStoreType.java
@@ -0,0 +1,12 @@
+package tech.ydb.spark.connector;
+
+/**
+ *
+ * @author zinal
+ */
+public enum YdbStoreType {
+ UNSPECIFIED,
+ ROW,
+ COLUMN,
+ INDEX
+}
diff --git a/src/main/java/tech/ydb/spark/connector/YdbTable.java b/src/main/java/tech/ydb/spark/connector/YdbTable.java
index 8792760..2ffcee3 100644
--- a/src/main/java/tech/ydb/spark/connector/YdbTable.java
+++ b/src/main/java/tech/ydb/spark/connector/YdbTable.java
@@ -62,7 +62,7 @@ public class YdbTable implements Table,
private final ArrayList keyTypes;
private final ArrayList partitions;
private final Map properties;
- private final boolean indexPseudoTable;
+ private final YdbStoreType storeType;
private StructType schema;
/**
@@ -85,7 +85,7 @@ public class YdbTable implements Table,
this.keyTypes = new ArrayList<>();
this.partitions = new ArrayList<>();
this.properties = new HashMap<>();
- this.indexPseudoTable = false;
+ this.storeType = convertStoreType(td);
Map cm = buildColumnsMap(td);
for (String cname : td.getPrimaryKeys()) {
TableColumn tc = cm.get(cname);
@@ -104,10 +104,7 @@ public class YdbTable implements Table,
}
}
}
- this.properties.put(YdbOptions.TABLE_TYPE, "table"); // TODO: columnshard support
- this.properties.put(YdbOptions.TABLE_PATH, tablePath);
- this.properties.put(YdbOptions.PRIMARY_KEY,
- this.keyColumns.stream().collect(Collectors.joining(",")));
+ fillProperties(this.properties, this.tablePath, this.storeType, this.keyColumns);
convertPartitioningSettings(td, this.properties);
LOG.debug("Loaded table {} with {} columns and {} partitions",
this.tablePath, this.columns.size(), this.partitions.size());
@@ -136,7 +133,7 @@ public class YdbTable implements Table,
this.keyTypes = new ArrayList<>();
this.partitions = new ArrayList<>();
this.properties = new HashMap<>();
- this.indexPseudoTable = true;
+ this.storeType = YdbStoreType.INDEX;
HashSet known = new HashSet<>();
Map cm = buildColumnsMap(tdMain);
// Add index key columns
@@ -166,10 +163,7 @@ public class YdbTable implements Table,
partitions.add(new YdbKeyRange(kr, connector.getDefaultTypes()));
}
}
- this.properties.put(YdbOptions.TABLE_TYPE, "index");
- this.properties.put(YdbOptions.TABLE_PATH, tablePath);
- this.properties.put(YdbOptions.PRIMARY_KEY,
- this.keyColumns.stream().collect(Collectors.joining(",")));
+ fillProperties(this.properties, this.tablePath, this.storeType, this.keyColumns);
LOG.debug("Loaded index {} with {} columns and {} partitions",
this.tablePath, this.columns.size(), this.partitions.size());
}
@@ -212,6 +206,29 @@ public static Result lookup(YdbConnector connector, YdbTypes types,
}).join();
}
+ static YdbStoreType convertStoreType(TableDescription td) {
+ /* TODO: implement store type detection
+ switch (td.getStoreType()) {
+ case COLUMN:
+ return YdbStoreType.COLUMN;
+ case ROW:
+ return YdbStoreType.ROW;
+ default:
+ return YdbStoreType.UNSPECIFIED;
+ }
+ */
+ return YdbStoreType.ROW;
+ }
+
+ static void fillProperties(Map props, String tablePath,
+ YdbStoreType storeType, List keyColumns) {
+ props.clear();
+ props.put(YdbOptions.TABLE_TYPE, storeType.name());
+ props.put(YdbOptions.TABLE_PATH, tablePath);
+ props.put(YdbOptions.PRIMARY_KEY,
+ keyColumns.stream().collect(Collectors.joining(",")));
+ }
+
static void convertPartitioningSettings(TableDescription td, Map properties) {
PartitioningSettings ps = td.getPartitioningSettings();
if (ps != null) {
@@ -269,7 +286,7 @@ public Set capabilities() {
final Set c = new HashSet<>();
c.add(TableCapability.BATCH_READ);
c.add(TableCapability.ACCEPT_ANY_SCHEMA); // allow YDB to check the schema
- if (!indexPseudoTable) {
+ if (!YdbStoreType.INDEX.equals(storeType)) {
// tables support writes, while indexes do not
c.add(TableCapability.BATCH_WRITE);
c.add(TableCapability.TRUNCATE);
@@ -322,30 +339,34 @@ public RowLevelOperationBuilder newRowLevelOperationBuilder(RowLevelOperationInf
return new YdbRowLevelBuilder();
}
- final YdbConnector getConnector() {
+ public YdbConnector getConnector() {
return connector;
}
- final YdbTypes getTypes() {
+ public YdbTypes getTypes() {
return types;
}
- final String tablePath() {
+ public String getTablePath() {
return tablePath;
}
- final List keyColumns() {
+ public List getKeyColumns() {
return keyColumns;
}
- final ArrayList keyTypes() {
+ public ArrayList getKeyTypes() {
return keyTypes;
}
- final ArrayList partitions() {
+ public ArrayList getPartitions() {
return partitions;
}
+ public YdbStoreType getStoreType() {
+ return storeType;
+ }
+
private StructField[] mapFields(List columns) {
final List fields = new ArrayList<>();
for (TableColumn tc : columns) {
diff --git a/src/main/java/tech/ydb/spark/connector/YdbTableOperationOptions.java b/src/main/java/tech/ydb/spark/connector/YdbTableOperationOptions.java
index b153518..107a114 100644
--- a/src/main/java/tech/ydb/spark/connector/YdbTableOperationOptions.java
+++ b/src/main/java/tech/ydb/spark/connector/YdbTableOperationOptions.java
@@ -35,7 +35,7 @@ public YdbTableOperationOptions(YdbTable table) {
this.connectOptions = new HashMap<>(table.getConnector().getConnectOptions());
this.types = table.getTypes();
this.tableName = table.name();
- this.tablePath = table.tablePath();
+ this.tablePath = table.getTablePath();
this.fieldsList = table.makeColumns();
this.fieldsMap = new HashMap<>();
for (YdbFieldInfo yfi : fieldsList) {
diff --git a/src/main/java/tech/ydb/spark/connector/YdbTablePartition.java b/src/main/java/tech/ydb/spark/connector/YdbTablePartition.java
index 5ef518b..acfc42f 100644
--- a/src/main/java/tech/ydb/spark/connector/YdbTablePartition.java
+++ b/src/main/java/tech/ydb/spark/connector/YdbTablePartition.java
@@ -11,14 +11,20 @@
*/
public class YdbTablePartition implements InputPartition {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+ private final int orderingKey;
private final YdbKeyRange range;
- public YdbTablePartition(YdbKeyRange range) {
+ public YdbTablePartition(int orderingKey, YdbKeyRange range) {
+ this.orderingKey = orderingKey;
this.range = range;
}
+ public int getOrderingKey() {
+ return orderingKey;
+ }
+
public YdbKeyRange getRange() {
return range;
}
diff --git a/src/main/java/tech/ydb/spark/connector/YdbWriteBuilder.java b/src/main/java/tech/ydb/spark/connector/YdbWriteBuilder.java
index 4762718..7276ce3 100644
--- a/src/main/java/tech/ydb/spark/connector/YdbWriteBuilder.java
+++ b/src/main/java/tech/ydb/spark/connector/YdbWriteBuilder.java
@@ -34,14 +34,14 @@ public YdbWriteBuilder(YdbTable table, LogicalWriteInfo info, boolean truncate)
@Override
public Write build() {
- LOG.debug("Creating YdbWrite for table {}", table.tablePath());
+ LOG.debug("Creating YdbWrite for table {}", table.getTablePath());
boolean mapByNames = validateSchemas(table.schema(), info.schema());
return new YdbWrite(table, info, mapByNames, truncate);
}
@Override
public WriteBuilder truncate() {
- LOG.debug("Truncation requested for table {}", table.tablePath());
+ LOG.debug("Truncation requested for table {}", table.getTablePath());
return new YdbWriteBuilder(table, info, true);
}
diff --git a/src/main/java/tech/ydb/spark/connector/impl/YdbCreateTable.java b/src/main/java/tech/ydb/spark/connector/impl/YdbCreateTable.java
index 3fe03a4..b54855e 100644
--- a/src/main/java/tech/ydb/spark/connector/impl/YdbCreateTable.java
+++ b/src/main/java/tech/ydb/spark/connector/impl/YdbCreateTable.java
@@ -14,6 +14,7 @@
import tech.ydb.spark.connector.YdbFieldInfo;
import tech.ydb.spark.connector.YdbFieldType;
import tech.ydb.spark.connector.YdbOptions;
+import tech.ydb.spark.connector.YdbStoreType;
import tech.ydb.spark.connector.YdbTypes;
import tech.ydb.table.Session;
import tech.ydb.table.description.TableDescription;
@@ -32,6 +33,7 @@ public class YdbCreateTable extends YdbPropertyHelper {
private final String tablePath;
private final List fields;
private final List primaryKey;
+ private final YdbStoreType storeType;
public YdbCreateTable(String tablePath, List fields,
List primaryKey, Map properties) {
@@ -39,6 +41,7 @@ public YdbCreateTable(String tablePath, List fields,
this.tablePath = tablePath;
this.fields = fields;
this.primaryKey = primaryKey;
+ this.storeType = getStoreType(properties);
}
public YdbCreateTable(String tablePath, List fields,
@@ -47,6 +50,7 @@ public YdbCreateTable(String tablePath, List fields,
this.tablePath = tablePath;
this.fields = fields;
this.primaryKey = makePrimaryKey(fields, properties);
+ this.storeType = getStoreType(properties);
}
public CompletableFuture createTable(Session session) {
@@ -78,13 +82,26 @@ public CompletableFuture createTable(Session session) {
}
ps.setMinPartitionsCount(minPartitions);
ps.setMaxPartitionsCount(maxPartitions);
- long minSizeMb = getLongOption(YdbOptions.AP_PART_SIZE_MB, 2000L);
+ long minSizeMb = getLongOption(YdbOptions.AP_PART_SIZE_MB, 1000L);
if (minSizeMb < 1L) {
minSizeMb = 10L;
}
ps.setPartitionSize(minSizeMb);
tdb.setPartitioningSettings(ps);
+ /* TODO: implement store type configuration
+ switch (storeType) {
+ case ROW:
+ tdb.setStoreType(TableDescription.StoreType.ROW);
+ break;
+ case COLUMN:
+ tdb.setStoreType(TableDescription.StoreType.COLUMN);
+ break;
+ default:
+ break;
+ }
+ */
+
return session.createTable(tablePath, tdb.build());
}
@@ -101,7 +118,7 @@ public static List convert(YdbTypes types, StructType st) {
return fields;
}
- private static List makePrimaryKey(List fields, Map properties) {
+ static List makePrimaryKey(List fields, Map properties) {
String value = properties.get(YdbOptions.PRIMARY_KEY);
if (value == null) {
String autoPk = grabAutoPk(fields);
@@ -110,7 +127,7 @@ private static List makePrimaryKey(List fields, Map fields) {
+ static String grabAutoPk(List fields) {
for (YdbFieldInfo yfi : fields) {
if (YdbOptions.AUTO_PK.equalsIgnoreCase(yfi.getName())) {
return yfi.getName();
@@ -120,4 +137,12 @@ private static String grabAutoPk(List fields) {
return YdbOptions.AUTO_PK;
}
+ private YdbStoreType getStoreType(Map properties) {
+ String value = properties.get(YdbOptions.TABLE_TYPE);
+ if (value == null) {
+ return YdbStoreType.UNSPECIFIED;
+ }
+ return YdbStoreType.valueOf(value.toUpperCase());
+ }
+
}
diff --git a/src/main/java/tech/ydb/spark/connector/impl/YdbScanImpl.java b/src/main/java/tech/ydb/spark/connector/impl/YdbScanReadTable.java
similarity index 98%
rename from src/main/java/tech/ydb/spark/connector/impl/YdbScanImpl.java
rename to src/main/java/tech/ydb/spark/connector/impl/YdbScanReadTable.java
index f11ea6e..87e3b65 100644
--- a/src/main/java/tech/ydb/spark/connector/impl/YdbScanImpl.java
+++ b/src/main/java/tech/ydb/spark/connector/impl/YdbScanReadTable.java
@@ -28,10 +28,10 @@
*
* @author zinal
*/
-public class YdbScanImpl implements AutoCloseable {
+public class YdbScanReadTable implements AutoCloseable {
private static final org.slf4j.Logger LOG
- = org.slf4j.LoggerFactory.getLogger(YdbScanImpl.class);
+ = org.slf4j.LoggerFactory.getLogger(YdbScanReadTable.class);
private static final QueueItem END_OF_SCAN = new QueueItem(null);
@@ -48,7 +48,7 @@ public class YdbScanImpl implements AutoCloseable {
private volatile GrpcReadStream stream;
private ResultSetReader current;
- public YdbScanImpl(YdbScanOptions options, YdbKeyRange keyRange) {
+ public YdbScanReadTable(YdbScanOptions options, YdbKeyRange keyRange) {
this.options = options;
this.keyRange = keyRange;
this.queue = new ArrayBlockingQueue<>(options.getScanQueueDepth());
diff --git a/src/test/java/tech/ydb/spark/connector/integration/IntegrationTest.java b/src/test/java/tech/ydb/spark/connector/integration/IntegrationTest.java
index 5981e84..01ab37a 100644
--- a/src/test/java/tech/ydb/spark/connector/integration/IntegrationTest.java
+++ b/src/test/java/tech/ydb/spark/connector/integration/IntegrationTest.java
@@ -34,7 +34,8 @@ public class IntegrationTest {
public static final String CATALOG = "spark.sql.catalog.ydb1";
public static final String TEST_TABLE = "test_table";
- public static GenericContainer> ydb = new GenericContainer<>("cr.yandex/yc/yandex-docker-local-ydb:latest")
+ public static final GenericContainer> YDB =
+ new GenericContainer<>("cr.yandex/yc/yandex-docker-local-ydb:latest")
.withCreateContainerCmdModifier(cmd -> cmd.withHostName("localhost"))
.withNetworkMode("host")
.withEnv("GRPC_TLS_PORT", "2135")
@@ -43,7 +44,7 @@ public class IntegrationTest {
.withEnv("YDB_USE_IN_MEMORY_PDISKS", "true");
static {
- ydb.start();
+ YDB.start();
}
protected SparkSession spark;
@@ -132,7 +133,6 @@ public void testOverwrite() {
.load()
.collectAsList();
assertThat(rows1).hasSize(1);
-
}
@Test