Skip to content

Commit

Permalink
Fix function serialization.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ghislain Fourny committed Feb 28, 2024
1 parent 1720afe commit 89fba0d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 26 deletions.
53 changes: 31 additions & 22 deletions src/main/java/org/rumbledb/items/FunctionItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.SerializationUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.Estimator;
import org.apache.spark.ml.Transformer;
Expand Down Expand Up @@ -230,21 +231,24 @@ public int hashCode() {
public void write(Kryo kryo, Output output) {
kryo.writeObject(output, this.identifier);
kryo.writeObject(output, this.parameterNames);
kryo.writeObject(output, this.signature.getParameterTypes());
kryo.writeObject(output, this.signature.getReturnType());
// kryo.writeObject(output, this.bodyIterator);
kryo.writeObject(output, this.localVariablesInClosure);
kryo.writeObject(output, this.RDDVariablesInClosure);
kryo.writeObject(output, this.dataFrameVariablesInClosure);
kryo.writeObject(output, this.dynamicModuleContext);
try {
byte[] data = SerializationUtils.serialize(this.signature);
output.writeInt(data.length);
output.writeBytes(data);
} catch (Exception e) {
throw new OurBadException(
"Error serializing signature:" + e.getMessage()
);
}

// kryo.writeObject(output, this.localVariablesInClosure);
// kryo.writeObject(output, this.RDDVariablesInClosure);
// kryo.writeObject(output, this.dataFrameVariablesInClosure);
// kryo.writeObject(output, this.dynamicModuleContext);

// convert RuntimeIterator to byte[] data
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(this.bodyIterator);
oos.flush();
byte[] data = bos.toByteArray();
byte[] data = SerializationUtils.serialize(this.bodyIterator);
output.writeInt(data.length);
output.writeBytes(data);
} catch (Exception e) {
Expand All @@ -259,21 +263,26 @@ public void write(Kryo kryo, Output output) {
public void read(Kryo kryo, Input input) {
this.identifier = kryo.readObject(input, FunctionIdentifier.class);
this.parameterNames = kryo.readObject(input, ArrayList.class);
List<SequenceType> parameters = kryo.readObject(input, ArrayList.class);
SequenceType returnType = kryo.readObject(input, SequenceType.class);
this.signature = new FunctionSignature(parameters, returnType);
try {
int dataLength = input.readInt();
byte[] data = input.readBytes(dataLength);
this.signature = SerializationUtils.deserialize(data);
} catch (Exception e) {
e.printStackTrace();
throw new OurBadException(
"Error deserializing parameter types:" + e.getMessage()
);
}
// this.bodyIterator = kryo.readObject(input, RuntimeIterator.class);
this.localVariablesInClosure = kryo.readObject(input, HashMap.class);
this.RDDVariablesInClosure = kryo.readObject(input, HashMap.class);
this.dataFrameVariablesInClosure = kryo.readObject(input, HashMap.class);
this.dynamicModuleContext = kryo.readObject(input, DynamicContext.class);
// this.localVariablesInClosure = kryo.readObject(input, HashMap.class);
// this.RDDVariablesInClosure = kryo.readObject(input, HashMap.class);
// this.dataFrameVariablesInClosure = kryo.readObject(input, HashMap.class);
// this.dynamicModuleContext = kryo.readObject(input, DynamicContext.class);

try {
int dataLength = input.readInt();
byte[] data = input.readBytes(dataLength);
ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bis);
this.bodyIterator = (RuntimeIterator) ois.readObject();
this.bodyIterator = SerializationUtils.deserialize(data);
} catch (Exception e) {
throw new OurBadException(
"Error converting functionItem-bodyRuntimeIterator to functionItem:" + e.getMessage()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,12 @@ public void setFromRow(Row row, ItemType itemType) {
}
if (!column.isCount()) {
List<Item> i = readColumnAsSequenceOfItems(row, itemType, columnIndex);
for (Item j : i) {
System.err.println(j.getDynamicType());
System.err.println(j.serialize());
}
/*
* for (Item j : i) {
* System.err.println(j.getDynamicType());
* System.err.println(j.serialize());
* }
*/
this.context.getVariableValues()
.addVariableValue(
column.getVariableName(),
Expand Down

0 comments on commit 89fba0d

Please sign in to comment.