Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Fixed python implementation for Apache Apex
Browse files Browse the repository at this point in the history
  • Loading branch information
patilvikram committed Jul 11, 2017
1 parent 7862fb3 commit a043f96
Show file tree
Hide file tree
Showing 68 changed files with 4,845 additions and 16 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ nb-configuration.xml
hadoop.log
site/
.checkstyle
*.pyc
*.out
examples/python/output
38 changes: 38 additions & 0 deletions docs/python/main.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#Developing Streaming Application in Python#

Currently we have exposed basic support for Stateless Support.

##Requirements:##
* Python 2.7
* py4j
Please install py4j on your machine.
```
pip install py4j
```


Once you have pulled Apache Malhar project, go to python project and follow next steps:

* Compile all projects under Apache Malhar and make sure you have hadoop installed on local node.
* Once compilation finish, go to python/script directory and launch ./pyshell
* This will launch python shell. Now you can develop your application using python shell.

You can write simpler application using available apis as well provide custom functions written in python.

```
def filter_function(a):
input_data=a.split(',')
if float(input_data[2])> 100:
return True
return False
from pyapex import createApp
a=createApp('python_app').from_kafka09('localhost:2181','test_topic') \
.filter('filter_operator',filter_function) \
.to_console(name='endConsole') \
.launch(False)
```


