Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5 (Scala 2.13) #1247

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1c97b40
Merge branch 'master' into Spark3.3
ghislainfourny May 4, 2023
1b41e02
Merge branch 'master' into Spark3.3
ghislainfourny May 16, 2023
3bebc9e
Fix parsing.
May 16, 2023
b73a128
Update to Spark 3.5.
Dec 21, 2023
9d0bfc4
Upgrade to AntLR 4.9.3.
Dec 21, 2023
570b548
Change jackson version to align with Spark.
Dec 21, 2023
c9931b3
Fix test.
Dec 21, 2023
1803a5c
Fix tests.
Dec 21, 2023
1d1d9f6
Fix tests.
Dec 21, 2023
524667d
Fix tests.
Dec 21, 2023
f745e31
Merge branch 'master' into Spark3.5.0
ghislainfourny Feb 27, 2024
c52ff1f
Fix test.
Feb 27, 2024
4aef769
Fix test.
Feb 27, 2024
debaef5
Merge branch 'master' into Spark3.5.0
ghislainfourny Feb 27, 2024
4b619b4
Merge branch 'master' into Spark3.5.0
ghislainfourny Feb 27, 2024
5521e0f
Merge branch 'master' into Spark3.5.0
ghislainfourny Jul 9, 2024
c4a0e34
Minor update.
Jul 10, 2024
ce49b4e
Merge branch 'master' of github.com:RumbleDB/rumble into Spark3.5.0
Jul 10, 2024
287869a
Merge branch 'master' of github.com:RumbleDB/rumble into Spark3.5.0
Jul 10, 2024
fb040cf
Merge branch 'master' into Spark3.5.0
ghislainfourny Jul 10, 2024
8e4460f
Merge branch 'master' into Spark3.5.0
ghislainfourny Sep 26, 2024
3ce80c1
Update pom.xml
ghislainfourny Oct 17, 2024
eeba107
Merge branch 'master' into Spark3.5.0
ghislainfourny Oct 24, 2024
e151cc8
Merge pull request #1270 from RumbleDB/delta-lake-functions
ghislainfourny Oct 24, 2024
023abbd
Merge branch 'master' into Spark3.5.0
ghislainfourny Oct 24, 2024
bb5270d
upgrade to scala 2.13 to fix tests
mschoeb Oct 25, 2024
c03ee2e
maven spotless
mschoeb Oct 25, 2024
dbcd1d7
update tests again
mschoeb Oct 25, 2024
673578d
Merge pull request #1271 from RumbleDB/spark3.5-fix-tests
ghislainfourny Oct 28, 2024
f9fb6d3
Execution mode for program.
Oct 28, 2024
4a6e6cb
Merge branch 'Spark3.5.0' of github.com:RumbleDB/rumble into Spark3.5.0
Oct 28, 2024
5c4be94
Fix tests.
Oct 28, 2024
ad37f76
Merge branch 'master' into Spark3.5.0
ghislainfourny Oct 28, 2024
626d0d3
Merge branch 'master' into Spark3.5.0
ghislainfourny Oct 28, 2024
ea8d455
Merge branch 'master' into Spark3.5.0
ghislainfourny Oct 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,32 +209,32 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.4.2</version>
<artifactId>spark-core_2.13</artifactId>
<version>3.5.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.4.2</version>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>3.4.2</version>
<artifactId>spark-mllib_2.13</artifactId>
<version>3.5.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.3.2</version>
<artifactId>hadoop-aws</artifactId>
<version>3.3.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>3.4.2</version>
<artifactId>spark-avro_2.13</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
Expand Down Expand Up @@ -282,11 +282,11 @@
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<!--<dependency>
<groupId>edu.vanderbilt.accre</groupId>
<artifactId>laurelin</artifactId>
<version>1.0.1</version>
</dependency>-->
<!--<dependency>
<groupId>edu.vanderbilt.accre</groupId>
<artifactId>laurelin</artifactId>
<version>1.0.1</version>
</dependency>-->
<dependency>
<groupId>org.jgrapht</groupId>
<artifactId>jgrapht-core</artifactId>
Expand All @@ -300,12 +300,12 @@
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.13.4</version>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>2.4.0</version>
<artifactId>delta-spark_2.13</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ public StaticContext visitExitStatement(ExitStatement exitStatement, StaticConte
return argument;
}

