Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
APEXMALHAR-2369 S3 output-tuple-based
Browse files Browse the repository at this point in the history
  • Loading branch information
yogidevendra committed Feb 24, 2017
1 parent dd5341f commit ec7b480
Show file tree
Hide file tree
Showing 8 changed files with 1,259 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Maps;
import com.google.common.collect.Queues;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator.CheckpointNotificationListener;
Expand Down Expand Up @@ -76,6 +80,10 @@ public void process(INPUT input)
private transient volatile boolean execute;
private transient AtomicReference<Throwable> cause;

@AutoMetric
private long queueLength;


@Override
public void setup(Context.OperatorContext context)
{
Expand Down Expand Up @@ -175,6 +183,7 @@ public void run()
}
QUEUETUPLE output = waitingTuples.remove();
processCommittedData(output);
--queueLength;
doneTuples.add(output);
}
} catch (Throwable e) {
Expand All @@ -195,6 +204,7 @@ public void run()
protected void enqueueForProcessing(QUEUETUPLE queueTuple)
{
currentWindowTuples.get(currentWindowId).add(queueTuple);
++queueLength;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
private long nextMillis;
private long partitionNextMillis;
private boolean repartition;
private transient HashMap<Integer, BatchedOperatorStats> partitionedInstanceStatus = new HashMap<Integer, BatchedOperatorStats>();
protected transient HashMap<Integer, BatchedOperatorStats> partitionedInstanceStatus = new HashMap<Integer, BatchedOperatorStats>();
@Min(1)
private int initialPartitionCount = 1;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.apex.malhar.lib.fs;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

import javax.validation.constraints.NotNull;

import org.apache.hadoop.fs.Path;

import com.google.common.base.Preconditions;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;

/**
* This operator writes incoming tuples to files.
* MetaData about the files is emitted on the output port for downstream processing (if any)
*
* @param <INPUT>
* Type for incoming tuples. Converter needs to be defined which
* converts these tuples to byte[]. Default converters for String,
* byte[] tuples are provided in S3TupleOutputModule.
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public class FSRecordCompactionOperator<INPUT> extends GenericFileOutputOperator<INPUT>
{

/**
* Output port for emitting metadata for finalized files.
*/
public transient DefaultOutputPort<OutputMetaData> output = new DefaultOutputPort<OutputMetaData>();

/**
* Queue for holding finalized files for emitting on output port
*/
private Queue<OutputMetaData> emitQueue = new LinkedBlockingQueue<OutputMetaData>();

@NotNull
String outputDirectoryName = "COMPACTION_OUTPUT_DIR";

@NotNull
String outputFileNamePrefix = "tuples-";

public FSRecordCompactionOperator()
{
filePath = "";
outputFileName = outputFileNamePrefix;
maxLength = 128 * 1024 * 1024L;
}

@Override
public void setup(Context.OperatorContext context)
{
filePath = context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + outputDirectoryName;
outputFileName = outputFileNamePrefix + context.getValue(DAG.APPLICATION_ID);
super.setup(context);
}

@Override
protected void finalizeFile(String fileName) throws IOException
{
super.finalizeFile(fileName);

String src = filePath + Path.SEPARATOR + fileName;
Path srcPath = new Path(src);
long offset = fs.getFileStatus(srcPath).getLen();

//Add finalized files to the queue
OutputMetaData metaData = new OutputMetaData(src, fileName, offset);
//finalizeFile is called from committed callback.
//Tuples should be emitted only between beginWindow to endWindow. Thus using emitQueue.
emitQueue.add(metaData);
}

@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
//Emit finalized files from the queue
while (!emitQueue.isEmpty()) {
output.emit(emitQueue.poll());
}
}

public String getOutputDirectoryName()
{
return outputDirectoryName;
}

public void setOutputDirectoryName(@NotNull String outputDirectoryName)
{
this.outputDirectoryName = Preconditions.checkNotNull(outputDirectoryName);
}

public String getOutputFileNamePrefix()
{
return outputFileNamePrefix;
}

public void setOutputFileNamePrefix(@NotNull String outputFileNamePrefix)
{
this.outputFileNamePrefix = Preconditions.checkNotNull(outputFileNamePrefix);
}

/**
* Metadata for output file for downstream processing
*/
public static class OutputMetaData
{
private String path;
private String fileName;
private long size;

public OutputMetaData()
{
}

public OutputMetaData(String path, String fileName, long size)
{
this.path = path;
this.fileName = fileName;
this.size = size;
}

public String getPath()
{
return path;
}

public void setPath(String path)
{
this.path = path;
}

public String getFileName()
{
return fileName;
}

public void setFileName(String fileName)
{
this.fileName = fileName;
}

public long getSize()
{
return size;
}

public void setSize(long size)
{
this.size = size;
}
}

}
Loading

0 comments on commit ec7b480

Please sign in to comment.