Skip to content

Commit

Permalink
Fix test.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ghislain Fourny committed Mar 5, 2024
1 parent 7696551 commit 52adf34
Show file tree
Hide file tree
Showing 13 changed files with 552 additions and 482 deletions.
8 changes: 8 additions & 0 deletions src/main/java/org/rumbledb/context/DynamicContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,21 @@ public VariableValues getVariableValues() {
@Override
public void write(Kryo kryo, Output output) {
kryo.writeObjectOrNull(output, this.parent, DynamicContext.class);
kryo.writeObject(output, this.conf);
kryo.writeObject(output, this.variableValues);
// kryo.writeObject(output, this.namedFunctions);
kryo.writeObject(output, this.inScopeSchemaTypes);
kryo.writeObject(output, this.currentDateTime.getMillis());
}

@Override
public void read(Kryo kryo, Input input) {
this.parent = kryo.readObjectOrNull(input, DynamicContext.class);
this.conf = kryo.readObject(input, RumbleRuntimeConfiguration.class);
this.variableValues = kryo.readObject(input, VariableValues.class);
this.namedFunctions = new NamedFunctions(this.conf);
this.inScopeSchemaTypes= kryo.readObject(input, InScopeSchemaTypes.class);
this.currentDateTime = new DateTime(kryo.readObject(input, Long.class));
}

public enum VariableDependency {
Expand Down
30 changes: 29 additions & 1 deletion src/main/java/org/rumbledb/context/RuntimeStaticContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,26 @@
import org.rumbledb.expressions.ExecutionMode;
import org.rumbledb.types.SequenceType;

public class RuntimeStaticContext implements Serializable {
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

public class RuntimeStaticContext implements Serializable, KryoSerializable {
private static final long serialVersionUID = 1L;

private RumbleRuntimeConfiguration configuration;
private SequenceType staticType;
private ExecutionMode executionMode;
private ExceptionMetadata metadata;

public RuntimeStaticContext() {
this.configuration = null;
this.staticType = null;
this.executionMode = null;
this.metadata = null;
}

public RuntimeStaticContext(
RumbleRuntimeConfiguration configuration,
SequenceType staticType,
Expand Down Expand Up @@ -62,4 +74,20 @@ public ExceptionMetadata getMetadata() {
return this.metadata;
}

@Override
public void write(Kryo kryo, Output output) {
kryo.writeObject(output, this.configuration);
kryo.writeObject(output, this.staticType);
kryo.writeObject(output, this.executionMode);
kryo.writeObject(output, this.metadata);
}

@Override
public void read(Kryo kryo, Input input) {
this.configuration = kryo.readObject(input, RumbleRuntimeConfiguration.class);
this.staticType = kryo.readObject(input, SequenceType.class);
this.executionMode = kryo.readObject(input, ExecutionMode.class);
this.metadata = kryo.readObject(input, ExceptionMetadata.class);
}

}
42 changes: 37 additions & 5 deletions src/main/java/org/rumbledb/exceptions/ExceptionMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,36 @@

import java.io.Serializable;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

/**
* Metadata for error reporting (line and column number)
*
* @author Stefan Irimescu, Ghislain Fourny
*/
public class ExceptionMetadata implements Serializable {
public class ExceptionMetadata implements Serializable, KryoSerializable {

private String location;
private int tokenLineNumber;
private int tokenColumnNumber;
private String code;

private static final long serialVersionUID = 1L;
private final String location;
private final int tokenLineNumber;
private final int tokenColumnNumber;
private final String code;
public static final ExceptionMetadata EMPTY_METADATA = new ExceptionMetadata("none", 1, 0, "");

/**
* Builds a new empty metadata object (for serialization and deserialization only)
*/
public ExceptionMetadata() {
this.location = "";
this.tokenLineNumber = -1;
this.tokenColumnNumber = -1;
this.code = "";
}

/**
* Builds a new metadata object
*
Expand Down Expand Up @@ -118,4 +134,20 @@ public String toString() {
+ getTokenColumnNumber()
+ ":";
}

@Override
public void write(Kryo kryo, Output output) {
kryo.writeObject(output, this.location);
kryo.writeObject(output, this.tokenLineNumber);
kryo.writeObject(output, this.tokenColumnNumber);
kryo.writeObject(output, this.code);
}

@Override
public void read(Kryo kryo, Input input) {
this.location = kryo.readObject(input, String.class);
this.tokenLineNumber= kryo.readObject(input, Integer.class);
this.tokenColumnNumber = kryo.readObject(input, Integer.class);
this.code = kryo.readObject(input, String.class);
}
}
35 changes: 5 additions & 30 deletions src/main/java/org/rumbledb/items/FunctionItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@
public class FunctionItem implements Item {

private static final long serialVersionUID = 1L;

private FunctionIdentifier identifier;
private List<Name> parameterNames;

// signature contains type information for all parameters and the return value
private FunctionSignature signature;
private RuntimeIterator bodyIterator;
private DynamicContext dynamicModuleContext;
Expand Down Expand Up @@ -242,20 +241,8 @@ public void write(Kryo kryo, Output output) {
"We do not support serializing DataFrames in function closures."
);
}
// kryo.writeObject(output, this.dynamicModuleContext);

// convert RuntimeIterator to byte[] data
/*
* try {
* byte[] data = SerializationUtils.serialize(this.bodyIterator);
* output.writeInt(data.length);
* output.writeBytes(data);
* } catch (Exception e) {
* throw new OurBadException(
* "Error converting functionItem-bodyRuntimeIterator to byte[]:" + e.getMessage()
* );
* }
*/
kryo.writeObject(output, this.dynamicModuleContext);
kryo.writeClassAndObject(output, this.bodyIterator);
}

@SuppressWarnings("unchecked")
Expand All @@ -267,20 +254,8 @@ public void read(Kryo kryo, Input input) {
this.localVariablesInClosure = kryo.readObject(input, HashMap.class);
this.RDDVariablesInClosure = new HashMap<>();
this.dataFrameVariablesInClosure = new HashMap<>();
// this.dynamicModuleContext = kryo.readObject(input, DynamicContext.class);
// this.bodyIterator = kryo.readObject(input, RuntimeIterator.class);

/*
* try {
* int dataLength = input.readInt();
* byte[] data = input.readBytes(dataLength);
* this.bodyIterator = SerializationUtils.deserialize(data);
* } catch (Exception e) {
* throw new OurBadException(
* "Error converting functionItem-bodyRuntimeIterator to functionItem:" + e.getMessage()
* );
* }
*/
this.dynamicModuleContext = kryo.readObject(input, DynamicContext.class);
this.bodyIterator = (RuntimeIterator) kryo.readClassAndObject(input);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
import org.rumbledb.expressions.comparison.ComparisonExpression.ComparisonOperator;
import org.rumbledb.types.BuiltinTypesCatalogue;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import sparksoniq.spark.SparkSessionManager;

import org.rumbledb.runtime.misc.ComparisonIterator;
Expand All @@ -47,6 +51,11 @@ public abstract class AtMostOneItemLocalRuntimeIterator extends RuntimeIterator
private static final long serialVersionUID = 1L;
private Item result;

protected AtMostOneItemLocalRuntimeIterator() {
super();
this.result = null;
}

protected AtMostOneItemLocalRuntimeIterator(
List<RuntimeIterator> children,
RuntimeStaticContext staticContext
Expand Down Expand Up @@ -186,4 +195,14 @@ public boolean getEffectiveBooleanValueOrCheckPosition(DynamicContext dynamicCon
getMetadata()
);
}

@Override
public void write(Kryo kryo, Output output) {
super.write(kryo, output);
}

@Override
public void read(Kryo kryo, Input input) {
super.read(kryo, input);
}
}
6 changes: 6 additions & 0 deletions src/main/java/org/rumbledb/runtime/HybridRuntimeIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public abstract class HybridRuntimeIterator extends RuntimeIterator {
protected List<Item> result = null;
private int currentResultIndex = 0;

protected HybridRuntimeIterator() {
super();
this.result = null;
this.currentResultIndex = 0;
}

protected HybridRuntimeIterator(
List<RuntimeIterator> children,
RuntimeStaticContext staticContext
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/rumbledb/runtime/RuntimeIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ public abstract class RuntimeIterator implements RuntimeIteratorInterface, KryoS
protected URI staticURI;
// private StaticContext staticContext;

public RuntimeIterator() {
this.hasNext = false;
this.isOpen = false;
this.children = null;
this.currentDynamicContextForLocalExecution = null;
this.staticContext = null;
this.staticURI = null;
}

protected RuntimeIterator(List<RuntimeIterator> children, RuntimeStaticContext staticContext) {
this.staticContext = staticContext;
if (this.staticContext.getStaticType() == null) {
Expand Down Expand Up @@ -220,6 +229,7 @@ public void reset(DynamicContext context) {

@Override
public void write(Kryo kryo, Output output) {
kryo.writeObject(output, this.staticContext);
kryo.writeObject(output, this.children);
// TODO serializer other fields
}
Expand All @@ -230,6 +240,7 @@ public void read(Kryo kryo, Input input) {
this.hasNext = false;
this.isOpen = false;
this.currentDynamicContextForLocalExecution = null;
this.staticContext = kryo.readObject(input, RuntimeStaticContext.class);
this.children = kryo.readObject(input, ArrayList.class);
// TODO serializer other fields
}
Expand Down
Loading

0 comments on commit 52adf34

Please sign in to comment.