private ExecutionMode getHighestExecutionMode(ExecutionMode firstExecMode, ExecutionMode secondExecMode) {
private static ExecutionMode getHighestExecutionMode(ExecutionMode firstExecMode, ExecutionMode secondExecMode) {
if (firstExecMode == ExecutionMode.UNSET || secondExecMode == ExecutionMode.UNSET) {
return ExecutionMode.UNSET;
}
Expand Down
17 changes: 12 additions & 5 deletions src/main/java/org/rumbledb/items/parsing/ItemParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@
import org.rumbledb.types.BuiltinTypesCatalogue;
import org.rumbledb.types.FieldDescriptor;
import org.rumbledb.types.ItemType;
import scala.collection.mutable.WrappedArray;
import scala.collection.immutable.ArraySeq;
import scala.collection.Iterator;

import sparksoniq.spark.SparkSessionManager;

import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
Expand Down Expand Up @@ -589,9 +590,15 @@ private static Item convertValueToItem(
}
} else {
@SuppressWarnings("unchecked")
Object arrayObject = ((WrappedArray<Object>) o).array();
for (int index = 0; index < Array.getLength(arrayObject); index++) {
Object value = Array.get(arrayObject, index);
Iterator<Object> iterator = null;
if (o instanceof scala.collection.mutable.ArraySeq) {
iterator = ((scala.collection.mutable.ArraySeq<Object>) o).iterator();
} else {
iterator = ((ArraySeq<Object>) o).iterator();
}
while (iterator.hasNext()) {
Object value = iterator.next();

members.add(convertValueToItem(value, dataType, metadata, memberType));
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/rumbledb/parser/XQueryLexer.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated from ./src/main/java/org/rumbledb/parser/XQueryLexer.g4 by ANTLR 4.8
// Generated from ./src/main/java/org/rumbledb/parser/XQueryLexer.g4 by ANTLR 4.9.3

// Java header
package org.rumbledb.parser;
Expand All @@ -18,7 +18,7 @@

@SuppressWarnings({"all", "warnings", "unchecked", "unused", "cast"})
public class XQueryLexer extends Lexer {
static { RuntimeMetaData.checkVersion("4.8", RuntimeMetaData.VERSION); }
static { RuntimeMetaData.checkVersion("4.9.3", RuntimeMetaData.VERSION); }

protected static final DFA[] _decisionToDFA;
protected static final PredictionContextCache _sharedContextCache =
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/rumbledb/parser/XQueryParser.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated from ./src/main/java/org/rumbledb/parser/XQueryParser.g4 by ANTLR 4.8
// Generated from ./src/main/java/org/rumbledb/parser/XQueryParser.g4 by ANTLR 4.9.3

// Java header
package org.rumbledb.parser;
Expand All @@ -25,7 +25,7 @@

@SuppressWarnings({"all", "warnings", "unchecked", "unused", "cast"})
public class XQueryParser extends Parser {
static { RuntimeMetaData.checkVersion("4.8", RuntimeMetaData.VERSION); }
static { RuntimeMetaData.checkVersion("4.9.3", RuntimeMetaData.VERSION); }

protected static final DFA[] _decisionToDFA;
protected static final PredictionContextCache _sharedContextCache =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated from ./src/main/java/org/rumbledb/parser/XQueryParser.g4 by ANTLR 4.8
// Generated from ./src/main/java/org/rumbledb/parser/XQueryParser.g4 by ANTLR 4.9.3

// Java header
package org.rumbledb.parser;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated from ./src/main/java/org/rumbledb/parser/XQueryParser.g4 by ANTLR 4.8
// Generated from ./src/main/java/org/rumbledb/parser/XQueryParser.g4 by ANTLR 4.9.3

// Java header
package org.rumbledb.parser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import scala.collection.mutable.WrappedArray;
import scala.collection.immutable.ArraySeq;
import scala.collection.Iterator;
import sparksoniq.spark.SparkSessionManager;

public class FlworDataFrameUtils {
Expand Down Expand Up @@ -807,19 +808,20 @@ private static Object deserializeByteArray(byte[] toDeserialize, Kryo kryo, Inpu
}

public static void deserializeWrappedParameters(
WrappedArray<byte[]> wrappedParameters,
ArraySeq<byte[]> wrappedParameters,
List<List<Item>> deserializedParams,
Kryo kryo,
Input input
) {
Object[] serializedParams = (Object[]) wrappedParameters.array();
for (Object serializedParam : serializedParams) {
if (serializedParam == null) {
Iterator<byte[]> iterator = wrappedParameters.iterator();
while (iterator.hasNext()) {
byte[] bytes = iterator.next();
if (bytes == null) {
deserializedParams.add(Collections.emptyList());
continue;
}
@SuppressWarnings("unchecked")
List<Item> deserializedParam = (List<Item>) deserializeByteArray((byte[]) serializedParam, kryo, input);
List<Item> deserializedParam = (List<Item>) deserializeByteArray((byte[]) bytes, kryo, input);
deserializedParams.add(deserializedParam);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.rumbledb.api.Item;
import org.rumbledb.exceptions.OurBadException;

import scala.collection.mutable.WrappedArray;
import scala.collection.immutable.ArraySeq;
import scala.collection.Iterator;


import java.util.ArrayList;
import java.util.List;

public class GroupClauseArrayMergeAggregateResultsUDF implements UDF1<WrappedArray<Object>, Object[]> {
public class GroupClauseArrayMergeAggregateResultsUDF implements UDF1<ArraySeq<Object>, Object[]> {


private static final long serialVersionUID = 1L;
Expand All @@ -43,22 +45,23 @@ public GroupClauseArrayMergeAggregateResultsUDF() {
}

@Override
public Object[] call(WrappedArray<Object> wrappedParameters) {
public Object[] call(ArraySeq<Object> wrappedParameters) {
this.nextResult.clear();
this.deserializedParams.clear();
List<Object> result = new ArrayList<Object>();
Object[] insideArrays = (Object[]) wrappedParameters.array();
for (Object o : insideArrays) {
Iterator<Object> iterator = wrappedParameters.iterator();
while (iterator.hasNext()) {
Object o = iterator.next();
if (o instanceof Row) {
Row row = (Row) o;
result.add(row);
}
if (o instanceof WrappedArray) {
@SuppressWarnings("rawtypes")
WrappedArray wrappedArray = (WrappedArray) o;
Object[] insideArrays2 = (Object[]) wrappedArray.array();
for (Object p : insideArrays2)
result.add(p);
if (o instanceof ArraySeq) {
@SuppressWarnings("unchecked")
ArraySeq<Object> arraySeq = (ArraySeq<Object>) o;
Iterator<Object> iterator2 = arraySeq.iterator();
while (iterator2.hasNext())
result.add(iterator2.next());
} else {
throw new OurBadException("We cannot process " + o.getClass().getCanonicalName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.apache.spark.sql.api.java.UDF1;
import org.rumbledb.api.Item;
import org.rumbledb.runtime.flwor.FlworDataFrameUtils;
import scala.collection.mutable.WrappedArray;
import scala.collection.immutable.ArraySeq;

import java.util.ArrayList;
import java.util.List;

public class GroupClauseSerializeAggregateResultsUDF implements UDF1<WrappedArray<byte[]>, byte[]> {
public class GroupClauseSerializeAggregateResultsUDF implements UDF1<ArraySeq<byte[]>, byte[]> {


private static final long serialVersionUID = 1L;
Expand All @@ -43,7 +43,7 @@ public GroupClauseSerializeAggregateResultsUDF() {
}

@Override
public byte[] call(WrappedArray<byte[]> wrappedParameters) {
public byte[] call(ArraySeq<byte[]> wrappedParameters) {
this.nextResult.clear();
this.deserializedParams.clear();
FlworDataFrameUtils.deserializeWrappedParameters(
Expand Down
8 changes: 7 additions & 1 deletion src/test/java/iq/Bugs.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package iq;

import iq.base.AnnotationsTestsBase;
import scala.Function0;
import scala.util.Properties;

import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -51,7 +52,12 @@ public class Bugs extends AnnotationsTestsBase {
public static final String javaVersion =
System.getProperty("java.version");
public static final String scalaVersion =
Properties.scalaPropOrElse("version.number", "unknown");
Properties.scalaPropOrElse("version.number", new Function0<String>() {
@Override
public String apply() {
return "unknown";
}
});
protected static List<File> _testFiles = new ArrayList<>();
protected final File testFile;

Expand Down
8 changes: 7 additions & 1 deletion src/test/java/iq/DeltaUpdateRuntimeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.rumbledb.api.Item;
import org.rumbledb.api.SequenceOfItems;
import scala.util.Properties;
import scala.Function0;
import sparksoniq.spark.SparkSessionManager;
import utils.FileManager;

Expand All @@ -58,7 +59,12 @@ public class DeltaUpdateRuntimeTests extends AnnotationsTestsBase {
public static final String javaVersion =
System.getProperty("java.version");
public static final String scalaVersion =
Properties.scalaPropOrElse("version.number", "unknown");
Properties.scalaPropOrElse("version.number", new Function0<String>() {
@Override
public String apply() {
return "unknown";
}
});

public RumbleRuntimeConfiguration getConfiguration() {
return new RumbleRuntimeConfiguration(
Expand Down
11 changes: 10 additions & 1 deletion src/test/java/iq/RuntimeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import scala.util.Properties;
import sparksoniq.spark.SparkSessionManager;
import utils.FileManager;
import scala.Function0;
import scala.util.Properties;


import java.io.File;
import java.util.*;
Expand All @@ -51,7 +54,13 @@ public class RuntimeTests extends AnnotationsTestsBase {
public static final String javaVersion =
System.getProperty("java.version");
public static final String scalaVersion =
Properties.scalaPropOrElse("version.number", "unknown");
Properties.scalaPropOrElse("version.number", new Function0<String>() {
@Override
public String apply() {
return "unknown";
}
});

protected static List<File> _testFiles = new ArrayList<>();
protected final File testFile;

Expand Down
9 changes: 8 additions & 1 deletion src/test/java/iq/StaticTypeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.rumbledb.config.RumbleRuntimeConfiguration;

import iq.base.AnnotationsTestsBase;
import scala.Function0;
import scala.util.Properties;

import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -37,7 +38,13 @@ public class StaticTypeTests extends AnnotationsTestsBase {
public static final String javaVersion =
System.getProperty("java.version");
public static final String scalaVersion =
Properties.scalaPropOrElse("version.number", "unknown");
Properties.scalaPropOrElse("version.number", new Function0<String>() {
@Override
public String apply() {
return "unknown";
}
});

protected static List<File> _testFiles = new ArrayList<>();
protected final File testFile;

Expand Down
9 changes: 8 additions & 1 deletion src/test/java/iq/UpdatesForRumbleBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.rumbledb.exceptions.ExceptionMetadata;
import org.rumbledb.runtime.functions.input.FileSystemUtil;
import scala.util.Properties;
import scala.Function0;
import sparksoniq.spark.SparkSessionManager;

import java.io.BufferedWriter;
Expand All @@ -28,7 +29,13 @@ public class UpdatesForRumbleBenchmark {
public static final String javaVersion =
System.getProperty("java.version");
public static final String scalaVersion =
Properties.scalaPropOrElse("version.number", "unknown");
Properties.scalaPropOrElse("version.number", new Function0<String>() {
@Override
public String apply() {
return "unknown";
}
});


public List<FileTuple> benchmarkFiles;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(:JIQS: ShouldRun; Output="({ "label" : 0, "name" : "a", "prediction" : [ "3", "4", "2", "6", "5" ] }, { "label" : 1, "name" : "b", "prediction" : [ "3", "4", "6", "5", "1" ] }, { "label" : 2, "name" : "c", "prediction" : [ "4", "2", "6", "5", "1" ] }, { "label" : 3, "name" : "d", "prediction" : [ "3", "2", "6", "5", "1" ] }, { "label" : 4, "name" : "e", "prediction" : [ "3", "4", "2", "6", "1" ] }, { "label" : 5, "name" : "f", "prediction" : [ "3", "4", "2", "5", "1" ] })" :)
(:JIQS: ShouldRun; Output="({ "label" : 0, "name" : "a", "prediction" : [ "4", "2", "3", "6", "5" ] }, { "label" : 1, "name" : "b", "prediction" : [ "4", "3", "6", "5", "1" ] }, { "label" : 2, "name" : "c", "prediction" : [ "4", "2", "6", "5", "1" ] }, { "label" : 3, "name" : "d", "prediction" : [ "2", "3", "6", "5", "1" ] }, { "label" : 4, "name" : "e", "prediction" : [ "4", "2", "3", "6", "1" ] }, { "label" : 5, "name" : "f", "prediction" : [ "4", "2", "3", "5", "1" ] })" :)
let $data := annotate(
json-file("../../../../queries/rumbleML/sample-ml-data-flat.json"),
{ "label": "integer", "binaryLabel": "integer", "name": "string", "age": "double", "weight": "double", "booleanCol": "boolean", "nullCol": "null", "stringCol": "string", "stringArrayCol": ["string"], "intArrayCol": ["integer"], "doubleArrayCol": ["double"], "doubleArrayArrayCol": [["double"]] }
Expand Down
Loading
Loading