Note: Currently developer need to ensure that required python dependencies are installed on Hadoop cluster.
15 changes: 15 additions & 0 deletions examples/python/resources/hadoop_word_count.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Hadoop is the Elephant King!
A yellow and elegant thing.
He never forgets
Useful data, or lets
An extraneous element cling!
A wonderful king is Hadoop.
The elephant plays well with Sqoop.
But what helps him to thrive
Are Impala, and Hive,
And HDFS in the group.
Hadoop is an elegant fellow.
An elephant gentle and mellow.
He never gets mad,
Or does anything bad,
Because, at his core, he is yellow.
96 changes: 96 additions & 0 deletions examples/python/resources/stock_data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
symbol,timestamp,open
MSFT,2014-10-31T00:00:00-04:00,46.31618
MSFT,2014-11-03T00:00:00-05:00,46.26685
MSFT,2014-11-04T00:00:00-05:00,46.6714
MSFT,2014-11-05T00:00:00-05:00,47.16475
MSFT,2014-11-06T00:00:00-05:00,47.22396
MSFT,2014-11-07T00:00:00-05:00,48.26987
MSFT,2014-11-10T00:00:00-05:00,48.04292
MSFT,2014-11-11T00:00:00-05:00,48.2008
MSFT,2014-11-12T00:00:00-05:00,47.91465
MSFT,2014-11-13T00:00:00-05:00,48.16133
MSFT,2014-11-14T00:00:00-05:00,49.07897
MSFT,2014-11-17T00:00:00-05:00,48.75336
MSFT,2014-11-18T00:00:00-05:00,48.78283
MSFT,2014-11-19T00:00:00-05:00,48.31615
MSFT,2014-11-20T00:00:00-05:00,47.66082
MSFT,2014-11-21T00:00:00-05:00,48.67361
MSFT,2014-11-24T00:00:00-05:00,47.65089
MSFT,2014-11-25T00:00:00-05:00,47.32322
MSFT,2014-11-26T00:00:00-05:00,47.15442
MSFT,2014-11-28T00:00:00-05:00,47.61117
MSFT,2014-12-01T00:00:00-05:00,47.54167
MSFT,2014-12-02T00:00:00-05:00,48.49488
MSFT,2014-12-03T00:00:00-05:00,48.09771
MSFT,2014-12-04T00:00:00-05:00,48.04806
MSFT,2014-12-05T00:00:00-05:00,48.47502
MSFT,2014-12-08T00:00:00-05:00,47.97855
MSFT,2014-12-09T00:00:00-05:00,46.77711
MSFT,2014-12-10T00:00:00-05:00,47.24379
MSFT,2014-12-11T00:00:00-05:00,46.74732
MSFT,2014-12-12T00:00:00-05:00,46.35014
MSFT,2014-12-15T00:00:00-05:00,46.86647
MSFT,2014-12-16T00:00:00-05:00,45.57566
MSFT,2014-12-17T00:00:00-05:00,44.73166
MSFT,2014-12-18T00:00:00-05:00,46.25085
MSFT,2014-12-19T00:00:00-05:00,47.27357
MSFT,2014-12-22T00:00:00-05:00,47.44237
MSFT,2014-12-23T00:00:00-05:00,48.0282
MSFT,2014-12-24T00:00:00-05:00,48.2963
MSFT,2014-12-26T00:00:00-05:00,48.06792
MSFT,2014-12-29T00:00:00-05:00,47.36294
MSFT,2014-12-30T00:00:00-05:00,47.10477
MSFT,2014-12-31T00:00:00-05:00,46.39979
MSFT,2015-01-02T00:00:00-05:00,46.33028
MSFT,2015-01-05T00:00:00-05:00,46.04234
MSFT,2015-01-06T00:00:00-05:00,46.05227
MSFT,2015-01-07T00:00:00-05:00,45.65509
MSFT,2015-01-08T00:00:00-05:00,46.41965
MSFT,2015-01-09T00:00:00-05:00,47.27357
MSFT,2015-01-12T00:00:00-05:00,47.08492
MSFT,2015-01-13T00:00:00-05:00,46.6381
MSFT,2015-01-14T00:00:00-05:00,45.63523
MSFT,2015-01-15T00:00:00-05:00,45.8934
MSFT,2015-01-16T00:00:00-05:00,44.98983
MSFT,2015-01-20T00:00:00-05:00,45.97283
MSFT,2015-01-21T00:00:00-05:00,45.61537
MSFT,2015-01-22T00:00:00-05:00,46.16149
MSFT,2015-01-23T00:00:00-05:00,47.02534
MSFT,2015-01-26T00:00:00-05:00,46.66788
MSFT,2015-01-27T00:00:00-05:00,42.6465
MSFT,2015-01-28T00:00:00-05:00,42.43799
MSFT,2015-01-29T00:00:00-05:00,40.64078
MSFT,2015-01-30T00:00:00-05:00,41.25639
MSFT,2015-02-02T00:00:00-05:00,40.30318
MSFT,2015-02-03T00:00:00-05:00,41.33583
MSFT,2015-02-04T00:00:00-05:00,41.64364
MSFT,2015-02-05T00:00:00-05:00,41.92166
MSFT,2015-02-06T00:00:00-05:00,42.37841
MSFT,2015-02-09T00:00:00-05:00,41.94152
MSFT,2015-02-10T00:00:00-05:00,42.43799
MSFT,2015-02-11T00:00:00-05:00,42.34863
MSFT,2015-02-12T00:00:00-05:00,42.38834
MSFT,2015-02-13T00:00:00-05:00,43.07346
MSFT,2015-02-17T00:00:00-05:00,43.97
MSFT,2015-02-18T00:00:00-05:00,43.63
MSFT,2015-02-19T00:00:00-05:00,43.27
MSFT,2015-02-20T00:00:00-05:00,43.5
MSFT,2015-02-23T00:00:00-05:00,43.7
MSFT,2015-02-24T00:00:00-05:00,44.15
MSFT,2015-02-25T00:00:00-05:00,43.95
MSFT,2015-02-26T00:00:00-05:00,43.99
MSFT,2015-02-27T00:00:00-05:00,44.14
MSFT,2015-03-02T00:00:00-05:00,43.67
MSFT,2015-03-03T00:00:00-05:00,43.56
MSFT,2015-03-04T00:00:00-05:00,43.01
MSFT,2015-03-05T00:00:00-05:00,43.07
MSFT,2015-03-06T00:00:00-05:00,43
MSFT,2015-03-09T00:00:00-04:00,42.19
MSFT,2015-03-10T00:00:00-04:00,42.35
MSFT,2015-03-11T00:00:00-04:00,42.32
MSFT,2015-03-12T00:00:00-04:00,41.33
MSFT,2015-03-13T00:00:00-04:00,40.7
MSFT,2015-03-16T00:00:00-04:00,41.47
MSFT,2015-03-17T00:00:00-04:00,41.37
MSFT,2015-03-18T00:00:00-04:00,41.43
MSFT,2015-03-19T00:00:00-04:00,42.25
51 changes: 51 additions & 0 deletions examples/python/stock_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import os
dir_path = os.path.dirname(os.path.realpath(__file__))
print dir_path
input_data=dir_path+"/"+"/resources/stock_data.csv"
data = []
with open( input_data, "r") as outfile:
outfile.readline()
for line in outfile:
data.append(line)

