diff --git a/.gitignore b/.gitignore
index d69387d9f9..138cc7bfd4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,3 +12,6 @@ nb-configuration.xml
hadoop.log
site/
.checkstyle
+*.pyc
+*.out
+examples/python/output
diff --git a/docs/python/main.md b/docs/python/main.md
new file mode 100644
index 0000000000..0e4f379d8b
--- /dev/null
+++ b/docs/python/main.md
@@ -0,0 +1,38 @@
+#Developing Streaming Application in Python#
+
+Currently we have exposed basic support for Stateless Support.
+
+##Requirements:##
+* Python 2.7
+* py4j
+ Please install py4j on your machine.
+ ```
+ pip install py4j
+ ```
+
+
+Once you have pulled Apache Malhar project, go to python project and follow next steps:
+
+* Compile all projects under Apache Malhar and make sure you have hadoop installed on local node.
+* Once compilation finish, go to python/script directory and launch ./pyshell
+* This will launch python shell. Now you can develop your application using python shell.
+
+You can write simpler application using available apis as well provide custom functions written in python.
+
+```
+
+def filter_function(a):
+ input_data=a.split(',')
+ if float(input_data[2])> 100:
+ return True
+ return False
+
+from pyapex import createApp
+a=createApp('python_app').from_kafka09('localhost:2181','test_topic') \
+ .filter('filter_operator',filter_function) \
+ .to_console(name='endConsole') \
+ .launch(False)
+```
+
+
+Note: Currently developer need to ensure that required python dependencies are installed on Hadoop cluster.
diff --git a/examples/python/resources/hadoop_word_count.txt b/examples/python/resources/hadoop_word_count.txt
new file mode 100644
index 0000000000..da9cd3af3e
--- /dev/null
+++ b/examples/python/resources/hadoop_word_count.txt
@@ -0,0 +1,15 @@
+Hadoop is the Elephant King!
+A yellow and elegant thing.
+He never forgets
+Useful data, or lets
+An extraneous element cling!
+A wonderful king is Hadoop.
+The elephant plays well with Sqoop.
+But what helps him to thrive
+Are Impala, and Hive,
+And HDFS in the group.
+Hadoop is an elegant fellow.
+An elephant gentle and mellow.
+He never gets mad,
+Or does anything bad,
+Because, at his core, he is yellow.
diff --git a/examples/python/resources/stock_data.csv b/examples/python/resources/stock_data.csv
new file mode 100644
index 0000000000..7b4a77f8f0
--- /dev/null
+++ b/examples/python/resources/stock_data.csv
@@ -0,0 +1,96 @@
+symbol,timestamp,open
+MSFT,2014-10-31T00:00:00-04:00,46.31618
+MSFT,2014-11-03T00:00:00-05:00,46.26685
+MSFT,2014-11-04T00:00:00-05:00,46.6714
+MSFT,2014-11-05T00:00:00-05:00,47.16475
+MSFT,2014-11-06T00:00:00-05:00,47.22396
+MSFT,2014-11-07T00:00:00-05:00,48.26987
+MSFT,2014-11-10T00:00:00-05:00,48.04292
+MSFT,2014-11-11T00:00:00-05:00,48.2008
+MSFT,2014-11-12T00:00:00-05:00,47.91465
+MSFT,2014-11-13T00:00:00-05:00,48.16133
+MSFT,2014-11-14T00:00:00-05:00,49.07897
+MSFT,2014-11-17T00:00:00-05:00,48.75336
+MSFT,2014-11-18T00:00:00-05:00,48.78283
+MSFT,2014-11-19T00:00:00-05:00,48.31615
+MSFT,2014-11-20T00:00:00-05:00,47.66082
+MSFT,2014-11-21T00:00:00-05:00,48.67361
+MSFT,2014-11-24T00:00:00-05:00,47.65089
+MSFT,2014-11-25T00:00:00-05:00,47.32322
+MSFT,2014-11-26T00:00:00-05:00,47.15442
+MSFT,2014-11-28T00:00:00-05:00,47.61117
+MSFT,2014-12-01T00:00:00-05:00,47.54167
+MSFT,2014-12-02T00:00:00-05:00,48.49488
+MSFT,2014-12-03T00:00:00-05:00,48.09771
+MSFT,2014-12-04T00:00:00-05:00,48.04806
+MSFT,2014-12-05T00:00:00-05:00,48.47502
+MSFT,2014-12-08T00:00:00-05:00,47.97855
+MSFT,2014-12-09T00:00:00-05:00,46.77711
+MSFT,2014-12-10T00:00:00-05:00,47.24379
+MSFT,2014-12-11T00:00:00-05:00,46.74732
+MSFT,2014-12-12T00:00:00-05:00,46.35014
+MSFT,2014-12-15T00:00:00-05:00,46.86647
+MSFT,2014-12-16T00:00:00-05:00,45.57566
+MSFT,2014-12-17T00:00:00-05:00,44.73166
+MSFT,2014-12-18T00:00:00-05:00,46.25085
+MSFT,2014-12-19T00:00:00-05:00,47.27357
+MSFT,2014-12-22T00:00:00-05:00,47.44237
+MSFT,2014-12-23T00:00:00-05:00,48.0282
+MSFT,2014-12-24T00:00:00-05:00,48.2963
+MSFT,2014-12-26T00:00:00-05:00,48.06792
+MSFT,2014-12-29T00:00:00-05:00,47.36294
+MSFT,2014-12-30T00:00:00-05:00,47.10477
+MSFT,2014-12-31T00:00:00-05:00,46.39979
+MSFT,2015-01-02T00:00:00-05:00,46.33028
+MSFT,2015-01-05T00:00:00-05:00,46.04234
+MSFT,2015-01-06T00:00:00-05:00,46.05227
+MSFT,2015-01-07T00:00:00-05:00,45.65509
+MSFT,2015-01-08T00:00:00-05:00,46.41965
+MSFT,2015-01-09T00:00:00-05:00,47.27357
+MSFT,2015-01-12T00:00:00-05:00,47.08492
+MSFT,2015-01-13T00:00:00-05:00,46.6381
+MSFT,2015-01-14T00:00:00-05:00,45.63523
+MSFT,2015-01-15T00:00:00-05:00,45.8934
+MSFT,2015-01-16T00:00:00-05:00,44.98983
+MSFT,2015-01-20T00:00:00-05:00,45.97283
+MSFT,2015-01-21T00:00:00-05:00,45.61537
+MSFT,2015-01-22T00:00:00-05:00,46.16149
+MSFT,2015-01-23T00:00:00-05:00,47.02534
+MSFT,2015-01-26T00:00:00-05:00,46.66788
+MSFT,2015-01-27T00:00:00-05:00,42.6465
+MSFT,2015-01-28T00:00:00-05:00,42.43799
+MSFT,2015-01-29T00:00:00-05:00,40.64078
+MSFT,2015-01-30T00:00:00-05:00,41.25639
+MSFT,2015-02-02T00:00:00-05:00,40.30318
+MSFT,2015-02-03T00:00:00-05:00,41.33583
+MSFT,2015-02-04T00:00:00-05:00,41.64364
+MSFT,2015-02-05T00:00:00-05:00,41.92166
+MSFT,2015-02-06T00:00:00-05:00,42.37841
+MSFT,2015-02-09T00:00:00-05:00,41.94152
+MSFT,2015-02-10T00:00:00-05:00,42.43799
+MSFT,2015-02-11T00:00:00-05:00,42.34863
+MSFT,2015-02-12T00:00:00-05:00,42.38834
+MSFT,2015-02-13T00:00:00-05:00,43.07346
+MSFT,2015-02-17T00:00:00-05:00,43.97
+MSFT,2015-02-18T00:00:00-05:00,43.63
+MSFT,2015-02-19T00:00:00-05:00,43.27
+MSFT,2015-02-20T00:00:00-05:00,43.5
+MSFT,2015-02-23T00:00:00-05:00,43.7
+MSFT,2015-02-24T00:00:00-05:00,44.15
+MSFT,2015-02-25T00:00:00-05:00,43.95
+MSFT,2015-02-26T00:00:00-05:00,43.99
+MSFT,2015-02-27T00:00:00-05:00,44.14
+MSFT,2015-03-02T00:00:00-05:00,43.67
+MSFT,2015-03-03T00:00:00-05:00,43.56
+MSFT,2015-03-04T00:00:00-05:00,43.01
+MSFT,2015-03-05T00:00:00-05:00,43.07
+MSFT,2015-03-06T00:00:00-05:00,43
+MSFT,2015-03-09T00:00:00-04:00,42.19
+MSFT,2015-03-10T00:00:00-04:00,42.35
+MSFT,2015-03-11T00:00:00-04:00,42.32
+MSFT,2015-03-12T00:00:00-04:00,41.33
+MSFT,2015-03-13T00:00:00-04:00,40.7
+MSFT,2015-03-16T00:00:00-04:00,41.47
+MSFT,2015-03-17T00:00:00-04:00,41.37
+MSFT,2015-03-18T00:00:00-04:00,41.43
+MSFT,2015-03-19T00:00:00-04:00,42.25
diff --git a/examples/python/stock_filter.py b/examples/python/stock_filter.py
new file mode 100644
index 0000000000..c40313a9a5
--- /dev/null
+++ b/examples/python/stock_filter.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+dir_path = os.path.dirname(os.path.realpath(__file__))
+print dir_path
+input_data=dir_path+"/"+"/resources/stock_data.csv"
+data = []
+with open( input_data, "r") as outfile:
+ outfile.readline()
+ for line in outfile:
+ data.append(line)
+
+def filter_func(a):
+ input_data=a.split(",")
+ if float(input_data[2])> 30:
+ return True
+ return False
+
+
+from pyapex import createApp
+
+def filter_func(a):
+ input_data=a.split(",")
+ if float(input_data[2])> 30:
+ return True
+ return False
+
+
+from pyapex import createApp
+a=createApp('python_app').from_data(data) \
+ .filter('filter_operator',filter_func) \
+ .to_console(name='endConsole') \
+ .launch(False)
+
diff --git a/examples/python/word_count.py b/examples/python/word_count.py
new file mode 100644
index 0000000000..703ed14298
--- /dev/null
+++ b/examples/python/word_count.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+dir_path = os.path.dirname(os.path.realpath(__file__))
+print dir_path
+input_data=dir_path+"/"+"/resources/hadoop_word_count.txt"
+data = []
+with open( input_data, "r") as outfile:
+ outfile.readline()
+ for line in outfile:
+ for d in line.split(' '):
+ if len(d):
+ data.append(d)
+
+
+from pyapex import createApp
+from pyapex.functions import ReduceFunction
+from pyapex.functions.window import TriggerType,Trigger,TriggerOption
+class MyReduceFunction(ReduceFunction):
+ def reduce(self, ip1, ip2):
+ return ip1+ip2
+
+t=TriggerOption.at_watermark()
+t.firingOnlyUpdatedPanes()
+t.accumulatingFiredPanes()
+t.withEarlyFiringsAtEvery(count=4)
+
+
+from pyapex import createApp
+a=createApp('reduce_app2').from_data(data) \
+ .window(window='TIME', duration=110, trigger=t,allowed_lateness=100) \
+ .countByKey("countByKey") \
+ .to_console(name='endConsole') \
+ .launch(False)
diff --git a/library/pom.xml b/library/pom.xml
index 17908ddce2..c334c6e32c 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -29,6 +29,8 @@
3.8.0-SNAPSHOT
+
+
malhar-libraryjar
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/InMemoryDataInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/InMemoryDataInputOperator.java
new file mode 100644
index 0000000000..b1900901a8
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/InMemoryDataInputOperator.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.util.List;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+
+public class InMemoryDataInputOperator implements InputOperator
+{
+ private List inputData = null;
+ private boolean emissionCompleted = false;
+ public final transient DefaultOutputPort outputPort = new DefaultOutputPort();
+
+ public InMemoryDataInputOperator()
+ {
+ inputData = null;
+ }
+
+ public InMemoryDataInputOperator(List data)
+ {
+ inputData = data;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (emissionCompleted) {
+ return;
+ }
+ for (T data : inputData) {
+ outputPort.emit(data);
+ }
+ emissionCompleted = true;
+ }
+
+ @Override
+ public void beginWindow(long l)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+}
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Reduce.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Reduce.java
new file mode 100644
index 0000000000..b6f51cec17
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Reduce.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+public interface Reduce extends Accumulation
+{
+ public T reduce(T input1, T input2);
+}
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java
index 2b1b635450..f52674b971 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java
@@ -18,7 +18,6 @@
*/
package org.apache.apex.malhar.lib.window.accumulation;
-import org.apache.apex.malhar.lib.window.Accumulation;
/**
* An easy to use reduce Accumulation
@@ -26,7 +25,7 @@
*
* @since 3.5.0
*/
-public abstract class ReduceFn implements Accumulation
+public abstract class ReduceFn implements Reduce
{
@Override
public INPUT defaultAccumulatedValue()
diff --git a/mkdocs.yml b/mkdocs.yml
index 75a862a9aa..c6ecb16841 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -5,6 +5,8 @@ pages:
- Apache Apex Malhar: index.md
- APIs:
- SQL: apis/calcite.md
+- Python Support:
+ - Pyshell: python/main.md
- Operators:
- Block Reader: operators/block_reader.md
- CSV Formatter: operators/csvformatter.md
diff --git a/pom.xml b/pom.xml
index dc9ed8b6b6..811432e709 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
3.6.0false-Xmx2048m
+ 1.4.9
@@ -209,6 +210,7 @@
library
+ pythoncontribkafkaexamples
@@ -233,6 +235,31 @@
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ test-jar
+ test
+
+
+ org.codehaus.jackson
+ jackson-core-asl
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ commons-beanutils
+ commons-beanutils
+
+
+ commons-beanutils
+ commons-beanutils-core
+
+
+
diff --git a/python/.gitignore b/python/.gitignore
new file mode 100644
index 0000000000..d37a857dba
--- /dev/null
+++ b/python/.gitignore
@@ -0,0 +1,2 @@
+apex-python/deps/*.*
+apex-python/deps/pyapex-0.0.4-src.zip
diff --git a/python/apex-python/.gitignore b/python/apex-python/.gitignore
new file mode 100644
index 0000000000..2723de6f7c
--- /dev/null
+++ b/python/apex-python/.gitignore
@@ -0,0 +1 @@
+jars
diff --git a/python/apex-python/README b/python/apex-python/README
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/python/apex-python/deps/py4j-0.10.4-src.zip b/python/apex-python/deps/py4j-0.10.4-src.zip
new file mode 100644
index 0000000000..72bb34d0fa
Binary files /dev/null and b/python/apex-python/deps/py4j-0.10.4-src.zip differ
diff --git a/python/apex-python/setup.py b/python/apex-python/setup.py
new file mode 100644
index 0000000000..f75dc526ab
--- /dev/null
+++ b/python/apex-python/setup.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+from setuptools import setup, find_packages
+
+# Utility function to read the README file.
+# Used for the long_description. It's nice, because now 1) we have a top level
+# README file and 2) it's easier to type in the README file than to put a raw
+# string in below ...
+def read(fname):
+ return open(os.path.join(os.path.dirname(__file__), fname)).read()
+DOC_DIR = "doc"
+DIST_DIR = "dist"
+
+setup(
+ name = "pyapex",
+ version = "0.0.4",
+ description = ("Python Source code for Apache Apex"),
+ license = "Apache License 2.0",
+ packages=['pyapex','pyapex.runtime','pyapex.functions'],
+ package_dir={"": "src"},
+ long_description=read('README'),
+ python_requires='~=2.7',
+ classifiers=[
+ "Development Status :: 1 - Beta",
+ "Topic :: Python Support",
+ "License :: Apache License 2.0",
+ ],
+)
diff --git a/python/apex-python/src/MANIFEST b/python/apex-python/src/MANIFEST
new file mode 100644
index 0000000000..44effe21c1
--- /dev/null
+++ b/python/apex-python/src/MANIFEST
@@ -0,0 +1,3 @@
+# file GENERATED by distutils, do NOT edit
+setup.py
+pyapex/__init__.py
diff --git a/python/apex-python/src/pyapex/__init__.py b/python/apex-python/src/pyapex/__init__.py
new file mode 100644
index 0000000000..e1c34f16d4
--- /dev/null
+++ b/python/apex-python/src/pyapex/__init__.py
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from .apexapp import createApp
+from .apexapp import getApp
+from runtime.worker_accum import ReduceWorkerImpl
diff --git a/python/apex-python/src/pyapex/apexapp.py b/python/apex-python/src/pyapex/apexapp.py
new file mode 100644
index 0000000000..42ef27b912
--- /dev/null
+++ b/python/apex-python/src/pyapex/apexapp.py
@@ -0,0 +1,210 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import types
+
+import dill
+from uuid import uuid1
+from py4j.protocol import Py4JJavaError
+from shellconn import ShellConnector
+
+def createApp(name):
+ shellConnector = ShellConnector()
+ return ApexStreamingApp(name, shellConnector)
+
+
+def getApp(name):
+ shellConnector = ShellConnector()
+ java_app = shellConnector.get_entry_point().getAppByName(name)
+ return ApexStreamingApp(name, java_app=java_app)
+
+'''
+This is Python Wrapper Around ApexStreamingApp
+If java streaming app is not found then no apis can be called on this wrapper.
+'''
+class ApexStreamingApp():
+ app_id = None
+ streaming_factory = None
+ java_streaming_app = None
+ instance_id = None
+ shell_connector = None
+ serialized_file_list = []
+
+ def __init__(self, name, shell_connector=None, java_app=None):
+ if shell_connector is None and java_app is None:
+ raise Exception("Invalid App initialization")
+ if java_app is None:
+ self.shell_connector = shell_connector
+ self.java_streaming_app = shell_connector.get_entry_point().createApp(name)
+ else:
+ self.java_streaming_app = java_app
+ self.shell_connector = shell_connector
+ self.instance_id = uuid1().urn[9:]
+
+ '''
+ This fuction will initialize input adapter to read from hdfs adapter
+ '''
+ def from_directory(self, folder_path):
+ self.java_streaming_app = self.java_streaming_app.fromFolder(folder_path)
+ return self
+
+ def from_kafka08(self, zoopkeepers, topic):
+ self.java_streaming_app = self.java_streaming_app.fromKafka08(zoopkeepers, topic)
+ return self
+
+ def from_kafka09(self, zoopkeepers, topic):
+ self.java_streaming_app = self.java_streaming_app.fromKafka09(zoopkeepers, topic)
+ return self
+
+ '''
+ Allow Data To be provided as input on shell or directly in python file such as
+
+ '''
+ def from_data(self, data):
+ if not isinstance(data, list):
+ raise Exception
+ data_for_java = self.shell_connector.get_jvm_gateway().jvm.java.util.ArrayList()
+ types_data = [int, float, str, bool, tuple, dict]
+ for d in data:
+ if type(d) in types_data:
+ data_for_java.append(d)
+ self.java_streaming_app = self.java_streaming_app.fromData(data_for_java)
+ return self
+
+ def to_console(self, name=None):
+ self.java_streaming_app = self.java_streaming_app.toConsole(name)
+ return self
+
+ def to_kafka_08(self, name=None, topic=None, brokerList=None, **kwargs):
+ properties = {}
+ if brokerList is not None:
+ properties['bootstrap.servers'] = brokerList
+ for key in kwargs.keys():
+ properties[key] = kwargs[key]
+ property_map = self.shell_connector.get_jvm_gateway().jvm.java.util.HashMap()
+ for key in properties.keys():
+ property_map.put(key, properties[key])
+ self.java_streaming_app = self.java_streaming_app.toKafka08(name, topic, property_map)
+ return self
+
+ def to_directory(self, name,file_name, directory_name, **kwargs):
+ properties = {}
+ if file_name is None or directory_name is None:
+ raise Exception("Directory Name or File name should be specified")
+ self.java_streaming_app = self.java_streaming_app.toFolder(name, file_name, directory_name)
+ return self
+
+ def map(self, name, func):
+ if not isinstance(func, types.FunctionType):
+ raise Exception
+
+ serialized_func = self.serialize_function(name, func)
+ self.java_streaming_app = self.java_streaming_app.map(name, serialized_func)
+ return self
+
+ def flatmap(self, name, func):
+ if not isinstance(func, types.FunctionType):
+ raise Exception
+ serialized_func = self.serialize_function(name, func)
+ self.java_streaming_app = self.java_streaming_app.flatMap(name, serialized_func)
+ return self
+
+ def filter(self, name, func):
+ if not isinstance(func, types.FunctionType):
+ raise Exception
+ serialized_func = self.serialize_function(name, func)
+ self.java_streaming_app = self.java_streaming_app.filter(name, serialized_func)
+ return self
+
+ def window(self,*args,**kwargs):
+ _jwindow = None
+ _jtrigger = None
+ gateway=self.shell_connector.get_jvm_gateway()
+ if 'window' not in kwargs or kwargs['window'] == 'GLOBAL':
+ _jwindow=gateway.jvm.WindowOption.GlobalWindow()
+ elif kwargs['window'] == 'TIME' and 'duration' in kwargs :
+ duration = long(kwargs['duration'])
+ _jduration = gateway.jvm.Duration(duration)
+ _jwindow = gateway.jvm.WindowOption.TimeWindows(_jduration)
+
+ elif kwargs['window'] == 'SLIDING' and 'duration' in kwargs and 'sliding_time' in kwargs:
+ duration = long(kwargs['duration'])
+ sliding_time = long(kwargs['sliding_time'])
+ _jduration = gateway.jvm.Duration(duration)
+ _jslideby = gateway.jvm.Duration(sliding_time)
+ _jwindow = gateway.jvm.SlidingTimeWindows(_jduration,_jslideby)
+ elif kwargs['window'] == 'SESSION' and 'mingap' in kwargs:
+ mingap = long(kwargs['mingap'])
+ _jmingap = gateway.jvm.Duration(mingap)
+ _jwindow = gateway.jvm.SessionWindows(_jmingap)
+ else:
+ raise Exception("Invalid Window Options are provided")
+
+ _jtrigger= None
+ if 'trigger' in kwargs:
+ from pyapex.functions.window import TriggerOption
+ if isinstance(kwargs['trigger'], TriggerOption ):
+ _jtrigger = TriggerOption.get_java_trigger_options(kwargs['trigger'],gateway)
+ else:
+ raise Exception("Incorrect Trigger Option")
+ from pyapex.functions.window import TriggerOption
+ _jallowed_lateness= None
+ if 'allowed_lateness' in kwargs:
+ _jallowed_lateness = gateway.jvm.Duration( long(kwargs['allowed_lateness']))
+ self.java_streaming_app = self.java_streaming_app.window(_jwindow,_jtrigger,_jallowed_lateness)
+ return self
+
+ def count(self,count="counter"):
+ self.java_streaming_app = self.java_streaming_app.count(name)
+ return self
+
+ def countByKey(self,name="counter"):
+ self.java_streaming_app = self.java_streaming_app.countByKey(name)
+ return self
+
+ def reduce(self,name, reduceFn):
+ serialized_func = self.serialize_function(name, reduceFn)
+ self.java_streaming_app = self.java_streaming_app.reduce(name,serialized_func)
+ return self
+
+ def reduceByKey(self,name, reduceFn):
+ serialized_func = self.serialize_function(name, reduceFn)
+ self.java_streaming_app = self.java_streaming_app.reduceByKey(name,serialized_func)
+ return self
+
+ def launch(self, local_mode=False):
+ try:
+ self.app_id = self.java_streaming_app.launch(local_mode)
+ return self.app_id
+ except Py4JJavaError as e:
+ import traceback
+ traceback.print_exc()
+ return e.java_exception.getMessage()
+
+ def kill(self):
+ return self.java_streaming_app.kill()
+
+ def set_config(self, key, value):
+ self.java_streaming_app = self.java_streaming_app.setConfig(key, value)
+ return self
+
+ def serialize_function(self, name, func):
+ serialized_func = bytearray()
+ serialized_func.extend(dill.dumps(func))
+ return serialized_func
diff --git a/python/apex-python/src/pyapex/commands.py b/python/apex-python/src/pyapex/commands.py
new file mode 100644
index 0000000000..3d0111f0d8
--- /dev/null
+++ b/python/apex-python/src/pyapex/commands.py
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from pyapex import getApp
+import getopt
+import sys
+
+def kill_app(app_name):
+ app = getApp(app_name)
+ if app:
+ print "KILLING APP " + str(app_name)
+ app.kill()
+ else:
+ print "Application not found for name " + str(app_name)
+
+
+def shutdown_app(app_name):
+ app = getApp(app_name)
+ if app:
+ print "SHUTTING DOWN APP NOW"
+ app.kill()
+ else:
+ print "Application not found for name " + str(app_name)
+
+
+func_map = {"KILL_MODE": kill_app, "SHUTDOWN_MODE": shutdown_app}
+
+
+def parse_argument():
+ print sys.argv
+ try:
+ opts, args = getopt.getopt(sys.argv[1:], "k:s:")
+ except getopt.GetoptError as err:
+ # print help information and exit:
+ print str(err) # will print something like "option -a not recognized"
+ sys.exit(2)
+ for opt, args in opts:
+ if opt == '-k':
+ func_map['KILL_MODE'](args)
+ elif opt == '-s':
+ func_map['KILL_MODE'](args)
+ else:
+ assert False, "Invalid Option"
+
+
+if __name__ == "__main__":
+ parse_argument()
diff --git a/python/apex-python/src/pyapex/functions/__init__.py b/python/apex-python/src/pyapex/functions/__init__.py
new file mode 100644
index 0000000000..08bc8a08f0
--- /dev/null
+++ b/python/apex-python/src/pyapex/functions/__init__.py
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from .functions import ReduceFunction
+from .transforms import WorkerImpl
diff --git a/python/apex-python/src/pyapex/functions/functions.py b/python/apex-python/src/pyapex/functions/functions.py
new file mode 100644
index 0000000000..e0e47d3dea
--- /dev/null
+++ b/python/apex-python/src/pyapex/functions/functions.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from abc import ABCMeta, abstractmethod
+
+class AbstractAccumulatorPythonWorker(object):
+ __metaclass__ = ABCMeta
+
+
+ @abstractmethod
+ def setObject(self, obj, opType):
+ pass
+
+ @abstractmethod
+ def defaultAccumulatedValue(self):
+ pass
+
+
+ @abstractmethod
+ def getOutput(accumulated):
+ pass
+
+
+ @abstractmethod
+ def getRetraction(output):
+ pass
+
+ @abstractmethod
+ def accumulate(self, accumulated, input):
+ pass
+
+ @abstractmethod
+ def merge(self, input1, input2):
+ pass
+
+
+class AccumulatorWorkerImpl(AbstractAccumulatorPythonWorker):
+
+ accum_obj = None
+ opType = None
+ counter = 0;
+
+ def __init__(self, gateway, opType):
+ self.gateway = gateway
+ self.opType = opType
+
+ @abstractmethod
+ def setObject(self, obj, opType):
+ try:
+ import os, imp
+ import cloudpickle
+ self.accum_obj = cloudpickle.loads(obj)
+ self.opType = opType
+ except ValueError as e:
+ print str(e)
+ from traceback import print_exc
+ print_exc()
+ except Exception:
+ from traceback import print_exc
+ print_exc()
+ return "RETURN VALUE"
+
+
+ def getConfirmed(self):
+ return self.opType
+
+
+class ReduceFunction(AbstractAccumulatorPythonWorker):
+
+ def accumulate( self, accumulated, data ):
+ return self.reduce(accumulated, data)
+
+
+ def merge( self, input1, input2 ):
+ return self.reduce(input1, input2)
+
+ @abstractmethod
+ def reduce( input1, input2 ):
+ pass
+
+ def defaultAccumulatedValue( self, data ):
+ return data
+
+ def getOutput( accumulated ):
+ return accumulated
+
+ def getRetraction( output ):
+ return None
+
+ def setObject( self, obj, opType ):
+ pass
+
+ class Java:
+ implements = ["org.apache.apex.malhar.python.operator.interfaces.PythonReduceWorker"]
\ No newline at end of file
diff --git a/python/apex-python/src/pyapex/functions/transforms.py b/python/apex-python/src/pyapex/functions/transforms.py
new file mode 100644
index 0000000000..f3e3764887
--- /dev/null
+++ b/python/apex-python/src/pyapex/functions/transforms.py
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from abc import ABCMeta, abstractmethod
+
+gateway = None
+
+class AbstractPythonWorker(object):
+ __metaclass__ = ABCMeta
+
+ @abstractmethod
+ def setFunction(self, f, opType):
+ pass
+
+ @abstractmethod
+ def execute(self, tupleIn):
+ pass
+
+ class Java:
+ implements = ["org.apache.apex.malhar.python.operator.interfaces.PythonWorker"]
+
+class WorkerImpl(AbstractPythonWorker):
+ serialized_f = None
+ callable_f = None
+ opType = None
+ dataMap = None
+ counter = 0;
+
+ def __init__(self, gateway, opType):
+ self.gateway = gateway
+ self.opType = opType
+
+ def setFunction(self, f, opType):
+ try:
+ import os, imp
+ import dill
+ self.callable_f = dill.loads(f)
+ self.opType = opType
+ except ValueError as e:
+ print str(e)
+ from traceback import print_exc
+ print_exc()
+ except Exception:
+ from traceback import print_exc
+ print_exc()
+ return None
+
+ def factory(gateway, type):
+ if type == "MAP": return MapWorkerImpl(gateway, type)
+ if type == "FLAT_MAP": return FlatMapWorkerImpl(gateway, type)
+ if type == "FILTER": return FilterWorkerImpl(gateway, type)
+
+ factory = staticmethod(factory)
+
+class MapWorkerImpl(WorkerImpl):
+ def execute(self, tupleIn):
+ try:
+ result = self.callable_f(tupleIn)
+ return result
+ except ValueError as e:
+ print str(e)
+ from traceback import print_exc
+ print_exc()
+ except Exception:
+ from traceback import print_exc
+ print_exc()
+ return None
+
+class FlatMapWorkerImpl(WorkerImpl):
+ def execute(self, tupleIn):
+ try:
+ result = self.callable_f(tupleIn)
+ from py4j.java_collections import SetConverter, MapConverter, ListConverter
+ return ListConverter().convert(result, self.gateway._gateway_client)
+ return result
+ except ValueError as e:
+ print str(e)
+ from traceback import print_exc
+ print_exc()
+ except Exception:
+ from traceback import print_exc
+ print_exc()
+ return None
+
+class FilterWorkerImpl(WorkerImpl):
+ def execute(self, tupleIn):
+ try:
+ result = self.callable_f(tupleIn)
+ if type(result) != bool:
+ result = True if result is not None else False
+ return result
+ except ValueError as e:
+ print str(e)
+ from traceback import print_exc
+ print_exc()
+ except Exception:
+ from traceback import print_exc
+ print_exc()
+ return None
diff --git a/python/apex-python/src/pyapex/functions/window.py b/python/apex-python/src/pyapex/functions/window.py
new file mode 100644
index 0000000000..7e0427805e
--- /dev/null
+++ b/python/apex-python/src/pyapex/functions/window.py
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class TriggerType:
+ EARLY = 1
+ ON_TIME = 2
+ LATE = 3
+ @staticmethod
+ def isValid(trigger_type):
+ if trigger_type < TriggerType.EARLY:
+ raise Exception("Incorrect Trigger Type")
+ if trigger_type > TriggerType.LATE:
+ raise Exception("Incorrect Trigger Type")
+ return True
+
+class AccumulationMode:
+ DISCARDING = 1
+ ACCUMULATING = 2
+ ACCUMULATING_AND_RETRACTING = 3
+ @staticmethod
+ def isValid(type):
+ if type < TriggerType.DISCARDING:
+ raise Exception("Incorrect Accumulation Mode")
+ if type > TriggerType.ACCUMULATING_AND_RETRACTING:
+ raise Exception("Incorrect Accumulation Mode")
+ return True
+
+class TriggerOption(object):
+ triggers = []
+ accumulation_mode = None
+ firingOnlyUpdatedPanes = False
+ @staticmethod
+ def at_watermark():
+ triggerOption = TriggerOption()
+ trigger = Trigger(TriggerType.ON_TIME)
+ triggerOption.triggers.append(trigger)
+ return triggerOption
+
+ def withEarlyFiringsAtEvery(self,*args,**kwargs):
+ trigger = None
+ if 'count' in kwargs:
+ trigger = CountTrigger(TriggerType.EARLY, kwargs['count'])
+ if 'duration' in kwargs:
+ trigger = TimeTrigger(TriggerType.EARLY, kwargs['duration'])
+ if trigger is None:
+ raise Exception("Unsufficent for trigger")
+ self.triggers.append(trigger)
+ return self
+
+ def withLateFiringsAtEvery( self, *args, **kwargs ):
+ trigger = None
+ if 'count' in kwargs:
+ trigger = CountTrigger(TriggerType.LATE, kwargs['count'])
+ if 'duration' in kwargs:
+ trigger = TimeTrigger(TriggerType.LATE, kwargs['duration'])
+ if trigger is None:
+ raise Exception("Unsufficent for trigger")
+ self.triggers.append(trigger)
+ return self
+
+ def discardingFiredPanes(self):
+ self.accumulation_mode = AccumulationMode.DISCARDING
+ return self
+
+ def accumulatingFiredPanes(self):
+ self.accumulation_mode = AccumulationMode.ACCUMULATING
+ return self
+
+ def accumulatingAndRetractingFiredPanes(self):
+ self.accumulation_mode = AccumulationMode.ACCUMULATING_AND_RETRACTING
+ return self
+
+ def firingOnlyUpdatedPanes(self):
+ self.firingOnlyUpdatedPanes = True
+ return self
+
+ @staticmethod
+ def get_java_trigger_options(trigger_option, gateway):
+ _jtrigger_option = None
+ for trigger in trigger_option.triggers:
+ if trigger.trigger_type == TriggerType.ON_TIME:
+ _jtrigger_option = gateway.jvm.TriggerOption.AtWatermark()
+ elif trigger.trigger_type == TriggerType.EARLY:
+ if isinstance(trigger, TimeTrigger):
+ _jduration = gateway.jvm.Duration(trigger.duration)
+ _jtrigger_option = _jtrigger_option.withEarlyFiringsAtEvery(_jduration)
+ else:
+ _jcount = gateway.jvm.Duration(trigger.count)
+ _jtrigger_option = _jtrigger_option.withEarlyFiringsAtEvery(_jcount)
+ elif trigger.trigger_type == TriggerType.LATE:
+ if isinstance(trigger, TimeTrigger):
+ _jduration = gateway.jvm.Duration(trigger.duration)
+ _jtrigger_option = _jtrigger_option.withLateFiringsAtEvery(_jduration)
+ else:
+ _jcount = gateway.jvm.Duration(trigger.count)
+ _jtrigger_option = _jtrigger_option.withLateFiringsAtEvery(_jcount)
+ return _jtrigger_option
+
+class Trigger(object):
+ trigger_type = None
+
+ def __init__(self,trigger_type):
+ self.trigger_type = trigger_type
+
+class TimeTrigger(Trigger):
+ duration = None
+
+ def __init__(self, trigger_type, duration):
+ super(TimeTrigger,self).__init__(trigger_type)
+ self.duration = duration
+
+class CountTrigger(Trigger):
+ count = None
+
+ def __init__(self, trigger_type, count):
+ super(CountTrigger, self).__init__(trigger_type)
+ self.count = count
+
+
+
+
+
diff --git a/python/apex-python/src/pyapex/runtime/__init__.py b/python/apex-python/src/pyapex/runtime/__init__.py
new file mode 100644
index 0000000000..d321c94c9b
--- /dev/null
+++ b/python/apex-python/src/pyapex/runtime/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
\ No newline at end of file
diff --git a/python/apex-python/src/pyapex/runtime/worker.py b/python/apex-python/src/pyapex/runtime/worker.py
new file mode 100755
index 0000000000..72f3788250
--- /dev/null
+++ b/python/apex-python/src/pyapex/runtime/worker.py
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+'''
+Worker.py file is responsible for instantiating specific workers such as MapWorkerImpl, FlatMapWorkerImpl, FilterWorkerImpl.
+
+Worker.py is ran using python and then we register back WorkerImpl with Java Process for each calls.
+'''
+import sys
+import site
+from py4j.java_gateway import JavaGateway, CallbackServerParameters, GatewayParameters, java_import
+
+# TODO this may cause race condition
+def find_free_port():
+ import socket
+ s = socket.socket()
+ s.listen(0)
+ addr, found_port = s.getsockname() # Return the port number assigned.
+ s.shutdown(socket.SHUT_RDWR)
+ s.close()
+ return found_port
+
+
+def main(argv):
+ import os, getpass
+
+ PYTHON_PATH = os.environ['PYTHONPATH'] if 'PYTHONPATH' in os.environ else None
+ os.environ['PYTHONPATH'] = PYTHON_PATH + ':' + site.getusersitepackages().replace('/home/.local/',
+ '/home/' + getpass.getuser() + '/.local/') + '/'
+ sys.path.extend(os.environ['PYTHONPATH'].split(':'))
+ print "PYTHONPATH " + str(os.environ['PYTHONPATH'])
+ gateway_params = GatewayParameters(address='127.0.0.1', port=int(argv[0]), auto_convert=True)
+ callback_params = CallbackServerParameters(daemonize=False, eager_load=True, port=0)
+ gateway = JavaGateway(gateway_parameters=gateway_params, callback_server_parameters=callback_params)
+
+ # Retrieve the port on which the python callback server was bound to.
+ python_port = gateway.get_callback_server().get_listening_port()
+
+ # Register python callback server with java gateway server
+ # Note that we use the java_gateway_server attribute that
+ # retrieves the GatewayServer instance.
+ gateway.java_gateway_server.resetCallbackClient(
+ gateway.java_gateway_server.getCallbackClient().getAddress(),
+ python_port)
+
+ # Instantiate WorkerImpl for PythonWorker java interface and regsiter with PythonWorkerProxy in Java.
+ from pyapex.functions import WorkerImpl
+ print "Registering Python Worker "
+ workerImpl = WorkerImpl.factory(gateway, argv[1])
+ if argv[1] in ['REDUCE','REDUCE_BY_KEY']:
+ serialized_object = gateway.entry_point.getSerializedData()
+ import dill
+ workerImpl=dill.loads(serialized_object)
+ print type(workerImpl)
+ gateway.entry_point.register(workerImpl)
+ else:
+ gateway.entry_point.register(workerImpl)
+
+ print "Python process started with type: " + argv[1]
+
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
diff --git a/python/apex-python/src/pyapex/shellconn.py b/python/apex-python/src/pyapex/shellconn.py
new file mode 100644
index 0000000000..47dfa61513
--- /dev/null
+++ b/python/apex-python/src/pyapex/shellconn.py
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from py4j.java_gateway import JavaGateway,java_import
+
+
+class ShellConnector(object):
+ gateway = None
+ entry_point = None
+
+ def __init__(self):
+ self.gateway = JavaGateway()
+ java_import(self.gateway.jvm, 'org.joda.time.*')
+ java_import(self.gateway.jvm, 'org.apache.apex.malhar.lib.window.*')
+
+ def __new__(cls):
+ if not hasattr(cls, 'instance'):
+ cls.instance = super(ShellConnector, cls).__new__(cls)
+ return cls.instance
+
+ def get_jvm_gateway(self):
+ return self.gateway
+
+ def get_entry_point(self):
+ return self.gateway.entry_point
diff --git a/python/apex-python/src/setup.py b/python/apex-python/src/setup.py
new file mode 100644
index 0000000000..135b93497e
--- /dev/null
+++ b/python/apex-python/src/setup.py
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from distutils.core import setup
+setup(name='pyapex',
+ version='0.0.4',
+ py_modules=['pyapex','pyapex.runtime','pyapex.functions'],
+ )
+
diff --git a/python/create_zip.sh b/python/create_zip.sh
new file mode 100755
index 0000000000..517b329072
--- /dev/null
+++ b/python/create_zip.sh
@@ -0,0 +1,26 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+VERSION=$1
+echo $1
+cd apex-python/src
+zip -r pyapex-$VERSION-src.zip pyapex
+mv pyapex-$VERSION-src.zip ../deps
+cd ../../
diff --git a/python/pom.xml b/python/pom.xml
new file mode 100644
index 0000000000..0127cb578e
--- /dev/null
+++ b/python/pom.xml
@@ -0,0 +1,349 @@
+
+
+ 4.0.0
+
+
+ org.apache.apex
+ malhar
+ 3.8.0-SNAPSHOT
+
+
+ malhar-python
+ Apache Apex Malhar Python Support
+ jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 2.4
+
+
+
+ test-jar
+
+ package
+
+
+
+
+ maven-assembly-plugin
+
+
+
+ org.apache.apex.malhar.python.PyApex
+
+
+
+ jar-with-dependencies
+
+
+
+
+
+ maven-dependency-plugin
+
+
+ create-client-mvn-generated-classpath
+ generate-resources
+
+ build-classpath
+
+
+
+ ${project.build.directory}/mvn-generated-runtime-classpath
+
+
+
+
+
+ create-client-mvn-generated-classpath-no-hadoop
+ generate-resources
+
+ build-classpath
+
+
+
+ ${project.build.directory}/mvn-generated-runtime-classpath-no-hadoop
+
+ org.apache.hadoop
+
+
+
+ create-mvn-generated-classpath
+ generate-test-resources
+
+ build-classpath
+
+
+
+ ${project.build.directory}/test-classes/mvn-generated-classpath
+
+
+
+
+
+ maven-dependency-plugin
+ 2.8
+
+
+ copy-dependencies-for-launch-time
+ prepare-package
+
+ copy-dependencies
+
+
+ ${project.build.directory}/libs
+
+
+
+
+
+ maven-dependency-plugin
+ 2.8
+
+
+ copy-dependencies-for-run-time
+ prepare-package
+
+ copy-dependencies
+
+
+ ${project.build.directory}/runtime_libs
+ provided
+ org.apache.hadoop
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ exec-maven-plugin
+ org.codehaus.mojo
+
+
+ Python Zip Build
+ generate-sources
+
+ exec
+
+
+ create_zip.sh
+
+ 0.0.4
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-client
+ ${hadoop.version}
+ provided
+
+
+ joda-time
+ joda-time
+ 2.3
+ provided
+
+
+ org.apache.apex
+ apex-engine
+ ${apex.core.version}
+
+
+ org.apache.apex
+ apex-api
+ ${apex.core.version}
+
+
+ org.apache.apex
+ apex-common
+ ${apex.core.version}
+
+
+
+ net.sf.py4j
+ py4j
+ 0.10.4
+
+
+
+ org.apache.apex
+ malhar-stream
+ ${project.version}
+ provided
+
+
+
+ org.apache.apex
+ malhar-contrib
+ ${project.version}
+
+
+
+
+ org.apache.kafka
+ kafka_2.10
+ 0.8.2.1
+ provided
+
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.8.2.1
+ provided
+
+
+ org.apache.kafka
+ kafka_2.10
+ 0.8.2.1
+ provided
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ org.slf4j
+ slf4j-simple
+
+
+ log4j
+ log4j
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+ org.apache.apex
+ malhar-library
+ ${project.version}
+ provided
+
+
+ org.powermock
+ powermock-module-junit4
+ ${powermock.version}
+ test
+
+
+ org.powermock
+ powermock-api-mockito
+ ${powermock.version}
+ test
+
+
+
+
diff --git a/python/scripts/log4j.properties b/python/scripts/log4j.properties
new file mode 100644
index 0000000000..a57fbc06e6
--- /dev/null
+++ b/python/scripts/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=TRACE,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=TRACE
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=trace
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=trace
+log4j.logger.org.apache.apex=trace
diff --git a/python/scripts/pyshell b/python/scripts/pyshell
new file mode 100755
index 0000000000..e2366975fb
--- /dev/null
+++ b/python/scripts/pyshell
@@ -0,0 +1,197 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# Support functions
+
+# TODO :
+# ACCEPTED ARGUMENTS:
+# shell
+# launch input_python.py
+# kill app-id
+# shutdown app-id
+
+usage() { echo "Usage: $0 [-sh] [-l ] [-m HADOOP|LOCAL ] [-k ] [-t ]" 1>&2; exit 1; }
+COMMAND_MODE='SHELL_MODE'
+RUNNING_MODE='HADOOP'
+EXECUTABLE_FILE=false
+while getopts ":s:l:k:t:m:" o; do
+ case "${o}" in
+ s)
+ s=${OPTARG}
+ COMMAND_MODE="SHELL_MODE"
+ ;;
+ l)
+ EXECUTABLE_FILE=${OPTARG}
+ COMMAND_MODE="LAUNCH_MODE"
+ ;;
+ k)
+ APP_NAME=${OPTARG}
+ COMMAND_MODE="KILL_MODE"
+ ;;
+ m)
+ RUNNING_MODE=${OPTARG}
+ ;;
+ t)
+ usage
+ echo "NOT IMPLEMENTTED YET"
+ ;;
+ *)
+ usage
+ ;;
+ esac
+done
+echoerr() { echo "$@" 1>&2; }
+if [ -e "pyapex" ]
+then
+ rm pyapex
+fi
+
+real_dir() {
+ SOURCE="${1:-${BASH_SOURCE[0]}}"
+ while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink
+ SOURCE_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
+ SOURCE="$(readlink "$SOURCE")"
+ [[ $SOURCE != /* ]] && SOURCE="$SOURCE_DIR/$SOURCE" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
+ done
+ SOURCE_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
+ echo $SOURCE_DIR
+}
+script_dir=$(real_dir "${BASH_SOURCE[0]}")
+# Create missing clirc file for current user
+if [ ! -f "${HOME}/.dt/clirc" ]; then
+ mkdir -p "${HOME}/.dt"
+ cat >${HOME}/.dt/clirc </dev/null`
+fi
+
+if [ "$DT_CLIENT_OPTS" = "" ]; then
+# DT_CLIENT_OPTS="-Xmx1024m -XX:MaxPermSize=512m -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled"
+ DT_CLIENT_OPTS="-Xmx1024m -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled"
+fi
+
+export HADOOP_CLIENT_OPTS="$DT_CLIENT_OPTS"
+
+PYTHON_BUILD="$( dirname "$0" )/../dist"
+BUILD_DIR="$( dirname "$0" )/../target"
+PYAPEX_HOME="`pwd`/../apex-python"
+
+if [[ (-z "$DT_HADOOP" ) || ( "$RUNNING_MODE" == "LOCAL" ) ]]; then
+ echo "Development Mode without Hadoop Installation: Using jars from mvn generated path"
+ MVN_GENERATED_PATH="$BUILD_DIR/mvn-generated-runtime-classpath"
+else
+ echo "Development Mode with Hadoop Installation: Using jars to be provided externally "
+ MVN_GENERATED_PATH="$BUILD_DIR/mvn-generated-runtime-classpath-no-hadoop"
+fi
+if [ -f "$MVN_GENERATED_PATH" ]; then
+ # development launch mode
+ DT_CORE_JAR="$BUILD_DIR/malhar-python-3.8.0-SNAPSHOT.jar"
+ if [ ! -f "$DT_CORE_JAR" ]; then
+ echoerr "Error: Cannot find $DT_CORE_JAR";
+ exit 1;
+ fi
+ DT_CLASSPATH="$DT_CLASSPATH:$DT_CORE_JAR"
+ DT_CLASSPATH=$BASEDIR/libs'/*'":${DT_CLASSPATH}"
+ DT_CLASSPATH="$DT_CLASSPATH:`cat $MVN_GENERATED_PATH`"
+else
+ # running from installation
+ if [ -z "$DT_HADOOP" ]; then
+ echoerr "Hadoop installation not found. Please include hadoop in PATH."
+ exit 1;
+ fi
+ BASEDIR=$( cd ${script_dir}/..; pwd -P )
+fi
+
+if [ -n "$DT_CLASSPATH" ]; then
+ if [ -z "$HADOOP_CLASSPATH" ]; then
+ export HADOOP_CLASSPATH="$DT_CLASSPATH"
+ else
+ export HADOOP_CLASSPATH="$HADOOP_CLASSPATH:$DT_CLASSPATH"
+ fi
+fi
+APEXRUNERPID=0
+APEXRUNNERLOG="$( dirname "$0")/ApexRunner.out"
+rm -f $APEXRUNNERLOG
+export PYAPEX_HOME=$PYAPEX_HOME
+cp $BUILD_DIR/runtime_libs/* $PYAPEX_HOME/deps
+cp $DT_CORE_JAR $PYAPEX_HOME/deps
+echo "PYAPEX_HOME $PYAPEX_HOME"
+
+echo "Intiating PYSHELL now "
+
+#ln -sf $PYAPEX_HOME "$script_dir/pyapex"
+if [ -z "$PYTHONPATH" ]
+then
+ export PYTHONPATH="$PYAPEX_HOME/deps/pyapex-0.0.4-src.zip"
+
+else
+ export PYTHONPATH="$PYTHONPATH:$PYAPEX_HOME/deps/pyapex-0.0.4-src.zip"
+fi
+if [[ ( -z "$DT_HADOOP" ) || ( "$RUNNING_MODE" == "LOCAL" ) ]]; then
+ echo "Warning: hadoop executable not found. Running standalone with ${DT_JAVA:-java}."
+ echoerr "Warning: hadoop executable not found. Running standalone with ${DT_JAVA:-java}."
+ echo "Starting Apex Runner without hadoop"
+ export CLASSPATH=$DT_CLASSPATH
+ "${DT_JAVA:-java}" $DT_CLIENT_OPTS org.apache.apex.malhar.python.PyApex "$@" >$APEXRUNNERLOG 2>&1 &
+ APEXRUNNERPID="$!"
+ echo $APEXRUNNERPID
+else
+ echo "Warning: hadoop found. Running with $DT_HADOOP"
+ export HADOOP_USER_CLASSPATH_FIRST=1
+ # remove hadoop and duplicate slf4j binding (bash replace is too slow)
+ export HADOOP_CLASSPATH=$(echo -n "$HADOOP_CLASSPATH" | tr ":" "\n" | sed "/slf4j-log4j/d" | sed "/org\/apache\/hadoop/d" | tr "\n" ":")
+ echo "Starting Apex Runner with hadoop $@"
+ "$DT_HADOOP" org.apache.apex.malhar.python.PyApex "$@" >$APEXRUNNERLOG 2>&1 &
+ APEXRUNNERPID="$!"
+fi
+echo "Apex Runner is started as process id: " $APEXRUNNERPID
+if [ "$COMMAND_MODE" = "SHELL_MODE" ]
+then
+ PYTHONPATH=$PYTHONPATH:"python" ipython "$@"
+elif [ "$COMMAND_MODE" = "LAUNCH_MODE" ]
+then
+ PYTHONPATH=$PYTHONPATH:"python" ipython "$EXECUTABLE_FILE"
+
+elif [ "$COMMAND_MODE" = "KILL_MODE" ]
+then
+ PYTHONPATH=$PYTHONPATH:"python" python "$PYAPEX_HOME/commands.py" "-k $APP_NAME"
+fi
+if [[ "$RUNNING_MODE" == "LOCAL" ]]; then
+ sleep 60
+fi
+if ! kill $APEXRUNNERPID > /dev/null 2>&1; then
+ echo "Could not send SIGTERM to process $PID. Force killing" >/dev/null >&2
+ kill -9 $APEXRUNNERPID >/dev/null 2>&1
+fi
+if [ -e pyapex ]
+then
+ rm pyapex
+fi
diff --git a/python/src/main/java/org/apache/apex/malhar/PythonConstants.java b/python/src/main/java/org/apache/apex/malhar/PythonConstants.java
new file mode 100644
index 0000000000..8aad2af8ec
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/PythonConstants.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar;
+
+public class PythonConstants
+{
+
+ public enum OpType
+ {
+ MAP("MAP"),
+ FLAT_MAP("FLAT_MAP"),
+ FILTER("FILTER"),
+ REDUCE("REDUCE"),
+ REDUCE_BY_KEY("REDUCE_BY_KEY");
+
+ private String operationName = null;
+
+ OpType(String name)
+ {
+ this.operationName = name;
+ }
+
+ public String getType()
+ {
+ return operationName;
+ }
+ }
+
+ public static String PY4J_SRC_ZIP_FILE_NAME = "py4j-0.10.4-src.zip";
+ public static String PYTHON_WORKER_FILE_NAME = "worker.py";
+ public static String PYTHON_APEX_ZIP_NAME = "pyapex-0.0.4-src.zip";
+
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/PyApex.java b/python/src/main/java/org/apache/apex/malhar/python/PyApex.java
new file mode 100644
index 0000000000..4bc920001e
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/PyApex.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
+import py4j.GatewayServer;
+import py4j.Py4JException;
+
+public class PyApex
+{
+
+ private PythonApp streamApp = null;
+ private static final Logger LOG = LoggerFactory.getLogger(PyApex.class);
+ private PythonAppManager.LaunchMode mode = null;
+
+ public PyApex(PythonAppManager.LaunchMode mode)
+ {
+ this.mode = mode;
+ }
+
+ public PythonApp createApp(String name)
+ {
+ if (streamApp == null) {
+ streamApp = new PythonApp(name);
+ if (this.mode != null) {
+ streamApp.setMode(this.mode);
+ }
+ }
+ return streamApp;
+ }
+
+ public PythonApp getAppByName(String name)
+ {
+ if (streamApp == null) {
+ try {
+
+ YarnClient client = YarnClient.createYarnClient();
+ List apps = client.getApplications();
+ for (ApplicationReport appReport : apps) {
+ if (appReport.getName().equals(name)) {
+ LOG.debug("Application Name: {} Application ID: {} Application State: {}", appReport.getName(), appReport.getApplicationId().toString(), appReport.getYarnApplicationState());
+ return new PythonApp(name, appReport.getApplicationId());
+ }
+ }
+ } catch (Exception e) {
+ throw new Py4JException("Error getting application list from resource manager", e);
+ }
+ streamApp = new PythonApp(name);
+ }
+ return streamApp;
+ }
+
+ public static void main(String[] args)
+ {
+
+ LOG.info("Starting PYAPEX with {}", StringUtils.join(args, ' '));
+ Options options = new Options();
+
+ Option input = new Option("m", "mode", true, "Launch Mode");
+ input.setRequired(false);
+ options.addOption(input);
+
+ Option pyfile = new Option("l", "launch-file", true, "Launch file");
+ pyfile.setRequired(false);
+ options.addOption(pyfile);
+
+ CommandLineParser parser = new BasicParser();
+ HelpFormatter formatter = new HelpFormatter();
+ CommandLine cmd;
+
+ try {
+ cmd = parser.parse(options, args);
+ } catch (ParseException e) {
+ LOG.error("Parsing Exception while parsing arguments",e);
+ formatter.printHelp("utility-name", options);
+ System.exit(1);
+ return;
+ }
+
+ String launchModeValue = cmd.getOptionValue("mode");
+ PythonAppManager.LaunchMode mode = launchModeValue != null ? PythonAppManager.LaunchMode.valueOf(launchModeValue) : null;
+ LOG.info("Starting PYAPEX with {}", mode);
+ PyApex pythonEntryPoint = new PyApex(mode);
+ GatewayServer gatewayServer = new GatewayServer(pythonEntryPoint);
+ gatewayServer.start();
+ LOG.debug("Gateway Server Started");
+ }
+
+}
diff --git a/python/src/main/java/org/apache/apex/malhar/python/PythonApp.java b/python/src/main/java/org/apache/apex/malhar/python/PythonApp.java
new file mode 100644
index 0000000000..778afd6561
--- /dev/null
+++ b/python/src/main/java/org/apache/apex/malhar/python/PythonApp.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.python;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.PythonConstants;
+import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator;
+import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+
+import org.apache.apex.malhar.python.operator.PythonGenericOperator;
+import org.apache.apex.malhar.python.operator.proxy.PythonReduceProxy;
+import org.apache.apex.malhar.python.runtime.PythonApexStreamImpl;
+import org.apache.apex.malhar.python.runtime.PythonWorkerContext;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.Option;
+import org.apache.apex.malhar.stream.api.PythonApexStream;
+import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.stram.client.StramAppLauncher;
+
+public class PythonApp implements StreamingApplication
+{
+
+ private PythonApexStream apexStream = null;
+ private StreamFactory streamFactory;
+ private ApplicationId appId = null;
+ private static final Logger LOG = LoggerFactory.getLogger(PythonApp.class);
+
+ private PythonAppManager manager = null;
+ private String name;
+ private Configuration conf;
+
+ private String apexDirectoryPath = null;
+ private PythonAppManager.LaunchMode mode = PythonAppManager.LaunchMode.LOCAL;
+
+ public PythonApp()
+ {
+ this(null, null);
+
+ }
+
+ public PythonApp(String name)
+ {
+
+ this(name, null);
+ }
+
+ public PythonApp(String name, ApplicationId appId)
+ {
+ this.appId = appId;
+ this.name = name;
+ this.conf = new Configuration(true);
+ this.apexDirectoryPath = System.getenv("PYAPEX_HOME");
+ this.conf.set("dt.loggers.level", "com.datatorrent.*:INFO,org.apache.*:DEBUG");
+ }
+
+ public String getApexDirectoryPath()
+ {
+ return apexDirectoryPath;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+
+ LOG.trace("Populating DAG in python app");
+ this.apexStream.populateDag(dag);
+
+ }
+
+ public void setRequiredJARFiles()
+ {
+
+ LOG.debug("PYAPEX_HOME: {}" + getApexDirectoryPath());
+ File dir = new File(this.getApexDirectoryPath() + "/deps/");
+ File[] files = dir.listFiles();
+ ArrayList jarFiles = new ArrayList();
+ for (File jarFile : files) {
+ LOG.info("FOUND FILES {}" + jarFile.getAbsolutePath());
+ jarFiles.add(jarFile.getAbsolutePath());
+
+ }
+ jarFiles.add(this.getApexDirectoryPath() + "/deps/" + PythonConstants.PY4J_SRC_ZIP_FILE_NAME);
+ jarFiles.add(this.getApexDirectoryPath() + "/src/pyapex/runtime/" + PythonConstants.PYTHON_WORKER_FILE_NAME);
+ jarFiles.add(this.getApexDirectoryPath() + "/deps/" + PythonConstants.PYTHON_APEX_ZIP_NAME);
+
+ extendExistingConfig(StramAppLauncher.LIBJARS_CONF_KEY_NAME, jarFiles);
+// this.getClassPaths();
+ }
+
+ public List getClassPaths()
+ {
+ LOG.info("PROCESSING CLASSPATH");
+ List paths = new ArrayList<>();
+ ClassLoader cl = ClassLoader.getSystemClassLoader();
+ URL[] urls = ((URLClassLoader)cl).getURLs();
+ for (URL url : urls) {
+ LOG.info("FOUND FILE PATH {}" + url.getFile());
+ paths.add(url.getFile());
+ }
+ return paths;
+ }
+
+ public void setRequiredRuntimeFiles()
+ {
+
+ ArrayList files = new ArrayList();
+ files.add(this.getApexDirectoryPath() + "/deps/" + PythonConstants.PY4J_SRC_ZIP_FILE_NAME);
+ files.add(this.getApexDirectoryPath() + "/src/pyapex/runtime/" + PythonConstants.PYTHON_WORKER_FILE_NAME);
+ files.add(this.getApexDirectoryPath() + "/deps/" + PythonConstants.PYTHON_APEX_ZIP_NAME);
+
+ extendExistingConfig(StramAppLauncher.FILES_CONF_KEY_NAME, files);
+
+ }
+
+ public void extendExistingConfig(String fileVariable, ArrayList fileList)
+ {
+ Configuration configuration = this.getConf();
+ String fileCSV = configuration.get(fileVariable);
+ String filesCSVToAppend = StringUtils.join(fileList, ",");
+
+ if (StringUtils.isEmpty(fileCSV)) {
+ fileCSV = filesCSVToAppend;
+
+ } else {
+ fileCSV = fileCSV + "," + filesCSVToAppend;
+ }
+
+ configuration.set(fileVariable, fileCSV);
+ }
+
+ public Configuration getConf()
+ {
+ return conf;
+ }
+
+ public void setConf(Configuration conf)
+ {
+ this.conf = conf;
+ }
+
+ public StreamFactory getStreamFactory()
+ {
+ if (streamFactory == null) {
+ streamFactory = new StreamFactory();
+ }
+ return streamFactory;
+ }
+
+ public String launch(boolean local) throws Exception
+ {
+
+ LOG.debug("Already set Launching mode option : {}", mode);
+
+ if (local) {
+ mode = PythonAppManager.LaunchMode.LOCAL;
+ }
+
+ mode = mode != PythonAppManager.LaunchMode.LOCAL ? PythonAppManager.LaunchMode.HADOOP : mode;
+
+ LOG.debug("Launching mode: {} ApexDirectoryPath: {}", mode, this.getApexDirectoryPath());
+ this.setRequiredJARFiles();
+ this.setRequiredRuntimeFiles();
+ this.manager = new PythonAppManager(this, mode);
+
+ DAG dag = this.apexStream.createDag();
+
+ Map pythonOperatorEnv = new HashMap<>();
+ if (mode == PythonAppManager.LaunchMode.LOCAL) {
+
+ pythonOperatorEnv.put(PythonWorkerContext.PYTHON_WORKER_PATH, this.getApexDirectoryPath() + "/src/pyapex/runtime/" + PythonConstants.PYTHON_WORKER_FILE_NAME);
+ pythonOperatorEnv.put(PythonWorkerContext.PY4J_DEPENDENCY_PATH, this.getApexDirectoryPath() + "/deps/" + PythonConstants.PY4J_SRC_ZIP_FILE_NAME);
+ pythonOperatorEnv.put(PythonWorkerContext.PYTHON_APEX_PATH, this.getApexDirectoryPath() + "/deps/" + PythonConstants.PY4J_SRC_ZIP_FILE_NAME);
+
+ }
+
+ Collection operators = dag.getAllOperatorsMeta();
+ for (DAG.OperatorMeta operatorMeta : operators) {
+ if (operatorMeta.getOperator() instanceof PythonGenericOperator) {
+ LOG.debug("Updating python operator: {}" + operatorMeta.getName());
+ PythonGenericOperator operator = ((PythonGenericOperator)operatorMeta.getOperator());
+ operator.getServer().setPythonOperatorEnv(pythonOperatorEnv);
+ }
+ }
+ return manager.launch();
+ }
+
+ public LocalMode.Controller runLocal()
+ {
+ return this.apexStream.runEmbedded(true, 0, null);
+ }
+
+ public ApexStream getApexStream()
+ {
+ return apexStream;
+ }
+
+ public void setApexStream(PythonApexStreamImpl apexStream)
+ {
+ this.apexStream = apexStream;
+ }
+
+ public PythonApp fromFolder(String directoryPath)
+ {
+ ApexStream currentStream = StreamFactory.fromFolder(directoryPath);
+ if (currentStream instanceof ApexStreamImpl) {
+ apexStream = new PythonApexStreamImpl((ApexStreamImpl)currentStream);
+ }
+ return this;
+ }
+
+ public PythonApp fromKafka08(String zookeepers, String topic)
+ {
+ ApexStream currentStream = StreamFactory.fromKafka08(zookeepers, topic);
+ if (currentStream instanceof ApexStreamImpl) {
+ apexStream = new PythonApexStreamImpl((ApexStreamImpl)currentStream);
+ }
+ return this;
+ }
+
+ public PythonApp fromData(List