From f246a2f006377c3127f009ba6289a3649a585195 Mon Sep 17 00:00:00 2001 From: Christoph Pirkl Date: Fri, 6 Oct 2023 17:14:37 +0200 Subject: [PATCH] Add test for multiple Parquet columns --- .../exasol/cloudetl/it/BaseDataImporter.scala | 14 +++++--- .../it/parquet/ParquetDataImporterIT.scala | 36 +++++++++++++++++++ 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala b/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala index 6f0c4f54..700d8fae 100644 --- a/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala +++ b/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala @@ -36,13 +36,19 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit super.afterAll() } - abstract class AbstractChecker(exaColumnType: String, tableName: String) { + abstract class AbstractChecker(exaColumnType: String, tableName: String) + extends AbstractMultiColChecker(Map("COLUMN" -> exaColumnType), tableName) + + abstract class AbstractMultiColChecker(columns: Map[String, String], tableName: String) { def withResultSet(block: ResultSet => Unit): this.type = { uploadFileToS3(bucketName, path) - val table = schema + val tableBuilder = schema .createTableBuilder(tableName.toUpperCase(java.util.Locale.ENGLISH)) - .column("COLUMN", exaColumnType) - .build() + columns.foreach { case (colName, colType) => + tableBuilder.column(colName, colType) + } + + val table = tableBuilder.build() importFromS3IntoExasol(schemaName, table, bucketName, path.getName(), dataFormat) val rs = executeQuery(s"SELECT * FROM ${table.getFullyQualifiedName()}") block(rs) diff --git a/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala b/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala index 400ad93e..bfe9e480 100644 --- a/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala +++ b/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala @@ -476,6 +476,24 @@ class ParquetDataImporterIT extends BaseDataImporter { ) } + test("imports multiple columns") { + MultiParquetChecker( + "required binary name (UTF8); required int32 age;", + Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"), + "multi_col" + ) + .withWriter { case (writer, schema) => + writer.write(new SimpleGroup(schema).append("name", "John").append("age", 24)) + writer.write(new SimpleGroup(schema).append("name", "Jane").append("age", 22)) + } + .assertResultSet( + table() + .row("John", 24) + .row("Jane", 22) + .matches() + ) + } + case class ParquetChecker(parquetColumn: String, exaColumn: String, tableName: String) extends AbstractChecker(exaColumn, tableName) with ParquetTestDataWriter { @@ -494,4 +512,22 @@ class ParquetDataImporterIT extends BaseDataImporter { } } + case class MultiParquetChecker(parquetColumn: String, exaColumns: Map[String, String], tableName: String) + extends AbstractMultiColChecker(exaColumns, tableName) + with ParquetTestDataWriter { + private val parquetSchema = MessageTypeParser.parseMessageType(s"message test { $parquetColumn }") + + def withWriter(block: (ParquetWriter[Group], MessageType) => Unit): MultiParquetChecker = { + val writer = getParquetWriter(path, parquetSchema, true) + block(writer, parquetSchema) + writer.close() + this + } + + def withInputValues[T](values: List[T]): MultiParquetChecker = { + writeDataValues(values, path, parquetSchema) + this + } + } + }