Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Fixed python implementation for Apache Apex
Browse files Browse the repository at this point in the history
  • Loading branch information
patilvikram committed Jun 29, 2017
1 parent 4df6858 commit 151413c
Show file tree
Hide file tree
Showing 46 changed files with 3,142 additions and 11 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ nb-configuration.xml
hadoop.log
site/
.checkstyle
*.pyc
*.out
38 changes: 38 additions & 0 deletions docs/python/main.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#Developing Streaming Application in Python#

Currently we have exposed basic support for Stateless Support.

##Requirements:##
* Python 2.7
* py4j
Please install py4j on your machine.
```
pip install py4j
```


Once you have pulled Apache Malhar project, go to python project and follow next steps:

* Compile all projects under Apache Malhar and make sure you have hadoop installed on local node.
* Once compilation finish, go to python/script directory and launch ./pyshell
* This will launch python shell. Now you can develop your application using python shell.

You can write simpler application using available apis as well provide custom functions written in python.

```
def filter_function(a):
input_data=a.split(',')
if float(input_data[2])> 100:
return True
return False
from pyapex import createApp
a=createApp('python_app').from_kafka09('localhost:2181','test_topic') \
.filter('filter_operator',filter_function) \
.to_console(name='endConsole') \
.launch(False)
```


Note: Currently developer need to ensure that required python dependencies are installed on Hadoop cluster.
96 changes: 96 additions & 0 deletions examples/python/resources/stock_data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
symbol,timestamp,open
MSFT,2014-10-31T00:00:00-04:00,46.31618
MSFT,2014-11-03T00:00:00-05:00,46.26685
MSFT,2014-11-04T00:00:00-05:00,46.6714
MSFT,2014-11-05T00:00:00-05:00,47.16475
MSFT,2014-11-06T00:00:00-05:00,47.22396
MSFT,2014-11-07T00:00:00-05:00,48.26987
MSFT,2014-11-10T00:00:00-05:00,48.04292
MSFT,2014-11-11T00:00:00-05:00,48.2008
MSFT,2014-11-12T00:00:00-05:00,47.91465
MSFT,2014-11-13T00:00:00-05:00,48.16133
MSFT,2014-11-14T00:00:00-05:00,49.07897
MSFT,2014-11-17T00:00:00-05:00,48.75336
MSFT,2014-11-18T00:00:00-05:00,48.78283
MSFT,2014-11-19T00:00:00-05:00,48.31615
MSFT,2014-11-20T00:00:00-05:00,47.66082
MSFT,2014-11-21T00:00:00-05:00,48.67361
MSFT,2014-11-24T00:00:00-05:00,47.65089
MSFT,2014-11-25T00:00:00-05:00,47.32322
MSFT,2014-11-26T00:00:00-05:00,47.15442
MSFT,2014-11-28T00:00:00-05:00,47.61117
MSFT,2014-12-01T00:00:00-05:00,47.54167
MSFT,2014-12-02T00:00:00-05:00,48.49488
MSFT,2014-12-03T00:00:00-05:00,48.09771
MSFT,2014-12-04T00:00:00-05:00,48.04806
MSFT,2014-12-05T00:00:00-05:00,48.47502
MSFT,2014-12-08T00:00:00-05:00,47.97855
MSFT,2014-12-09T00:00:00-05:00,46.77711
MSFT,2014-12-10T00:00:00-05:00,47.24379
MSFT,2014-12-11T00:00:00-05:00,46.74732
MSFT,2014-12-12T00:00:00-05:00,46.35014
MSFT,2014-12-15T00:00:00-05:00,46.86647
MSFT,2014-12-16T00:00:00-05:00,45.57566
MSFT,2014-12-17T00:00:00-05:00,44.73166
MSFT,2014-12-18T00:00:00-05:00,46.25085
MSFT,2014-12-19T00:00:00-05:00,47.27357
MSFT,2014-12-22T00:00:00-05:00,47.44237
MSFT,2014-12-23T00:00:00-05:00,48.0282
MSFT,2014-12-24T00:00:00-05:00,48.2963
MSFT,2014-12-26T00:00:00-05:00,48.06792
MSFT,2014-12-29T00:00:00-05:00,47.36294
MSFT,2014-12-30T00:00:00-05:00,47.10477
MSFT,2014-12-31T00:00:00-05:00,46.39979
MSFT,2015-01-02T00:00:00-05:00,46.33028
MSFT,2015-01-05T00:00:00-05:00,46.04234
MSFT,2015-01-06T00:00:00-05:00,46.05227
MSFT,2015-01-07T00:00:00-05:00,45.65509
MSFT,2015-01-08T00:00:00-05:00,46.41965
MSFT,2015-01-09T00:00:00-05:00,47.27357
MSFT,2015-01-12T00:00:00-05:00,47.08492
MSFT,2015-01-13T00:00:00-05:00,46.6381
MSFT,2015-01-14T00:00:00-05:00,45.63523
MSFT,2015-01-15T00:00:00-05:00,45.8934
MSFT,2015-01-16T00:00:00-05:00,44.98983
MSFT,2015-01-20T00:00:00-05:00,45.97283
MSFT,2015-01-21T00:00:00-05:00,45.61537
MSFT,2015-01-22T00:00:00-05:00,46.16149
MSFT,2015-01-23T00:00:00-05:00,47.02534
MSFT,2015-01-26T00:00:00-05:00,46.66788
MSFT,2015-01-27T00:00:00-05:00,42.6465
MSFT,2015-01-28T00:00:00-05:00,42.43799
MSFT,2015-01-29T00:00:00-05:00,40.64078
MSFT,2015-01-30T00:00:00-05:00,41.25639
MSFT,2015-02-02T00:00:00-05:00,40.30318
MSFT,2015-02-03T00:00:00-05:00,41.33583
MSFT,2015-02-04T00:00:00-05:00,41.64364
MSFT,2015-02-05T00:00:00-05:00,41.92166
MSFT,2015-02-06T00:00:00-05:00,42.37841
MSFT,2015-02-09T00:00:00-05:00,41.94152
MSFT,2015-02-10T00:00:00-05:00,42.43799
MSFT,2015-02-11T00:00:00-05:00,42.34863
MSFT,2015-02-12T00:00:00-05:00,42.38834
MSFT,2015-02-13T00:00:00-05:00,43.07346
MSFT,2015-02-17T00:00:00-05:00,43.97
MSFT,2015-02-18T00:00:00-05:00,43.63
MSFT,2015-02-19T00:00:00-05:00,43.27
MSFT,2015-02-20T00:00:00-05:00,43.5
MSFT,2015-02-23T00:00:00-05:00,43.7
MSFT,2015-02-24T00:00:00-05:00,44.15
MSFT,2015-02-25T00:00:00-05:00,43.95
MSFT,2015-02-26T00:00:00-05:00,43.99
MSFT,2015-02-27T00:00:00-05:00,44.14
MSFT,2015-03-02T00:00:00-05:00,43.67
MSFT,2015-03-03T00:00:00-05:00,43.56
MSFT,2015-03-04T00:00:00-05:00,43.01
MSFT,2015-03-05T00:00:00-05:00,43.07
MSFT,2015-03-06T00:00:00-05:00,43
MSFT,2015-03-09T00:00:00-04:00,42.19
MSFT,2015-03-10T00:00:00-04:00,42.35
MSFT,2015-03-11T00:00:00-04:00,42.32
MSFT,2015-03-12T00:00:00-04:00,41.33
MSFT,2015-03-13T00:00:00-04:00,40.7
MSFT,2015-03-16T00:00:00-04:00,41.47
MSFT,2015-03-17T00:00:00-04:00,41.37
MSFT,2015-03-18T00:00:00-04:00,41.43
MSFT,2015-03-19T00:00:00-04:00,42.25
30 changes: 30 additions & 0 deletions examples/python/stock_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#input_data="apex-malhar/examples/python/resources/stock_data.csv"
input_data="/home/vikram/Documents/src/apex-malhar/examples/python/resources/stock_data.csv"
data = []
with open( input_data, "r") as outfile:
outfile.readline()
for line in outfile:
data.append(line)

