Skip to content

Commit

Permalink
Merge branch 'master' into Spark3.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ghislainfourny authored Sep 26, 2024
2 parents fb040cf + ae77e46 commit 8e4460f
Show file tree
Hide file tree
Showing 381 changed files with 7,954 additions and 2,181 deletions.
23 changes: 23 additions & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,26 @@ jobs:
- name: MLTestsNativeDeactivated
run: mvn -Dtest=MLTestsNativeDeactivated test

tests4:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- name: Set up Java 11
uses: actions/setup-java@v3
with:
java-version: 11
distribution: adopt
- name: Cache Maven packages
uses: actions/cache@v3
with:
path: ~/.m2
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2
- name: Install with Maven
run: mvn install -DskipTests -Dgpg.skip --quiet
- name: Compile with Maven
run: mvn clean compile assembly:single
- name: DeltaUpdateRuntimeTests
run: mvn -Dtest=DeltaUpdateRuntimeTests test
9 changes: 7 additions & 2 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,13 @@ NativeFLWORRuntimeTestsParallelismDeactivated:
script:
- mvn -Dtest=NativeFLWORRuntimeTestsParallelismDeactivated test

StaticTypingTest:
stage: tests3
updatedeltaruntime-test:
stage: test
script:
- mvn -Dtest=DeltaUpdateRuntimeTests test

statictyping-test:
stage: test
script:
- mvn -Dtest=StaticTypeTests test

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ The documentation also contains an introduction specific to RumbleDB and how you

