From f97397efb892312e36b828e6b125c349fa58d1d5 Mon Sep 17 00:00:00 2001 From: Christoph Pirkl Date: Tue, 10 Oct 2023 13:44:09 +0200 Subject: [PATCH] #280: Improve log messages for import (#278) --- dependencies.md | 2 +- doc/changes/changelog.md | 1 + doc/changes/changes_2.7.5.md | 27 ++++ doc/user_guide/user_guide.md | 20 +-- pk_generated_parent.pom | 2 +- pom.xml | 20 +-- .../cloudetl/emitter/FilesDataEmitter.scala | 22 ++- .../scriptclasses/FilesDataImporter.scala | 14 +- .../scriptclasses/ExaIteratorMock.java | 150 ++++++++++++++++++ src/test/java/org/apache/log4j/MDC.java | 108 +++++++------ .../apache/log4j/helpers/ThreadLocalMap.java | 36 ++--- .../com/exasol/cloudetl/DataRecords.scala | 4 +- .../exasol/cloudetl/it/BaseDataImporter.scala | 51 ++++-- .../cloudetl/it/BaseIntegrationTest.scala | 9 +- .../cloudetl/it/BaseS3IntegrationTest.scala | 20 ++- .../cloudetl/it/avro/AvroDataImporterIT.scala | 1 + .../cloudetl/it/orc/OrcDataImporterIT.scala | 20 +-- .../it/parquet/ParquetDataImporterIT.scala | 110 +++++++++++++ .../scriptclasses/FilesDataImporterTest.scala | 103 ++++++++++-- .../scriptclasses/TableDataExporterTest.scala | 2 +- .../transform/DefaultTransformationTest.scala | 4 +- 21 files changed, 582 insertions(+), 144 deletions(-) create mode 100644 doc/changes/changes_2.7.5.md create mode 100644 src/test/java/com/exasol/cloudetl/scriptclasses/ExaIteratorMock.java diff --git a/dependencies.md b/dependencies.md index 1bd67e5c..c79c6f66 100644 --- a/dependencies.md +++ b/dependencies.md @@ -47,7 +47,7 @@ | ------------------------------------------ | ----------------------------------------- | | [scalatest][40] | [the Apache License, ASL Version 2.0][27] | | [scalatestplus-mockito][41] | [Apache-2.0][27] | -| [mockito-core][42] | [The MIT License][43] | +| [mockito-core][42] | [MIT][43] | | [Hamcrest][44] | [BSD License 3][45] | | [testcontainers-scala-scalatest][46] | [The MIT License (MIT)][47] | | [Testcontainers :: Localstack][48] | [MIT][49] | diff --git a/doc/changes/changelog.md b/doc/changes/changelog.md index 5534c403..044e0353 100644 --- a/doc/changes/changelog.md +++ b/doc/changes/changelog.md @@ -1,5 +1,6 @@ # Changes +* [2.7.5](changes_2.7.5.md) * [2.7.4](changes_2.7.4.md) * [2.7.3](changes_2.7.3.md) * [2.7.2](changes_2.7.2.md) diff --git a/doc/changes/changes_2.7.5.md b/doc/changes/changes_2.7.5.md new file mode 100644 index 00000000..aa23e655 --- /dev/null +++ b/doc/changes/changes_2.7.5.md @@ -0,0 +1,27 @@ +# Cloud Storage Extension 2.7.5, released 2023-10-10 + +Code name: Improved log messages + +## Summary + +This release adds log messages to allow debugging issues during import. + +## Features + +* #280: Improved log messages for import +## Dependency Updates + +### Cloud Storage Extension + +#### Compile Dependency Updates + +* Updated `com.google.protobuf:protobuf-java:3.24.3` to `3.24.4` + +#### Test Dependency Updates + +* Updated `org.mockito:mockito-core:5.5.0` to `5.6.0` +* Updated `org.testcontainers:localstack:1.19.0` to `1.19.1` + +#### Plugin Dependency Updates + +* Updated `com.diffplug.spotless:spotless-maven-plugin:2.39.0` to `2.40.0` diff --git a/doc/user_guide/user_guide.md b/doc/user_guide/user_guide.md index ad40a9b6..0447263d 100644 --- a/doc/user_guide/user_guide.md +++ b/doc/user_guide/user_guide.md @@ -150,7 +150,7 @@ downloaded jar file is the same as the checksum provided in the releases. To check the SHA256 result of the local jar, run the command: ```sh -sha256sum exasol-cloud-storage-extension-2.7.4.jar +sha256sum exasol-cloud-storage-extension-2.7.5.jar ``` ### Building From Source @@ -180,7 +180,7 @@ mvn clean package -DskipTests=true ``` The assembled jar file should be located at -`target/exasol-cloud-storage-extension-2.7.4.jar`. +`target/exasol-cloud-storage-extension-2.7.5.jar`. ### Create an Exasol Bucket @@ -202,7 +202,7 @@ for the HTTP protocol. Upload the jar file using curl command: ```sh -curl -X PUT -T exasol-cloud-storage-extension-2.7.4.jar \ +curl -X PUT -T exasol-cloud-storage-extension-2.7.5.jar \ http://w:@exasol.datanode.domain.com:2580// ``` @@ -234,7 +234,7 @@ OPEN SCHEMA CLOUD_STORAGE_EXTENSION; CREATE OR REPLACE JAVA SET SCRIPT IMPORT_PATH(...) EMITS (...) AS %scriptclass com.exasol.cloudetl.scriptclasses.FilesImportQueryGenerator; - %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.4.jar; + %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.5.jar; / CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_METADATA(...) EMITS ( @@ -244,12 +244,12 @@ CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_METADATA(...) EMITS ( end_index DECIMAL(36, 0) ) AS %scriptclass com.exasol.cloudetl.scriptclasses.FilesMetadataReader; - %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.4.jar; + %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.5.jar; / CREATE OR REPLACE JAVA SET SCRIPT IMPORT_FILES(...) EMITS (...) AS %scriptclass com.exasol.cloudetl.scriptclasses.FilesDataImporter; - %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.4.jar; + %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.5.jar; / ``` @@ -268,12 +268,12 @@ OPEN SCHEMA CLOUD_STORAGE_EXTENSION; CREATE OR REPLACE JAVA SET SCRIPT EXPORT_PATH(...) EMITS (...) AS %scriptclass com.exasol.cloudetl.scriptclasses.TableExportQueryGenerator; - %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.4.jar; + %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.5.jar; / CREATE OR REPLACE JAVA SET SCRIPT EXPORT_TABLE(...) EMITS (ROWS_AFFECTED INT) AS %scriptclass com.exasol.cloudetl.scriptclasses.TableDataExporter; - %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.4.jar; + %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.5.jar; / ``` @@ -407,13 +407,13 @@ CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_METADATA(...) EMITS ( ) AS %jvmoption -DHTTPS_PROXY=http://username:password@10.10.1.10:1180 %scriptclass com.exasol.cloudetl.scriptclasses.FilesMetadataReader; - %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.4.jar; + %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.5.jar; / CREATE OR REPLACE JAVA SET SCRIPT IMPORT_FILES(...) EMITS (...) AS %jvmoption -DHTTPS_PROXY=http://username:password@10.10.1.10:1180 %scriptclass com.exasol.cloudetl.scriptclasses.FilesDataImporter; - %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.4.jar; + %jar /buckets/bfsdefault//exasol-cloud-storage-extension-2.7.5.jar; / ``` diff --git a/pk_generated_parent.pom b/pk_generated_parent.pom index 1e3ce210..a2595a1f 100644 --- a/pk_generated_parent.pom +++ b/pk_generated_parent.pom @@ -3,7 +3,7 @@ 4.0.0 com.exasol cloud-storage-extension-generated-parent - 2.7.4 + 2.7.5 pom UTF-8 diff --git a/pom.xml b/pom.xml index baa37c2f..041826d7 100644 --- a/pom.xml +++ b/pom.xml @@ -3,14 +3,14 @@ 4.0.0 com.exasol cloud-storage-extension - 2.7.4 + 2.7.5 Cloud Storage Extension Exasol Cloud Storage Import And Export Extension https://github.com/exasol/cloud-storage-extension/ cloud-storage-extension-generated-parent com.exasol - 2.7.4 + 2.7.5 pk_generated_parent.pom @@ -369,7 +369,7 @@ com.google.protobuf protobuf-java - 3.24.3 + 3.24.4 com.google.cloud.bigdataoss @@ -424,7 +424,7 @@ org.apache.spark spark-sql_${scala.compat.version} - 3.4.1 + 3.4.1 org.spark-project.spark @@ -529,7 +529,7 @@ org.mockito mockito-core - 5.5.0 + 5.6.0 test @@ -547,7 +547,7 @@ org.testcontainers localstack - 1.19.0 + 1.19.1 test @@ -746,10 +746,6 @@ CVE-2023-33546 - - - - CVE-2020-21485 @@ -838,7 +834,7 @@ com.diffplug.spotless spotless-maven-plugin - 2.39.0 + 2.40.0 @@ -862,7 +858,7 @@ com.geirsson metaconfig-pprint_${scala.compat.version} - 0.11.1 + 0.12.0 com.github.liancheng diff --git a/src/main/scala/com/exasol/cloudetl/emitter/FilesDataEmitter.scala b/src/main/scala/com/exasol/cloudetl/emitter/FilesDataEmitter.scala index faa91eed..ea7f968a 100644 --- a/src/main/scala/com/exasol/cloudetl/emitter/FilesDataEmitter.scala +++ b/src/main/scala/com/exasol/cloudetl/emitter/FilesDataEmitter.scala @@ -57,31 +57,49 @@ final case class FilesDataEmitter(properties: StorageProperties, files: Map[Stri case _ => emitRegularData(context) } - private[this] def emitRegularData(context: ExaIterator): Unit = + private[this] def emitRegularData(context: ExaIterator): Unit = { + var totalRowCount = 0 files.foreach { case (filename, _) => Using(Source(fileFormat, new Path(filename), bucket.getConfiguration(), bucket.fileSystem)) { source => + var rowCount = 0 source.stream().foreach { row => val values = defaultTransformation.transform(transformRegularRowValues(row)) context.emit(values: _*) + rowCount += 1 } + totalRowCount += rowCount + logger.info(s"Imported file $filename with $rowCount rows") } } + logger.info(s"Imported ${files.size} files with $totalRowCount rows in total") + } private[this] def transformRegularRowValues(row: RegularRow): Array[Object] = row.getValues().map(_.asInstanceOf[Object]).toArray - private[this] def emitParquetData(context: ExaIterator): Unit = + private[this] def emitParquetData(context: ExaIterator): Unit = { + var totalRowCount = 0 + var totalIntervalCount = 0 files.foreach { case (filename, intervals) => val inputFile = getInputFile(filename) val converter = ParquetValueConverter(RowParquetReader.getSchema(inputFile)) val source = new RowParquetChunkReader(inputFile, intervals) + var rowCount = 0 source.read(new Consumer[Row] { override def accept(row: Row): Unit = { val values = defaultTransformation.transform(converter.convert(row)) context.emit(values: _*) + rowCount += 1 } }) + totalRowCount += rowCount + totalIntervalCount += intervals.size() + logger.info( + s"Imported file $inputFile with $rowCount rows and ${intervals.size()} intervals" + ) } + logger.info(s"Imported ${files.size} files with $totalIntervalCount intervals and $totalRowCount rows in total") + } private[this] def getInputFile(filename: String): InputFile = try { diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/FilesDataImporter.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/FilesDataImporter.scala index 1e8ec60c..5d9f53ff 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/FilesDataImporter.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/FilesDataImporter.scala @@ -33,13 +33,18 @@ object FilesDataImporter extends LazyLogging { val files = collectFiles(iterator) val nodeId = metadata.getNodeId() val vmId = metadata.getVmId() + var intervalCount = 0 files.foreach { case (filename, intervals) => - logger.info(s"Intervals '${getIntervalString(intervals)}' for file $filename on node '$nodeId' and vm '$vmId'.") + logger.info( + s"Importing intervals '${getIntervalString(intervals)}' for file $filename on node '$nodeId' and vm '$vmId'." + ) + intervalCount += intervals.size() } + logger.info(s"Importing ${files.size} files with $intervalCount intervals") FilesDataEmitter(storageProperties, files).emit(iterator) } - private[this] def collectFiles(iterator: ExaIterator): Map[String, List[ChunkInterval]] = { + def collectFiles(iterator: ExaIterator): Map[String, List[ChunkInterval]] = { val files = new HashMap[String, List[ChunkInterval]]() do { val filename = iterator.getString(FILENAME_STARTING_INDEX) @@ -60,11 +65,14 @@ object FilesDataImporter extends LazyLogging { private[this] def getIntervalString(intervals: List[ChunkInterval]): String = { val sb = new StringBuilder() for { i <- 0 until intervals.size() } { + if (i > 0) { + sb.append(", ") + } sb.append("[") .append(intervals.get(i).getStartPosition()) .append(",") .append(intervals.get(i).getEndPosition()) - .append("), ") + .append(")") } sb.toString() } diff --git a/src/test/java/com/exasol/cloudetl/scriptclasses/ExaIteratorMock.java b/src/test/java/com/exasol/cloudetl/scriptclasses/ExaIteratorMock.java new file mode 100644 index 00000000..db8c64d9 --- /dev/null +++ b/src/test/java/com/exasol/cloudetl/scriptclasses/ExaIteratorMock.java @@ -0,0 +1,150 @@ +package com.exasol.cloudetl.scriptclasses; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; + +import com.exasol.ExaIterator; + +public class ExaIteratorMock implements ExaIterator { + + private int currentRow = 0; + private final Object[][] values; + private final List emittedRows = new ArrayList<>(); + + public static ExaIteratorMock empty() { + return new ExaIteratorMock(new Object[0][0]); + } + + public ExaIteratorMock(final Object[]... values) { + this.values = values; + } + + @Override + public void emit(final Object... args) { + emittedRows.add(args); + } + + public List getEmittedRows() { + return emittedRows; + } + + @Override + public BigDecimal getBigDecimal(final int col) { + throw new UnsupportedOperationException("Unimplemented method 'getBigDecimal'"); + } + + @Override + public BigDecimal getBigDecimal(final String col) { + throw new UnsupportedOperationException("Unimplemented method 'getBigDecimal'"); + } + + @Override + public Boolean getBoolean(final int col) { + throw new UnsupportedOperationException("Unimplemented method 'getBoolean'"); + } + + @Override + public Boolean getBoolean(final String col) { + throw new UnsupportedOperationException("Unimplemented method 'getBoolean'"); + } + + @Override + public Date getDate(final int col) { + throw new UnsupportedOperationException("Unimplemented method 'getDate'"); + } + + @Override + public Date getDate(final String col) { + throw new UnsupportedOperationException("Unimplemented method 'getDate'"); + } + + @Override + public Double getDouble(final int col) { + throw new UnsupportedOperationException("Unimplemented method 'getDouble'"); + } + + @Override + public Double getDouble(final String col) { + throw new UnsupportedOperationException("Unimplemented method 'getDouble'"); + } + + @Override + public Integer getInteger(final int col) { + throw new UnsupportedOperationException("Unimplemented method 'getInteger'"); + } + + @Override + public Integer getInteger(final String col) { + throw new UnsupportedOperationException("Unimplemented method 'getInteger'"); + } + + @Override + public Long getLong(final int col) { + return get(Long.class, col); + } + + @Override + public Long getLong(final String col) { + throw new UnsupportedOperationException("Unimplemented method 'getLong'"); + } + + @Override + public Object getObject(final int col) { + throw new UnsupportedOperationException("Unimplemented method 'getObject'"); + } + + @Override + public Object getObject(final String col) { + throw new UnsupportedOperationException("Unimplemented method 'getObject'"); + } + + @Override + public String getString(final int col) { + return get(String.class, col); + } + + private T get(final Class type, final int col) { + if (values.length > currentRow) { + return type.cast(values[currentRow][col]); + } else { + return null; + } + } + + @Override + public String getString(final String col) { + throw new UnsupportedOperationException("Unimplemented method 'getString'"); + } + + @Override + public Timestamp getTimestamp(final String col) { + throw new UnsupportedOperationException("Unimplemented method 'getTimestamp'"); + } + + @Override + public Timestamp getTimestamp(final int col) { + throw new UnsupportedOperationException("Unimplemented method 'getTimestamp'"); + } + + @Override + public boolean next() { + if (currentRow + 1 >= values.length) { + return false; + } + currentRow++; + return true; + } + + @Override + public void reset() { + throw new UnsupportedOperationException("Unimplemented method 'reset'"); + } + + @Override + public long size() { + throw new UnsupportedOperationException("Unimplemented method 'size'"); + } +} diff --git a/src/test/java/org/apache/log4j/MDC.java b/src/test/java/org/apache/log4j/MDC.java index d344e585..6c49e6ee 100644 --- a/src/test/java/org/apache/log4j/MDC.java +++ b/src/test/java/org/apache/log4j/MDC.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,42 +17,44 @@ package org.apache.log4j; -import org.apache.log4j.helpers.ThreadLocalMap; - import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Hashtable; +import org.apache.log4j.helpers.ThreadLocalMap; + /** - * The MDC class is similar to the {@link NDC} class except that it is - * based on a map instead of a stack. It provides mapped - * diagnostic contexts. A Mapped Diagnostic Context, or - * MDC in short, is an instrument for distinguishing interleaved log - * output from different sources. Log output is typically interleaved - * when a server handles multiple clients near-simultaneously. + * The MDC class is similar to the {@link NDC} class except that it is based on a map instead of a stack. It provides + * mapped diagnostic contexts. A Mapped Diagnostic Context, or MDC in short, is an instrument for + * distinguishing interleaved log output from different sources. Log output is typically interleaved when a server + * handles multiple clients near-simultaneously. *