def filter_func(a):
input_data=a.split(",")
if float(input_data[2])> 30:
return True
return False


from pyapex import createApp

def filter_func(a):
input_data=a.split(",")
if float(input_data[2])> 30:
return True
return False


from pyapex import createApp
a=createApp('python_app').from_data(data) \
.filter('filter_operator',filter_func) \
.to_console(name='endConsole') \
.launch(False)

2 changes: 2 additions & 0 deletions library/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
<version>3.8.0-SNAPSHOT</version>
</parent>



<artifactId>malhar-library</artifactId>
<packaging>jar</packaging>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.datatorrent.lib.io.fs;

import java.util.List;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;

public class InMemoryDataInputOperator<T> implements InputOperator
{
private List<T> inputData = null;
private boolean emissionCompleted = false;
public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>();

public InMemoryDataInputOperator()
{
inputData = null;
}

public InMemoryDataInputOperator(List<T> data)
{
inputData = data;
}

@Override
public void emitTuples()
{
if (emissionCompleted) {
return;
}
for (T data : inputData) {
outputPort.emit(data);
}
emissionCompleted = true;
}

@Override
public void beginWindow(long l)
{
}

@Override
public void endWindow()
{
}

@Override
public void setup(Context.OperatorContext context)
{
}

@Override
public void teardown()
{
}
}
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ pages:
- Apache Apex Malhar: index.md
- APIs:
- SQL: apis/calcite.md
- Python Support:
- Pyshell: python/main.md
- Operators:
- Block Reader: operators/block_reader.md
- CSV Formatter: operators/csvformatter.md
Expand Down
26 changes: 26 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
<apex.core.version>3.6.0</apex.core.version>
<semver.plugin.skip>false</semver.plugin.skip>
<surefire.args>-Xmx2048m</surefire.args>
<powermock.version>1.4.9</powermock.version>
</properties>

<build>
Expand Down Expand Up @@ -233,6 +234,31 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
</exclusion>
<exclusion>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

</project>
1 change: 1 addition & 0 deletions python/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pyapex/jars/*.*
Loading

0 comments on commit 151413c

Please sign in to comment.