[The documentation of the current master (for the adventurous and curious) is available here.](http://sparksoniq.readthedocs.io/en/latest/)

RumbleDB is an effort involving many researchers and ETH Zurich students: code and support by Stefan Irimescu, Ghislain Fourny, Gustavo Alonso, Renato Marroquin, Rodrigo Bruno, Falko Noé, Ioana Stefan, Andrea Rinaldi, Stevan Mihajlovic, Mario Arduini, Can Berker Çıkış, Elwin Stephan, David Dao, Zirun Wang, Ingo Müller, Dan-Ovidiu Graur, Thomas Zhou, Olivier Goerens, Alexandru Meterez, Remo Röthlisberger, Dominik Bruggisser, David Loughlin.
RumbleDB is an effort involving many researchers and ETH Zurich students: code and support by Stefan Irimescu, Ghislain Fourny, Gustavo Alonso, Renato Marroquin, Rodrigo Bruno, Falko Noé, Ioana Stefan, Andrea Rinaldi, Stevan Mihajlovic, Mario Arduini, Can Berker Çıkış, Elwin Stephan, David Dao, Zirun Wang, Ingo Müller, Dan-Ovidiu Graur, Thomas Zhou, Olivier Goerens, Alexandru Meterez, Remo Röthlisberger, Dominik Bruggisser, David Loughlin, David Buzatu.
24 changes: 22 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,16 @@
</build>

<dependencies>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.37</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.37</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
Expand Down Expand Up @@ -257,7 +267,12 @@
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
Expand Down Expand Up @@ -287,7 +302,12 @@
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.15.2</version>
</dependency>
</dependencies>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>

<distributionManagement>
<snapshotRepository>
Expand Down
102 changes: 99 additions & 3 deletions src/main/java/org/rumbledb/api/Item.java
Original file line number Diff line number Diff line change
Expand Up @@ -692,17 +692,113 @@ default boolean isNaN() {
* @return an int representing nestedness of the item inside transform expressions.
*/
default int getMutabilityLevel() {
return 0;
return -1;
}

/**
* Sets the mutability level of the item.
* Sets the mutability level of the item to a supplied value.
*
* @param mutabilityLevel the new mutability level.
* @param mutabilityLevel new mutability level.
*/
default void setMutabilityLevel(int mutabilityLevel) {
}

/**
* Returns the top level ID of the item.
*
* @return int representing the rowID of the item within a DeltaFile.
*/
default long getTopLevelID() {
return -1;
}

/**
* Sets the top level ID of the item to a supplied value.
*
* @param topLevelID new top level ID.
*/
default void setTopLevelID(long topLevelID) {
}

/**
* Returns the path from the top level object of a DeltaFile for the item.
*
* @return String representing the path of the item from the top level within a DeltaFile.
*/
default String getPathIn() {
return "null";
}

/**
* Sets the path from the top level object of a DeltaFile for the item to a supplied value.
*
* @param pathIn new path from top level.
*/
default void setPathIn(String pathIn) {
}

/**
* Returns the location of the DeltaFile for the item.
*
* @return String representing the location of the DeltaFile for the item.
*/
default String getTableLocation() {
return null;
}


/**
* Sets the location of the DeltaFile for the item to a supplied value.
*
* @param location new location of the DeltaFile for the item.
*/
default void setTableLocation(String location) {
}

/**
* Returns the SparkSQL value of the item for use in a query.
*
* @return String representing the SparkSQL value of the item.
*/
default String getSparkSQLValue() {
throw new UnsupportedOperationException("Operation not defined for type " + this.getDynamicType());
}

/**
* Returns the SparkSQL value of the item for use in a query.
*
* @return String representing the SparkSQL value of the item.
*/
default String getSparkSQLValue(ItemType itemType) {
throw new UnsupportedOperationException("Operation not defined for type " + this.getDynamicType());
}

/**
* Returns the SparkSQL type of the item for use in a query.
*
* @return String representing the SparkSQL type of the item.
*/
default String getSparkSQLType() {
throw new UnsupportedOperationException("Operation not defined for type " + this.getDynamicType());
}

/**
* Tests for physical equality. The semantics are that of the eq operator.
*
* @param other another item.
* @return true it is equal to other, false otherwise.
*/
default boolean physicalEquals(Object other) {
if (!(other instanceof Item)) {
return false;
}
Item otherItem = (Item) other;
if (this.getTopLevelID() == -1 || otherItem.getTopLevelID() == -1) {
return System.identityHashCode(this) == System.identityHashCode(otherItem);
}
return this.getTopLevelID() == otherItem.getTopLevelID() && this.getPathIn().equals(otherItem.getPathIn());
}

/**
* Tests for logical equality. The semantics are that of the eq operator.
*
Expand Down
12 changes: 0 additions & 12 deletions src/main/java/org/rumbledb/api/Rumble.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.rumbledb.context.DynamicContext;
import org.rumbledb.expressions.module.MainModule;
import org.rumbledb.runtime.RuntimeIterator;
import org.rumbledb.runtime.update.PendingUpdateList;
import sparksoniq.spark.SparkSessionManager;

import java.io.IOException;
Expand Down Expand Up @@ -52,11 +51,6 @@ public SequenceOfItems runQuery(String query) {
this.configuration
);

if (iterator.isUpdating()) {
PendingUpdateList pul = iterator.getPendingUpdateList(dynamicContext);
pul.applyUpdates(iterator.getMetadata());
}

return new SequenceOfItems(iterator, dynamicContext, this.configuration);
}

Expand All @@ -78,12 +72,6 @@ public SequenceOfItems runQuery(URI location) throws IOException {
this.configuration
);

if (iterator.isUpdating()) {
PendingUpdateList pul = iterator.getPendingUpdateList(dynamicContext);
pul.applyUpdates(iterator.getMetadata());
}

System.err.println("final iterator is: " + iterator.isUpdating());
return new SequenceOfItems(iterator, dynamicContext, this.configuration);
}

Expand Down
55 changes: 52 additions & 3 deletions src/main/java/org/rumbledb/api/SequenceOfItems.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import org.apache.spark.sql.Row;
import org.rumbledb.config.RumbleRuntimeConfiguration;
import org.rumbledb.context.DynamicContext;
import org.rumbledb.items.ItemFactory;
import org.rumbledb.runtime.RuntimeIterator;

import org.rumbledb.runtime.update.PendingUpdateList;
import sparksoniq.spark.SparkSessionManager;

/**
Expand Down Expand Up @@ -51,7 +53,9 @@ public SequenceOfItems(
* Opens the iterator.
*/
public void open() {
this.iterator.open(this.dynamicContext);
if (this.isMaterialisable()) {
this.iterator.open(this.dynamicContext);
}
this.isOpen = true;
}

Expand All @@ -68,7 +72,9 @@ public boolean isOpen() {
* Closes the iterator.
*/
public void close() {
this.iterator.close();
if (this.isOpen) {
this.iterator.close();
}
this.isOpen = false;
}

Expand All @@ -78,6 +84,9 @@ public void close() {
* @return true if there are more items, false otherwise.
*/
public boolean hasNext() {
if (!this.isMaterialisable()) {
return false;
}
return this.iterator.hasNext();
}

Expand All @@ -88,6 +97,9 @@ public boolean hasNext() {
* @return the next item.
*/
public Item next() {
if (!this.isMaterialisable()) {
return ItemFactory.getInstance().createNullItem();
}
return this.iterator.next();
}

Expand All @@ -109,13 +121,34 @@ public boolean availableAsDataFrame() {
return this.iterator.isDataFrame();
}

/**
* Returns whether the iterator is updating
*
* @return true if updating; otherwise false.
*/
public boolean availableAsPUL() {
return this.iterator.isUpdating();
}

/**
* Return whether the iterator of the sequence should be evaluated to materialise the sequence of items.
*
* @return true if materialisable; otherwise false
*/
private boolean isMaterialisable() {
return !(this.availableAsPUL() && !this.iterator.isSequential());
}

/**
* Returns the sequence of items as an RDD of Items rather than iterating over them locally.
* It is not possible to do so if the iterator is open.
*
* @return an RDD of Items.
*/
public JavaRDD<Item> getAsRDD() {
if (!this.isMaterialisable()) {
return SparkSessionManager.getInstance().getJavaSparkContext().emptyRDD();
}
if (this.isOpen) {
throw new RuntimeException("Cannot obtain an RDD if the iterator is open.");
}
Expand All @@ -129,19 +162,33 @@ public JavaRDD<Item> getAsRDD() {
* @return a data frame.
*/
public Dataset<Row> getAsDataFrame() {
if (!this.isMaterialisable()) {
return SparkSessionManager.getInstance().getOrCreateSession().emptyDataFrame();
}
if (this.isOpen) {
throw new RuntimeException("Cannot obtain an RDD if the iterator is open.");
}
return this.iterator.getDataFrame(this.dynamicContext).getDataFrame();
}

/**
* Applies the PUL available when the iterator is updating.
*/
public void applyPUL() {
PendingUpdateList pul = this.iterator.getPendingUpdateList(this.dynamicContext);
pul.applyUpdates(this.iterator.getMetadata());
}

/*
* Populates a list of items with the output.
*
* @return -1 if successful. Returns Long.MAX_VALUE if there were more items beyond the materialization cap.
*/
public long populateList(List<Item> resultList) {
resultList.clear();
if (!this.isMaterialisable()) {
return -1;
}
this.iterator.open(this.dynamicContext);
Item result = null;
if (this.iterator.hasNext()) {
Expand Down Expand Up @@ -176,12 +223,14 @@ public long populateList(List<Item> resultList) {

public long populateListWithWarningOnlyIfCapReached(List<Item> resultList) {
if (this.availableAsRDD()) {
if (!this.isMaterialisable()) {
return -1;
}
JavaRDD<Item> rdd = this.iterator.getRDD(this.dynamicContext);
return SparkSessionManager.collectRDDwithLimitWarningOnly(rdd, resultList);
} else {
return populateList(resultList);
}
}


}
Loading

0 comments on commit 8e4460f

Please sign in to comment.