From 396c6c3351e69555dd85b3aa214b2632c428d847 Mon Sep 17 00:00:00 2001 From: Maksim Zinal Date: Fri, 30 Aug 2024 21:35:14 +0300 Subject: [PATCH] minor adjustments --- pom.xml | 11 +- .../tech/ydb/spark/connector/YdbTable.java | 36 +--- .../integration/IntegrationTest.java | 154 +++++++++--------- 3 files changed, 85 insertions(+), 116 deletions(-) diff --git a/pom.xml b/pom.xml index e4cfcba..e82cfb3 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ Maksim Zinal - mzinal@ydb.tech + zinal@ydb.tech YDB https://ydb.tech/ @@ -27,7 +27,7 @@ tech.ydb ydb-sdk-bom - 2.2.8 + 2.2.9 pom import @@ -135,7 +135,6 @@ 3.26.3 test - @@ -181,7 +180,7 @@ org.apache.maven.plugins maven-checkstyle-plugin - 3.4.0 + 3.5.0 config/ydb.checkstyle.xml config/ydb.suppressions.xml @@ -199,12 +198,12 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.7.0 + 3.10.0 org.apache.maven.plugins maven-deploy-plugin - 3.1.2 + 3.1.3 org.apache.maven.plugins diff --git a/src/main/java/tech/ydb/spark/connector/YdbTable.java b/src/main/java/tech/ydb/spark/connector/YdbTable.java index cb977df..8792760 100644 --- a/src/main/java/tech/ydb/spark/connector/YdbTable.java +++ b/src/main/java/tech/ydb/spark/connector/YdbTable.java @@ -71,10 +71,10 @@ public class YdbTable implements Table, * @param connector YDB connector * @param types YDB type convertor * @param logicalName Table logical name - * @param actualPath Table path + * @param tablePath Table path * @param td Table description object obtained from YDB */ - public YdbTable(YdbConnector connector, YdbTypes types, + YdbTable(YdbConnector connector, YdbTypes types, String logicalName, String tablePath, TableDescription td) { this.connector = connector; this.types = types; @@ -124,7 +124,7 @@ public YdbTable(YdbConnector connector, YdbTypes types, * @param ix Index information entry * @param tdIx Table description object for the index table */ - public YdbTable(YdbConnector connector, YdbTypes types, + YdbTable(YdbConnector connector, YdbTypes types, String logicalName, String tablePath, TableDescription tdMain, TableIndex ix, TableDescription tdIx) { this.connector = connector; @@ -212,21 +212,7 @@ public static Result lookup(YdbConnector connector, YdbTypes types, }).join(); } - private YdbTable() { - connector = null; - types = null; - logicalName = null; - tablePath = null; - columns = null; - keyColumns = null; - keyTypes = null; - partitions = null; - properties = null; - indexPseudoTable = false; - } - - private static void convertPartitioningSettings(TableDescription td, - Map properties) { + static void convertPartitioningSettings(TableDescription td, Map properties) { PartitioningSettings ps = td.getPartitioningSettings(); if (ps != null) { Boolean bv = ps.getPartitioningBySize(); @@ -252,7 +238,7 @@ private static void convertPartitioningSettings(TableDescription td, } } - private static Map buildColumnsMap(TableDescription td) { + static Map buildColumnsMap(TableDescription td) { Map m = new HashMap<>(); for (TableColumn tc : td.getColumns()) { m.put(tc.getName(), tc); @@ -390,16 +376,4 @@ public String toString() { return "YdbTable:" + connector.getCatalogName() + ":" + tablePath; } - public boolean isActualTable() { - return true; - } - - public static YdbTable buildShell() { - return new YdbTable() { - @Override - public boolean isActualTable() { - return false; - } - }; - } } diff --git a/src/test/java/tech/ydb/spark/connector/integration/IntegrationTest.java b/src/test/java/tech/ydb/spark/connector/integration/IntegrationTest.java index 4335ee2..5981e84 100644 --- a/src/test/java/tech/ydb/spark/connector/integration/IntegrationTest.java +++ b/src/test/java/tech/ydb/spark/connector/integration/IntegrationTest.java @@ -1,5 +1,9 @@ package tech.ydb.spark.connector.integration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; @@ -15,6 +19,7 @@ import org.junit.Before; import org.junit.Test; import org.testcontainers.containers.GenericContainer; + import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.spark.connector.impl.YdbConnector; @@ -22,15 +27,11 @@ import tech.ydb.table.query.Params; import tech.ydb.table.transaction.TxControl; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import static org.assertj.core.api.Assertions.assertThat; public class IntegrationTest { - public static final String CATALOG = "spark.sql.catalog.ydb"; + + public static final String CATALOG = "spark.sql.catalog.ydb1"; public static final String TEST_TABLE = "test_table"; public static GenericContainer ydb = new GenericContainer<>("cr.yandex/yc/yandex-docker-local-ydb:latest") @@ -103,7 +104,6 @@ public void testViaJavaApi() { assertThat(rows1).hasSize(1); } - @Test public void testOverwrite() { schemaQuery("create table " + TEST_TABLE + "(id Uint64, value Text, PRIMARY KEY(id))").expectSuccess(); @@ -141,9 +141,9 @@ public void testCreateBySparkSQL() { schemaQuery("create table " + original + "(id Uint64, value Text, PRIMARY KEY(id))").expectSuccess(); dataQuery("upsert into " + original + "(id, value) values (1, 'asdf'), (2, 'zxcv'), (3, 'fghj')"); - spark.sql("create table ydb." + TEST_TABLE + "(id bigint, value string) ").queryExecution(); - spark.sql("insert into ydb." + TEST_TABLE + " select * from ydb." + original).queryExecution(); - List rows = spark.sql("select * from ydb." + TEST_TABLE) + spark.sql("create table ydb1." + TEST_TABLE + "(id bigint, value string) ").queryExecution(); + spark.sql("insert into ydb1." + TEST_TABLE + " select * from ydb1." + original).queryExecution(); + List rows = spark.sql("select * from ydb1." + TEST_TABLE) .collectAsList(); assertThat(rows).hasSize(3); } @@ -154,77 +154,76 @@ public void testCreateWithSelect() { schemaQuery("create table " + original + "(id Uint64, value Text, PRIMARY KEY(id))").expectSuccess(); dataQuery("upsert into " + original + "(id, value) values (1, 'asdf'), (2, 'zxcv'), (3, 'fghj')"); - spark.sql("create table ydb." + TEST_TABLE + " as select * from ydb." + original) + spark.sql("create table ydb1." + TEST_TABLE + " as select * from ydb1." + original) .queryExecution(); - List rows = spark.sql("select * from ydb." + TEST_TABLE) + List rows = spark.sql("select * from ydb1." + TEST_TABLE) .collectAsList(); assertThat(rows).hasSize(3); } - @Test public void testCatalogAccess() { - String createToster = "CREATE TABLE toster(\n" + - " a Uint64 NOT NULL,\n" + - " b Uint32,\n" + - " c Int32,\n" + - " d Int64,\n" + - " e Text,\n" + - " f Bytes,\n" + - " g Timestamp,\n" + - " h Datetime,\n" + - " i Date,\n" + - " j Json,\n" + - " k JsonDocument,\n" + - " l Bool,\n" + - " m Uint8,\n" + - " n Float,\n" + - " o Double,\n" + - " p Decimal(22,9),\n" + - " PRIMARY KEY(a)\n" + - ")"; + String createToster = "CREATE TABLE toster(\n" + + " a Uint64 NOT NULL,\n" + + " b Uint32,\n" + + " c Int32,\n" + + " d Int64,\n" + + " e Text,\n" + + " f Bytes,\n" + + " g Timestamp,\n" + + " h Datetime,\n" + + " i Date,\n" + + " j Json,\n" + + " k JsonDocument,\n" + + " l Bool,\n" + + " m Uint8,\n" + + " n Float,\n" + + " o Double,\n" + + " p Decimal(22,9),\n" + + " PRIMARY KEY(a)\n" + + ")"; schemaQuery(createToster).expectSuccess(); - String upsertToster = "UPSERT INTO toster(a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p) VALUES (\n" + - " 1001,\n" + - " 2002,\n" + - " 3003,\n" + - " 4004,\n" + - " \"Text string\"u,\n" + - " \"Bytes string\",\n" + - " Timestamp(\"2023-01-07T11:05:32.123456Z\"),\n" + - " Datetime(\"2023-01-07T11:05:32Z\"),\n" + - " Date(\"2023-01-07\"),\n" + - " Json(@@{\"x\": 1, \"y\": \"test\"}@@),\n" + - " JsonDocument(@@{\"x\": 1, \"y\": \"test\"}@@),\n" + - " True,\n" + - " 7,\n" + - " 123.456f,\n" + - " 123.456789,\n" + - " Decimal(\"123.456789\", 22, 9)\n" + - "), (\n" + - " 10001,\n" + - " 20002,\n" + - " 30003,\n" + - " 40004,\n" + - " \"New Text string\"u,\n" + - " \"New Bytes string\",\n" + - " Timestamp(\"2020-01-07T11:05:32.123456Z\"),\n" + - " Datetime(\"2020-01-07T11:05:32Z\"),\n" + - " Date(\"2020-01-07\"),\n" + - " Json(@@{\"x\": 2, \"y\": \"dust\"}@@),\n" + - " JsonDocument(@@{\"x\": 2, \"y\": \"dust\"}@@),\n" + - " False,\n" + - " 8,\n" + - " 1023.456f,\n" + - " 1023.456789,\n" + - " Decimal(\"1023.456789\", 22, 9)\n" + - ")"; + String upsertToster = "UPSERT INTO toster(a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p) VALUES (\n" + + " 1001,\n" + + " 2002,\n" + + " 3003,\n" + + " 4004,\n" + + " \"Text string\"u,\n" + + " \"Bytes string\",\n" + + " Timestamp(\"2023-01-07T11:05:32.123456Z\"),\n" + + " Datetime(\"2023-01-07T11:05:32Z\"),\n" + + " Date(\"2023-01-07\"),\n" + + " Json(@@{\"x\": 1, \"y\": \"test\"}@@),\n" + + " JsonDocument(@@{\"x\": 1, \"y\": \"test\"}@@),\n" + + " True,\n" + + " 7,\n" + + " 123.456f,\n" + + " 123.456789,\n" + + " Decimal(\"123.456789\", 22, 9)\n" + + "), (\n" + + " 10001,\n" + + " 20002,\n" + + " 30003,\n" + + " 40004,\n" + + " \"New Text string\"u,\n" + + " \"New Bytes string\",\n" + + " Timestamp(\"2020-01-07T11:05:32.123456Z\"),\n" + + " Datetime(\"2020-01-07T11:05:32Z\"),\n" + + " Date(\"2020-01-07\"),\n" + + " Json(@@{\"x\": 2, \"y\": \"dust\"}@@),\n" + + " JsonDocument(@@{\"x\": 2, \"y\": \"dust\"}@@),\n" + + " False,\n" + + " 8,\n" + + " 1023.456f,\n" + + " 1023.456789,\n" + + " Decimal(\"1023.456789\", 22, 9)\n" + + ")"; dataQuery(upsertToster); - List rows = spark.sql("SELECT * FROM ydb.toster") + List rows = spark.sql("SELECT * FROM ydb1.toster") .collectAsList(); assertThat(rows).hasSize(2); } @@ -232,27 +231,24 @@ public void testCatalogAccess() { private Dataset sampleDataset() { ArrayList rows = new ArrayList<>(); StructType schema = new StructType(new StructField[]{ - new StructField("id", DataTypes.LongType, false, Metadata.empty()), - new StructField("value", DataTypes.StringType, false, Metadata.empty()), - }); + new StructField("id", DataTypes.LongType, false, Metadata.empty()), + new StructField("value", DataTypes.StringType, false, Metadata.empty()),}); rows.add(new GenericRowWithSchema(new Object[]{1L, "some value"}, schema)); return spark.createDataFrame(rows, schema); } private void dataQuery(String query) { connector.getRetryCtx().supplyStatus(session -> session.executeDataQuery( - query, - TxControl.serializableRw().setCommitTx(true), - Params.empty()) - .thenApply(Result::getStatus)) + query, + TxControl.serializableRw().setCommitTx(true), + Params.empty()) + .thenApply(Result::getStatus)) .join().expectSuccess(); } private Status schemaQuery(String query) { - return connector.getRetryCtx().supplyStatus(session -> session.executeSchemeQuery( - query - )) - .join(); + return connector.getRetryCtx().supplyStatus( + session -> session.executeSchemeQuery(query)).join(); } }