diff --git a/pom.xml b/pom.xml index 43b5e3fec..ef97610df 100644 --- a/pom.xml +++ b/pom.xml @@ -209,31 +209,31 @@ org.apache.spark - spark-core_2.12 + spark-core_2.13 3.5.1 provided org.apache.spark - spark-sql_2.12 + spark-sql_2.13 3.5.1 provided org.apache.spark - spark-mllib_2.12 + spark-mllib_2.13 3.5.1 provided org.apache.hadoop - hadoop-aws + hadoop-aws 3.3.6 provided org.apache.spark - spark-avro_2.12 + spark-avro_2.13 3.5.1 @@ -277,37 +277,37 @@ commons-io 2.11.0 - - org.apache.httpcomponents - httpclient - 4.5.13 - - - - org.jgrapht - jgrapht-core - 1.4.0 - - - joda-time - joda-time - 2.10.6 - - - com.fasterxml.jackson.dataformat - jackson-dataformat-yaml - 2.15.2 - - - io.delta - delta-spark_2.12 - 3.2.1 - - + + org.apache.httpcomponents + httpclient + 4.5.13 + + + + org.jgrapht + jgrapht-core + 1.4.0 + + + joda-time + joda-time + 2.10.6 + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + 2.15.2 + + + io.delta + delta-spark_2.13 + 3.2.1 + + diff --git a/src/main/java/org/rumbledb/items/parsing/ItemParser.java b/src/main/java/org/rumbledb/items/parsing/ItemParser.java index 434e9a43b..4c83d2047 100644 --- a/src/main/java/org/rumbledb/items/parsing/ItemParser.java +++ b/src/main/java/org/rumbledb/items/parsing/ItemParser.java @@ -46,13 +46,14 @@ import org.rumbledb.types.BuiltinTypesCatalogue; import org.rumbledb.types.FieldDescriptor; import org.rumbledb.types.ItemType; -import scala.collection.mutable.WrappedArray; +import scala.collection.immutable.ArraySeq; +import scala.collection.Iterator; + import sparksoniq.spark.SparkSessionManager; import java.io.IOException; import java.io.Serializable; import java.io.StringReader; -import java.lang.reflect.Array; import java.math.BigDecimal; import java.math.BigInteger; import java.sql.Date; @@ -589,9 +590,15 @@ private static Item convertValueToItem( } } else { @SuppressWarnings("unchecked") - Object arrayObject = ((WrappedArray) o).array(); - for (int index = 0; index < Array.getLength(arrayObject); index++) { - Object value = Array.get(arrayObject, index); + Iterator iterator = null; + if (o instanceof scala.collection.mutable.ArraySeq) { + iterator = ((scala.collection.mutable.ArraySeq) o).iterator(); + } else { + iterator = ((ArraySeq) o).iterator(); + } + while (iterator.hasNext()) { + Object value = iterator.next(); + members.add(convertValueToItem(value, dataType, metadata, memberType)); } } diff --git a/src/main/java/org/rumbledb/runtime/flwor/FlworDataFrameUtils.java b/src/main/java/org/rumbledb/runtime/flwor/FlworDataFrameUtils.java index 614404396..b8cfb797b 100644 --- a/src/main/java/org/rumbledb/runtime/flwor/FlworDataFrameUtils.java +++ b/src/main/java/org/rumbledb/runtime/flwor/FlworDataFrameUtils.java @@ -86,7 +86,8 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import scala.collection.mutable.WrappedArray; +import scala.collection.immutable.ArraySeq; +import scala.collection.Iterator; import sparksoniq.spark.SparkSessionManager; public class FlworDataFrameUtils { @@ -807,19 +808,20 @@ private static Object deserializeByteArray(byte[] toDeserialize, Kryo kryo, Inpu } public static void deserializeWrappedParameters( - WrappedArray wrappedParameters, + ArraySeq wrappedParameters, List> deserializedParams, Kryo kryo, Input input ) { - Object[] serializedParams = (Object[]) wrappedParameters.array(); - for (Object serializedParam : serializedParams) { - if (serializedParam == null) { + Iterator iterator = wrappedParameters.iterator(); + while (iterator.hasNext()) { + byte[] bytes = iterator.next(); + if (bytes == null) { deserializedParams.add(Collections.emptyList()); continue; } @SuppressWarnings("unchecked") - List deserializedParam = (List) deserializeByteArray((byte[]) serializedParam, kryo, input); + List deserializedParam = (List) deserializeByteArray((byte[]) bytes, kryo, input); deserializedParams.add(deserializedParam); } } diff --git a/src/main/java/org/rumbledb/runtime/flwor/udfs/GroupClauseArrayMergeAggregateResultsUDF.java b/src/main/java/org/rumbledb/runtime/flwor/udfs/GroupClauseArrayMergeAggregateResultsUDF.java index 2054b1519..e1c3e189f 100644 --- a/src/main/java/org/rumbledb/runtime/flwor/udfs/GroupClauseArrayMergeAggregateResultsUDF.java +++ b/src/main/java/org/rumbledb/runtime/flwor/udfs/GroupClauseArrayMergeAggregateResultsUDF.java @@ -25,12 +25,14 @@ import org.rumbledb.api.Item; import org.rumbledb.exceptions.OurBadException; -import scala.collection.mutable.WrappedArray; +import scala.collection.immutable.ArraySeq; +import scala.collection.Iterator; + import java.util.ArrayList; import java.util.List; -public class GroupClauseArrayMergeAggregateResultsUDF implements UDF1, Object[]> { +public class GroupClauseArrayMergeAggregateResultsUDF implements UDF1, Object[]> { private static final long serialVersionUID = 1L; @@ -43,22 +45,23 @@ public GroupClauseArrayMergeAggregateResultsUDF() { } @Override - public Object[] call(WrappedArray wrappedParameters) { + public Object[] call(ArraySeq wrappedParameters) { this.nextResult.clear(); this.deserializedParams.clear(); List result = new ArrayList(); - Object[] insideArrays = (Object[]) wrappedParameters.array(); - for (Object o : insideArrays) { + Iterator iterator = wrappedParameters.iterator(); + while (iterator.hasNext()) { + Object o = iterator.next(); if (o instanceof Row) { Row row = (Row) o; result.add(row); } - if (o instanceof WrappedArray) { - @SuppressWarnings("rawtypes") - WrappedArray wrappedArray = (WrappedArray) o; - Object[] insideArrays2 = (Object[]) wrappedArray.array(); - for (Object p : insideArrays2) - result.add(p); + if (o instanceof ArraySeq) { + @SuppressWarnings("unchecked") + ArraySeq arraySeq = (ArraySeq) o; + Iterator iterator2 = arraySeq.iterator(); + while (iterator2.hasNext()) + result.add(iterator2.next()); } else { throw new OurBadException("We cannot process " + o.getClass().getCanonicalName()); } diff --git a/src/main/java/org/rumbledb/runtime/flwor/udfs/GroupClauseSerializeAggregateResultsUDF.java b/src/main/java/org/rumbledb/runtime/flwor/udfs/GroupClauseSerializeAggregateResultsUDF.java index 4bd53e232..bcbcb0c0c 100644 --- a/src/main/java/org/rumbledb/runtime/flwor/udfs/GroupClauseSerializeAggregateResultsUDF.java +++ b/src/main/java/org/rumbledb/runtime/flwor/udfs/GroupClauseSerializeAggregateResultsUDF.java @@ -23,12 +23,12 @@ import org.apache.spark.sql.api.java.UDF1; import org.rumbledb.api.Item; import org.rumbledb.runtime.flwor.FlworDataFrameUtils; -import scala.collection.mutable.WrappedArray; +import scala.collection.immutable.ArraySeq; import java.util.ArrayList; import java.util.List; -public class GroupClauseSerializeAggregateResultsUDF implements UDF1, byte[]> { +public class GroupClauseSerializeAggregateResultsUDF implements UDF1, byte[]> { private static final long serialVersionUID = 1L; @@ -43,7 +43,7 @@ public GroupClauseSerializeAggregateResultsUDF() { } @Override - public byte[] call(WrappedArray wrappedParameters) { + public byte[] call(ArraySeq wrappedParameters) { this.nextResult.clear(); this.deserializedParams.clear(); FlworDataFrameUtils.deserializeWrappedParameters( diff --git a/src/test/java/iq/Bugs.java b/src/test/java/iq/Bugs.java index 704916331..21ec03f0a 100644 --- a/src/test/java/iq/Bugs.java +++ b/src/test/java/iq/Bugs.java @@ -21,6 +21,7 @@ package iq; import iq.base.AnnotationsTestsBase; +import scala.Function0; import scala.util.Properties; import org.apache.spark.SparkConf; @@ -51,7 +52,12 @@ public class Bugs extends AnnotationsTestsBase { public static final String javaVersion = System.getProperty("java.version"); public static final String scalaVersion = - Properties.scalaPropOrElse("version.number", "unknown"); + Properties.scalaPropOrElse("version.number", new Function0() { + @Override + public String apply() { + return "unknown"; + } + }); protected static List _testFiles = new ArrayList<>(); protected final File testFile; diff --git a/src/test/java/iq/DeltaUpdateRuntimeTests.java b/src/test/java/iq/DeltaUpdateRuntimeTests.java index 7341472bd..990151dc6 100644 --- a/src/test/java/iq/DeltaUpdateRuntimeTests.java +++ b/src/test/java/iq/DeltaUpdateRuntimeTests.java @@ -38,6 +38,7 @@ import org.rumbledb.api.Item; import org.rumbledb.api.SequenceOfItems; import scala.util.Properties; +import scala.Function0; import sparksoniq.spark.SparkSessionManager; import utils.FileManager; @@ -58,7 +59,12 @@ public class DeltaUpdateRuntimeTests extends AnnotationsTestsBase { public static final String javaVersion = System.getProperty("java.version"); public static final String scalaVersion = - Properties.scalaPropOrElse("version.number", "unknown"); + Properties.scalaPropOrElse("version.number", new Function0() { + @Override + public String apply() { + return "unknown"; + } + }); public RumbleRuntimeConfiguration getConfiguration() { return new RumbleRuntimeConfiguration( diff --git a/src/test/java/iq/RuntimeTests.java b/src/test/java/iq/RuntimeTests.java index 7dbbffed8..71674f4ff 100644 --- a/src/test/java/iq/RuntimeTests.java +++ b/src/test/java/iq/RuntimeTests.java @@ -36,6 +36,9 @@ import scala.util.Properties; import sparksoniq.spark.SparkSessionManager; import utils.FileManager; +import scala.Function0; +import scala.util.Properties; + import java.io.File; import java.util.*; @@ -51,7 +54,13 @@ public class RuntimeTests extends AnnotationsTestsBase { public static final String javaVersion = System.getProperty("java.version"); public static final String scalaVersion = - Properties.scalaPropOrElse("version.number", "unknown"); + Properties.scalaPropOrElse("version.number", new Function0() { + @Override + public String apply() { + return "unknown"; + } + }); + protected static List _testFiles = new ArrayList<>(); protected final File testFile; diff --git a/src/test/java/iq/StaticTypeTests.java b/src/test/java/iq/StaticTypeTests.java index edec1671c..7ce351bff 100644 --- a/src/test/java/iq/StaticTypeTests.java +++ b/src/test/java/iq/StaticTypeTests.java @@ -3,6 +3,7 @@ import org.rumbledb.config.RumbleRuntimeConfiguration; import iq.base.AnnotationsTestsBase; +import scala.Function0; import scala.util.Properties; import org.apache.spark.SparkConf; @@ -37,7 +38,13 @@ public class StaticTypeTests extends AnnotationsTestsBase { public static final String javaVersion = System.getProperty("java.version"); public static final String scalaVersion = - Properties.scalaPropOrElse("version.number", "unknown"); + Properties.scalaPropOrElse("version.number", new Function0() { + @Override + public String apply() { + return "unknown"; + } + }); + protected static List _testFiles = new ArrayList<>(); protected final File testFile; diff --git a/src/test/java/iq/UpdatesForRumbleBenchmark.java b/src/test/java/iq/UpdatesForRumbleBenchmark.java index da2a41317..5697fd008 100644 --- a/src/test/java/iq/UpdatesForRumbleBenchmark.java +++ b/src/test/java/iq/UpdatesForRumbleBenchmark.java @@ -12,6 +12,7 @@ import org.rumbledb.exceptions.ExceptionMetadata; import org.rumbledb.runtime.functions.input.FileSystemUtil; import scala.util.Properties; +import scala.Function0; import sparksoniq.spark.SparkSessionManager; import java.io.BufferedWriter; @@ -28,7 +29,13 @@ public class UpdatesForRumbleBenchmark { public static final String javaVersion = System.getProperty("java.version"); public static final String scalaVersion = - Properties.scalaPropOrElse("version.number", "unknown"); + Properties.scalaPropOrElse("version.number", new Function0() { + @Override + public String apply() { + return "unknown"; + } + }); + public List benchmarkFiles; diff --git a/src/test/resources/test_files/RumbleML/RumbleML/EstimatorTests/MLEstimator-FPGrowth.jq b/src/test/resources/test_files/RumbleML/RumbleML/EstimatorTests/MLEstimator-FPGrowth.jq index 6302bdcf4..d6c435cf4 100644 --- a/src/test/resources/test_files/RumbleML/RumbleML/EstimatorTests/MLEstimator-FPGrowth.jq +++ b/src/test/resources/test_files/RumbleML/RumbleML/EstimatorTests/MLEstimator-FPGrowth.jq @@ -1,4 +1,4 @@ -(:JIQS: ShouldRun; Output="({ "label" : 0, "name" : "a", "prediction" : [ "3", "4", "2", "6", "5" ] }, { "label" : 1, "name" : "b", "prediction" : [ "3", "4", "6", "5", "1" ] }, { "label" : 2, "name" : "c", "prediction" : [ "4", "2", "6", "5", "1" ] }, { "label" : 3, "name" : "d", "prediction" : [ "3", "2", "6", "5", "1" ] }, { "label" : 4, "name" : "e", "prediction" : [ "3", "4", "2", "6", "1" ] }, { "label" : 5, "name" : "f", "prediction" : [ "3", "4", "2", "5", "1" ] })" :) +(:JIQS: ShouldRun; Output="({ "label" : 0, "name" : "a", "prediction" : [ "4", "2", "3", "6", "5" ] }, { "label" : 1, "name" : "b", "prediction" : [ "4", "3", "6", "5", "1" ] }, { "label" : 2, "name" : "c", "prediction" : [ "4", "2", "6", "5", "1" ] }, { "label" : 3, "name" : "d", "prediction" : [ "2", "3", "6", "5", "1" ] }, { "label" : 4, "name" : "e", "prediction" : [ "4", "2", "3", "6", "1" ] }, { "label" : 5, "name" : "f", "prediction" : [ "4", "2", "3", "5", "1" ] })" :) let $data := annotate( json-file("../../../../queries/rumbleML/sample-ml-data-flat.json"), { "label": "integer", "binaryLabel": "integer", "name": "string", "age": "double", "weight": "double", "booleanCol": "boolean", "nullCol": "null", "stringCol": "string", "stringArrayCol": ["string"], "intArrayCol": ["integer"], "doubleArrayCol": ["double"], "doubleArrayArrayCol": [["double"]] }