Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/genomix/fullstack_genomix' into …
Browse files Browse the repository at this point in the history
…wbiesing/optimizations-from-profiling-BUILD_HYRACKS

Conflicts:
	genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/io/VertexValueWritable.java
	genomix/genomix-
pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/DeBruijnGraphCleanVertex.java
  • Loading branch information
jakebiesinger committed Dec 2, 2013
2 parents f3fb261 + 5cdf31f commit 17b8e0d
Show file tree
Hide file tree
Showing 215 changed files with 2,322 additions and 499 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@ output
tmp
dist
.idea
bin
graphviz
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,12 @@ Parallel genome assembly using Hyracks


[![Build Status](https://travis-ci.org/uci-cbcl/genomix.png?branch=genomix/fullstack_genomix)](https://travis-ci.org/uci-cbcl/genomix)

Acknowledgement
---------
YourKit is supporting Genomix open source project with its full-featured Java Profiler.
YourKit, LLC is the creator of innovative and intelligent tools for profiling
Java and .NET applications. Take a look at YourKit's leading software products:
<a href="http://www.yourkit.com/java/profiler/index.jsp">YourKit Java Profiler</a> and
<a href="http://www.yourkit.com/.net/profiler/index.jsp">YourKit .NET Profiler</a>.

Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,8 @@
import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;

/**
* <<<<<<< .working
* Left input is broadcast and preserves its local properties. Right input can
* be partitioned in any way.
* =======
* Left input is broadcast and preserves its local properties.
* Right input can be partitioned in any way.
* >>>>>>> .merge-right.r3014
*/
public class NLJoinPOperator extends AbstractJoinPOperator {

Expand Down Expand Up @@ -290,4 +285,4 @@ public int getTupleIndex() {
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ public void reset() {
}

@Override
public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
int tIndex, AggregateState state) throws HyracksDataException {
throw new IllegalStateException("this method should not be called");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {

@Override
public void close() throws HyracksDataException {
if(outputAppender.getTupleCount() > 0){
if (outputAppender.getTupleCount() > 0) {
FrameUtils.flushFrame(outputFrame, outputWriter);
outputAppender.reset(outputFrame, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ public void reset() {
}

@Override
public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
int tIndex, AggregateState state) throws HyracksDataException {
throw new IllegalStateException("this method should not be called");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHy
@Override
public void open() throws HyracksDataException {
if (frameSorter == null) {
frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
outputRecordDesc);
frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory,
comparatorFactories, outputRecordDesc);
}
frameSorter.reset();
writer.open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@
public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
public static int NO_DEFAULT_BRANCH = -1;

private final ICopyEvaluatorFactory[] evalFactories;
private final IBinaryBooleanInspector boolInspector;
private final int defaultBranchIndex;

public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec, ICopyEvaluatorFactory[] evalFactories,
IBinaryBooleanInspector boolInspector, int defaultBranchIndex, RecordDescriptor rDesc) {
super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length);
Expand All @@ -66,14 +66,15 @@ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
private final ByteBuffer[] writeBuffers = new ByteBuffer[outputArity];
private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(),
0);
private final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inOutRecDesc);
private final FrameTupleReference frameTuple = new FrameTupleReference();

private final FrameTupleAppender tupleAppender = new FrameTupleAppender(ctx.getFrameSize());
private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount());
private final DataOutput tupleDos = tupleBuilder.getDataOutput();

@Override
public void close() throws HyracksDataException {
// Flush (possibly not full) buffers that have data, and close writers.
Expand Down Expand Up @@ -102,28 +103,28 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
boolean found = false;
for (int j = 0; j < evals.length; j++) {
try {
evalBuf.reset();
evalBuf.reset();
evals[j].evaluate(frameTuple);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
found = boolInspector.getBooleanValue(evalBuf.getByteArray(), 0, 1);
if (found) {
copyAndAppendTuple(j);
break;
copyAndAppendTuple(j);
break;
}
}
// Optionally write to default output branch.
if (!found && defaultBranchIndex != NO_DEFAULT_BRANCH) {
copyAndAppendTuple(defaultBranchIndex);
copyAndAppendTuple(defaultBranchIndex);
}
}
}

private void copyAndAppendTuple(int outputIndex) throws HyracksDataException {
// Copy tuple into tuple builder.
// Copy tuple into tuple builder.
try {
tupleBuilder.reset();
tupleBuilder.reset();
for (int i = 0; i < frameTuple.getFieldCount(); i++) {
tupleDos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
frameTuple.getFieldLength(i));
Expand All @@ -134,15 +135,17 @@ private void copyAndAppendTuple(int outputIndex) throws HyracksDataException {
}
// Append to frame.
tupleAppender.reset(writeBuffers[outputIndex], false);
if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
FrameUtils.flushFrame(writeBuffers[outputIndex], writers[outputIndex]);
tupleAppender.reset(writeBuffers[outputIndex], true);
if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
throw new IllegalStateException();
}
}
}

@Override
public void open() throws HyracksDataException {
for (IFrameWriter writer : writers) {
Expand All @@ -155,13 +158,13 @@ public void open() throws HyracksDataException {
tupleAppender.reset(writeBuffers[i], true);
}
// Create evaluators for partitioning.
try {
for (int i = 0; i < evalFactories.length; i++) {
evals[i] = evalFactories[i].createEvaluator(evalBuf);
}
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
try {
for (int i = 0; i < evalFactories.length; i++) {
evals[i] = evalFactories[i].createEvaluator(evalBuf);
}
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
}

@Override
Expand All @@ -171,4 +174,3 @@ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescripto
};
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,64 @@ public int editDistance(VKmer other) {
return editDistance(this, other);
}

public int indexOf(VKmer pattern) {
return indexOf(this, pattern);
}

/**
* use KMP to fast detect whether master Vkmer contain pattern (only detect the first position which pattern match);
* if true return index, otherwise return -1;
* @param master
* @param pattern
* @return
*/
public static int indexOf(VKmer master, VKmer pattern) {
int patternSize = pattern.getKmerLetterLength();
int strSize = master.getKmerLetterLength();
int[] failureSet = computeFailureSet(pattern, patternSize);
int p = 0;
int m = 0;
while (p < patternSize && m < strSize) {
if (pattern.getGeneCodeAtPosition(p) == master.getGeneCodeAtPosition(m)) {
p++;
m++;
} else if (p == 0) {
m++;
} else {
p = failureSet[p - 1] + 1;
}
}
if (p < patternSize) {
return -1;
} else {
return m - patternSize;
}
}

/**
* compute the failure function of KMP algorithm
*
* @param failureSet
* @param pattern
* @return
*/
protected static int[] computeFailureSet(VKmer pattern, int patternSize) {
int[] failureSet = new int[patternSize];
int i = 0;
failureSet[0] = -1;
for (int j = 1; j < pattern.getKmerLetterLength(); j++) {
i = failureSet[j - 1];
while (i > 0 && pattern.getGeneCodeAtPosition(j) != pattern.getGeneCodeAtPosition(i + 1)) {
i = failureSet[i];
}
if (pattern.getGeneCodeAtPosition(j) == pattern.getGeneCodeAtPosition(i + 1)) {
failureSet[j] = i + 1;
} else
failureSet[j] = -1;
}
return failureSet;
}

/**
* return the fractional difference between the given kmers. This is the edit distance divided by the smaller length.
* Note: the fraction may be larger than 1 (when the edit distance is larger than the kmer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;

import junit.framework.Assert;
Expand Down Expand Up @@ -609,5 +610,67 @@ public void TestLargeKmerMergeRR() {
kmer1.mergeWithKmerInDir(EDGETYPE.RR, 9, kmer2);
Assert.assertEquals("Invalid RR merge!!!", "TTCACATACTATCCTGCGTACGC", kmer1.toString());
}

private static final char[] symbols = new char[4];
static {
symbols[0] = 'A';
symbols[1] = 'C';
symbols[2] = 'G';
symbols[3] = 'T';
}

public static String generateString(int length) {
Random random = new Random();
char[] buf = new char[length];
for (int idx = 0; idx < buf.length; idx++) {
buf[idx] = symbols[random.nextInt(4)];
}
return new String(buf);
}

@Test
public void TestIndexOfForShortRead(){
VKmer kmer1 = new VKmer("ACTATCCTGCGTACGC");
VKmer kmer2 = new VKmer("TGCGT");
Assert.assertEquals(7, kmer1.indexOf(kmer2));
VKmer kmer3 = new VKmer("ACTATCCTGCGTACGC");
VKmer kmer4 = new VKmer("TGCGA");
Assert.assertEquals(-1, kmer3.indexOf(kmer4));
VKmer kmer5 = new VKmer("ACTATCCTGCGTACGC");
VKmer kmer6 = new VKmer("ACGC");
Assert.assertEquals(12, kmer5.indexOf(kmer6));
VKmer kmer7 = new VKmer("ACTATCCTGCGTACGC");
VKmer kmer8 = new VKmer("ACTAC");
Assert.assertEquals(-1, kmer7.indexOf(kmer8));
}

@Test
public void TestIndexOfForLongRead(){
String testStr1 = generateString(100);
VKmer testKmer1 = new VKmer(testStr1);
String subStr1 = testStr1.substring(25, 80);
VKmer subKmer1 = new VKmer(subStr1);
Assert.assertEquals(25, testKmer1.indexOf(subKmer1));

String testStr2 = generateString(200);
VKmer testKmer2 = new VKmer(testStr2);
String subStr2 = testStr2.substring(100, 200);
VKmer subKmer2 = new VKmer(subStr2);
Assert.assertEquals(100, testKmer2.indexOf(subKmer2));

String testStr3 = generateString(300);
VKmer testKmer3 = new VKmer(testStr3);
VKmer subKmer3 = new VKmer();
for(int i = 0; i < 10; i++){
String subStr3 = testStr3.substring(40 + i * 3, 40 + i * 3 + 55);
subKmer3.setAsCopy(subStr3);
Assert.assertEquals(40 + i * 3, testKmer3.indexOf(subKmer3));
}

String testStr4 = generateString(55);
if(!testStr3.contains(testStr4)){
VKmer testKmer4 = new VKmer(testStr4);
Assert.assertEquals(-1, testKmer3.indexOf(testKmer4));
}
}
}
4 changes: 2 additions & 2 deletions genomix/genomix-driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
<repositoryName>../lib</repositoryName>
<configurationDirectory>etc:"$HADOOP_HOME"/conf:/etc/hadoop/conf</configurationDirectory>
<includeConfigurationDirectoryInClasspath>true</includeConfigurationDirectoryInClasspath>
<extraJvmArguments>-Djava.util.logging.config.file=@BASEDIR@/../conf/driver.logging.properties</extraJvmArguments>
<extraJvmArguments>-Djava.util.logging.config.file=@BASEDIR@/../conf/worker.logging.properties</extraJvmArguments>
</configuration>
<phase>package</phase>
<goals>
Expand Down Expand Up @@ -112,7 +112,7 @@
<repositoryName>../lib</repositoryName>
<configurationDirectory>etc:"$HADOOP_HOME"/conf:/etc/hadoop/conf</configurationDirectory>
<includeConfigurationDirectoryInClasspath>true</includeConfigurationDirectoryInClasspath>
<extraJvmArguments>-Djava.util.logging.config.file=@BASEDIR@/../conf/driver.logging.properties</extraJvmArguments>
<extraJvmArguments>-Djava.util.logging.config.file=@BASEDIR@/../conf/worker.logging.properties</extraJvmArguments>
</configuration>
<phase>package</phase>
<goals>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ CLASSPATH="${HADOOP_HOME}:${CLASSPATH}:."
# The FRAME_SIZE is the size in bytes of a single "page" used in an external merge-sort.
# The FRAME_LIMIT is the number of such pages to use in an in-memory sort buffer.
# By default, we use 64k * 4096 = 268MB buffer.
FRAME_SIZE=65535
FRAME_SIZE=1048560
FRAME_LIMIT=4096

# The number of jobs to keep logs for (logs are kept in-memory)
Expand Down
Loading

0 comments on commit 17b8e0d

Please sign in to comment.