Skip to content

Commit

Permalink
Add test for multiple Parquet columns
Browse files Browse the repository at this point in the history
  • Loading branch information
kaklakariada committed Oct 6, 2023
1 parent 558970e commit f246a2f
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 4 deletions.
14 changes: 10 additions & 4 deletions src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,19 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit
super.afterAll()
}

abstract class AbstractChecker(exaColumnType: String, tableName: String) {
abstract class AbstractChecker(exaColumnType: String, tableName: String)
extends AbstractMultiColChecker(Map("COLUMN" -> exaColumnType), tableName)

abstract class AbstractMultiColChecker(columns: Map[String, String], tableName: String) {
def withResultSet(block: ResultSet => Unit): this.type = {
uploadFileToS3(bucketName, path)
val table = schema
val tableBuilder = schema
.createTableBuilder(tableName.toUpperCase(java.util.Locale.ENGLISH))
.column("COLUMN", exaColumnType)
.build()
columns.foreach { case (colName, colType) =>
tableBuilder.column(colName, colType)
}

val table = tableBuilder.build()
importFromS3IntoExasol(schemaName, table, bucketName, path.getName(), dataFormat)
val rs = executeQuery(s"SELECT * FROM ${table.getFullyQualifiedName()}")
block(rs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,24 @@ class ParquetDataImporterIT extends BaseDataImporter {
)
}

test("imports multiple columns") {
MultiParquetChecker(
"required binary name (UTF8); required int32 age;",
Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"),
"multi_col"
)
.withWriter { case (writer, schema) =>
writer.write(new SimpleGroup(schema).append("name", "John").append("age", 24))
writer.write(new SimpleGroup(schema).append("name", "Jane").append("age", 22))
}
.assertResultSet(
table()
.row("John", 24)
.row("Jane", 22)
.matches()
)
}

case class ParquetChecker(parquetColumn: String, exaColumn: String, tableName: String)
extends AbstractChecker(exaColumn, tableName)
with ParquetTestDataWriter {
Expand All @@ -494,4 +512,22 @@ class ParquetDataImporterIT extends BaseDataImporter {
}
}

case class MultiParquetChecker(parquetColumn: String, exaColumns: Map[String, String], tableName: String)
extends AbstractMultiColChecker(exaColumns, tableName)
with ParquetTestDataWriter {
private val parquetSchema = MessageTypeParser.parseMessageType(s"message test { $parquetColumn }")

def withWriter(block: (ParquetWriter[Group], MessageType) => Unit): MultiParquetChecker = {
val writer = getParquetWriter(path, parquetSchema, true)
block(writer, parquetSchema)
writer.close()
this
}

def withInputValues[T](values: List[T]): MultiParquetChecker = {
writeDataValues(values, path, parquetSchema)
this
}
}

}

0 comments on commit f246a2f

Please sign in to comment.