Skip to content

Commit

Permalink
Merge branch 'genomix/fullstack_genomix' into nanzhang/ray-genomix
Browse files Browse the repository at this point in the history
# By buyingyi (16) and others
# Via [email protected] (8) and others
* genomix/fullstack_genomix: (41 commits)
  update to new pregelix aggregator interface
  fix application lifecyle mgmt in hyracks nc
  fix the pinned page issue during a node failure
  Make sure the validty bit in the metadata page is flushed to disk when marking a component to be valid.
  NodeControllers clean up appEntryPoints on shutdown (2nd try)
  fix an issue found by Sattam
  Fix for issue 127.
  minor fix for heartbeat state population
  support multiple user-defined global aggregators
  reverted the change of removing adjacent exchange operators
  updated hivestrix test case for running aggregation fix
  Fixed a bug on unclosed running aggregation runtime; fixed an issue on two adjacent exchange operators (connectors) when duplicate sort operator is removed.
  Fixed the incorrect exchange merging introduced by the previous commit; updated the IntroHashPartitionMergeExchange rule to handle the hash-merge-exchange operator.
  Fixed a bug on omitted order by columns when added an exchange operator to enforce the group-by property.
  Fixed a bug on unclosed running aggregation runtime; fixed an issue on two adjacent exchange operators (connectors) when duplicate sort operator is removed.
  revert a minor change
  revert a minor change
  fix IIndexAccessor interface, add a boolean exclusiveMode parameter for the createSearchCursor method
  fix file write race condition
  Revert changes to InlineVariablesRule.
  ...
  • Loading branch information