def filter_func(a):
input_data=a.split(",")
if float(input_data[2])> 30:
return True
return False


from pyapex import createApp

def filter_func(a):
input_data=a.split(",")
if float(input_data[2])> 30:
return True
return False


from pyapex import createApp
a=createApp('python_app').from_data(data) \
.filter('filter_operator',filter_func) \
.to_console(name='endConsole') \
.launch(False)

51 changes: 51 additions & 0 deletions examples/python/word_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import os
dir_path = os.path.dirname(os.path.realpath(__file__))
print dir_path
input_data=dir_path+"/"+"/resources/hadoop_word_count.txt"
data = []
with open( input_data, "r") as outfile:
outfile.readline()
for line in outfile:
for d in line.split(' '):
if len(d):
data.append(d)


from pyapex import createApp
from pyapex.functions import ReduceFunction
from pyapex.functions.window import TriggerType,Trigger,TriggerOption
class MyReduceFunction(ReduceFunction):
def reduce(self, ip1, ip2):
return ip1+ip2

t=TriggerOption.at_watermark()
t.firingOnlyUpdatedPanes()
t.accumulatingFiredPanes()
t.withEarlyFiringsAtEvery(count=4)


from pyapex import createApp
a=createApp('reduce_app2').from_data(data) \
.window(window='TIME', duration=110, trigger=t,allowed_lateness=100) \
.countByKey("countByKey") \
.to_console(name='endConsole') \
.launch(False)
2 changes: 2 additions & 0 deletions library/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
<version>3.8.0-SNAPSHOT</version>
</parent>



<artifactId>malhar-library</artifactId>
<packaging>jar</packaging>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io.fs;

import java.util.List;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;

public class InMemoryDataInputOperator<T> implements InputOperator
{
private List<T> inputData = null;
private boolean emissionCompleted = false;
public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>();

public InMemoryDataInputOperator()
{
inputData = null;
}

public InMemoryDataInputOperator(List<T> data)
{
inputData = data;
}

@Override
public void emitTuples()
{
if (emissionCompleted) {
return;
}
for (T data : inputData) {
outputPort.emit(data);
}
emissionCompleted = true;
}

@Override
public void beginWindow(long l)
{
}

@Override
public void endWindow()
{
}

@Override
public void setup(Context.OperatorContext context)
{
}

@Override
public void teardown()
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.window.accumulation;

import org.apache.apex.malhar.lib.window.Accumulation;

public interface Reduce<T> extends Accumulation<T,T,T>
{
public T reduce(T input1, T input2);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
*/
package org.apache.apex.malhar.lib.window.accumulation;

import org.apache.apex.malhar.lib.window.Accumulation;

/**
* An easy to use reduce Accumulation
* @param <INPUT>
*
* @since 3.5.0
*/
public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT>
public abstract class ReduceFn<INPUT> implements Reduce<INPUT>
{
@Override
public INPUT defaultAccumulatedValue()
Expand Down
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ pages:
- Apache Apex Malhar: index.md
- APIs:
- SQL: apis/calcite.md
- Python Support:
- Pyshell: python/main.md
- Operators:
- Block Reader: operators/block_reader.md
- CSV Formatter: operators/csvformatter.md
Expand Down
Loading

0 comments on commit a043f96

Please sign in to comment.