Skip to content

Commit

Permalink
Kryo serialization for sequence types and signatures.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ghislain Fourny committed Feb 29, 2024
1 parent 89fba0d commit e931509
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 26 deletions.
38 changes: 14 additions & 24 deletions src/main/java/org/rumbledb/items/FunctionItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,19 +231,18 @@ public int hashCode() {
public void write(Kryo kryo, Output output) {
kryo.writeObject(output, this.identifier);
kryo.writeObject(output, this.parameterNames);
try {
byte[] data = SerializationUtils.serialize(this.signature);
output.writeInt(data.length);
output.writeBytes(data);
} catch (Exception e) {
kryo.writeObject(output, this.signature);
kryo.writeObject(output, this.localVariablesInClosure);
if (!this.RDDVariablesInClosure.isEmpty()) {
throw new OurBadException(
"Error serializing signature:" + e.getMessage()
"We do not support serializing RDDs in function closures."
);
}
if (!this.dataFrameVariablesInClosure.isEmpty()) {
throw new OurBadException(
"We do not support serializing DataFrames in function closures."
);
}

// kryo.writeObject(output, this.localVariablesInClosure);
// kryo.writeObject(output, this.RDDVariablesInClosure);
// kryo.writeObject(output, this.dataFrameVariablesInClosure);
// kryo.writeObject(output, this.dynamicModuleContext);

// convert RuntimeIterator to byte[] data
Expand All @@ -263,21 +262,12 @@ public void write(Kryo kryo, Output output) {
public void read(Kryo kryo, Input input) {
this.identifier = kryo.readObject(input, FunctionIdentifier.class);
this.parameterNames = kryo.readObject(input, ArrayList.class);
try {
int dataLength = input.readInt();
byte[] data = input.readBytes(dataLength);
this.signature = SerializationUtils.deserialize(data);
} catch (Exception e) {
e.printStackTrace();
throw new OurBadException(
"Error deserializing parameter types:" + e.getMessage()
);
}
// this.bodyIterator = kryo.readObject(input, RuntimeIterator.class);
// this.localVariablesInClosure = kryo.readObject(input, HashMap.class);
// this.RDDVariablesInClosure = kryo.readObject(input, HashMap.class);
// this.dataFrameVariablesInClosure = kryo.readObject(input, HashMap.class);
this.signature = kryo.readObject(input, FunctionSignature.class);
this.localVariablesInClosure = kryo.readObject(input, HashMap.class);
this.RDDVariablesInClosure = new HashMap<>();
this.dataFrameVariablesInClosure = new HashMap<>();
// this.dynamicModuleContext = kryo.readObject(input, DynamicContext.class);
// this.bodyIterator = kryo.readObject(input, RuntimeIterator.class);

try {
int dataLength = input.readInt();
Expand Down
27 changes: 26 additions & 1 deletion src/main/java/org/rumbledb/types/FunctionSignature.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@
package org.rumbledb.types;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class FunctionSignature implements Serializable {
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

public class FunctionSignature implements Serializable, KryoSerializable {
private List<SequenceType> parameterTypes;
private SequenceType returnType;
private static final long serialVersionUID = 1L;
Expand All @@ -36,6 +42,11 @@ public FunctionSignature(
this.returnType = returnType;
}

public FunctionSignature() {
this.parameterTypes = new ArrayList<>();
this.returnType = SequenceType.ITEM_STAR;
}


public List<SequenceType> getParameterTypes() {
return this.parameterTypes;
Expand Down Expand Up @@ -91,4 +102,18 @@ public String toString() {
sb.append(this.returnType);
return sb.toString();
}


@Override
public void write(Kryo kryo, Output output) {
kryo.writeObject(output, this.parameterTypes);
kryo.writeObject(output, this.returnType);
}

@SuppressWarnings("unchecked")
@Override
public void read(Kryo kryo, Input input) {
this.parameterTypes = kryo.readObject(input, ArrayList.class);
this.returnType = kryo.readObject(input, SequenceType.class);
}
}
19 changes: 18 additions & 1 deletion src/main/java/org/rumbledb/types/SequenceType.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@
import org.rumbledb.exceptions.ExceptionMetadata;
import org.rumbledb.exceptions.OurBadException;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SequenceType implements Serializable {
public class SequenceType implements Serializable, KryoSerializable {

private static final long serialVersionUID = 1L;
private ItemType itemType;
Expand Down Expand Up @@ -738,5 +743,17 @@ public static SequenceType createSequenceType(String userFriendlyName) {
}


@Override
public void write(Kryo kryo, Output output) {
// kryo.writeObject(output, this.itemType);
kryo.writeObject(output, this.arity);
}

@SuppressWarnings("unchecked")
@Override
public void read(Kryo kryo, Input input) {
// this.itemType = kryo.readObject(input, ItemType.class);
this.arity = kryo.readObject(input, Arity.class);
}

}

0 comments on commit e931509

Please sign in to comment.