Nan Zhang committed Nov 13, 2013
2 parents 547603f + d0b563f commit 8522c18
Show file tree
Hide file tree
Showing 154 changed files with 2,641 additions and 1,537 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
bin
target
.classpath
.settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
Expand Down Expand Up @@ -60,7 +62,6 @@ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent) {

return emptyUnaryRequirements();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext
foundTarget = false;
break;
}
if(child.getOperatorTag() == LogicalOperatorTag.GROUP){
foundTarget = false;
break;
}
if (orderSensitiveOps.contains(child.getOperatorTag())) {
orderSensitive = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ private boolean physOptimizeOp(Mutable<ILogicalOperator> opRef, IPhysicalPropert
// Now, transfer annotations from the original sort op. to this one.
AbstractLogicalOperator transferTo = nextOp;
if (transferTo.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
//
// remove duplicate exchange operator
transferTo = (AbstractLogicalOperator) transferTo.getInputs().get(0).getValue();
}
transferTo.getAnnotations().putAll(op.getAnnotations());
Expand Down Expand Up @@ -345,6 +347,13 @@ private List<OrderColumn> getOrderColumnsFromGroupingProperties(List<ILocalStruc
LocalOrderProperty orderProp = (LocalOrderProperty) dlvd.get(j);
returnedProperties.add(new OrderColumn(orderProp.getColumn(), orderProp.getOrder()));
}
// maintain other order columns after the required order columns
if(returnedProperties.size() != 0){
for(int j = prefix + 1; j < dlvdCols.size(); j++){
LocalOrderProperty orderProp = (LocalOrderProperty) dlvd.get(j);
returnedProperties.add(new OrderColumn(orderProp.getColumn(), orderProp.getOrder()));
}
}
return returnedProperties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext
throws AlgebricksException {
AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
if (op1.getPhysicalOperator() == null
|| op1.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.HASH_PARTITION_EXCHANGE) {
|| (op1.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.HASH_PARTITION_EXCHANGE && op1
.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.HASH_PARTITION_MERGE_EXCHANGE)) {
return false;
}
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
if (op2.getPhysicalOperator() == null
|| op2.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
return false;
}
if (op1.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.HASH_PARTITION_MERGE_EXCHANGE) {
// if it is a hash_partition_merge_exchange, the sort_merge_exchange can be simply removed
op1.getInputs().get(0).setValue(op2.getInputs().get(0).getValue());
op1.computeDeliveredPhysicalProperties(context);
return true;
}
HashPartitionExchangePOperator hpe = (HashPartitionExchangePOperator) op1.getPhysicalOperator();
SortMergeExchangePOperator sme = (SortMergeExchangePOperator) op2.getPhysicalOperator();
List<OrderColumn> ocList = new ArrayList<OrderColumn>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();

if (op.getOperatorTag() != LogicalOperatorTag.INNERJOIN
&& op.getOperatorTag() != LogicalOperatorTag.LEFTOUTERJOIN) {
if (op.getOperatorTag() != LogicalOperatorTag.INNERJOIN) {
return false;
}
AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,14 @@ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAc

@Override
public void close() {

for (int i = 0; i < pipelines.length; ++i) {
try {
outputWriter.setInputIdx(i);
pipelines[i].close();
} catch (HyracksDataException e) {
throw new IllegalStateException(e);
}
}
}
};
}
Expand Down Expand Up @@ -232,7 +239,10 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {

@Override
public void close() throws HyracksDataException {
// clearFrame();
if(outputAppender.getTupleCount() > 0){
FrameUtils.flushFrame(outputFrame, outputWriter);
outputAppender.reset(outputFrame, true);
}
}

public void setInputIdx(int inputIdx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(f

@Override
public void open() throws HyracksDataException {
if(!first){
if (!first) {
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(f
private IUnnestingEvaluator agg;
private ArrayTupleBuilder tupleBuilder;

private int tupleCount;
private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);

@Override
Expand All @@ -98,7 +97,6 @@ public void open() throws HyracksDataException {
throw new HyracksDataException(ae);
}
tupleBuilder = new ArrayTupleBuilder(projectionList.length);
tupleCount = 1;
writer.open();
}

Expand All @@ -120,6 +118,10 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {

try {
agg.init(tRef);
// assume that when unnesting the tuple, each step() call for each element
// in the tuple will increase the positionIndex, and the positionIndex will
// be reset when a new tuple is to be processed.
int positionIndex = 1;
boolean goon = true;
do {
tupleBuilder.reset();
Expand All @@ -146,7 +148,7 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (hasPositionalVariable) {
// Write the positional variable as an INT32
tupleBuilder.getDataOutput().writeByte(3);
tupleBuilder.getDataOutput().writeInt(offset + tupleCount++);
tupleBuilder.getDataOutput().writeInt(offset + positionIndex++);
tupleBuilder.addFieldEndOffset();
}
appendToFrameFromTupleBuilder(tupleBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void incrementCounter(byte counterName) {
public static HashMapWritable<ByteWritable, VLongWritable> readStatisticsCounterResult(Configuration conf) {
try {
VertexValueWritable value = (VertexValueWritable) IterationUtils.readGlobalAggregateValue(conf,
BspUtils.getJobId(conf));
BspUtils.getJobId(conf), StatisticsAggregator.class.getName());
return value.getCounters();
} catch (IOException e) {
throw new IllegalStateException(e);
Expand All @@ -169,7 +169,7 @@ public static PregelixJob getConfiguredJob(
else
job = new PregelixJob(conf, vertexClass.getSimpleName());
job.setVertexClass(vertexClass);
job.setGlobalAggregatorClass(StatisticsAggregator.class);
job.addGlobalAggregatorClass(StatisticsAggregator.class);
job.setVertexInputFormatClass(NodeToVertexInputFormat.class);
job.setVertexOutputFormatClass(VertexToNodeOutputFormat.class);
job.setOutputKeyClass(VKmer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class JobGenerator {
public static String outputBase = "src/test/resources/jobs/";

private static void configureJob(PregelixJob job) {
job.setGlobalAggregatorClass(StatisticsAggregator.class);
job.addGlobalAggregatorClass(StatisticsAggregator.class);
job.setVertexInputFormatClass(NodeToVertexInputFormat.class);
job.setVertexOutputFormatClass(VertexToNodeOutputFormat.class);
job.setDynamicVertexValueSize(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ write [%0->$$21, %0->$$24, %0->$$23]
}
-- PRE_CLUSTERED_GROUP_BY[$$17] |PARTITIONED|
exchange
-- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$17(ASC)] HASH:[$$17] |PARTITIONED|
-- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$17(ASC), $$18(ASC)] HASH:[$$17] |PARTITIONED|
group by ([$$17 := %0->$$1; $$18 := %0->$$3]) decor ([]) {
aggregate [$$20] <- [function-call: hive:max(PARTIAL1), Args:[%0->$$3]]
-- AGGREGATE |LOCAL|
Expand Down Expand Up @@ -50,7 +50,7 @@ write [%0->$$21, %0->$$24, %0->$$23]
}
-- PRE_CLUSTERED_GROUP_BY[$$17] |PARTITIONED|
exchange
-- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$17(ASC)] HASH:[$$17] |PARTITIONED|
-- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$17(ASC), $$18(ASC)] HASH:[$$17] |PARTITIONED|
group by ([$$17 := %0->$$1; $$18 := %0->$$3]) decor ([]) {
aggregate [$$20] <- [function-call: hive:max(PARTIAL1), Args:[%0->$$3]]
-- AGGREGATE |LOCAL|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface IJobSerializerDeserializerContainer {
* @param deploymentId
* @return
*/
public IJobSerializerDeserializer getJobSerializerDeerializer(DeploymentId deploymentId);
public IJobSerializerDeserializer getJobSerializerDeserializer(DeploymentId deploymentId);

/**
* Add a deployment with the job serializer deserializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@

package edu.uci.ics.hyracks.api.job;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import edu.uci.ics.hyracks.api.deployment.DeploymentId;

public class JobSerializerDeserializerContainer implements IJobSerializerDeserializerContainer {

private IJobSerializerDeserializer defaultJobSerDe = new JobSerializerDeserializer();
private Map<DeploymentId, IJobSerializerDeserializer> jobSerializerDeserializerMap = new HashMap<DeploymentId, IJobSerializerDeserializer>();
private Map<DeploymentId, IJobSerializerDeserializer> jobSerializerDeserializerMap = new ConcurrentHashMap<DeploymentId, IJobSerializerDeserializer>();

@Override
public synchronized IJobSerializerDeserializer getJobSerializerDeerializer(DeploymentId deploymentId) {
public synchronized IJobSerializerDeserializer getJobSerializerDeserializer(DeploymentId deploymentId) {
if (deploymentId == null) {
return defaultJobSerDe;
}
Expand All @@ -44,4 +44,9 @@ public synchronized void removeJobSerializerDeserializer(DeploymentId deployment
jobSerializerDeserializerMap.remove(deploymentId);
}

@Override
public String toString() {
return jobSerializerDeserializerMap.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ public static final class Config {

private final List<ILifeCycleComponent> components;
private boolean stopInitiated;
private boolean stopped;
private String dumpPath;
private boolean configured;

private LifeCycleComponentManager() {
components = new ArrayList<ILifeCycleComponent>();
stopInitiated = false;
configured = false;
stopped = false;
}

@Override
Expand Down Expand Up @@ -76,6 +78,12 @@ public synchronized void stopAll(boolean dumpState) throws IOException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.severe("Attempting to stop " + this);
}
if (stopped) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.severe("Lifecycle management was already stopped");
}
return;
}
if (stopInitiated) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.severe("Stop already in progress");
Expand Down Expand Up @@ -124,7 +132,7 @@ public synchronized void stopAll(boolean dumpState) throws IOException {
}
}
stopInitiated = false;

stopped = true;
}

@Override
Expand All @@ -142,4 +150,8 @@ public void configure(Map<String, String> configuration) {
configured = true;
}

public boolean stoppedAll() {
return stopped;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ public void notifyHeartbeat(HeartbeatData hbData) {
ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
rrdPtr = (rrdPtr + 1) % RRD_SIZE;
}
rrdPtr = (rrdPtr + 1) % RRD_SIZE;
}

public int incrementLastHeartbeatDuration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ public abstract class ApplicationContext implements IApplicationContext {
protected IJobSerializerDeserializerContainer jobSerDeContainer = new JobSerializerDeserializerContainer();
protected ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r);
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
* This is the IJobSerializerDeserializer implementation for jobs with dynamic deployed jars.
*
* @author yingyib
*
*/
public class ClassLoaderJobSerializerDeserializer implements IJobSerializerDeserializer {

Expand Down Expand Up @@ -99,4 +98,9 @@ public Class<?> loadClass(String className) throws HyracksException {
public ClassLoader getClassLoader() throws HyracksException {
return classLoader;
}

@Override
public String toString() {
return classLoader.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static void undeploy(DeploymentId deploymentId, IJobSerializerDeserialize
*/
public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container,
ServerContext ctx, boolean isNC) throws HyracksException {
IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeerializer(deploymentId);
IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeserializer(deploymentId);
if (jobSerDe == null) {
jobSerDe = new ClassLoaderJobSerializerDeserializer();
container.addJobSerializerDeserializer(deploymentId, jobSerDe);
Expand All @@ -116,7 +116,7 @@ public static Object deserialize(byte[] bytes, DeploymentId deploymentId, IAppli
try {
IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
.getJobSerializerDeerializer(deploymentId);
.getJobSerializerDeserializer(deploymentId);
Object obj = jobSerDe == null ? JavaSerializationUtils.deserialize(bytes) : jobSerDe.deserialize(bytes);
return obj;
} catch (Exception e) {
Expand All @@ -138,7 +138,7 @@ public static Class<?> loadClass(String className, DeploymentId deploymentId, IA
try {
IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
.getJobSerializerDeerializer(deploymentId);
.getJobSerializerDeserializer(deploymentId);
Class<?> cl = jobSerDe == null ? JavaSerializationUtils.loadClass(className) : jobSerDe
.loadClass(className);
return cl;
Expand All @@ -160,7 +160,7 @@ public static ClassLoader getClassLoader(DeploymentId deploymentId, IApplication
try {
IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
.getJobSerializerDeerializer(deploymentId);
.getJobSerializerDeserializer(deploymentId);
ClassLoader cl = jobSerDe == null ? DeploymentUtils.class.getClassLoader() : jobSerDe.getClassLoader();
return cl;
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 8522c18

Please sign in to comment.