Skip to content

Commit

Permalink
minor adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
zinal committed Aug 30, 2024
1 parent 97687a9 commit 396c6c3
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 116 deletions.
11 changes: 5 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<developers>
<developer>
<name>Maksim Zinal</name>
<email>mzinal@ydb.tech</email>
<email>zinal@ydb.tech</email>
<organization>YDB</organization>
<organizationUrl>https://ydb.tech/</organizationUrl>
</developer>
Expand All @@ -27,7 +27,7 @@
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-bom</artifactId>
<version>2.2.8</version>
<version>2.2.9</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down Expand Up @@ -135,7 +135,6 @@
<version>3.26.3</version>
<scope>test</scope>
</dependency>

</dependencies>
<profiles>
<profile>
Expand Down Expand Up @@ -181,7 +180,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.4.0</version>
<version>3.5.0</version>
<configuration>
<configLocation>config/ydb.checkstyle.xml</configLocation>
<suppressionsFileExpression>config/ydb.suppressions.xml</suppressionsFileExpression>
Expand All @@ -199,12 +198,12 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.7.0</version>
<version>3.10.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>3.1.2</version>
<version>3.1.3</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
36 changes: 5 additions & 31 deletions src/main/java/tech/ydb/spark/connector/YdbTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ public class YdbTable implements Table,
* @param connector YDB connector
* @param types YDB type convertor
* @param logicalName Table logical name
* @param actualPath Table path
* @param tablePath Table path
* @param td Table description object obtained from YDB
*/
public YdbTable(YdbConnector connector, YdbTypes types,
YdbTable(YdbConnector connector, YdbTypes types,
String logicalName, String tablePath, TableDescription td) {
this.connector = connector;
this.types = types;
Expand Down Expand Up @@ -124,7 +124,7 @@ public YdbTable(YdbConnector connector, YdbTypes types,
* @param ix Index information entry
* @param tdIx Table description object for the index table
*/
public YdbTable(YdbConnector connector, YdbTypes types,
YdbTable(YdbConnector connector, YdbTypes types,
String logicalName, String tablePath,
TableDescription tdMain, TableIndex ix, TableDescription tdIx) {
this.connector = connector;
Expand Down Expand Up @@ -212,21 +212,7 @@ public static Result<YdbTable> lookup(YdbConnector connector, YdbTypes types,
}).join();
}

private YdbTable() {
connector = null;
types = null;
logicalName = null;
tablePath = null;
columns = null;
keyColumns = null;
keyTypes = null;
partitions = null;
properties = null;
indexPseudoTable = false;
}

private static void convertPartitioningSettings(TableDescription td,
Map<String, String> properties) {
static void convertPartitioningSettings(TableDescription td, Map<String, String> properties) {
PartitioningSettings ps = td.getPartitioningSettings();
if (ps != null) {
Boolean bv = ps.getPartitioningBySize();
Expand All @@ -252,7 +238,7 @@ private static void convertPartitioningSettings(TableDescription td,
}
}

private static Map<String, TableColumn> buildColumnsMap(TableDescription td) {
static Map<String, TableColumn> buildColumnsMap(TableDescription td) {
Map<String, TableColumn> m = new HashMap<>();
for (TableColumn tc : td.getColumns()) {
m.put(tc.getName(), tc);
Expand Down Expand Up @@ -390,16 +376,4 @@ public String toString() {
return "YdbTable:" + connector.getCatalogName() + ":" + tablePath;
}

public boolean isActualTable() {
return true;
}

public static YdbTable buildShell() {
return new YdbTable() {
@Override
public boolean isActualTable() {
return false;
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package tech.ydb.spark.connector.integration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
Expand All @@ -15,22 +19,19 @@
import org.junit.Before;
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;

import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.spark.connector.impl.YdbConnector;
import tech.ydb.spark.connector.impl.YdbRegistry;
import tech.ydb.table.query.Params;
import tech.ydb.table.transaction.TxControl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

public class IntegrationTest {
public static final String CATALOG = "spark.sql.catalog.ydb";

public static final String CATALOG = "spark.sql.catalog.ydb1";
public static final String TEST_TABLE = "test_table";

public static GenericContainer<?> ydb = new GenericContainer<>("cr.yandex/yc/yandex-docker-local-ydb:latest")
Expand Down Expand Up @@ -103,7 +104,6 @@ public void testViaJavaApi() {
assertThat(rows1).hasSize(1);
}


@Test
public void testOverwrite() {
schemaQuery("create table " + TEST_TABLE + "(id Uint64, value Text, PRIMARY KEY(id))").expectSuccess();
Expand Down Expand Up @@ -141,9 +141,9 @@ public void testCreateBySparkSQL() {
schemaQuery("create table " + original + "(id Uint64, value Text, PRIMARY KEY(id))").expectSuccess();
dataQuery("upsert into " + original + "(id, value) values (1, 'asdf'), (2, 'zxcv'), (3, 'fghj')");

spark.sql("create table ydb." + TEST_TABLE + "(id bigint, value string) ").queryExecution();
spark.sql("insert into ydb." + TEST_TABLE + " select * from ydb." + original).queryExecution();
List<Row> rows = spark.sql("select * from ydb." + TEST_TABLE)
spark.sql("create table ydb1." + TEST_TABLE + "(id bigint, value string) ").queryExecution();
spark.sql("insert into ydb1." + TEST_TABLE + " select * from ydb1." + original).queryExecution();
List<Row> rows = spark.sql("select * from ydb1." + TEST_TABLE)
.collectAsList();
assertThat(rows).hasSize(3);
}
Expand All @@ -154,105 +154,101 @@ public void testCreateWithSelect() {
schemaQuery("create table " + original + "(id Uint64, value Text, PRIMARY KEY(id))").expectSuccess();
dataQuery("upsert into " + original + "(id, value) values (1, 'asdf'), (2, 'zxcv'), (3, 'fghj')");

spark.sql("create table ydb." + TEST_TABLE + " as select * from ydb." + original)
spark.sql("create table ydb1." + TEST_TABLE + " as select * from ydb1." + original)
.queryExecution();
List<Row> rows = spark.sql("select * from ydb." + TEST_TABLE)
List<Row> rows = spark.sql("select * from ydb1." + TEST_TABLE)
.collectAsList();
assertThat(rows).hasSize(3);
}


@Test
public void testCatalogAccess() {
String createToster = "CREATE TABLE toster(\n" +
" a Uint64 NOT NULL,\n" +
" b Uint32,\n" +
" c Int32,\n" +
" d Int64,\n" +
" e Text,\n" +
" f Bytes,\n" +
" g Timestamp,\n" +
" h Datetime,\n" +
" i Date,\n" +
" j Json,\n" +
" k JsonDocument,\n" +
" l Bool,\n" +
" m Uint8,\n" +
" n Float,\n" +
" o Double,\n" +
" p Decimal(22,9),\n" +
" PRIMARY KEY(a)\n" +
")";
String createToster = "CREATE TABLE toster(\n"
+ " a Uint64 NOT NULL,\n"
+ " b Uint32,\n"
+ " c Int32,\n"
+ " d Int64,\n"
+ " e Text,\n"
+ " f Bytes,\n"
+ " g Timestamp,\n"
+ " h Datetime,\n"
+ " i Date,\n"
+ " j Json,\n"
+ " k JsonDocument,\n"
+ " l Bool,\n"
+ " m Uint8,\n"
+ " n Float,\n"
+ " o Double,\n"
+ " p Decimal(22,9),\n"
+ " PRIMARY KEY(a)\n"
+ ")";

schemaQuery(createToster).expectSuccess();

String upsertToster = "UPSERT INTO toster(a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p) VALUES (\n" +
" 1001,\n" +
" 2002,\n" +
" 3003,\n" +
" 4004,\n" +
" \"Text string\"u,\n" +
" \"Bytes string\",\n" +
" Timestamp(\"2023-01-07T11:05:32.123456Z\"),\n" +
" Datetime(\"2023-01-07T11:05:32Z\"),\n" +
" Date(\"2023-01-07\"),\n" +
" Json(@@{\"x\": 1, \"y\": \"test\"}@@),\n" +
" JsonDocument(@@{\"x\": 1, \"y\": \"test\"}@@),\n" +
" True,\n" +
" 7,\n" +
" 123.456f,\n" +
" 123.456789,\n" +
" Decimal(\"123.456789\", 22, 9)\n" +
"), (\n" +
" 10001,\n" +
" 20002,\n" +
" 30003,\n" +
" 40004,\n" +
" \"New Text string\"u,\n" +
" \"New Bytes string\",\n" +
" Timestamp(\"2020-01-07T11:05:32.123456Z\"),\n" +
" Datetime(\"2020-01-07T11:05:32Z\"),\n" +
" Date(\"2020-01-07\"),\n" +
" Json(@@{\"x\": 2, \"y\": \"dust\"}@@),\n" +
" JsonDocument(@@{\"x\": 2, \"y\": \"dust\"}@@),\n" +
" False,\n" +
" 8,\n" +
" 1023.456f,\n" +
" 1023.456789,\n" +
" Decimal(\"1023.456789\", 22, 9)\n" +
")";
String upsertToster = "UPSERT INTO toster(a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p) VALUES (\n"
+ " 1001,\n"
+ " 2002,\n"
+ " 3003,\n"
+ " 4004,\n"
+ " \"Text string\"u,\n"
+ " \"Bytes string\",\n"
+ " Timestamp(\"2023-01-07T11:05:32.123456Z\"),\n"
+ " Datetime(\"2023-01-07T11:05:32Z\"),\n"
+ " Date(\"2023-01-07\"),\n"
+ " Json(@@{\"x\": 1, \"y\": \"test\"}@@),\n"
+ " JsonDocument(@@{\"x\": 1, \"y\": \"test\"}@@),\n"
+ " True,\n"
+ " 7,\n"
+ " 123.456f,\n"
+ " 123.456789,\n"
+ " Decimal(\"123.456789\", 22, 9)\n"
+ "), (\n"
+ " 10001,\n"
+ " 20002,\n"
+ " 30003,\n"
+ " 40004,\n"
+ " \"New Text string\"u,\n"
+ " \"New Bytes string\",\n"
+ " Timestamp(\"2020-01-07T11:05:32.123456Z\"),\n"
+ " Datetime(\"2020-01-07T11:05:32Z\"),\n"
+ " Date(\"2020-01-07\"),\n"
+ " Json(@@{\"x\": 2, \"y\": \"dust\"}@@),\n"
+ " JsonDocument(@@{\"x\": 2, \"y\": \"dust\"}@@),\n"
+ " False,\n"
+ " 8,\n"
+ " 1023.456f,\n"
+ " 1023.456789,\n"
+ " Decimal(\"1023.456789\", 22, 9)\n"
+ ")";

dataQuery(upsertToster);

List<Row> rows = spark.sql("SELECT * FROM ydb.toster")
List<Row> rows = spark.sql("SELECT * FROM ydb1.toster")
.collectAsList();
assertThat(rows).hasSize(2);
}

private Dataset<Row> sampleDataset() {
ArrayList<Row> rows = new ArrayList<>();
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.LongType, false, Metadata.empty()),
new StructField("value", DataTypes.StringType, false, Metadata.empty()),
});
new StructField("id", DataTypes.LongType, false, Metadata.empty()),
new StructField("value", DataTypes.StringType, false, Metadata.empty()),});
rows.add(new GenericRowWithSchema(new Object[]{1L, "some value"}, schema));
return spark.createDataFrame(rows, schema);
}

private void dataQuery(String query) {
connector.getRetryCtx().supplyStatus(session -> session.executeDataQuery(
query,
TxControl.serializableRw().setCommitTx(true),
Params.empty())
.thenApply(Result::getStatus))
query,
TxControl.serializableRw().setCommitTx(true),
Params.empty())
.thenApply(Result::getStatus))
.join().expectSuccess();
}

private Status schemaQuery(String query) {
return connector.getRetryCtx().supplyStatus(session -> session.executeSchemeQuery(
query
))
.join();
return connector.getRetryCtx().supplyStatus(
session -> session.executeSchemeQuery(query)).join();
}

}

0 comments on commit 396c6c3

Please sign in to comment.