- *

The MDC is managed on a per thread basis. A - * child thread automatically inherits a copy of the mapped - * diagnostic context of its parent. + *

+ * The MDC is managed on a per thread basis. A child thread automatically inherits a copy of + * the mapped diagnostic context of its parent. *

- *

The MDC class requires JDK 1.2 or above. Under JDK 1.1 the MDC - * will always return empty values but otherwise will not affect or - * harm your application.

+ *

+ * The MDC class requires JDK 1.2 or above. Under JDK 1.1 the MDC will always return empty values but otherwise will not + * affect or harm your application. + *

* - *

Attention: the application is required to clean up. In web applications - * this can happen with creating a Servlet Filter and overriding the - * onFilter method like:

+ *

+ * Attention: the application is required to clean up. In web applications this can happen with creating a Servlet + * Filter and overriding the onFilter method like: + *

* *
  * try {
- *    MDC.put(myKey);
- *    chain.doFilter(request, response);
+ *     MDC.put(myKey);
+ *     chain.doFilter(request, response);
  * } finally {
- *    MDC.remove(myKey);
+ *     MDC.remove(myKey);
  * }
  * 
* - *

Please also see: {@link http://logging.apache.org/log4j/1.2/faq.html#mdcmemoryleak}

+ *

+ * Please also see: {@link http://logging.apache.org/log4j/1.2/faq.html#mdcmemoryleak} + *

* * @author Ceki Gülcü * @since 1.2 @@ -75,21 +77,20 @@ private MDC() { } try { - removeMethod = ThreadLocal.class.getMethod("remove", null); - } catch (NoSuchMethodException e) { + removeMethod = ThreadLocal.class.getMethod("remove"); + } catch (final NoSuchMethodException e) { // don't do anything - java prior 1.5 } } /** - * Put a context value (the o parameter) as identified - * with the key parameter into the current thread's - * context map. + * Put a context value (the o parameter) as identified with the key parameter into the + * current thread's context map. *

- *

If the current thread does not have a context map it is - * created as a side effect. + *

+ * If the current thread does not have a context map it is created as a side effect. */ - public static void put(String key, Object o) { + public static void put(final String key, final Object o) { if (mdc != null) { mdc.put0(key, o); } @@ -98,9 +99,10 @@ public static void put(String key, Object o) { /** * Get the context identified by the key parameter. *

- *

This method has no side effects. + *

+ * This method has no side effects. */ - public static Object get(String key) { + public static Object get(final String key) { if (mdc != null) { return mdc.get0(key); } @@ -108,21 +110,18 @@ public static Object get(String key) { } /** - * Remove the the context identified by the key - * parameter. + * Remove the the context identified by the key parameter. */ - public static void remove(String key) { + public static void remove(final String key) { if (mdc != null) { mdc.remove0(key); } } - /** - * Get the current thread's MDC as a hashtable. This method is - * intended to be used internally. + * Get the current thread's MDC as a hashtable. This method is intended to be used internally. */ - public static Hashtable getContext() { + public static Hashtable getContext() { if (mdc != null) { return mdc.getContext0(); } else { @@ -141,25 +140,24 @@ public static void clear() { } } - - private void put0(String key, Object o) { + private void put0(final String key, final Object o) { if (java1 || tlm == null) { return; } else { - Hashtable ht = (Hashtable) ((ThreadLocalMap) tlm).get(); + Hashtable ht = (Hashtable) ((ThreadLocalMap) tlm).get(); if (ht == null) { - ht = new Hashtable(HT_SIZE); + ht = new Hashtable<>(HT_SIZE); ((ThreadLocalMap) tlm).set(ht); } ht.put(key, o); } } - private Object get0(String key) { + private Object get0(final String key) { if (java1 || tlm == null) { return null; } else { - Hashtable ht = (Hashtable) ((ThreadLocalMap) tlm).get(); + final Hashtable ht = (Hashtable) ((ThreadLocalMap) tlm).get(); if (ht != null && key != null) { return ht.get(key); } else { @@ -168,40 +166,40 @@ private Object get0(String key) { } } - private void remove0(String key) { + private void remove0(final String key) { if (!java1 && tlm != null) { - Hashtable ht = (Hashtable) ((ThreadLocalMap) tlm).get(); + final Hashtable ht = (Hashtable) ((ThreadLocalMap) tlm).get(); if (ht != null) { ht.remove(key); // clean up if this was the last key if (ht.isEmpty()) { - clear0(); + clear0(); } } } } - private Hashtable getContext0() { + private Hashtable getContext0() { if (java1 || tlm == null) { return null; } else { - return (Hashtable) ((ThreadLocalMap) tlm).get(); + return (Hashtable) ((ThreadLocalMap) tlm).get(); } } private void clear0() { if (!java1 && tlm != null) { - Hashtable ht = (Hashtable) ((ThreadLocalMap) tlm).get(); + final Hashtable ht = (Hashtable) ((ThreadLocalMap) tlm).get(); if (ht != null) { ht.clear(); } if (removeMethod != null) { // java 1.3/1.4 does not have remove - will suffer from a memory leak try { - removeMethod.invoke(tlm, null); - } catch (IllegalAccessException e) { + removeMethod.invoke(tlm); + } catch (final IllegalAccessException e) { // should not happen - } catch (InvocationTargetException e) { + } catch (final InvocationTargetException e) { // should not happen } } diff --git a/src/test/java/org/apache/log4j/helpers/ThreadLocalMap.java b/src/test/java/org/apache/log4j/helpers/ThreadLocalMap.java index da60c86f..b7b20338 100644 --- a/src/test/java/org/apache/log4j/helpers/ThreadLocalMap.java +++ b/src/test/java/org/apache/log4j/helpers/ThreadLocalMap.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,23 +20,21 @@ import java.util.Hashtable; /** - ThreadLocalMap extends {@link InheritableThreadLocal} - to bequeath a copy of the hashtable of the MDC of the parent - thread. - - @author Ceki Gülcü - @since 1.2 -*/ -final public class ThreadLocalMap extends InheritableThreadLocal { + * ThreadLocalMap extends {@link InheritableThreadLocal} to bequeath a copy of the hashtable of the MDC of + * the parent thread. + * + * @author Ceki Gülcü + * @since 1.2 + */ +final public class ThreadLocalMap extends InheritableThreadLocal { - public - final - Object childValue(Object parentValue) { - Hashtable ht = (Hashtable) parentValue; - if(ht != null) { - return ht.clone(); - } else { - return null; + @Override + public final Object childValue(final Object parentValue) { + final Hashtable ht = (Hashtable) parentValue; + if (ht != null) { + return ht.clone(); + } else { + return null; + } } - } } diff --git a/src/test/scala/com/exasol/cloudetl/DataRecords.scala b/src/test/scala/com/exasol/cloudetl/DataRecords.scala index 18df0305..92a662af 100644 --- a/src/test/scala/com/exasol/cloudetl/DataRecords.scala +++ b/src/test/scala/com/exasol/cloudetl/DataRecords.scala @@ -17,8 +17,8 @@ trait DataRecords { val TIMESTAMP_VALUE2: Timestamp = new Timestamp(System.currentTimeMillis()) val rawRecords: Seq[Seq[Object]] = Seq( - Seq(1, 3L, BIG_DECIMAL_VALUE1, 3.14d, "xyz", true, DATE_VALUE1, TIMESTAMP_VALUE1), - Seq(2, 4L, BIG_DECIMAL_VALUE2, 0.13d, "abc", false, DATE_VALUE2, TIMESTAMP_VALUE2) + Seq[Any](1, 3L, BIG_DECIMAL_VALUE1, 3.14d, "xyz", true, DATE_VALUE1, TIMESTAMP_VALUE1), + Seq[Any](2, 4L, BIG_DECIMAL_VALUE2, 0.13d, "abc", false, DATE_VALUE2, TIMESTAMP_VALUE2) ).map { seq => seq.map(_.asInstanceOf[AnyRef]) } diff --git a/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala b/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala index 6f0c4f54..aa49ed89 100644 --- a/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala +++ b/src/test/scala/com/exasol/cloudetl/it/BaseDataImporter.scala @@ -14,19 +14,22 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit val dataFormat: String var outputDirectory: Path = _ - var path: HPath = _ + var paths: List[HPath] = List() + val baseFileName = "part-" override final def beforeEach(): Unit = { outputDirectory = createTemporaryFolder(s"$dataFormat-tests-") - path = new HPath(outputDirectory.toUri.toString, s"part-00000.$dataFormat") () } - override final def afterEach(): Unit = + override final def afterEach(): Unit = { + paths = List() deletePathFiles(outputDirectory) + } override def beforeAll(): Unit = { super.beforeAll() + createBucket(bucketName) prepareExasolDatabase(schemaName) createS3ConnectionObject() } @@ -36,14 +39,27 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit super.afterAll() } - abstract class AbstractChecker(exaColumnType: String, tableName: String) { + def addFile(): HPath = { + val fileCounter = String.format("%04d", paths.length) + val newPath = new HPath(outputDirectory.toUri.toString, s"$baseFileName$fileCounter.$dataFormat") + paths = paths.appended(newPath) + newPath + } + + abstract class AbstractChecker(exaColumnType: String, tableName: String) + extends AbstractMultiColChecker(Map("COLUMN" -> exaColumnType), tableName) + + abstract class AbstractMultiColChecker(columns: Map[String, String], tableName: String) { def withResultSet(block: ResultSet => Unit): this.type = { - uploadFileToS3(bucketName, path) - val table = schema + paths.foreach(path => uploadFileToS3(bucketName, path)) + val tableBuilder = schema .createTableBuilder(tableName.toUpperCase(java.util.Locale.ENGLISH)) - .column("COLUMN", exaColumnType) - .build() - importFromS3IntoExasol(schemaName, table, bucketName, path.getName(), dataFormat) + columns.foreach { case (colName, colType) => + tableBuilder.column(colName, colType) + } + + val table = tableBuilder.build() + importFromS3IntoExasol(schemaName, table, bucketName, s"$baseFileName*", dataFormat) val rs = executeQuery(s"SELECT * FROM ${table.getFullyQualifiedName()}") block(rs) rs.close() @@ -54,6 +70,21 @@ trait BaseDataImporter extends BaseS3IntegrationTest with BeforeAndAfterEach wit withResultSet(assertThat(_, matcher)) () } - } + def assertFails(errorMessageMatcher: Matcher[String]): Unit = { + paths.foreach(path => uploadFileToS3(bucketName, path)) + val tableBuilder = schema + .createTableBuilder(tableName.toUpperCase(java.util.Locale.ENGLISH)) + columns.foreach { case (colName, colType) => + tableBuilder.column(colName, colType) + } + + val table = tableBuilder.build() + val exception = intercept[IllegalStateException] { + importFromS3IntoExasol(schemaName, table, bucketName, s"$baseFileName*", dataFormat) + } + assertThat(exception.getCause().getMessage(), errorMessageMatcher) + () + } + } } diff --git a/src/test/scala/com/exasol/cloudetl/it/BaseIntegrationTest.scala b/src/test/scala/com/exasol/cloudetl/it/BaseIntegrationTest.scala index 75dddbc9..b4df9ac7 100644 --- a/src/test/scala/com/exasol/cloudetl/it/BaseIntegrationTest.scala +++ b/src/test/scala/com/exasol/cloudetl/it/BaseIntegrationTest.scala @@ -41,6 +41,7 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll with LazyLo def prepareExasolDatabase(schemaName: String): Unit = { executeStmt(s"DROP SCHEMA IF EXISTS $schemaName CASCADE;") factory = new ExasolObjectFactory(getConnection()) + logger.info("Creating schema " + schemaName) schema = factory.createSchema(schemaName) createImportDeploymentScripts() createExportDeploymentScripts() @@ -48,7 +49,12 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll with LazyLo } def executeStmt(sql: String): Unit = { - getConnection().createStatement().execute(sql) + try { + getConnection().createStatement().execute(sql) + } catch { + case exception: Exception => + throw new IllegalStateException(s"Failed executing SQL '$sql': ${exception.getMessage()}", exception) + } () } @@ -117,6 +123,7 @@ trait BaseIntegrationTest extends AnyFunSuite with BeforeAndAfterAll with LazyLo private[this] def uploadJarToBucket(): Unit = { val jarPath = Paths.get("target", assembledJarName) + logger.info("Uploading JAR " + jarPath + " to bucket...") exasolContainer.getDefaultBucket.uploadFile(jarPath, assembledJarName) } diff --git a/src/test/scala/com/exasol/cloudetl/it/BaseS3IntegrationTest.scala b/src/test/scala/com/exasol/cloudetl/it/BaseS3IntegrationTest.scala index c1dd6c9a..699cb43f 100644 --- a/src/test/scala/com/exasol/cloudetl/it/BaseS3IntegrationTest.scala +++ b/src/test/scala/com/exasol/cloudetl/it/BaseS3IntegrationTest.scala @@ -83,7 +83,10 @@ trait BaseS3IntegrationTest extends BaseIntegrationTest { def getAWSSecretKey(): String = s3Container.getSecretKey() def uploadFileToS3(bucket: String, file: HPath): Unit = { - createBucket(bucket) + if (!s3.doesBucketExistV2(bucket)) { + createBucket(bucket) + } + logger.debug(s"Uploading file $file to bucket $bucket") val request = new PutObjectRequest(bucket, file.getName(), new File(file.toUri())) s3.putObject(request) () @@ -94,19 +97,28 @@ trait BaseS3IntegrationTest extends BaseIntegrationTest { () } - def importFromS3IntoExasol(schemaName: String, table: Table, bucket: String, file: String, dataFormat: String): Unit = + def importFromS3IntoExasol( + schemaName: String, + table: Table, + bucket: String, + file: String, + dataFormat: String + ): Unit = { + val bucketPath = s"s3a://$bucket/$file" + logger.debug(s"Importing $bucketPath of format $dataFormat into table ${table.getFullyQualifiedName()}...") executeStmt( s"""|IMPORT INTO ${table.getFullyQualifiedName()} |FROM SCRIPT $schemaName.IMPORT_PATH WITH - |BUCKET_PATH = 's3a://$bucket/$file' + |BUCKET_PATH = '$bucketPath' |DATA_FORMAT = '$dataFormat' |S3_ENDPOINT = '$s3Endpoint' |S3_CHANGE_DETECTION_MODE = 'none' |TRUNCATE_STRING = 'true' |CONNECTION_NAME = 'S3_CONNECTION' |PARALLELISM = 'nproc()'; - """.stripMargin + """.stripMargin ) + } def exportIntoS3(schemaName: String, tableName: String, bucket: String): Unit = executeStmt( diff --git a/src/test/scala/com/exasol/cloudetl/it/avro/AvroDataImporterIT.scala b/src/test/scala/com/exasol/cloudetl/it/avro/AvroDataImporterIT.scala index 2812fb48..ffbc5db2 100644 --- a/src/test/scala/com/exasol/cloudetl/it/avro/AvroDataImporterIT.scala +++ b/src/test/scala/com/exasol/cloudetl/it/avro/AvroDataImporterIT.scala @@ -331,6 +331,7 @@ class AvroDataImporterIT extends BaseDataImporter { val avroSchema = new Schema.Parser().parse(avroSchemaStr) def withInputValues[T](values: List[T]): AvroChecker = { + val path = addFile() writeDataValues(values, path, avroSchema) this } diff --git a/src/test/scala/com/exasol/cloudetl/it/orc/OrcDataImporterIT.scala b/src/test/scala/com/exasol/cloudetl/it/orc/OrcDataImporterIT.scala index e9649574..f9559098 100644 --- a/src/test/scala/com/exasol/cloudetl/it/orc/OrcDataImporterIT.scala +++ b/src/test/scala/com/exasol/cloudetl/it/orc/OrcDataImporterIT.scala @@ -23,7 +23,7 @@ class OrcDataImporterIT extends BaseDataImporter { test("imports boolean") { OrcChecker("struct", "BOOLEAN", "boolean_table") - .withInputValues(List(true, false, null)) + .withInputValues(List[Any](true, false, null)) .assertResultSet( table() .row(java.lang.Boolean.TRUE) @@ -35,7 +35,7 @@ class OrcDataImporterIT extends BaseDataImporter { test("imports byte") { OrcChecker("struct", "DECIMAL(3,0)", "byte_table") - .withInputValues(List(11, null)) + .withInputValues(List[Any](11, null)) .assertResultSet( table().row(java.lang.Byte.valueOf("11")).row(null).matches(NO_JAVA_TYPE_CHECK) ) @@ -43,7 +43,7 @@ class OrcDataImporterIT extends BaseDataImporter { test("imports short") { OrcChecker("struct", "DECIMAL(9,0)", "short_table") - .withInputValues(List(13, null)) + .withInputValues(List[Any](13, null)) .assertResultSet( table().row(java.lang.Short.valueOf("13")).row(null).matches(NO_JAVA_TYPE_CHECK) ) @@ -51,7 +51,7 @@ class OrcDataImporterIT extends BaseDataImporter { test("imports int") { OrcChecker("struct", "DECIMAL(10,0)", "int_table") - .withInputValues(List(INT_MIN, 999, null, INT_MAX)) + .withInputValues(List[Any](INT_MIN, 999, null, INT_MAX)) .assertResultSet( table() .row(java.lang.Integer.valueOf(INT_MIN)) @@ -64,7 +64,7 @@ class OrcDataImporterIT extends BaseDataImporter { test("imports long") { OrcChecker("struct", "DECIMAL(19,0)", "long_table") - .withInputValues(List(LONG_MIN, 1234L, null, LONG_MAX)) + .withInputValues(List[Any](LONG_MIN, 1234L, null, LONG_MAX)) .assertResultSet( table() .row(java.lang.Long.valueOf(LONG_MIN)) @@ -78,7 +78,7 @@ class OrcDataImporterIT extends BaseDataImporter { test("imports float") { val EPS = java.math.BigDecimal.valueOf(0.0001) OrcChecker("struct", "FLOAT", "float_table") - .withInputValues(List(3.14f, null)) + .withInputValues(List[Any](3.14f, null)) .assertResultSet( table() .row(CellMatcherFactory.cellMatcher(3.14, STRICT, EPS)) @@ -89,7 +89,7 @@ class OrcDataImporterIT extends BaseDataImporter { test("imports double") { OrcChecker("struct", "DOUBLE", "double_table") - .withInputValues(List(2.71, null)) + .withInputValues(List[Any](2.71, null)) .assertResultSet(table().row(java.lang.Double.valueOf(2.71)).row(null).matches()) } @@ -135,7 +135,7 @@ class OrcDataImporterIT extends BaseDataImporter { test("imports date") { OrcChecker("struct", "DATE", "date_table") - .withInputValues(List(0, 1, null)) + .withInputValues(List[Any](0, 1, null)) .assertResultSet( table() .row(java.sql.Date.valueOf("1970-01-01")) @@ -172,7 +172,7 @@ class OrcDataImporterIT extends BaseDataImporter { test("imports list of doubles") { OrcChecker("struct>", "VARCHAR(20)", "list_doubles") - .withInputValues(List(Seq(1.11, 2.22, null))) + .withInputValues(List(Seq[Any](1.11, 2.22, null))) .assertResultSet(table().row("[1.11,2.22,null]").matches()) } @@ -230,6 +230,7 @@ class OrcDataImporterIT extends BaseDataImporter { test("imports union") { val orcType = "struct>" val orcSchema = TypeDescription.fromString(orcType) + val path = addFile() val writer = OrcFile.createWriter(path, OrcFile.writerOptions(conf).setSchema(orcSchema)) val batch = orcSchema.createRowBatch() batch.size = 3 @@ -262,6 +263,7 @@ class OrcDataImporterIT extends BaseDataImporter { val orcSchema = TypeDescription.fromString(orcColumn) def withInputValues[T](values: List[T]): OrcChecker = { + val path = addFile() writeDataValues(values, path, orcSchema) this } diff --git a/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala b/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala index 400ad93e..deec0edb 100644 --- a/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala +++ b/src/test/scala/com/exasol/cloudetl/it/parquet/ParquetDataImporterIT.scala @@ -19,6 +19,7 @@ import org.apache.parquet.example.data.simple.SimpleGroup import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.io.api.Binary import org.apache.parquet.schema._ +import org.hamcrest.Matchers class ParquetDataImporterIT extends BaseDataImporter { @@ -476,12 +477,94 @@ class ParquetDataImporterIT extends BaseDataImporter { ) } + test("imports multiple columns from single file") { + MultiParquetChecker( + "required binary name (UTF8); required int32 age;", + Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"), + "multi_col_single_file" + ) + .addParquetFile { case (writer, schema) => + writer.write(new SimpleGroup(schema).append("name", "John").append("age", 24)) + writer.write(new SimpleGroup(schema).append("name", "Jane").append("age", 22)) + } + .assertResultSet( + table("VARCHAR", "BIGINT") + .row("John", 24L) + .row("Jane", 22L) + .matches() + ) + } + + test("imports from multiple files") { + MultiParquetChecker( + "required binary name (UTF8); required int32 age;", + Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"), + "multi_col_multi_file" + ) + .addParquetFile { case (writer, schema) => + writer.write(new SimpleGroup(schema).append("name", "John").append("age", 24)) + } + .addParquetFile { case (writer, schema) => + writer.write(new SimpleGroup(schema).append("name", "Jane").append("age", 22)) + } + .assertResultSet( + table("VARCHAR", "BIGINT") + .row("John", 24L) + .row("Jane", 22L) + .matches() + ) + } + + test("import with missing field fails") { + MultiParquetChecker( + "required binary name (UTF8); required int32 age;", + Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"), + "missing_field" + ) + .addParquetFile { case (writer, schema) => + writer.write(new SimpleGroup(schema).append("name", "John")) + } + .assertFails( + Matchers.containsString( + "ParquetDecodingException: Can't read value in column [age] required int32 age" + ) + ) + } + + test("importing from files with different schema fails") { + MultiParquetChecker( + "required binary name (UTF8);", + Map("NAME" -> "VARCHAR(60)", "AGE" -> "INTEGER"), + "missing_column" + ) + .addParquetFileWithSchema( + MessageTypeParser.parseMessageType("message test { required binary name (UTF8); required int32 age; }"), + { case (writer, schema) => + writer.write(new SimpleGroup(schema).append("name", "John").append("age", 24)) + } + ) + .addParquetFileWithSchema( + MessageTypeParser.parseMessageType("message test { required binary name (UTF8); }"), + { case (writer, schema) => + writer.write( + new SimpleGroup(schema).append("name", "Jane") + ) + } + ) + .assertFails( + Matchers.containsString( + "ExaIterationException: E-UDF-CL-SL-JAVA-1107: emit() takes exactly 2 arguments (1 given)" + ) + ) + } + case class ParquetChecker(parquetColumn: String, exaColumn: String, tableName: String) extends AbstractChecker(exaColumn, tableName) with ParquetTestDataWriter { private val parquetSchema = MessageTypeParser.parseMessageType(s"message test { $parquetColumn }") def withWriter(block: (ParquetWriter[Group], MessageType) => Unit): ParquetChecker = { + val path = addFile() val writer = getParquetWriter(path, parquetSchema, true) block(writer, parquetSchema) writer.close() @@ -489,6 +572,33 @@ class ParquetDataImporterIT extends BaseDataImporter { } def withInputValues[T](values: List[T]): ParquetChecker = { + val path = addFile() + writeDataValues(values, path, parquetSchema) + this + } + } + + case class MultiParquetChecker(parquetColumn: String, exaColumns: Map[String, String], tableName: String) + extends AbstractMultiColChecker(exaColumns, tableName) + with ParquetTestDataWriter { + private val parquetSchema = MessageTypeParser.parseMessageType(s"message test { $parquetColumn }") + + def addParquetFile(block: (ParquetWriter[Group], MessageType) => Unit): MultiParquetChecker = + addParquetFileWithSchema(parquetSchema, block) + + def addParquetFileWithSchema( + customParquetSchema: MessageType, + block: (ParquetWriter[Group], MessageType) => Unit + ): MultiParquetChecker = { + val path = addFile() + val writer = getParquetWriter(path, customParquetSchema, true) + block(writer, customParquetSchema) + writer.close() + this + } + + def withInputValues[T](values: List[T]): MultiParquetChecker = { + val path = addFile() writeDataValues(values, path, parquetSchema) this } diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/FilesDataImporterTest.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/FilesDataImporterTest.scala index 79ddb521..b716d9b7 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/FilesDataImporterTest.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/FilesDataImporterTest.scala @@ -2,10 +2,14 @@ package com.exasol.cloudetl.scriptclasses import com.exasol.ExaIterator import com.exasol.ExaMetadata +import com.exasol.cloudetl.storage.StorageProperties +import com.exasol.parquetio.data.ChunkInterval +import com.exasol.parquetio.data.ChunkIntervalImpl import org.mockito.Mockito._ +import org.scalatest.matchers.should._ -class FilesDataImporterTest extends StorageTest { +class FilesDataImporterTest extends StorageTest with Matchers { private[this] val properties = Map( "BUCKET_PATH" -> testResourceParquetPath, @@ -27,14 +31,29 @@ class FilesDataImporterTest extends StorageTest { val file2 = s"$testResourceDir/import/parquet/sales_positions2.snappy.parquet" val expectedNumberOfRecords = 1000 - val iter = mockExasolIterator(properties) - when(iter.next()).thenReturn(true, false) - when(iter.getString(2)).thenReturn(file1, file2) - when(iter.getLong(3)).thenReturn(0L, 0L) - when(iter.getLong(4)).thenReturn(1L, 1L) + val storageProperties = StorageProperties(properties) + val iter = + new ExaIteratorMock(iteratorRow(storageProperties, file1, 0, 1), iteratorRow(storageProperties, file2, 0, 1)) FilesDataImporter.run(mock[ExaMetadata], iter) - verify(iter, times(expectedNumberOfRecords)).emit(anyObjects()) + assert(iter.getEmittedRows().size() == expectedNumberOfRecords) + } + + test("run emits records for single interval") { + val file1 = s"$testResourceDir/import/parquet/sales_positions1.snappy.parquet" + val storageProperties = StorageProperties(properties) + val iter = new ExaIteratorMock(iteratorRow(storageProperties, file1, 0, 1)) + FilesDataImporter.run(mock[ExaMetadata], iter) + assert(iter.getEmittedRows().size() == 500) + } + + test("run emits records for duplicate intervals") { + val file = s"$testResourceDir/import/parquet/sales_positions1.snappy.parquet" + val storageProperties = StorageProperties(properties) + val iter = + new ExaIteratorMock(iteratorRow(storageProperties, file, 0, 1), iteratorRow(storageProperties, file, 0, 1)) + FilesDataImporter.run(mock[ExaMetadata], iter) + assert(iter.getEmittedRows().size() == 500) } /** @@ -62,6 +81,66 @@ class FilesDataImporterTest extends StorageTest { verifySmallFilesImport(iter) } + test("collectFiles for empty iterator (will never happen in a UDF)") { + val result = FilesDataImporter.collectFiles(ExaIteratorMock.empty()) + assert(result.size == 1) + result.get(null).get.should(contain).theSameElementsAs(List(chunk(0, 0))) + } + + test("collectFiles for iterator with single entry") { + val result = + FilesDataImporter.collectFiles(new ExaIteratorMock(iteratorRow("file1.parquet", 17L, 42L))) + result.get("file1.parquet").get.should(contain).theSameElementsAs(List(chunk(17, 42))) + } + + test("collectFiles for iterator with single file but multiple chunks") { + val result = + FilesDataImporter.collectFiles( + new ExaIteratorMock( + iteratorRow("file1.parquet", 17L, 42L), + iteratorRow("file1.parquet", 1L, 2L) + ) + ) + assert(result.size == 1) + result.get("file1.parquet").get.should(contain).theSameElementsAs(List(chunk(17, 42), chunk(1, 2))) + } + + test("collectFiles for iterator with multiple files and multiple chunks") { + val result = + FilesDataImporter.collectFiles( + new ExaIteratorMock( + iteratorRow("file1.parquet", 17L, 42L), + iteratorRow("file1.parquet", 1L, 2L), + iteratorRow("file2.parquet", 0L, 1L) + ) + ) + assert(result.size == 2) + result.get("file1.parquet").get.should(contain).theSameElementsAs(List(chunk(17, 42), chunk(1, 2))) + result.get("file2.parquet").get.should(contain).theSameElementsAs(List(chunk(0, 1))) + } + + test("collectFiles for iterator with two files") { + val result = + FilesDataImporter.collectFiles( + new ExaIteratorMock( + iteratorRow("file1.parquet", 17L, 42L), + iteratorRow("file2.parquet", 1L, 2L) + ) + ) + assert(result.size == 2) + result.get("file1.parquet").get.should(contain).theSameElementsAs(List(chunk(17, 42))) + result.get("file2.parquet").get.should(contain).theSameElementsAs(List(chunk(1, 2))) + } + + private def iteratorRow(file: String, intervalStart: Long, intervalEnd: Long) = + Array[Any](null, null, file, intervalStart, intervalEnd) + + private def iteratorRow(storageProperties: StorageProperties, file: String, intervalStart: Long, intervalEnd: Long) = + Array[Any](storageProperties.getStoragePath(), storageProperties.mkString(), file, intervalStart, intervalEnd) + + private[this] def chunk(start: Long, end: Long): ChunkInterval = + new ChunkIntervalImpl(start, end) + private[this] def mockFileIterator(fileFormat: String, filename: String): ExaIterator = { val iter = mockExasolIterator(properties ++ Map("DATA_FORMAT" -> fileFormat)) when(iter.next()).thenReturn(false) @@ -74,11 +153,11 @@ class FilesDataImporterTest extends StorageTest { private[this] def verifySmallFilesImport(iter: ExaIterator): Unit = { val totalRecords = 5 val records: Seq[Seq[Object]] = Seq( - Seq(582244536L, 2, 96982, 1, 0.56, null, null), - Seq(582177839L, 6, 96982, 2, 0.56, null, null), - Seq(582370207L, 0, 96982, 1, 0.56, null, null), - Seq(582344312L, 0, 96982, 5, 0.56, null, null), - Seq(582344274L, 1, 96982, 1, 0.56, null, null) + Seq[Any](582244536L, 2, 96982, 1, 0.56, null, null), + Seq[Any](582177839L, 6, 96982, 2, 0.56, null, null), + Seq[Any](582370207L, 0, 96982, 1, 0.56, null, null), + Seq[Any](582344312L, 0, 96982, 5, 0.56, null, null), + Seq[Any](582344274L, 1, 96982, 1, 0.56, null, null) ).map { seq => seq.map(_.asInstanceOf[AnyRef]) } diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/TableDataExporterTest.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/TableDataExporterTest.scala index 3f88e151..327cb0cf 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/TableDataExporterTest.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/TableDataExporterTest.scala @@ -123,7 +123,7 @@ class TableDataExporterTest extends StorageTest with BeforeAndAfterEach with Dat FilesDataImporter.run(mock[ExaMetadata], importIter) - verify(importIter, times(2)).emit(anyObjects) + verify(importIter, times(2)).emit(anyObjects()) verify(iterator, times(1)).emit(Long.valueOf(2)) } diff --git a/src/test/scala/com/exasol/cloudetl/transform/DefaultTransformationTest.scala b/src/test/scala/com/exasol/cloudetl/transform/DefaultTransformationTest.scala index 6c04cb7c..acb2e0c3 100644 --- a/src/test/scala/com/exasol/cloudetl/transform/DefaultTransformationTest.scala +++ b/src/test/scala/com/exasol/cloudetl/transform/DefaultTransformationTest.scala @@ -9,11 +9,11 @@ import org.scalatest.funsuite.AnyFunSuite class DefaultTransformationTest extends AnyFunSuite { val longString = getRandomString(2000010) - val values = Array(1, 3.14, "abc", longString).map(_.asInstanceOf[Object]) + val values = Array[Any](1, 3.14, "abc", longString).map(_.asInstanceOf[Object]) test("truncates strings larger than maximum varchar size") { val properties = StorageProperties(Map(TRUNCATE_STRING -> "true")) - val expected = Array(1, 3.14, "abc", longString.substring(0, 2000000)).map(_.asInstanceOf[Object]) + val expected = Array[Any](1, 3.14, "abc", longString.substring(0, 2000000)).map(_.asInstanceOf[Object]) assert(new DefaultTransformation(properties).transform(Array.from(values)) === expected) }