Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

APEXMALHAR-2468 Move FileIO related examples from datatorrent examples to apex-malhar examples #609

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions examples/fileIO/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
## Sample application for File Read-Write implementation

This example shows how to extend the `AbstractFileInputOperator` and `AbstractFileOutputOperator` from [Malhar library](https://github.com/apache/apex-malhar) to create a high performance application to copy files. There are two examples in the project:

1. FileIO: This application copies text file contents line by like to specified destination. The required properties for application are specified in src/main/resources/META-INF/properties-FileIO.xml.

2. ThroughputBasedFileIO: This application copies file contents block by block to specified destination. This application makes use of `AbstractThroughputFileInputOperator` from [Malhar library](https://github.com/apache/apex-malhar). The required properties for application are specified in src/main/resources/META-INF/properties-ThroughputBasedFileIO.xml.

#### Follow these steps to run this application

**Step 1**: Update input and output file(s)/folder properties in your application properties file.

For Input location update property `dt.operator.read.prop.directory`

For Output location update property `dt.operator.write.prop.filePath`

**Step 2**: Build the code:

shell> mvn clean install

Upload the `target/fileIO-1.0-SNAPSHOT.apa` to the UI console if available or launch it from the commandline using `apexcli`.

**Step 3**: During launch use `src/main/resources/META-INF/properties-*.xml` as a custom configuration file; then verify
that the output directory has the expected output.

**Note**: The application can be run in local mode within your IDE by simply running tests available in `src/test/java` folder.

## Sample application for Multi Directory File Read-Write Implementation

This example is very similar to the fileIO example with one difference: it shows how
create a set of partitions separated into slices where each slice monitors a different
input directory. A custom partitioner and directory scanner are used.

## Sample application for File Input & output operators

Sample application to show how to use the file input and output operators.

During a typical run on a Hadoop cluster, when input files are dropped into the
configured input directory (e.g. `/tmp/SimpleFileIO/input-dir`), the application
will create temporary files like this at the configured output location in
HDFS (e.g. `/tmp/SimpleFileIO/output-dir`) and copy all input file data to it:

/tmp/SimpleFileIO/output-dir/myfile_p2.0.1465929407447.tmp

When the file size exceeds the configured limit of 100000 bytes, a new file with
a name like `myfile_p2.1.1465929407447.tmp` will be opened and, a minute or two
later, the old file will be renamed to `myfile_p2.0`.

## Sample application for file output operator with partitioning and rolling file output.

Sample application to show how to use the file output operator along with
partitioning and rolling file output.

A typical run on a Hadoop cluster will create files like this at the configured
output location in HDFS (e.g. `/tmp/fileOutput`) where the numeric extension is
the sequnce number of rolling output files and the number following 'p' is the
operator id of the partition that generated the file:

/tmp/fileOutput/sequence_p3.0
/tmp/fileOutput/sequence_p3.1
/tmp/fileOutput/sequence_p4.0
/tmp/fileOutput/sequence_p4.1

Each file should contain lines like this where the second value is the number
produced by the generator operator and the first is the corresponding operator id:

[1, 1075]
[1, 1095]
[2, 1110]
[2, 1120]

Please note that there are no guarantees about the way operator ids are assigned
to operators by the platform.

51 changes: 51 additions & 0 deletions examples/fileIO/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-examples</artifactId>
<version>3.8.0-SNAPSHOT</version>
</parent>

<artifactId>malhar-examples-fileIO</artifactId>
<packaging>jar</packaging>

<!-- change these to the appropriate values -->
<!--name>File Input and Output</name>
<description>Simple application illustrating use of file input and output operators</description-->

<name>FileIO examples</name>
<description>Demostrates the FileIO related examples</description>

<dependencies>
<!-- add your dependencies here -->
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>apex-engine</artifactId>
<version>${apex.core.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
63 changes: 63 additions & 0 deletions examples/fileIO/src/assemble/appPackage.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<!--

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>appPackage</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>${basedir}/target/</directory>
<outputDirectory>/app</outputDirectory>
<includes>
<include>${project.artifactId}-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${basedir}/target/deps</directory>
<outputDirectory>/lib</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/src/site/conf</directory>
<outputDirectory>/conf</outputDirectory>
<includes>
<include>*.xml</include>
</includes>
</fileSet>
<fileSet>
<directory>${basedir}/src/main/resources/META-INF</directory>
<outputDirectory>/META-INF</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/src/main/resources/app</directory>
<outputDirectory>/app</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/src/main/resources/resources</directory>
<outputDirectory>/resources</outputDirectory>
</fileSet>
</fileSets>

</assembly>

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.apex.examples.fileIO;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;

import static com.datatorrent.api.Context.PortContext.PARTITION_PARALLEL;

@ApplicationAnnotation(name = "FileIO")
public class Application implements StreamingApplication
{

@Override
public void populateDAG(DAG dag, Configuration conf)
{
// create operators
FileReader reader = dag.addOperator("read", FileReader.class);
FileWriter writer = dag.addOperator("write", FileWriter.class);

// using parallel partitioning ensures that lines from a single file are handled
// by the same writer
//
dag.setInputPortAttribute(writer.input, PARTITION_PARALLEL, true);
dag.setInputPortAttribute(writer.control, PARTITION_PARALLEL, true);

dag.addStream("data", reader.output, writer.input);
dag.addStream("ctrl", reader.control, writer.control);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.apex.examples.fileIO;

import java.util.ArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;

public class BytesFileWriter extends AbstractFileOutputOperator<byte[]>
{
private static final transient Logger LOG = LoggerFactory.getLogger(BytesFileWriter.class);
private static final char START_FILE = ThroughputBasedReader.START_FILE;
private static final char FINISH_FILE = ThroughputBasedReader.FINISH_FILE;
private String fileName; // current file name
private boolean eof;
private transient ArrayList<byte[]> savedLines = new ArrayList<>();

public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
{
@Override
public void process(String tuple)
{
processControlTuple(tuple);
}
};

private void processControlTuple(final String tuple)
{
if (START_FILE == tuple.charAt(0)) {
// sanity check
if (null != fileName) {
throw new RuntimeException(String.format("Error: fileName = %s, expected null", fileName));
}

fileName = tuple.substring(1);
if (!savedLines.isEmpty()) {
LOG.debug("Processing {} saved lines", savedLines.size());
for (byte[] line : savedLines) {
processTuple(line);
}
savedLines.clear();
}
return;
}

final int last = tuple.length() - 1;
if (FINISH_FILE == tuple.charAt(last)) { // end of file
String name = tuple.substring(0, last);
LOG.info("Closing file: " + name);
if (null == fileName || !fileName.equals(name)) {
throw new RuntimeException(String.format("Error: fileName = %s != %s = tuple", fileName, tuple));
}
eof = true;
return;
}
}

@Override
public void endWindow()
{
if (!eof) {
return;
}

// got an EOF, so must have a file name
if (null == fileName) {
throw new RuntimeException("Error: fileName empty");
}
requestFinalize(fileName);
super.endWindow();

eof = false;
fileName = null;
}

@Override
protected String getFileName(byte[] tuple)
{
return fileName;
}

@Override
protected byte[] getBytesForTuple(byte[] tuple)
{
return tuple;
}

@Override
public void processTuple(byte[] tuple)
{
if (null == fileName) {
savedLines.add(tuple);
return;
}
super.processTuple(tuple);
}

}
Loading