diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java index 0d67c317c7..78f340e453 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java @@ -25,10 +25,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.google.common.collect.Maps; import com.google.common.collect.Queues; + +import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.Operator.CheckpointNotificationListener; @@ -76,6 +80,10 @@ public void process(INPUT input) private transient volatile boolean execute; private transient AtomicReference cause; + @AutoMetric + private long queueLength; + + @Override public void setup(Context.OperatorContext context) { @@ -175,6 +183,7 @@ public void run() } QUEUETUPLE output = waitingTuples.remove(); processCommittedData(output); + --queueLength; doneTuples.add(output); } } catch (Throwable e) { @@ -195,6 +204,7 @@ public void run() protected void enqueueForProcessing(QUEUETUPLE queueTuple) { currentWindowTuples.get(currentWindowId).add(queueTuple); + ++queueLength; } /** diff --git a/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java b/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java index ed571b40a9..bf648c7739 100644 --- a/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java +++ b/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java @@ -64,7 +64,7 @@ public abstract class StatsAwareStatelessPartitioner impleme private long nextMillis; private long partitionNextMillis; private boolean repartition; - private transient HashMap partitionedInstanceStatus = new HashMap(); + protected transient HashMap partitionedInstanceStatus = new HashMap(); @Min(1) private int initialPartitionCount = 1; diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperator.java new file mode 100644 index 0000000000..1dd12c4d42 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperator.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.validation.constraints.NotNull; + +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; + +/** + * This operator writes incoming tuples to files. + * MetaData about the files is emitted on the output port for downstream processing (if any) + * + * @param + * Type for incoming tuples. Converter needs to be defined which + * converts these tuples to byte[]. Default converters for String, + * byte[] tuples are provided in S3TupleOutputModule. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordCompactionOperator extends GenericFileOutputOperator +{ + + /** + * Output port for emitting metadata for finalized files. + */ + public transient DefaultOutputPort output = new DefaultOutputPort(); + + /** + * Queue for holding finalized files for emitting on output port + */ + private Queue emitQueue = new LinkedBlockingQueue(); + + @NotNull + String outputDirectoryName = "COMPACTION_OUTPUT_DIR"; + + @NotNull + String outputFileNamePrefix = "tuples-"; + + public FSRecordCompactionOperator() + { + filePath = ""; + outputFileName = outputFileNamePrefix; + maxLength = 128 * 1024 * 1024L; + } + + @Override + public void setup(Context.OperatorContext context) + { + filePath = context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + outputDirectoryName; + outputFileName = outputFileNamePrefix + context.getValue(DAG.APPLICATION_ID); + super.setup(context); + } + + @Override + protected void finalizeFile(String fileName) throws IOException + { + super.finalizeFile(fileName); + + String src = filePath + Path.SEPARATOR + fileName; + Path srcPath = new Path(src); + long offset = fs.getFileStatus(srcPath).getLen(); + + //Add finalized files to the queue + OutputMetaData metaData = new OutputMetaData(src, fileName, offset); + //finalizeFile is called from committed callback. + //Tuples should be emitted only between beginWindow to endWindow. Thus using emitQueue. + emitQueue.add(metaData); + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + //Emit finalized files from the queue + while (!emitQueue.isEmpty()) { + output.emit(emitQueue.poll()); + } + } + + public String getOutputDirectoryName() + { + return outputDirectoryName; + } + + public void setOutputDirectoryName(@NotNull String outputDirectoryName) + { + this.outputDirectoryName = Preconditions.checkNotNull(outputDirectoryName); + } + + public String getOutputFileNamePrefix() + { + return outputFileNamePrefix; + } + + public void setOutputFileNamePrefix(@NotNull String outputFileNamePrefix) + { + this.outputFileNamePrefix = Preconditions.checkNotNull(outputFileNamePrefix); + } + + /** + * Metadata for output file for downstream processing + */ + public static class OutputMetaData + { + private String path; + private String fileName; + private long size; + + public OutputMetaData() + { + } + + public OutputMetaData(String path, String fileName, long size) + { + this.path = path; + this.fileName = fileName; + this.size = size; + } + + public String getPath() + { + return path; + } + + public void setPath(String path) + { + this.path = path; + } + + public String getFileName() + { + return fileName; + } + + public void setFileName(String fileName) + { + this.fileName = fileName; + } + + public long getSize() + { + return size; + } + + public void setSize(long size) + { + this.size = size; + } + } + +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java new file mode 100644 index 0000000000..5fd19f993d --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs.s3; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.io.fs.AbstractReconciler; + +/** + * This operator uploads files to Amazon S3 after files are finalized and + * frozen by the committed callback. + * + * S3TupleOutputModule uses this operator in conjunction with S3CompactionOperator + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class S3Reconciler extends AbstractReconciler +{ + /** + * Access key id for Amazon S3 + */ + @NotNull + private String accessKey; + + /** + * Secret key for Amazon S3 + */ + @NotNull + private String secretKey; + + /** + * Bucket name for data upload + */ + @NotNull + private String bucketName; + + /** + * S3 End point + */ + private String endPoint; + + /** + * Directory name under S3 bucket + */ + @NotNull + private String directoryName; + + /** + * Client instance for connecting to Amazon S3 + */ + protected transient AmazonS3 s3client; + + /** + * FileSystem instance for reading intermediate directory + */ + protected transient FileSystem fs; + + protected transient String filePath; + + private static final String TMP_EXTENSION = ".tmp"; + + @Override + public void setup(Context.OperatorContext context) + { + s3client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey)); + filePath = context.getValue(DAG.APPLICATION_PATH); + try { + fs = FileSystem.newInstance(new Path(filePath).toUri(), new Configuration()); + } catch (IOException e) { + logger.error("Unable to create FileSystem: {}", e.getMessage()); + } + super.setup(context); + } + + /** + * Enques the tuple for processing after committed callback + */ + @Override + protected void processTuple(FSRecordCompactionOperator.OutputMetaData outputMetaData) + { + logger.debug("enque : {}", outputMetaData); + enqueueForProcessing(outputMetaData); + } + + /** + * Uploads the file on Amazon S3 using putObject API from S3 client + */ + @Override + protected void processCommittedData(FSRecordCompactionOperator.OutputMetaData outputMetaData) + { + try { + Path path = new Path(outputMetaData.getPath()); + if (fs.exists(path) == false) { + logger.debug("Ignoring non-existent path assuming replay : {}", path); + return; + } + FSDataInputStream fsinput = fs.open(path); + ObjectMetadata omd = new ObjectMetadata(); + omd.setContentLength(outputMetaData.getSize()); + String keyName = directoryName + Path.SEPARATOR + outputMetaData.getFileName(); + PutObjectRequest request = new PutObjectRequest(bucketName, keyName, fsinput, omd); + if (outputMetaData.getSize() < Integer.MAX_VALUE) { + request.getRequestClientOptions().setReadLimit((int)outputMetaData.getSize()); + } else { + throw new RuntimeException("PutRequestSize greater than Integer.MAX_VALUE"); + } + if (fs.exists(path)) { + logger.debug("Trying to upload : {}", path); + s3client.putObject(request); + logger.debug("Uploading : {}", keyName); + } + } catch (FileNotFoundException e) { + logger.debug("Ignoring non-existent path assuming replay : {}", outputMetaData.getPath()); + } catch (IOException e) { + logger.error("Unable to create Stream: {}", e.getMessage()); + } + } + + /** + * Clears intermediate/temporary files if any + */ + @Override + public void endWindow() + { + logger.info("in endWindow()"); + while (doneTuples.peek() != null) { + FSRecordCompactionOperator.OutputMetaData metaData = doneTuples.poll(); + logger.debug("found metaData = {}", metaData); + committedTuples.remove(metaData); + try { + Path dest = new Path(metaData.getPath()); + //Deleting the intermediate files and when writing to tmp files + // there can be vagrant tmp files which we have to clean + FileStatus[] statuses = fs.listStatus(dest.getParent()); + + for (FileStatus status : statuses) { + String statusName = status.getPath().getName(); + if (statusName.endsWith(TMP_EXTENSION) && statusName.startsWith(metaData.getFileName())) { + //a tmp file has tmp extension always preceded by timestamp + String actualFileName = statusName.substring(0, + statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1)); + logger.debug("actualFileName = {}", actualFileName); + if (metaData.getFileName().equals(actualFileName)) { + logger.debug("deleting stray file {}", statusName); + fs.delete(status.getPath(), true); + } + } else if (statusName.equals(metaData.getFileName())) { + logger.info("deleting s3-compaction file {}", statusName); + fs.delete(status.getPath(), true); + } + } + } catch (IOException e) { + logger.error("Unable to Delete a file: {}", metaData.getFileName()); + } + } + } + + /** + * Get access key id + * + * @return Access key id for Amazon S3 + */ + public String getAccessKey() + { + return accessKey; + } + + /** + * Set access key id + * + * @param accessKey + * Access key id for Amazon S3 + */ + public void setAccessKey(@NotNull String accessKey) + { + this.accessKey = Preconditions.checkNotNull(accessKey); + } + + /** + * Get secret key + * + * @return Secret key for Amazon S3 + */ + public String getSecretKey() + { + return secretKey; + } + + /** + * Set secret key + * + * @param secretKey + * Secret key for Amazon S3 + */ + public void setSecretKey(@NotNull String secretKey) + { + this.secretKey = Preconditions.checkNotNull(secretKey); + } + + /** + * Get bucket name + * + * @return Bucket name for data upload + */ + public String getBucketName() + { + return bucketName; + } + + /** + * Set bucket name + * + * @param bucketName + * Bucket name for data upload + */ + public void setBucketName(@NotNull String bucketName) + { + this.bucketName = Preconditions.checkNotNull(bucketName); + } + + /** + * Get directory name + * + * @return Directory name under S3 bucket + */ + public String getDirectoryName() + { + return directoryName; + } + + /** + * Set directory name + * + * @param directoryName + * Directory name under S3 bucket + */ + public void setDirectoryName(@NotNull String directoryName) + { + this.directoryName = Preconditions.checkNotNull(directoryName); + } + + /** + * Return the S3 End point + * + * @return S3 End point + */ + public String getEndPoint() + { + return endPoint; + } + + /** + * Set the S3 End point + * + * @param endPoint + * S3 end point + */ + public void setEndPoint(String endPoint) + { + this.endPoint = endPoint; + } + + /** + * Set Amazon S3 client + * + * @param s3client + * Client for Amazon S3 + */ + @VisibleForTesting + void setS3client(AmazonS3 s3client) + { + this.s3client = s3client; + } + + private static final Logger logger = LoggerFactory.getLogger(S3Reconciler.class); + +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerQueuePartitioner.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerQueuePartitioner.java new file mode 100644 index 0000000000..edd0054a7f --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerQueuePartitioner.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs.s3; + +import java.util.List; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Operator; +import com.datatorrent.api.Stats; +import com.datatorrent.lib.partitioner.StatsAwareStatelessPartitioner; + +/** + * This partitioner looks at Reconciler queue size to decide no. of partitions. + * This partitioner is used for S3Reconciler Operator. + * @param + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class S3ReconcilerQueuePartitioner extends StatsAwareStatelessPartitioner +{ + private static final long serialVersionUID = -4407806429128758992L; + + private int maxPartitions = 16; + private int minPartitions = 1; + + private int maxQueueSizePerPartition = 4; + + @Override + protected int getLoad(BatchedOperatorStats stats) + { + double totalBacklog = 0; + double statsPartitionCount = 0; + for (Map.Entry partitionStatus : partitionedInstanceStatus.entrySet()) { + BatchedOperatorStats batchedOperatorStats = partitionStatus.getValue(); + if (batchedOperatorStats != null) { + List lastWindowedStats = batchedOperatorStats.getLastWindowedStats(); + if (lastWindowedStats != null && lastWindowedStats.size() > 0) { + Stats.OperatorStats lastStats = lastWindowedStats.get(lastWindowedStats.size() - 1); + Long queueLength = (Long)lastStats.metrics.get("queueLength"); + totalBacklog += queueLength; + statsPartitionCount += 1; + logger.debug("queueLength : {}, totalBacklog {},statsPartitionCount{}", queueLength, totalBacklog, + statsPartitionCount); + } + } + } + + double backlogPerPartition = totalBacklog / statsPartitionCount; + logger.debug("backlogPerPartition : {}", backlogPerPartition); + logger.debug("maxQueueSizePerPartition : {}, partitionedInstanceStatus.size():{}" + ", maxPartitions:{}", + maxQueueSizePerPartition, partitionedInstanceStatus.size(), maxPartitions); + + if (backlogPerPartition > maxQueueSizePerPartition && partitionedInstanceStatus.size() < maxPartitions) { + return 1; + } + logger.debug("minPartitions:{}", minPartitions); + + if (backlogPerPartition < 1.1 && partitionedInstanceStatus.size() > minPartitions) { + return -1; + } + + return 0; + } + + public int getMaxPartitions() + { + return maxPartitions; + } + + public void setMaxPartitions(int maxPartitions) + { + this.maxPartitions = maxPartitions; + } + + public int getMinPartitions() + { + return minPartitions; + } + + public void setMinPartitions(int minPartitions) + { + this.minPartitions = minPartitions; + } + + public int getMaxQueueSizePerPartition() + { + return maxQueueSizePerPartition; + } + + public void setMaxQueueSizePerPartition(int maxQueueSizePerPartition) + { + this.maxQueueSizePerPartition = maxQueueSizePerPartition; + } + + private static final Logger logger = LoggerFactory.getLogger(S3ReconcilerQueuePartitioner.class); +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java new file mode 100644 index 0000000000..59cd04644a --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java @@ -0,0 +1,398 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs.s3; + +import java.util.Arrays; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator; +import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.NoOpConverter; +import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringToBytesConverter; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.api.StatsListener; +import com.datatorrent.lib.converter.Converter; +import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner; + +/** + * S3TupleOutputModule writes incoming tuples into files and uploads these files on Amazon S3. + * + * @param Type of incoming Tuple.Converter needs to be defined which converts these tuples to byte[]. + * Default converters for String, byte[] tuples are provided in + * S3TupleOutputModule.S3BytesOutputModule, S3TupleOutputModule.S3StringOutputModule + * + * @displayName S3 Tuple Output Module + * @tags S3, Output + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public abstract class S3TupleOutputModule implements Module +{ + public final transient ProxyInputPort input = new ProxyInputPort(); + + /** + * AWS access key + */ + @NotNull + private String accessKey; + /** + * AWS secret access key + */ + @NotNull + private String secretAccessKey; + + /** + * S3 End point + */ + private String endPoint; + /** + * Name of the bucket in which to upload the files + */ + @NotNull + private String bucketName; + + /** + * Path of the output directory. Relative path of the files copied will be + * maintained w.r.t. source directory and output directory + */ + @NotNull + private String outputDirectoryPath; + + /** + * Max number of idle windows for which no new data is added to current part + * file. Part file will be finalized after these many idle windows after last + * new data. + */ + private long maxIdleWindows = 30; + + /** + * The maximum length in bytes of a rolling file. The default value of this is + * 1MB. + */ + @Min(1) + protected Long maxLength = 128 * 1024 * 1024L; + + /** + * Maximum number of tuples per sec per partition for HDFS write. + */ + private long maxTuplesPerSecPerPartition = 300000; + + /** + * Minimum number of tuples per sec per partition for HDFS write. + */ + private long minTuplesPerSecPerPartition = 30000; + + /** + * Time interval in milliseconds to check for repartitioning + */ + private long coolDownMillis = 1 * 60 * 1000; + + /** + * Maximum number of S3 upload partitions + */ + private int maxS3UploadPartitions = 16; + + /** + * Minimum number of S3 upload partitions + */ + private int minS3UploadPartitions = 1; + + /** + * Maximum queue size for S3 upload + */ + private int maxQueueSizeS3Upload = 4; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + FSRecordCompactionOperator s3compaction = dag.addOperator("S3Compaction", new FSRecordCompactionOperator()); + s3compaction.setConverter(getConverter()); + s3compaction.setMaxIdleWindows(maxIdleWindows); + s3compaction.setMaxLength(maxLength); + + StatelessThroughputBasedPartitioner> partitioner = new StatelessThroughputBasedPartitioner>(); + partitioner.setMaximumEvents(maxTuplesPerSecPerPartition); + partitioner.setMinimumEvents(minTuplesPerSecPerPartition); + partitioner.setCooldownMillis(coolDownMillis); + dag.setAttribute(s3compaction, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[] {partitioner})); + dag.setAttribute(s3compaction, OperatorContext.PARTITIONER, partitioner); + + S3Reconciler s3Reconciler = dag.addOperator("S3Reconciler", new S3Reconciler()); + s3Reconciler.setAccessKey(accessKey); + s3Reconciler.setSecretKey(secretAccessKey); + s3Reconciler.setBucketName(bucketName); + s3Reconciler.setEndPoint(endPoint); + s3Reconciler.setDirectoryName(outputDirectoryPath); + + S3ReconcilerQueuePartitioner reconcilerPartitioner = new S3ReconcilerQueuePartitioner(); + reconcilerPartitioner.setCooldownMillis(coolDownMillis); + reconcilerPartitioner.setMinPartitions(minS3UploadPartitions); + reconcilerPartitioner.setMaxPartitions(maxS3UploadPartitions); + reconcilerPartitioner.setMaxQueueSizePerPartition(maxQueueSizeS3Upload); + + dag.setAttribute(s3Reconciler, OperatorContext.STATS_LISTENERS, + Arrays.asList(new StatsListener[] {reconcilerPartitioner})); + dag.setAttribute(s3Reconciler, OperatorContext.PARTITIONER, reconcilerPartitioner); + + if (endPoint != null) { + s3Reconciler.setEndPoint(endPoint); + } + dag.addStream("write-to-s3", s3compaction.output, s3Reconciler.input); + input.set(s3compaction.input); + } + + /** + * Get the AWS access key + * + * @return AWS access key + */ + public String getAccessKey() + { + return accessKey; + } + + /** + * Set the AWS access key + * + * @param accessKey + * access key + */ + public void setAccessKey(@NotNull String accessKey) + { + this.accessKey = Preconditions.checkNotNull(accessKey); + } + + /** + * Return the AWS secret access key + * + * @return AWS secret access key + */ + public String getSecretAccessKey() + { + return secretAccessKey; + } + + /** + * Set the AWS secret access key + * + * @param secretAccessKey + * AWS secret access key + */ + public void setSecretAccessKey(@NotNull String secretAccessKey) + { + this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey); + } + + /** + * Get the name of the bucket in which to upload the files + * + * @return bucket name + */ + public String getBucketName() + { + return bucketName; + } + + /** + * Set the name of the bucket in which to upload the files + * + * @param bucketName + * name of the bucket + */ + public void setBucketName(@NotNull String bucketName) + { + this.bucketName = Preconditions.checkNotNull(bucketName); + } + + /** + * Return the S3 End point + * + * @return S3 End point + */ + public String getEndPoint() + { + return endPoint; + } + + /** + * Set the S3 End point + * + * @param endPoint + * S3 end point + */ + public void setEndPoint(String endPoint) + { + this.endPoint = Preconditions.checkNotNull(endPoint); + } + + /** + * Get the path of the output directory. + * + * @return path of output directory + */ + public String getOutputDirectoryPath() + { + return outputDirectoryPath; + } + + /** + * Set the path of the output directory. + * + * @param outputDirectoryPath + * path of output directory + */ + public void setOutputDirectoryPath(@NotNull String outputDirectoryPath) + { + this.outputDirectoryPath = Preconditions.checkNotNull(outputDirectoryPath); + } + + /** + * No. of idle window after which file should be rolled over + * + * @return max number of idle windows for rollover + */ + public long getMaxIdleWindows() + { + return maxIdleWindows; + } + + /** + * No. of idle window after which file should be rolled over + * + * @param maxIdleWindows + * max number of idle windows for rollover + */ + public void setMaxIdleWindows(long maxIdleWindows) + { + this.maxIdleWindows = maxIdleWindows; + } + + /** + * Get max length of file after which file should be rolled over + * + * @return max length of file + */ + public Long getMaxLength() + { + return maxLength; + } + + /** + * Set max length of file after which file should be rolled over + * + * @param maxLength + * max length of file + */ + public void setMaxLength(Long maxLength) + { + this.maxLength = maxLength; + } + + public long getMaxTuplesPerSecPerPartition() + { + return maxTuplesPerSecPerPartition; + } + + public void setMaxTuplesPerSecPerPartition(long maxTuplesPerSecPerPartition) + { + this.maxTuplesPerSecPerPartition = maxTuplesPerSecPerPartition; + } + + public long getMinTuplesPerSecPerPartition() + { + return minTuplesPerSecPerPartition; + } + + public void setMinTuplesPerSecPerPartition(long minTuplesPerSecPerPartition) + { + this.minTuplesPerSecPerPartition = minTuplesPerSecPerPartition; + } + + public long getCoolDownMillis() + { + return coolDownMillis; + } + + public void setCoolDownMillis(long coolDownMillis) + { + this.coolDownMillis = coolDownMillis; + } + + public int getMaxS3UploadPartitions() + { + return maxS3UploadPartitions; + } + + public void setMaxS3UploadPartitions(int maxS3UploadPartitions) + { + this.maxS3UploadPartitions = maxS3UploadPartitions; + } + + public int getMinS3UploadPartitions() + { + return minS3UploadPartitions; + } + + public void setMinS3UploadPartitions(int minS3UploadPartitions) + { + this.minS3UploadPartitions = minS3UploadPartitions; + } + + public int getMaxQueueSizeS3Upload() + { + return maxQueueSizeS3Upload; + } + + public void setMaxQueueSizeS3Upload(int maxQueueSizeS3Upload) + { + this.maxQueueSizeS3Upload = maxQueueSizeS3Upload; + } + + /** + * Converter for conversion of input tuples to byte[] + * + * @return converter + */ + protected abstract Converter getConverter(); + + public static class S3BytesOutputModule extends S3TupleOutputModule + { + @Override + protected Converter getConverter() + { + return new NoOpConverter(); + } + } + + public static class S3StringOutputModule extends S3TupleOutputModule + { + @Override + protected Converter getConverter() + { + return new StringToBytesConverter(); + } + } +} diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperatorTest.java new file mode 100644 index 0000000000..43a8f6c3df --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperatorTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.File; +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.testbench.CollectorTestSink; + +public class FSRecordCompactionOperatorTest +{ + + private class TestMeta extends TestWatcher + { + FSRecordCompactionOperator underTest; + Context.OperatorContext context; + String outputPath; + + @Override + protected void starting(Description description) + { + super.starting(description); + outputPath = new File("target/" + description.getClassName() + "/" + description.getMethodName()).getPath(); + + Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.DAGContext.APPLICATION_ID, description.getClassName()); + attributes.put(DAG.DAGContext.APPLICATION_PATH, outputPath); + context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); + + underTest = new FSRecordCompactionOperator(); + underTest.setConverter(new GenericFileOutputOperator.NoOpConverter()); + underTest.setup(context); + underTest.setMaxIdleWindows(10); + } + + @Override + protected void finished(Description description) + { + this.underTest.teardown(); + try { + FileUtils.deleteDirectory(new File("target" + Path.SEPARATOR + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testRotate() throws Exception + { + CollectorTestSink sink = new CollectorTestSink(); + testMeta.underTest.output.setSink((CollectorTestSink)sink); + + for (int i = 0; i < 60; i++) { + testMeta.underTest.beginWindow(i); + if (i < 10) { + testMeta.underTest.input.process(("Record" + Integer.toString(i)).getBytes()); + } + testMeta.underTest.endWindow(); + } + testMeta.underTest.committed(59); + for (int i = 60; i < 70; i++) { + testMeta.underTest.beginWindow(i); + testMeta.underTest.endWindow(); + } + + Assert.assertEquals("tuples-" + testMeta.context.getAttributes().get(DAG.DAGContext.APPLICATION_ID) + + "_1.0", sink.collectedTuples.get(0).getFileName()); + } +} diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java new file mode 100644 index 0000000000..f1acb9b013 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs.s3; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.Path; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.PutObjectRequest; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +public class S3ReconcilerTest +{ + + private class TestMeta extends TestWatcher + { + S3Reconciler underTest; + Context.OperatorContext context; + + @Mock + AmazonS3 s3clientMock; + String outputPath; + + @Override + protected void starting(Description description) + { + super.starting(description); + outputPath = new File( + "target" + Path.SEPARATOR + description.getClassName() + Path.SEPARATOR + description.getMethodName()) + .getPath(); + + Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.DAGContext.APPLICATION_ID, description.getClassName()); + attributes.put(DAG.DAGContext.APPLICATION_PATH, outputPath); + context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); + + underTest = new S3Reconciler(); + underTest.setAccessKey(""); + underTest.setSecretKey(""); + + underTest.setup(context); + + MockitoAnnotations.initMocks(this); + when(s3clientMock.putObject((PutObjectRequest)any())).thenReturn(null); + underTest.setS3client(s3clientMock); + } + + @Override + protected void finished(Description description) + { + this.underTest.teardown(); + try { + FileUtils.deleteDirectory(new File("target" + Path.SEPARATOR + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testFileClearing() throws Exception + { + String fileName = "s3-compaction_1.0"; + String path = testMeta.outputPath + Path.SEPARATOR + fileName; + long size = 80; + + File file = new File(path); + File tmpFile = new File(path + "." + System.currentTimeMillis() + ".tmp"); + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < 10; i++) { + sb.append("Record" + i + "\n"); + if (i == 5) { + FileUtils.write(tmpFile, sb.toString()); + } + } + FileUtils.write(file, sb.toString()); + + FSRecordCompactionOperator.OutputMetaData outputMetaData = new FSRecordCompactionOperator.OutputMetaData(path, fileName, size); + testMeta.underTest.beginWindow(0); + testMeta.underTest.input.process(outputMetaData); + testMeta.underTest.endWindow(); + + for (int i = 1; i < 60; i++) { + testMeta.underTest.beginWindow(i); + testMeta.underTest.endWindow(); + } + testMeta.underTest.committed(59); + for (int i = 60; i < 70; i++) { + testMeta.underTest.beginWindow(i); + Thread.sleep(10); + testMeta.underTest.endWindow(); + } + Collection files = + FileUtils.listFiles(new File(testMeta.outputPath), null, true); + Assert.assertEquals(0, files.size()); + } +}