diff --git a/.gitignore b/.gitignore index d69387d9f9..9ce7235f7e 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ nb-configuration.xml hadoop.log site/ .checkstyle +*.pyc +*.out diff --git a/docs/python/main.md b/docs/python/main.md new file mode 100644 index 0000000000..0e4f379d8b --- /dev/null +++ b/docs/python/main.md @@ -0,0 +1,38 @@ +#Developing Streaming Application in Python# + +Currently we have exposed basic support for Stateless Support. + +##Requirements:## +* Python 2.7 +* py4j + Please install py4j on your machine. + ``` + pip install py4j + ``` + + +Once you have pulled Apache Malhar project, go to python project and follow next steps: + +* Compile all projects under Apache Malhar and make sure you have hadoop installed on local node. +* Once compilation finish, go to python/script directory and launch ./pyshell +* This will launch python shell. Now you can develop your application using python shell. + +You can write simpler application using available apis as well provide custom functions written in python. + +``` + +def filter_function(a): + input_data=a.split(',') + if float(input_data[2])> 100: + return True + return False + +from pyapex import createApp +a=createApp('python_app').from_kafka09('localhost:2181','test_topic') \ + .filter('filter_operator',filter_function) \ + .to_console(name='endConsole') \ + .launch(False) +``` + + +Note: Currently developer need to ensure that required python dependencies are installed on Hadoop cluster. diff --git a/examples/python/resources/stock_data.csv b/examples/python/resources/stock_data.csv new file mode 100644 index 0000000000..7b4a77f8f0 --- /dev/null +++ b/examples/python/resources/stock_data.csv @@ -0,0 +1,96 @@ +symbol,timestamp,open +MSFT,2014-10-31T00:00:00-04:00,46.31618 +MSFT,2014-11-03T00:00:00-05:00,46.26685 +MSFT,2014-11-04T00:00:00-05:00,46.6714 +MSFT,2014-11-05T00:00:00-05:00,47.16475 +MSFT,2014-11-06T00:00:00-05:00,47.22396 +MSFT,2014-11-07T00:00:00-05:00,48.26987 +MSFT,2014-11-10T00:00:00-05:00,48.04292 +MSFT,2014-11-11T00:00:00-05:00,48.2008 +MSFT,2014-11-12T00:00:00-05:00,47.91465 +MSFT,2014-11-13T00:00:00-05:00,48.16133 +MSFT,2014-11-14T00:00:00-05:00,49.07897 +MSFT,2014-11-17T00:00:00-05:00,48.75336 +MSFT,2014-11-18T00:00:00-05:00,48.78283 +MSFT,2014-11-19T00:00:00-05:00,48.31615 +MSFT,2014-11-20T00:00:00-05:00,47.66082 +MSFT,2014-11-21T00:00:00-05:00,48.67361 +MSFT,2014-11-24T00:00:00-05:00,47.65089 +MSFT,2014-11-25T00:00:00-05:00,47.32322 +MSFT,2014-11-26T00:00:00-05:00,47.15442 +MSFT,2014-11-28T00:00:00-05:00,47.61117 +MSFT,2014-12-01T00:00:00-05:00,47.54167 +MSFT,2014-12-02T00:00:00-05:00,48.49488 +MSFT,2014-12-03T00:00:00-05:00,48.09771 +MSFT,2014-12-04T00:00:00-05:00,48.04806 +MSFT,2014-12-05T00:00:00-05:00,48.47502 +MSFT,2014-12-08T00:00:00-05:00,47.97855 +MSFT,2014-12-09T00:00:00-05:00,46.77711 +MSFT,2014-12-10T00:00:00-05:00,47.24379 +MSFT,2014-12-11T00:00:00-05:00,46.74732 +MSFT,2014-12-12T00:00:00-05:00,46.35014 +MSFT,2014-12-15T00:00:00-05:00,46.86647 +MSFT,2014-12-16T00:00:00-05:00,45.57566 +MSFT,2014-12-17T00:00:00-05:00,44.73166 +MSFT,2014-12-18T00:00:00-05:00,46.25085 +MSFT,2014-12-19T00:00:00-05:00,47.27357 +MSFT,2014-12-22T00:00:00-05:00,47.44237 +MSFT,2014-12-23T00:00:00-05:00,48.0282 +MSFT,2014-12-24T00:00:00-05:00,48.2963 +MSFT,2014-12-26T00:00:00-05:00,48.06792 +MSFT,2014-12-29T00:00:00-05:00,47.36294 +MSFT,2014-12-30T00:00:00-05:00,47.10477 +MSFT,2014-12-31T00:00:00-05:00,46.39979 +MSFT,2015-01-02T00:00:00-05:00,46.33028 +MSFT,2015-01-05T00:00:00-05:00,46.04234 +MSFT,2015-01-06T00:00:00-05:00,46.05227 +MSFT,2015-01-07T00:00:00-05:00,45.65509 +MSFT,2015-01-08T00:00:00-05:00,46.41965 +MSFT,2015-01-09T00:00:00-05:00,47.27357 +MSFT,2015-01-12T00:00:00-05:00,47.08492 +MSFT,2015-01-13T00:00:00-05:00,46.6381 +MSFT,2015-01-14T00:00:00-05:00,45.63523 +MSFT,2015-01-15T00:00:00-05:00,45.8934 +MSFT,2015-01-16T00:00:00-05:00,44.98983 +MSFT,2015-01-20T00:00:00-05:00,45.97283 +MSFT,2015-01-21T00:00:00-05:00,45.61537 +MSFT,2015-01-22T00:00:00-05:00,46.16149 +MSFT,2015-01-23T00:00:00-05:00,47.02534 +MSFT,2015-01-26T00:00:00-05:00,46.66788 +MSFT,2015-01-27T00:00:00-05:00,42.6465 +MSFT,2015-01-28T00:00:00-05:00,42.43799 +MSFT,2015-01-29T00:00:00-05:00,40.64078 +MSFT,2015-01-30T00:00:00-05:00,41.25639 +MSFT,2015-02-02T00:00:00-05:00,40.30318 +MSFT,2015-02-03T00:00:00-05:00,41.33583 +MSFT,2015-02-04T00:00:00-05:00,41.64364 +MSFT,2015-02-05T00:00:00-05:00,41.92166 +MSFT,2015-02-06T00:00:00-05:00,42.37841 +MSFT,2015-02-09T00:00:00-05:00,41.94152 +MSFT,2015-02-10T00:00:00-05:00,42.43799 +MSFT,2015-02-11T00:00:00-05:00,42.34863 +MSFT,2015-02-12T00:00:00-05:00,42.38834 +MSFT,2015-02-13T00:00:00-05:00,43.07346 +MSFT,2015-02-17T00:00:00-05:00,43.97 +MSFT,2015-02-18T00:00:00-05:00,43.63 +MSFT,2015-02-19T00:00:00-05:00,43.27 +MSFT,2015-02-20T00:00:00-05:00,43.5 +MSFT,2015-02-23T00:00:00-05:00,43.7 +MSFT,2015-02-24T00:00:00-05:00,44.15 +MSFT,2015-02-25T00:00:00-05:00,43.95 +MSFT,2015-02-26T00:00:00-05:00,43.99 +MSFT,2015-02-27T00:00:00-05:00,44.14 +MSFT,2015-03-02T00:00:00-05:00,43.67 +MSFT,2015-03-03T00:00:00-05:00,43.56 +MSFT,2015-03-04T00:00:00-05:00,43.01 +MSFT,2015-03-05T00:00:00-05:00,43.07 +MSFT,2015-03-06T00:00:00-05:00,43 +MSFT,2015-03-09T00:00:00-04:00,42.19 +MSFT,2015-03-10T00:00:00-04:00,42.35 +MSFT,2015-03-11T00:00:00-04:00,42.32 +MSFT,2015-03-12T00:00:00-04:00,41.33 +MSFT,2015-03-13T00:00:00-04:00,40.7 +MSFT,2015-03-16T00:00:00-04:00,41.47 +MSFT,2015-03-17T00:00:00-04:00,41.37 +MSFT,2015-03-18T00:00:00-04:00,41.43 +MSFT,2015-03-19T00:00:00-04:00,42.25 diff --git a/examples/python/stock_filter.py b/examples/python/stock_filter.py new file mode 100644 index 0000000000..63f53c8ca9 --- /dev/null +++ b/examples/python/stock_filter.py @@ -0,0 +1,30 @@ +#input_data="apex-malhar/examples/python/resources/stock_data.csv" +input_data="/home/vikram/Documents/src/apex-malhar/examples/python/resources/stock_data.csv" +data = [] +with open( input_data, "r") as outfile: + outfile.readline() + for line in outfile: + data.append(line) + +def filter_func(a): + input_data=a.split(",") + if float(input_data[2])> 30: + return True + return False + + +from pyapex import createApp + +def filter_func(a): + input_data=a.split(",") + if float(input_data[2])> 30: + return True + return False + + +from pyapex import createApp +a=createApp('python_app').from_data(data) \ + .filter('filter_operator',filter_func) \ + .to_console(name='endConsole') \ + .launch(False) + diff --git a/library/pom.xml b/library/pom.xml index 17908ddce2..c334c6e32c 100644 --- a/library/pom.xml +++ b/library/pom.xml @@ -29,6 +29,8 @@ 3.8.0-SNAPSHOT + + malhar-library jar diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/InMemoryDataInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/InMemoryDataInputOperator.java new file mode 100644 index 0000000000..7f8a82f954 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/io/fs/InMemoryDataInputOperator.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.lib.io.fs; + +import java.util.List; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; + +public class InMemoryDataInputOperator implements InputOperator +{ + private List inputData = null; + private boolean emissionCompleted = false; + public final transient DefaultOutputPort outputPort = new DefaultOutputPort(); + + public InMemoryDataInputOperator() + { + inputData = null; + } + + public InMemoryDataInputOperator(List data) + { + inputData = data; + } + + @Override + public void emitTuples() + { + if (emissionCompleted) { + return; + } + for (T data : inputData) { + outputPort.emit(data); + } + emissionCompleted = true; + } + + @Override + public void beginWindow(long l) + { + } + + @Override + public void endWindow() + { + } + + @Override + public void setup(Context.OperatorContext context) + { + } + + @Override + public void teardown() + { + } +} diff --git a/mkdocs.yml b/mkdocs.yml index 75a862a9aa..c6ecb16841 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -5,6 +5,8 @@ pages: - Apache Apex Malhar: index.md - APIs: - SQL: apis/calcite.md +- Python Support: + - Pyshell: python/main.md - Operators: - Block Reader: operators/block_reader.md - CSV Formatter: operators/csvformatter.md diff --git a/pom.xml b/pom.xml index dc9ed8b6b6..0a16fe3322 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ 3.6.0 false -Xmx2048m + 1.4.9 @@ -233,6 +234,31 @@ + + org.apache.hadoop + hadoop-common + ${hadoop.version} + test-jar + test + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + commons-beanutils + commons-beanutils + + + commons-beanutils + commons-beanutils-core + + + diff --git a/python/.gitignore b/python/.gitignore new file mode 100644 index 0000000000..8531876944 --- /dev/null +++ b/python/.gitignore @@ -0,0 +1 @@ +pyapex/jars/*.* diff --git a/python/pom.xml b/python/pom.xml new file mode 100644 index 0000000000..9e82af9736 --- /dev/null +++ b/python/pom.xml @@ -0,0 +1,301 @@ + + + 4.0.0 + + + org.apache.apex + malhar + 3.8.0-SNAPSHOT + + + malhar-python + Apache Apex Malhar Python Support + jar + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + test-jar + + package + + + + + maven-assembly-plugin + + + + org.apache.apex.malhar.python.PyApex + + + + jar-with-dependencies + + + + + + maven-dependency-plugin + + + create-client-mvn-generated-classpath + generate-resources + + build-classpath + + + + ${project.build.directory}/mvn-generated-runtime-classpath + + + + + + create-client-mvn-generated-classpath-no-hadoop + generate-resources + + build-classpath + + + + ${project.build.directory}/mvn-generated-runtime-classpath-no-hadoop + + org.apache.hadoop + + + + create-mvn-generated-classpath + generate-test-resources + + build-classpath + + + + ${project.build.directory}/test-classes/mvn-generated-classpath + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies-for-launch-time + prepare-package + + copy-dependencies + + + ${project.build.directory}/libs + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies-for-run-time + prepare-package + + copy-dependencies + + + ${project.build.directory}/runtime_libs + provided + org.apache.hadoop + + + + + + + + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + provided + + + org.apache.apex + apex-engine + ${apex.core.version} + + + org.apache.apex + apex-api + ${apex.core.version} + + + org.apache.apex + apex-common + ${apex.core.version} + + + + net.sf.py4j + py4j + 0.10.4 + + + + org.apache.apex + malhar-stream + ${project.version} + + + + org.apache.apex + malhar-contrib + ${project.version} + + + + + org.apache.kafka + kafka_2.10 + 0.8.2.1 + provided + + + + + + org.apache.kafka + kafka-clients + 0.8.2.1 + provided + + + org.apache.kafka + kafka_2.10 + 0.8.2.1 + provided + + + org.slf4j + slf4j-log4j12 + + + org.slf4j + slf4j-simple + + + log4j + log4j + + + org.apache.zookeeper + zookeeper + + + + + org.apache.apex + malhar-library + ${project.version} + + + org.powermock + powermock-module-junit4 + ${powermock.version} + test + + + org.powermock + powermock-api-mockito + ${powermock.version} + test + + + + diff --git a/python/pyapex/.gitignore b/python/pyapex/.gitignore new file mode 100644 index 0000000000..2723de6f7c --- /dev/null +++ b/python/pyapex/.gitignore @@ -0,0 +1 @@ +jars diff --git a/python/pyapex/__init__.py b/python/pyapex/__init__.py new file mode 100644 index 0000000000..39bea93789 --- /dev/null +++ b/python/pyapex/__init__.py @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from .apexapp import createApp +from .apexapp import getApp diff --git a/python/pyapex/apexapp.py b/python/pyapex/apexapp.py new file mode 100644 index 0000000000..709df8c246 --- /dev/null +++ b/python/pyapex/apexapp.py @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import types + +import cloudpickle +from tempfile import TemporaryFile, NamedTemporaryFile +from uuid import uuid1 +from py4j.protocol import Py4JJavaError +from shellconn import ShellConnector + +def createApp(name): + shellConnector = ShellConnector() + return ApexStreamingApp(name, shellConnector) + + +def getApp(name): + shellConnector = ShellConnector() + java_app = shellConnector.get_entry_point().getAppByName(name) + return ApexStreamingApp(name, java_app=java_app) + +''' +This is Python Wrapper Around ApexStreamingApp +If java streaming app is not found then no apis can be called on this wrapper. +''' +class ApexStreamingApp(): + app_id = None + streaming_factory = None + java_streaming_app = None + instance_id = None + shell_connector = None + serialized_file_list = [] + + def __init__(self, name, shell_connector=None, java_app=None): + if shell_connector is None and java_app is None: + raise Exception("Invalid App initialization") + if java_app is None: + self.shell_connector = shell_connector + self.java_streaming_app = shell_connector.get_entry_point().createApp(name) + else: + self.java_streaming_app = java_app + self.shell_connector = shell_connector + self.instance_id = uuid1().urn[9:] + + ''' + This fuction will initialize input adapter to read from hdfs adapter + ''' + def from_directory(self, folder_path): + self.java_streaming_app = self.java_streaming_app.fromFolder(folder_path) + return self + + def from_kafka08(self, zoopkeepers, topic): + self.java_streaming_app = self.java_streaming_app.fromKafka08(zoopkeepers, topic) + return self + + def from_kafka09(self, zoopkeepers, topic): + self.java_streaming_app = self.java_streaming_app.fromKafka09(zoopkeepers, topic) + return self + + ''' + Allow Data To be provided as input on shell or directly in python file such as + + ''' + def from_data(self, data): + if not isinstance(data, list): + raise Exception + data_for_java = self.shell_connector.get_jvm_gateway().jvm.java.util.ArrayList() + types_data = [int, float, str, bool, tuple, dict] + for d in data: + if type(d) in types_data: + data_for_java.append(d) + self.java_streaming_app = self.java_streaming_app.fromData(data_for_java) + return self + + def to_console(self, name=None): + self.java_streaming_app = self.java_streaming_app.toConsole(name) + return self + + def to_kafka_08(self, name=None, topic=None, brokerList=None, **kwargs): + properties = {} + if brokerList is not None: + properties['bootstrap.servers'] = brokerList + for key in kwargs.keys(): + properties[key] = kwargs[key] + property_map = self.shell_connector.get_jvm_gateway().jvm.java.util.HashMap() + for key in properties.keys(): + property_map.put(key, properties[key]) + self.java_streaming_app = self.java_streaming_app.toKafka08(name, topic, property_map) + return self + + def to_directory(self, name,file_name, directory_name, **kwargs): + properties = {} + if file_name is None or directory_name is None: + raise Exception("Directory Name or File name should be specified") + self.java_streaming_app = self.java_streaming_app.toFolder(name, file_name, directory_name) + return self + + def map(self, name, func): + if not isinstance(func, types.FunctionType): + raise Exception + + serialized_func = self.serialize_function(name, func) + self.java_streaming_app = self.java_streaming_app.map(name, serialized_func) + return self + + def flatmap(self, name, func): + if not isinstance(func, types.FunctionType): + raise Exception + serialized_func = self.serialize_function(name, func) + self.java_streaming_app = self.java_streaming_app.flatMap(name, serialized_func) + return self + + def filter(self, name, func): + if not isinstance(func, types.FunctionType): + raise Exception + serialized_func = self.serialize_function(name, func) + self.java_streaming_app = self.java_streaming_app.filter(name, serialized_func) + return self + + def launch(self, local_mode=False): + try: + self.app_id = self.java_streaming_app.launch(local_mode) + return self.app_id + except Py4JJavaError as e: + import traceback + traceback.print_exc() + return e.java_exception.getMessage() + + def kill(self): + return self.java_streaming_app.kill() + + def set_config(self, key, value): + self.java_streaming_app = self.java_streaming_app.setConfig(key, value) + return self + + def serialize_function(self, name, func): + serialized_func = bytearray() + serialized_func.extend(cloudpickle.dumps(func)) + return serialized_func diff --git a/python/pyapex/commands.py b/python/pyapex/commands.py new file mode 100644 index 0000000000..3d0111f0d8 --- /dev/null +++ b/python/pyapex/commands.py @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from pyapex import getApp +import getopt +import sys + +def kill_app(app_name): + app = getApp(app_name) + if app: + print "KILLING APP " + str(app_name) + app.kill() + else: + print "Application not found for name " + str(app_name) + + +def shutdown_app(app_name): + app = getApp(app_name) + if app: + print "SHUTTING DOWN APP NOW" + app.kill() + else: + print "Application not found for name " + str(app_name) + + +func_map = {"KILL_MODE": kill_app, "SHUTDOWN_MODE": shutdown_app} + + +def parse_argument(): + print sys.argv + try: + opts, args = getopt.getopt(sys.argv[1:], "k:s:") + except getopt.GetoptError as err: + # print help information and exit: + print str(err) # will print something like "option -a not recognized" + sys.exit(2) + for opt, args in opts: + if opt == '-k': + func_map['KILL_MODE'](args) + elif opt == '-s': + func_map['KILL_MODE'](args) + else: + assert False, "Invalid Option" + + +if __name__ == "__main__": + parse_argument() diff --git a/python/pyapex/jars/py4j-0.10.4.jar b/python/pyapex/jars/py4j-0.10.4.jar new file mode 100644 index 0000000000..3d298a106b Binary files /dev/null and b/python/pyapex/jars/py4j-0.10.4.jar differ diff --git a/python/pyapex/runtime/__init__.py b/python/pyapex/runtime/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/pyapex/runtime/py4j-0.10.4-src.zip b/python/pyapex/runtime/py4j-0.10.4-src.zip new file mode 100644 index 0000000000..72bb34d0fa Binary files /dev/null and b/python/pyapex/runtime/py4j-0.10.4-src.zip differ diff --git a/python/pyapex/runtime/py4j-0.10.4.tar.gz b/python/pyapex/runtime/py4j-0.10.4.tar.gz new file mode 100644 index 0000000000..e24c5ff88e Binary files /dev/null and b/python/pyapex/runtime/py4j-0.10.4.tar.gz differ diff --git a/python/pyapex/runtime/worker.py b/python/pyapex/runtime/worker.py new file mode 100755 index 0000000000..14decca513 --- /dev/null +++ b/python/pyapex/runtime/worker.py @@ -0,0 +1,191 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + + +''' +Worker.py file is responsible for instantiating specific workers such as MapWorkerImpl, FlatMapWorkerImpl, FilterWorkerImpl. + +Worker.py is ran using python and then we register back WorkerImpl with Java Process for each calls. + + +''' +import sys +import site +import pip +from py4j.java_gateway import JavaGateway, CallbackServerParameters, GatewayParameters, java_import +import logging +from abc import ABCMeta, abstractmethod + +gateway = None + + +class AbstractPythonWorker(object): + __metaclass__ = ABCMeta + + @abstractmethod + def setFunction(self, f, opType): + pass + + @abstractmethod + def execute(self, tupleIn): + pass + + @abstractmethod + def setState(self, dataMap): + pass + + class Java: + implements = ["org.apache.apex.malhar.python.runtime.PythonWorker"] + + +class WorkerImpl(AbstractPythonWorker): + serialized_f = None + callable_f = None + opType = None + dataMap = None + counter = 0; + + def __init__(self, gateway, opType): + self.gateway = gateway + self.opType = opType + + def setFunction(self, f, opType): + try: + import os, imp + import cloudpickle + self.callable_f = cloudpickle.loads(f) + self.opType = opType + except ValueError as e: + print str(e) + from traceback import print_exc + print_exc() + except Exception: + from traceback import print_exc + print_exc() + return "RETURN VALUE" + + def setState(self, dataMap): + self.dataMap = dataMap + print "Python : setting state correctly" + return True + + def factory(gateway, type): + # return eval(type + "()") + if type == "MAP": return MapWorkerImpl(gateway, type) + if type == "FLATMAP": return FlatMapWorkerImpl(gateway, type) + if type == "FILTER": return FilterWorkerImpl(gateway, type) + assert 0, "Bad shape creation: " + type + + factory = staticmethod(factory) + + +class MapWorkerImpl(WorkerImpl): + def execute(self, tupleIn): + try: + result = self.callable_f(tupleIn) + return result + except ValueError as e: + print str(e) + from traceback import print_exc + print_exc() + except Exception: + from traceback import print_exc + print_exc() + return None + + +class FlatMapWorkerImpl(WorkerImpl): + def execute(self, tupleIn): + try: + result = self.callable_f(tupleIn) + from py4j.java_collections import SetConverter, MapConverter, ListConverter + return ListConverter().convert(result, self.gateway._gateway_client) + return result + except ValueError as e: + print str(e) + from traceback import print_exc + print_exc() + except Exception: + from traceback import print_exc + print_exc() + return None + + +class FilterWorkerImpl(WorkerImpl): + def execute(self, tupleIn): + try: + result = self.callable_f(tupleIn) + if type(result) != bool: + result = True if result is not None else False + return result + except ValueError as e: + print str(e) + from traceback import print_exc + print_exc() + except Exception: + from traceback import print_exc + print_exc() + return None + + +# TODO this may cause race condition +def find_free_port(): + import socket + s = socket.socket() + s.listen(0) + addr, found_port = s.getsockname() # Return the port number assigned. + s.shutdown(socket.SHUT_RDWR) + s.close() + return found_port + + +def main(argv): + import os, getpass + # print argv + # print [f for f in sys.path if f.endswith('packages')] + + PYTHON_PATH = os.environ['PYTHONPATH'] if 'PYTHONPATH' in os.environ else None + os.environ['PYTHONPATH'] = PYTHON_PATH + ':' + site.getusersitepackages().replace('/home/.local/', + '/home/' + getpass.getuser() + '/.local/') + '/' + sys.path.extend(os.environ['PYTHONPATH'].split(':')) + print "PYTHONPATH " + str(os.environ['PYTHONPATH']) + import pickle + gp = GatewayParameters(address='127.0.0.1', port=int(argv[0]), auto_convert=True) + cb = CallbackServerParameters(daemonize=False, eager_load=True, port=0) + gateway = JavaGateway(gateway_parameters=gp, callback_server_parameters=cb) + + # Retrieve the port on which the python callback server was bound to. + python_port = gateway.get_callback_server().get_listening_port() + + # Register python callback server with java gateway server + # Note that we use the java_gateway_server attribute that + # retrieves the GatewayServer instance. + gateway.java_gateway_server.resetCallbackClient( + gateway.java_gateway_server.getCallbackClient().getAddress(), + python_port) + + # Instantiate WorkerImpl for PythonWorker java interface and regsiter with PythonWorkerProxy in Java. + workerImpl = WorkerImpl.factory(gateway, argv[1]) + gateway.entry_point.register(workerImpl) + print "Python process started with type" + argv[1] + + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/python/pyapex/shellconn.py b/python/pyapex/shellconn.py new file mode 100644 index 0000000000..94a1e838e5 --- /dev/null +++ b/python/pyapex/shellconn.py @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from py4j.java_gateway import JavaGateway + + +class ShellConnector(object): + gateway = None + entry_point = None + + def __init__(self): + self.gateway = JavaGateway() + + def __new__(cls): + if not hasattr(cls, 'instance'): + cls.instance = super(ShellConnector, cls).__new__(cls) + return cls.instance + + def get_jvm_gateway(self): + return self.gateway + + def get_entry_point(self): + return self.gateway.entry_point diff --git a/python/scripts/pyshell b/python/scripts/pyshell new file mode 100755 index 0000000000..1d9c9895dd --- /dev/null +++ b/python/scripts/pyshell @@ -0,0 +1,183 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# Support functions + +# TODO : +# ACCEPTED ARGUMENTS: +# shell +# launch input_python.py +# kill app-id +# shutdown app-id + +usage() { echo "Usage: $0 [-sh] [-l ] [-m HADOOP|LOCAL ] [-k ] [-t ]" 1>&2; exit 1; } +COMMAND_MODE='SHELL_MODE' +RUNNING_MODE='HADOOP' +EXECUTABLE_FILE=false +while getopts ":s:l:k:t:m:" o; do + case "${o}" in + s) + s=${OPTARG} + COMMAND_MODE="SHELL_MODE" + ;; + l) + EXECUTABLE_FILE=${OPTARG} + COMMAND_MODE="LAUNCH_MODE" + ;; + k) + APP_NAME=${OPTARG} + COMMAND_MODE="KILL_MODE" + ;; + m) + RUNNING_MODE=${OPTARG} + ;; + t) + usage + echo "NOT IMPLEMENTTED YET" + ;; + *) + usage + ;; + esac +done +echoerr() { echo "$@" 1>&2; } +if [ -e "pyapex" ] +then + rm pyapex +fi + +real_dir() { + SOURCE="${1:-${BASH_SOURCE[0]}}" + while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink + SOURCE_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" + SOURCE="$(readlink "$SOURCE")" + [[ $SOURCE != /* ]] && SOURCE="$SOURCE_DIR/$SOURCE" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located + done + SOURCE_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" + echo $SOURCE_DIR +} +script_dir=$(real_dir "${BASH_SOURCE[0]}") +# Create missing clirc file for current user +if [ ! -f "${HOME}/.dt/clirc" ]; then + mkdir -p "${HOME}/.dt" + cat >${HOME}/.dt/clirc </dev/null` +fi + +if [ "$DT_CLIENT_OPTS" = "" ]; then +# DT_CLIENT_OPTS="-Xmx1024m -XX:MaxPermSize=512m -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled" + DT_CLIENT_OPTS="-Xmx1024m -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled" +fi + +export HADOOP_CLIENT_OPTS="$DT_CLIENT_OPTS" + +BUILD_DIR="$( dirname "$0" )/../target" +PYAPEX_HOME="`pwd`/../pyapex" +if [ -z "$DT_HADOOP" ]; then + echo "Development Mode without Hadoop Installation: Using jars from mvn generated path" + MVN_GENERATED_PATH="$BUILD_DIR/mvn-generated-runtime-classpath" +else + echo "Development Mode with Hadoop Installation: Using jars to be provided externally " + MVN_GENERATED_PATH="$BUILD_DIR/mvn-generated-runtime-classpath-no-hadoop" +fi +if [ -f "$MVN_GENERATED_PATH" ]; then + # development launch mode + DT_CORE_JAR="$BUILD_DIR/malhar-python-3.8.0-SNAPSHOT.jar" + if [ ! -f "$DT_CORE_JAR" ]; then + echoerr "Error: Cannot find $DT_CORE_JAR"; + exit 1; + fi + DT_CLASSPATH="$DT_CLASSPATH:$DT_CORE_JAR" + DT_CLASSPATH=$BASEDIR/libs'/*'":${DT_CLASSPATH}" + DT_CLASSPATH="$DT_CLASSPATH:`cat $MVN_GENERATED_PATH`" +else + # running from installation + if [ -z "$DT_HADOOP" ]; then + echoerr "Hadoop installation not found. Please include hadoop in PATH." + exit 1; + fi + BASEDIR=$( cd ${script_dir}/..; pwd -P ) +fi + +if [ -n "$DT_CLASSPATH" ]; then + if [ -z "$HADOOP_CLASSPATH" ]; then + export HADOOP_CLASSPATH="$DT_CLASSPATH" + else + export HADOOP_CLASSPATH="$HADOOP_CLASSPATH:$DT_CLASSPATH" + fi +fi +APEXRUNERPID=0 +APEXRUNNERLOG="$( dirname "$0")/ApexRunner.out" +rm -f $APEXRUNNERLOG +export PYAPEX_HOME=$PYAPEX_HOME +cp $BUILD_DIR/runtime_libs/* $PYAPEX_HOME/jars +cp $DT_CORE_JAR $PYAPEX_HOME/jars +echo "Intiating PYSHELL now " + +ln -sf $PYAPEX_HOME "$script_dir/pyapex" +if [[ ( -z "$DT_HADOOP" ) || ( "$RUNNING_MODE" == "LOCAL" ) ]]; then + echo "Warning: hadoop executable not found. Running standalone with ${DT_JAVA:-java}." + echoerr "Warning: hadoop executable not found. Running standalone with ${DT_JAVA:-java}." + echo "Starting Apex Runner without hadoop" + export CLASSPATH=$DT_CLASSPATH + "${DT_JAVA:-java}" $DT_CLIENT_OPTS org.apache.apex.malhar.python.PyApex "$@" >$APEXRUNNERLOG 2>&1 & + APEXRUNNERPID="$!" +else + echo "Warning: hadoop found. Running with $DT_HADOOP" + export HADOOP_USER_CLASSPATH_FIRST=1 + # remove hadoop and duplicate slf4j binding (bash replace is too slow) + export HADOOP_CLASSPATH=$(echo -n "$HADOOP_CLASSPATH" | tr ":" "\n" | sed "/slf4j-log4j/d" | sed "/org\/apache\/hadoop/d" | tr "\n" ":") + echo "Starting Apex Runner with hadoop $@" + "$DT_HADOOP" org.apache.apex.malhar.python.PyApex "$@" "-l log.properties" >$APEXRUNNERLOG 2>&1 & + APEXRUNNERPID="$!" +fi +sleep 5 +echo "Apex Runner is started as process id: " $APEXRUNNERPID +if [ "$COMMAND_MODE" = "SHELL_MODE" ] +then + PYTHONPATH=$PYTHONPATH:"python" ipython "$@" +elif [ "$COMMAND_MODE" = "LAUNCH_MODE" ] +then + PYTHONPATH=$PYTHONPATH:"python" ipython "$EXECUTABLE_FILE" + +elif [ "$COMMAND_MODE" = "KILL_MODE" ] +then + PYTHONPATH=$PYTHONPATH:"python" python "$PYAPEX_HOME/commands.py" "-k $APP_NAME" +fi +if ! kill $APEXRUNNERPID > /dev/null 2>&1; then + echo "Could not send SIGTERM to process $PID. Force killing" >/dev/null >&2 + kill -9 $APEXRUNNERPID >/dev/null 2>&1 +fi +if [ -e pyapex ] +then + rm pyapex +fi diff --git a/python/src/main/java/org/apache/apex/malhar/python/PyApex.java b/python/src/main/java/org/apache/apex/malhar/python/PyApex.java new file mode 100644 index 0000000000..b62fe687d5 --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/PyApex.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.python; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.log4j.PropertyConfigurator; + +import py4j.GatewayServer; +import py4j.Py4JException; + +public class PyApex +{ + + private PythonApp streamApp = null; + private static final Logger LOG = LoggerFactory.getLogger(PyApex.class); + + public PythonApp createApp(String name) + { + if (streamApp == null) { + streamApp = new PythonApp(name); + } + return streamApp; + } + + public PythonApp getAppByName(String name) + { + if (streamApp == null) { + try { + + YarnClient client = YarnClient.createYarnClient(); + List apps = client.getApplications(); + for (ApplicationReport appReport : apps) { + if (appReport.getName().equals(name)) { + LOG.debug("Application Name: {} Application ID: {} Application State: {}", appReport.getName(), appReport.getApplicationId().toString(), appReport.getYarnApplicationState()); + return new PythonApp(name, appReport.getApplicationId()); + } + } + } catch (Exception e) { + throw new Py4JException("Error getting application list from resource manager", e); + } + streamApp = new PythonApp(name); + } + return streamApp; + } + + public static void main(String[] args) + { + + LOG.info("Starting PYAPEX with {}" + StringUtils.join(args, ' ')); + PropertyConfigurator.configure("./log.properties"); + PyApex pythonEntryPoint = new PyApex(); + GatewayServer gatewayServer = new GatewayServer(pythonEntryPoint); + gatewayServer.start(); + LOG.debug("Gateway Server Started"); + } + +} diff --git a/python/src/main/java/org/apache/apex/malhar/python/PythonApp.java b/python/src/main/java/org/apache/apex/malhar/python/PythonApp.java new file mode 100644 index 0000000000..8069c247b1 --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/PythonApp.java @@ -0,0 +1,348 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.python; + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.python.operator.PythonGenericOperator; +import org.apache.apex.malhar.python.runtime.PythonApexStreamImpl; +import org.apache.apex.malhar.python.runtime.PythonWorkerContext; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.Option; +import org.apache.apex.malhar.stream.api.PythonApexStream; +import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl; +import org.apache.apex.malhar.stream.api.impl.ApexWindowedStreamImpl; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator; +import com.datatorrent.stram.client.StramAppLauncher; + +public class PythonApp implements StreamingApplication +{ + + private PythonApexStream apexStream = null; + private StreamFactory streamFactory; + private ApplicationId appId = null; + private static final Logger LOG = LoggerFactory.getLogger(PythonApp.class); + private String py4jSrcZip = "py4j-0.10.4-src.zip"; + private PythonAppManager manager = null; + private String name; + private Configuration conf; + + private String apexDirectoryPath = null; + + public PythonApp() + { + this(null, null); + + } + + public PythonApp(String name) + { + + this(name, null); + } + + public PythonApp(String name, ApplicationId appId) + { + this.appId = appId; + this.name = name; + this.conf = new Configuration(true); + this.apexDirectoryPath = System.getenv("PYAPEX_HOME"); + this.conf.set("dt.loggers.level", "com.datatorrent.*:INFO,org.apache.*:DEBUG"); + } + + public String getApexDirectoryPath() + { + return apexDirectoryPath; + } + + public String getName() + { + return name; + } + + public void populateDAG(DAG dag, Configuration conf) + { + + LOG.trace("Populating DAG in python app"); + this.apexStream.populateDag(dag); + + } + + public void setRequiredJARFiles() + { + + LOG.debug("PYAPEX_HOME: {}" + getApexDirectoryPath()); + File dir = new File(this.getApexDirectoryPath() + "/jars/"); + File[] files = dir.listFiles(); + ArrayList jarFiles = new ArrayList(); + for (File jarFile : files) { + LOG.info("FOUND FILES {}" + jarFile.getAbsolutePath()); + jarFiles.add(jarFile.getAbsolutePath()); + + } + jarFiles.add(this.getApexDirectoryPath() + "/runtime/" + py4jSrcZip); + jarFiles.add(this.getApexDirectoryPath() + "/runtime/worker.py"); + extendExistingConfig(StramAppLauncher.LIBJARS_CONF_KEY_NAME, jarFiles); +// this.getClassPaths(); + } + + public List getClassPaths() + { + LOG.info("PROCESSING CLASSPATH"); + List paths = new ArrayList<>(); + ClassLoader cl = ClassLoader.getSystemClassLoader(); + URL[] urls = ((URLClassLoader)cl).getURLs(); + for (URL url : urls) { + LOG.info("FOUND FILE PATH {}" + url.getFile()); + paths.add(url.getFile()); + } + return paths; + } + + public void setRequiredRuntimeFiles() + { + String APEX_DIRECTORY_PATH = System.getenv("PYAPEX_HOME"); + ArrayList files = new ArrayList(); + files.add(APEX_DIRECTORY_PATH + "/runtime/" + py4jSrcZip); + files.add(APEX_DIRECTORY_PATH + "/runtime/worker.py"); + extendExistingConfig(StramAppLauncher.FILES_CONF_KEY_NAME, files); + + } + + public void extendExistingConfig(String fileVariable, ArrayList fileList) + { + Configuration configuration = this.getConf(); + String fileCSV = configuration.get(fileVariable); + String filesCSVToAppend = StringUtils.join(fileList, ","); + + if (StringUtils.isEmpty(fileCSV)) { + fileCSV = filesCSVToAppend; + + } else { + fileCSV = fileCSV + "," + filesCSVToAppend; + } + + configuration.set(fileVariable, fileCSV); + } + + public Configuration getConf() + { + return conf; + } + + public void setConf(Configuration conf) + { + this.conf = conf; + } + + public StreamFactory getStreamFactory() + { + if (streamFactory == null) { + streamFactory = new StreamFactory(); + } + return streamFactory; + } + + public String launch(boolean local) throws Exception + { + PythonAppManager.LaunchMode mode = PythonAppManager.LaunchMode.HADOOP; + if (local) { + mode = PythonAppManager.LaunchMode.LOCAL; + + } + LOG.debug("Launching mode: {}" + mode); + this.setRequiredJARFiles(); + this.setRequiredRuntimeFiles(); + this.manager = new PythonAppManager(this, mode); + + DAG dag = this.apexStream.createDag(); + + Map pythonOperatorEnv = new HashMap<>(); + if (local) { + pythonOperatorEnv.put(PythonWorkerContext.PYTHON_WORKER_PATH, this.getApexDirectoryPath() + "/runtime/worker.py"); + pythonOperatorEnv.put(PythonWorkerContext.PY4J_DEPENDENCY_PATH, this.getApexDirectoryPath() + "/runtime/" + py4jSrcZip); + } + + Collection operators = dag.getAllOperatorsMeta(); + for (DAG.OperatorMeta operatorMeta : operators) { + if (operatorMeta.getOperator() instanceof PythonGenericOperator) { + LOG.debug("Updating python operator: {}" + operatorMeta.getName()); + PythonGenericOperator operator = ((PythonGenericOperator)operatorMeta.getOperator()); + operator.setPythonOperatorEnv(pythonOperatorEnv); + } + } + return manager.launch(); + } + + public LocalMode.Controller runLocal() + { + return this.apexStream.runEmbedded(true, 0, null); + } + + public ApexStream getApexStream() + { + return apexStream; + } + + public void setApexStream(PythonApexStream apexStream) + { + this.apexStream = apexStream; + + } + + public PythonApp fromFolder(String directoryPath) + { + ApexStream currentStream = StreamFactory.fromFolder(directoryPath); + if (currentStream instanceof ApexStreamImpl) { + apexStream = new PythonApexStreamImpl((ApexStreamImpl)currentStream); + } + return this; + } + + public PythonApp fromKafka08(String zookeepers, String topic) + { + ApexStream currentStream = StreamFactory.fromKafka08(zookeepers, topic); + if (currentStream instanceof ApexStreamImpl) { + apexStream = new PythonApexStreamImpl((ApexStreamImpl)currentStream); + } + return this; + } + + public PythonApp fromData(List inputs) + { + + ApexStream currentStream = StreamFactory.fromData(inputs); + if (currentStream instanceof ApexStreamImpl) { + apexStream = new PythonApexStreamImpl((ApexStreamImpl)currentStream); + } + return this; + } + + public PythonApp fromKafka09(String zookeepers, String topic) + { + ApexStream currentStream = StreamFactory.fromKafka09(zookeepers, topic); + if (currentStream instanceof ApexStreamImpl) { + apexStream = new PythonApexStreamImpl((ApexStreamImpl)currentStream); + } + return this; + } + + public PythonApp map(String name, byte[] searializedFunction) + { + apexStream = (PythonApexStream)apexStream.map(searializedFunction, Option.Options.name(name)); + + return this; + } + + public PythonApp flatMap(String name, byte[] searializedFunction) + { + apexStream = (PythonApexStream)apexStream.flatMap(searializedFunction, Option.Options.name(name)); + return this; + } + + public PythonApp filter(String name, byte[] searializedFunction) + { + apexStream = (PythonApexStream)apexStream.filter(searializedFunction, Option.Options.name(name)); + return this; + } + + public PythonApp window() + { + apexStream = (PythonApexStream)apexStream.window(new WindowOption.GlobalWindow()); + + return this; + } + + public PythonApp count(String name) + { + if (apexStream instanceof ApexWindowedStreamImpl) { + apexStream = (PythonApexStream)((ApexWindowedStreamImpl)apexStream).count(Option.Options.name(name)); + return this; + } + return this; + } + + public PythonApp toConsole(String name) + { + apexStream = (PythonApexStream)apexStream.print(Option.Options.name(name)); + return this; + } + + public PythonApp toKafka08(String name, String topic, Map properties) + { + KafkaSinglePortOutputOperator kafkaOutputOperator = new KafkaSinglePortOutputOperator(); + kafkaOutputOperator.setTopic(topic); + List propertyList = new ArrayList(); + for (String key : properties.keySet()) { + propertyList.add(key + "=" + properties.get(key)); + } + + String producerConfigs = StringUtils.join(propertyList, ","); + LOG.debug("PropertyList for kafka producer {}" + producerConfigs); + kafkaOutputOperator.setProducerProperties(producerConfigs); + apexStream = (PythonApexStream)apexStream.endWith(kafkaOutputOperator, kafkaOutputOperator.inputPort, Option.Options.name(name)); + return this; + } + + public PythonApp toFolder(String name, String fileName, String directoryName) + { + + GenericFileOutputOperator outputOperator = new GenericFileOutputOperator<>(); + outputOperator.setFilePath(directoryName); + outputOperator.setOutputFileName(fileName); + outputOperator.setConverter(new GenericFileOutputOperator.StringToBytesConverter()); + apexStream = (PythonApexStream)apexStream.endWith(outputOperator, outputOperator.input, Option.Options.name(name)); + return this; + } + + public PythonApp setConfig(String key, String value) + { + getConf().set(key, value); + return this; + } + + public void kill() throws Exception + { + if (manager == null) { + throw new Exception("Application is not running yet"); + + } + manager.shutdown(); + } +} diff --git a/python/src/main/java/org/apache/apex/malhar/python/PythonAppFactory.java b/python/src/main/java/org/apache/apex/malhar/python/PythonAppFactory.java new file mode 100644 index 0000000000..944ca2288f --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/PythonAppFactory.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.python; + +import com.datatorrent.stram.client.StramAppLauncher; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; + + +public class PythonAppFactory implements StramAppLauncher.AppFactory +{ + private PythonApp streamingApp; + private String name; + + public PythonAppFactory(String name, PythonApp app) + { + this.name = name; + this.streamingApp = app; + } + + public LogicalPlan createApp(LogicalPlanConfiguration logicalPlanConfiguration) + { + LogicalPlan logicalPlan = new LogicalPlan(); + logicalPlanConfiguration.prepareDAG(logicalPlan, streamingApp, getName()); + return logicalPlan; + } + + public String getName() + { + return name; + } + + public String getDisplayName() + { + return name; + } +} diff --git a/python/src/main/java/org/apache/apex/malhar/python/PythonAppManager.java b/python/src/main/java/org/apache/apex/malhar/python/PythonAppManager.java new file mode 100644 index 0000000000..1928d8006e --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/PythonAppManager.java @@ -0,0 +1,86 @@ +package org.apache.apex.malhar.python; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.stram.client.StramAppLauncher; + +public class PythonAppManager +{ + private LaunchMode mode; + private Object appIdentifier; + private PythonApp app = null; + private static final Logger LOG = LoggerFactory.getLogger(PythonApp.class); + + public enum LaunchMode + { + LOCAL, HADOOP; + } + + public PythonAppManager(PythonApp app, LaunchMode mode) + { + this.app = app; + this.mode = mode; + } + + public String launch() throws Exception + { + + LOG.info("Launching app in python app"); + + try { + if (mode == LaunchMode.LOCAL) { + appIdentifier = app.runLocal(); + return "LocalMode"; + } else { + StramAppLauncher appLauncher = null; + appLauncher = new StramAppLauncher(app.getName(), app.getConf()); + appLauncher.loadDependencies(); + + PythonAppFactory appFactory = new PythonAppFactory(app.getName(), app); + + this.appIdentifier = appLauncher.launchApp(appFactory); + return this.appIdentifier.toString(); + + } + + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Failed TO Launch PYTHON Streaming Application"); + LOG.error("Encountered Exception " + e.getMessage()); + throw e; + } + + } + + public void shutdown() + { + if (mode == LaunchMode.LOCAL) { + ((LocalMode.Controller)this.appIdentifier).shutdown(); + } else { + try { + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(app.getConf()); + yarnClient.start(); + yarnClient.killApplication((ApplicationId)this.appIdentifier); + yarnClient.stop(); + } catch (YarnException e) { + e.printStackTrace(); + LOG.error("FAILED TO SHUTDOWN PYTHON STREAMING APPLICATION "); + LOG.error("Encountered Exception {}" + e.getMessage()); + } catch (IOException e) { + LOG.error("FAILED TO SHUTDOWN PYTHON STREAMING APPLICATION "); + LOG.error("Encountered Exception {} " + e.getMessage()); + } + + } + } +} diff --git a/python/src/main/java/org/apache/apex/malhar/python/operator/PythonFilterOperator.java b/python/src/main/java/org/apache/apex/malhar/python/operator/PythonFilterOperator.java new file mode 100644 index 0000000000..49a4ac3944 --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/operator/PythonFilterOperator.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.python.operator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultOutputPort; + +public class PythonFilterOperator extends PythonGenericOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(PythonFilterOperator.class); + + DefaultOutputPort falsePort = new DefaultOutputPort<>(); + DefaultOutputPort truePort = new DefaultOutputPort(); + + public PythonFilterOperator() + { + operationType = OpType.FILTER; + } + + public PythonFilterOperator(byte[] serializedFunc) + { + super(serializedFunc); + operationType = OpType.FILTER; + } + + @Override + protected void processTuple(T tuple) + { + LOG.trace("Received Tuple: {}", tuple); + Object result = pythonWorkerProxy.execute(tuple); + if (result instanceof Boolean) { + Boolean b = (Boolean)result; + LOG.trace("Filter response received: {}" + b); + if (b.booleanValue()) { + out.emit(tuple); + } + } + + } +} diff --git a/python/src/main/java/org/apache/apex/malhar/python/operator/PythonFlatMapOperator.java b/python/src/main/java/org/apache/apex/malhar/python/operator/PythonFlatMapOperator.java new file mode 100644 index 0000000000..34a52fb399 --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/operator/PythonFlatMapOperator.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.python.operator; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PythonFlatMapOperator extends PythonGenericOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(PythonFlatMapOperator.class); + + public PythonFlatMapOperator() + { + operationType = OpType.FLAT_MAP; + } + + + + public PythonFlatMapOperator(byte[] serializedFunc) + { + super(serializedFunc); + operationType = OpType.FLAT_MAP; + } + + @Override + protected void processTuple(T tuple) + { + LOG.trace("Received Tuple: {}" + tuple); + + List result = (List)pythonWorkerProxy.execute(tuple); + if (result != null) { + LOG.trace("List response received: {}" + result); + if (result instanceof List) { + for (T item : result) { + out.emit(item); + } + }else{ + LOG.warn("Returned response is not list: {}" + result); + } + } + } + +} diff --git a/python/src/main/java/org/apache/apex/malhar/python/operator/PythonGenericOperator.java b/python/src/main/java/org/apache/apex/malhar/python/operator/PythonGenericOperator.java new file mode 100644 index 0000000000..c975514d7c --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/operator/PythonGenericOperator.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.python.operator; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.python.runtime.PythonWorkerContext; +import org.apache.apex.malhar.python.runtime.PythonWorkerProxy; +import org.apache.apex.malhar.python.util.LoggerUtils; +import org.apache.apex.malhar.python.util.NetworkUtils; + +import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +import py4j.GatewayConnection; +import py4j.GatewayServer; +import py4j.GatewayServerListener; +import py4j.Py4JServerConnection; + +public abstract class PythonGenericOperator extends BaseOperator +{ + + protected transient GatewayServer server = null; + protected transient PythonGatewayServerListenser py4jListener = null; + protected transient PythonWorkerProxy pythonWorkerProxy = null; + protected byte[] serializedFunction = null; + private static final Logger LOG = LoggerFactory.getLogger(PythonGenericOperator.class); + protected transient OpType operationType = null; + private PythonWorkerContext pythonWorkerContext = null; + protected Map environementData = new HashMap(); + + + public enum OpType + { + MAP("MAP"), + FLAT_MAP("FLAT_MAP"), + FILTER("FILTER"); + + private String operationName = null; + + OpType(String name) + { + this.operationName = name; + } + + public String getType() + { + return operationName; + } + + } + + public final transient DefaultInputPort in = new DefaultInputPort() + { + @Override + public void process(T tuple) + { + + processTuple(tuple); + + } + + }; + public final transient DefaultOutputPort out = new DefaultOutputPort(); + + public PythonGenericOperator() + { + this(null); + + } + + public PythonGenericOperator(byte[] serializedFunc) + { + this.serializedFunction = serializedFunc; + this.pythonWorkerContext = new PythonWorkerContext(this.operationType, serializedFunc, environementData); + + } + + public void setup(OperatorContext context) + { + LOG.debug("Application path from Python Operator: {} ", (String)context.getValue(DAGContext.APPLICATION_PATH)); + // Setting up context path explicitly for handling local as well Hadoop Based Application Development + this.pythonWorkerContext.setup(); + + this.pythonWorkerProxy = new PythonWorkerProxy<>(this.serializedFunction); + + + // Instantiating Py4j Gateway Server for Python Worker Process connect back + boolean gatewayServerLaunchSuccess = false; + int serverStartAttempts = 5; + while (!gatewayServerLaunchSuccess && serverStartAttempts>0) { + try { + this.server = new GatewayServer(this.pythonWorkerProxy, NetworkUtils.findAvaliablePort()); + this.py4jListener = new PythonGenericOperator.PythonGatewayServerListenser(this.server, this.pythonWorkerContext); + this.server.addListener(this.py4jListener); + this.server.start(true); + gatewayServerLaunchSuccess = true; + --serverStartAttempts; + } catch (Exception ex) { + LOG.error("Gateway server failed to launch to due: {}" + ex.getMessage()); + gatewayServerLaunchSuccess = false; + } + } + + if( !gatewayServerLaunchSuccess ){ + throw new RuntimeException("Failed to launch Gateway Server"); + } + + serverStartAttempts = 5; + + while (!this.py4jListener.isPythonServerStarted() && !this.pythonWorkerProxy.isFunctionEnabled() && serverStartAttempts > 0) { + try { + Thread.sleep(5000L); + LOG.debug("Waiting for Python Worker Registration"); + --serverStartAttempts; + } catch (InterruptedException ex) { + LOG.error("Python Callback server failed to launch to due: {}" + ex.getMessage()); + } + } + if( !pythonWorkerProxy.isWorkerRegistered()){ + this.server.shutdown(); + throw new RuntimeException("Failed to launch Call Back Server"); + } + + // Transferring serialized function to Python Worker. + if (this.pythonWorkerProxy.isWorkerRegistered()) { + this.pythonWorkerProxy.setFunction(this.operationType.getType()); + } + + } + + public void teardown() + { + if (server != null) { + server.shutdown(); + } + } + + public static class PythonGatewayServerListenser implements GatewayServerListener + { + + private GatewayServer server = null; + private Process pyProcess = null; + private boolean pythonServerStarted = false; + private OpType operationType = OpType.MAP; + private static final Logger LOG = LoggerFactory.getLogger(PythonGatewayServerListenser.class); + private PythonWorkerContext context = null; + + public boolean isPythonServerStarted() + { + return this.pythonServerStarted; + } + + public PythonGatewayServerListenser(GatewayServer startedServer, PythonWorkerContext context) + { + this.server = startedServer; + this.context = context; + } + + public void connectionError(Exception e) + { + LOG.debug("Python Connection error : {}", e.getMessage()); + + } + + @Override + public void connectionStarted(Py4JServerConnection py4JServerConnection) + { + + } + + @Override + public void connectionStopped(Py4JServerConnection py4JServerConnection) + { + + } + + public void connectionStarted(GatewayConnection gatewayConnection) + { + LOG.debug("Python Connection started: {}", gatewayConnection.getSocket().getPort()); + + } + + public void connectionStopped(GatewayConnection gatewayConnection) + { + LOG.debug("Python Connection stoppped"); + } + + public void serverError(Exception e) + { + LOG.debug("Gateway Server error: {}", e.getMessage()); + } + + public void serverPostShutdown() + { + + LOG.debug("Gateway server shut down"); + } + + public void serverPreShutdown() + { + LOG.debug("Gateway server shutting down"); + + if (this.pyProcess != null) { + this.pyProcess.destroy(); + LOG.debug("Destroyed python worker process"); + } + } + + public void serverStarted() + { + LOG.debug("Gateway server started: {}", this.server.getPort()); + this.startPythonWorker(this.server.getPort()); + } + + public void serverStopped() + { + LOG.debug("Gateway server stopped"); + if (this.pyProcess != null) { + this.pyProcess.destroy(); + LOG.debug("Destroyed python worker process"); + } + + } + + private void startPythonWorker(int gatewayServerPort) + { + ProcessBuilder pb = new ProcessBuilder(new String[0]); + try { + LOG.info("Starting python worker process using context: {}", this.context); + LOG.info("Worker File Path: {}", this.context.getWorkerFilePath()); + LOG.info("Python Environment Path: {}", this.context.getPythonEnvPath()); + Map processEnvironment = pb.environment(); + processEnvironment.put("PYTHONPATH", this.context.getPythonEnvPath()); + this.pyProcess = pb.command(new String[]{"/usr/bin/python", "-u", this.context.getWorkerFilePath(), "" + gatewayServerPort, operationType.getType()}).start(); + LoggerUtils.captureProcessStreams(this.pyProcess); + this.pythonServerStarted = true; + PythonGenericOperator.LOG.info("Python worker started: {}", this.pyProcess); + } catch (IOException exception) { + + PythonGenericOperator.LOG.error("Failed to start python server: {}" + exception.getMessage()); + } + } + } + + protected abstract void processTuple(T tuple); + + public void setPythonOperatorEnv(Map environementData) + { + this.environementData = environementData; + if (pythonWorkerContext == null) { + this.pythonWorkerContext = new PythonWorkerContext(this.operationType, serializedFunction, environementData); + } else { + this.pythonWorkerContext.setEnvironmentData(environementData); + } + } + +} diff --git a/python/src/main/java/org/apache/apex/malhar/python/operator/PythonMapOperator.java b/python/src/main/java/org/apache/apex/malhar/python/operator/PythonMapOperator.java new file mode 100644 index 0000000000..fd5a9da9b8 --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/operator/PythonMapOperator.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.python.operator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PythonMapOperator extends PythonGenericOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(PythonMapOperator.class); + + public PythonMapOperator() + { + operationType = OpType.MAP; + } + + public PythonMapOperator(byte[] serializedFunc) + { + super(serializedFunc); + operationType = OpType.MAP; + } + + @Override + protected void processTuple(T tuple) + { + + LOG.trace("Received Tuple: {} ", tuple); + Object result = pythonWorkerProxy.execute(tuple); + if (result != null) { + LOG.trace("Response received: {} ", result); + out.emit((T)result); + } + } +} diff --git a/python/src/main/java/org/apache/apex/malhar/python/runtime/PythonApexStreamImpl.java b/python/src/main/java/org/apache/apex/malhar/python/runtime/PythonApexStreamImpl.java new file mode 100644 index 0000000000..9769d08545 --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/runtime/PythonApexStreamImpl.java @@ -0,0 +1,63 @@ +package org.apache.apex.malhar.python.runtime; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.python.operator.PythonFilterOperator; +import org.apache.apex.malhar.python.operator.PythonFlatMapOperator; +import org.apache.apex.malhar.python.operator.PythonGenericOperator; +import org.apache.apex.malhar.python.operator.PythonMapOperator; +import org.apache.apex.malhar.stream.api.Option; +import org.apache.apex.malhar.stream.api.PythonApexStream; +import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Operator; + +/** + * Created by vikram on 7/6/17. + */ + +@InterfaceStability.Evolving +public class PythonApexStreamImpl extends ApexStreamImpl implements PythonApexStream +{ + private static final Logger LOG = LoggerFactory.getLogger(PythonApexStreamImpl.class); + + + public PythonApexStreamImpl() + { + super(); + } + + public PythonApexStreamImpl(ApexStreamImpl apexStream) + { + super(apexStream); + } + + @Override + public ApexStreamImpl map(byte[] serializedFunction, Option... opts) + { + LOG.debug("Adding Python map operator"); + PythonGenericOperator operator = new PythonMapOperator(serializedFunction); + return addOperator(operator, (Operator.InputPort)operator.in, (Operator.OutputPort)operator.out, opts); + + } + + @Override + public ApexStreamImpl flatMap(byte[] serializedFunction, Option... opts) + { + LOG.debug("Adding Python flatmap operator"); + PythonGenericOperator operator = new PythonFlatMapOperator(serializedFunction); + return addOperator(operator, (Operator.InputPort)operator.in, (Operator.OutputPort)operator.out, opts); + + } + + @Override + public ApexStreamImpl filter(byte[] serializedFunction, Option... opts) + { + LOG.debug("Adding Python filter operator"); + PythonFilterOperator operator = new PythonFilterOperator<>(serializedFunction); + return addOperator(operator, (Operator.InputPort)operator.in, (Operator.OutputPort)operator.out, opts); + + } +} diff --git a/python/src/main/java/org/apache/apex/malhar/python/runtime/PythonWorker.java b/python/src/main/java/org/apache/apex/malhar/python/runtime/PythonWorker.java new file mode 100644 index 0000000000..a6e299908a --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/runtime/PythonWorker.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.python.runtime; + +import java.util.Map; + +public interface PythonWorker +{ + public Object setFunction(byte[] func, String opType); + + public Object execute(T tuple); + + public boolean setState(Map map); + +} diff --git a/python/src/main/java/org/apache/apex/malhar/python/runtime/PythonWorkerContext.java b/python/src/main/java/org/apache/apex/malhar/python/runtime/PythonWorkerContext.java new file mode 100644 index 0000000000..b216a63941 --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/runtime/PythonWorkerContext.java @@ -0,0 +1,101 @@ +package org.apache.apex.malhar.python.runtime; + +import java.io.File; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.python.operator.PythonGenericOperator; + +public class PythonWorkerContext implements Serializable +{ + public static String PY4J_DEPENDENCY_PATH = "PY4J_DEPENDENCY_PATH"; + public static String PYTHON_WORKER_PATH = "PYTHON_WORKER_PATH"; + public static String PY4J_SRC_ZIP_FILE_NAME = "py4j-0.10.4-src.zip"; + public static String PYTHON_WORKER_FILE_NAME = "worker.py"; + public static String ENV_VAR_PYTHONPATH = "PYTHONPATH"; + + private String dependencyPath = null; + private String workerFilePath = null; + private String pythonEnvPath = null; + private static final Logger LOG = LoggerFactory.getLogger(PythonWorkerContext.class); + + private byte[] serializedFunction = null; + private PythonGenericOperator.OpType opType = null; + + // environment data is set explicitly with local paths for managing local mode execution + private Map environmentData = new HashMap(); + + public PythonWorkerContext() + { + + } + + public PythonWorkerContext(PythonGenericOperator.OpType operationType, byte[] serializedFunction, Map environmentData) + { + this(); + this.opType = operationType; + this.serializedFunction = serializedFunction; + this.environmentData = environmentData; + } + + public void setup() + { + LOG.info("Setting up worker context: {}", this); + File py4jDependencyFile = new File("./" + PY4J_SRC_ZIP_FILE_NAME); + pythonEnvPath = System.getenv(ENV_VAR_PYTHONPATH); + LOG.info("Found python environment path: {}", pythonEnvPath); + if (pythonEnvPath != null) { + pythonEnvPath = py4jDependencyFile.getAbsolutePath() + ":" + pythonEnvPath; + } else { + pythonEnvPath = py4jDependencyFile.getAbsolutePath(); + } + LOG.debug("Final python environment path with Py4j depenency path: {}", pythonEnvPath); + LOG.info("FINAL DEPENDENCY PATH: {}", environmentData.get(PY4J_DEPENDENCY_PATH)); + if ((this.dependencyPath = environmentData.get(PY4J_DEPENDENCY_PATH)) == null) { + this.dependencyPath = py4jDependencyFile.getAbsolutePath(); + } + + LOG.info("FINAL WORKER PATH: {}", environmentData.get(PYTHON_WORKER_PATH)); + if ((this.workerFilePath = environmentData.get(PYTHON_WORKER_PATH)) == null) { + File pythonWorkerFile = new File("./" + PYTHON_WORKER_FILE_NAME); + this.workerFilePath = pythonWorkerFile.getAbsolutePath(); + } + + LOG.info("Python dependency Path {} worker Path {}" , this.dependencyPath, this.workerFilePath); + } + + public synchronized String getDependencyPath() + { + return this.dependencyPath; + } + + public synchronized String getWorkerFilePath() + { + return this.workerFilePath; + } + + public synchronized String getPythonEnvPath() + { + return this.pythonEnvPath; + } + + public synchronized byte[] getSerializedFunction() + { + return this.serializedFunction; + } + + + public synchronized Map getEnvironmentData() + { + return this.environmentData; + } + + public synchronized void setEnvironmentData(Map environmentData) + { + this.environmentData = environmentData; + } +} diff --git a/python/src/main/java/org/apache/apex/malhar/python/runtime/PythonWorkerProxy.java b/python/src/main/java/org/apache/apex/malhar/python/runtime/PythonWorkerProxy.java new file mode 100644 index 0000000000..cd66d1f4fc --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/runtime/PythonWorkerProxy.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.python.runtime; + +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.python.runtime.PythonWorker; + +import py4j.Py4JException; + +public class PythonWorkerProxy +{ + private static final Logger LOG = LoggerFactory.getLogger(PythonWorkerProxy.class); + private PythonWorker registerdPythonWorker = null; + private boolean functionEnabled = false; + private byte[] serializedFunction = null; + private boolean workerRegistered = false; + + protected Map tupleList = new HashMap(); + + int counter = 0; + + public PythonWorkerProxy(byte[] serializedFunc) + { + this.serializedFunction = serializedFunc; + } + + public Object execute(T tuple) + { + if (registerdPythonWorker != null) { + + Object result = null; + LOG.trace("Processing tuple: {}", tuple); + try { + result = registerdPythonWorker.execute(tuple); + LOG.trace("Processed tuple: {}", result); + } catch (Py4JException ex) { + LOG.error("Exception encountered while executing operation for tuple: {} Message: {}", tuple, ex.getMessage()); + } + Integer counterString = new Integer(counter); + for (String key : tupleList.keySet()) { + LOG.error("Updated Keys status: {} {}", key, tupleList.get(key)); + + } + LOG.error("Tuple List count: {}", tupleList.size()); + counter++; + + return result; + + } + return null; + } + + public void register(PythonWorker pythonWorker) + { + LOG.debug("Registering python worker now"); + this.registerdPythonWorker = pythonWorker; + this.workerRegistered = true; + LOG.debug("Python worker registered"); + } + + public void setFunction(String opType) + { + if (this.isWorkerRegistered() && !isFunctionEnabled()) { + LOG.debug("Setting Serialized function"); + this.registerdPythonWorker.setFunction(this.serializedFunction, opType); + this.registerdPythonWorker.setState(tupleList); + this.functionEnabled = true; + LOG.debug("Set Serialized function"); + } + } + + public boolean isWorkerRegistered() + { + return this.workerRegistered; + } + + public boolean isFunctionEnabled() + { + return this.functionEnabled; + } +} diff --git a/python/src/main/java/org/apache/apex/malhar/python/util/LoggerUtils.java b/python/src/main/java/org/apache/apex/malhar/python/util/LoggerUtils.java new file mode 100644 index 0000000000..2d42e30918 --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/util/LoggerUtils.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.python.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.python.operator.PythonGenericOperator; + +public class LoggerUtils +{ + + public static class InputStreamConsumer extends Thread + { + private static final Logger LOG = LoggerFactory.getLogger(PythonGenericOperator.class); + private InputStream is; + private String name; + private StreamType streamType; + private String processId; + + public enum StreamType + { + ERROR, OUTPUT + } + + public InputStreamConsumer(String name, InputStream is, StreamType streamType) + { + this.is = is; + this.name = name; + this.streamType = streamType; + } + + @Override + public void run() + { + LOG.info("Starting Stream Gobbler " + this.name); + try { + + InputStreamReader isr = new InputStreamReader(this.is); + BufferedReader br = new BufferedReader(isr); + String line; + while ((line = br.readLine()) != null) { + if (this.streamType == StreamType.ERROR) { + LOG.error(" From other process :" + line); + } else { + LOG.info(" From other process :" + line); + + } + } + } catch (IOException exp) { + exp.printStackTrace(); + } + + LOG.info("Exiting Stream Gobbler " + this.name); + } + } + + public static void captureProcessStreams(Process process) + { + InputStreamConsumer stdoutConsumer = new InputStreamConsumer("outputStream", process.getInputStream(), InputStreamConsumer.StreamType.OUTPUT); + InputStreamConsumer erroConsumer = new InputStreamConsumer("errorStream", process.getErrorStream(), InputStreamConsumer.StreamType.ERROR); + erroConsumer.start(); + stdoutConsumer.start(); + } +} diff --git a/python/src/main/java/org/apache/apex/malhar/python/util/NetworkUtils.java b/python/src/main/java/org/apache/apex/malhar/python/util/NetworkUtils.java new file mode 100644 index 0000000000..28511a5ede --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/python/util/NetworkUtils.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.python.util; + +import java.io.IOException; +import java.net.ServerSocket; + +import com.google.common.base.Throwables; + +public class NetworkUtils +{ + + public static int findAvaliablePort() + { + // Find port + int foundPort = -1; + try { + ServerSocket serverSocket = new ServerSocket(0); + foundPort = serverSocket.getLocalPort(); + serverSocket.close(); + return foundPort; + } catch (IOException e) { + throw Throwables.propagate(e); + } + + } +} diff --git a/python/src/main/java/org/apache/apex/malhar/stream/api/PythonApexStream.java b/python/src/main/java/org/apache/apex/malhar/stream/api/PythonApexStream.java new file mode 100644 index 0000000000..6d36e63021 --- /dev/null +++ b/python/src/main/java/org/apache/apex/malhar/stream/api/PythonApexStream.java @@ -0,0 +1,30 @@ +package org.apache.apex.malhar.stream.api; + +/** + * Created by vikram on 7/6/17. + */ +public interface PythonApexStream extends ApexStream +{ + + /** + * Add custom serialized Python Function along with options + * @param serializedFunction stores Serialized Function data + * @return new stream of type T + */ + > STREAM map(byte[] serializedFunction, Option... opts); + + /** + * Add custom serialized Python Function along with options + * @param serializedFunction stores Serialized Function data + * @return new stream of type T + */ + > STREAM flatMap(byte[] serializedFunction, Option... opts); + + /** + * Add custom serialized Python Function along with options + * @param serializedFunction stores Serialized Function data + * @return new stream of type T + */ + > STREAM filter(byte[] serializedFunction, Option... opts); + +} diff --git a/python/src/test/java/org/apache/apex/malhar/python/operator/runtime/PythonOperatorTest.java b/python/src/test/java/org/apache/apex/malhar/python/operator/runtime/PythonOperatorTest.java new file mode 100644 index 0000000000..3be51fb0c2 --- /dev/null +++ b/python/src/test/java/org/apache/apex/malhar/python/operator/runtime/PythonOperatorTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.python.operator.runtime; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.python.operator.PythonMapOperator; +import org.apache.apex.malhar.python.runtime.PythonWorkerContext; +import org.apache.commons.codec.binary.Base64; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.stram.StramLocalCluster; + +public class PythonOperatorTest +{ + private static int TupleCount; + private static List lengthList = new ArrayList<>(); + private static int tupleSumInResultCollector = 0; + private static final int NumTuples = 10; + private static final Logger LOG = LoggerFactory.getLogger(PythonOperatorTest.class); + + public static class NumberGenerator extends BaseOperator implements InputOperator + { + private int num; + + public final transient DefaultOutputPort output = new DefaultOutputPort(); + + @Override + public void setup(Context.OperatorContext context) + { + num = 0; + } + + @Override + public void emitTuples() + { + if (num < NumTuples) { + output.emit(new Integer(num)); + num++; + } + } + } + + + public static class ResultCollector extends BaseOperator + { + + public final transient DefaultInputPort input = new DefaultInputPort() + { + + @Override + public void process(Integer in) + { + TupleCount++; + lengthList.add(in); + tupleSumInResultCollector += in; + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + TupleCount = 0; + lengthList = new ArrayList<>(); + } + + } + + @Test + public void testPythonMapOperator() + { + + LocalMode lma = LocalMode.newInstance(); + DAG dag = lma.getDAG(); + + NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator()); + + ResultCollector collector = dag.addOperator("collector", new ResultCollector()); +// Base64 string for map function written and serialized using pickle +// Function : +// lambda(a): int(a)*2 + String funcAsBase64String = "gAJjY2xvdWRwaWNrbGUuY2xvdWRwaWNrbGUKX2ZpbGxfZnVuY3Rpb24KcQAoY2Nsb3VkcGlja2xlLmNsb3VkcGlja2xlCl9tYWtlX3NrZWxfZnVuYwpxAWNjbG91ZHBpY2tsZS5jbG91ZHBpY2tsZQpfYnVpbHRpbl90eXBlCnECVQhDb2RlVHlwZXEDhXEEUnEFKEsBSwFLAktDVQ50AAB8AACDAQBkAQAUU3EGTksChnEHVQNpbnRxCIVxCVUBZnEKhXELVR48aXB5dGhvbi1pbnB1dC00LTU4NDRlZWUwNzQ4Zj5xDFUIPGxhbWJkYT5xDUsBVQBxDikpdHEPUnEQXXERfXESh3ETUnEUfXEVTn1xFnRSLg=="; + + byte[] decoded = Base64.decodeBase64(funcAsBase64String); + + Map environmentData = new HashMap<>(); + + final String cwd = System.getProperty("user.dir"); + String pythonRuntimeDirectory = cwd + "/../python/pyapex/runtime"; + LOG.debug("Current working directory:" + pythonRuntimeDirectory); + environmentData.put(PythonWorkerContext.PYTHON_WORKER_PATH, pythonRuntimeDirectory + "/" + PythonWorkerContext.PYTHON_WORKER_FILE_NAME); + environmentData.put(PythonWorkerContext.PY4J_DEPENDENCY_PATH, pythonRuntimeDirectory + "/" + PythonWorkerContext.PY4J_SRC_ZIP_FILE_NAME); + PythonMapOperator mapOperator = new PythonMapOperator(decoded); + mapOperator.setPythonOperatorEnv(environmentData); + + dag.addOperator("mapOperator", mapOperator); + + dag.addStream("raw numbers", numGen.output, mapOperator.in); + dag.addStream("mapped results", mapOperator.out, collector.input); + + // Create local cluster + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(false); + + ((StramLocalCluster)lc).setExitCondition(new Callable() + { + @Override + public Boolean call() throws Exception + { + return TupleCount == NumTuples; + } + }); + + lc.run(50000); + + Assert.assertEquals(NumTuples, TupleCount); + Assert.assertEquals(90, tupleSumInResultCollector); + } +} diff --git a/python/src/test/java/org/apache/apex/malhar/python/operator/runtime/PythonWorkerContextTest.java b/python/src/test/java/org/apache/apex/malhar/python/operator/runtime/PythonWorkerContextTest.java new file mode 100644 index 0000000000..7b474ce7fc --- /dev/null +++ b/python/src/test/java/org/apache/apex/malhar/python/operator/runtime/PythonWorkerContextTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.python.operator.runtime; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.apex.malhar.python.runtime.PythonWorkerContext; + +//import org.apache.apex.malhar.python.operator.PythonGenericOperator; + +public class PythonWorkerContextTest +{ + + @Test + public void testPythonWorkerContextTest() + { + PythonWorkerContext context = new PythonWorkerContext(); + String currentWorkingDirectory = "/home/data"; + Map environmentData = new HashMap<>(); + environmentData.put(PythonWorkerContext.PY4J_DEPENDENCY_PATH, currentWorkingDirectory + "/./" + PythonWorkerContext.PY4J_SRC_ZIP_FILE_NAME); + environmentData.put(PythonWorkerContext.PYTHON_WORKER_PATH, currentWorkingDirectory + "/./" + PythonWorkerContext.PYTHON_WORKER_FILE_NAME); + + context.setEnvironmentData(environmentData); + context.setup(); + Assert.assertEquals(currentWorkingDirectory + "/./" + PythonWorkerContext.PY4J_SRC_ZIP_FILE_NAME, context.getDependencyPath()); + Assert.assertEquals(currentWorkingDirectory + "/./" + PythonWorkerContext.PYTHON_WORKER_FILE_NAME, context.getWorkerFilePath()); + + } + + @Test + public void testPythonWorkerContextWithDeafaultTest() + { + + PythonWorkerContext context = new PythonWorkerContext(); + String currentWorkingDirectory = System.getProperty("user.dir"); + context.setup(); + Assert.assertEquals(currentWorkingDirectory + "/./" + PythonWorkerContext.PY4J_SRC_ZIP_FILE_NAME, context.getDependencyPath()); + Assert.assertEquals(currentWorkingDirectory + "/./" + PythonWorkerContext.PYTHON_WORKER_FILE_NAME, context.getWorkerFilePath()); + + + } + +} diff --git a/python/src/test/java/org/apache/apex/malhar/python/operator/runtime/PythonWorkerProxyTest.java b/python/src/test/java/org/apache/apex/malhar/python/operator/runtime/PythonWorkerProxyTest.java new file mode 100644 index 0000000000..64ace04a93 --- /dev/null +++ b/python/src/test/java/org/apache/apex/malhar/python/operator/runtime/PythonWorkerProxyTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.python.operator.runtime; + +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.python.runtime.PythonWorker; +import org.apache.apex.malhar.python.runtime.PythonWorkerProxy; + +import py4j.Py4JException; + +import static org.mockito.Matchers.any; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({PythonWorkerProxy.class, LoggerFactory.class}) +public class PythonWorkerProxyTest +{ + + public static class PythonTestWorker implements PythonWorker + { + + @Override + public Object setFunction(byte[] func, String opType) + { + return opType; + } + + @Override + public Object execute(Object tuple) + { + return tuple; + } + + @Override + public boolean setState(Map map) + { + return false; + } + } + + @Test + public void testPythonWorkerRegisterAndExecute() + { + mockStatic(LoggerFactory.class); + Logger logger = mock(Logger.class); + when(LoggerFactory.getLogger(PythonWorkerProxy.class)).thenReturn(logger); + + String functionData = new String("TestFunction"); + PythonWorkerProxy workerProxy = new PythonWorkerProxy(functionData.getBytes()); + PythonTestWorker worker = new PythonTestWorker(); + workerProxy.register(worker); + workerProxy.setFunction("DUMMY_OPERATION"); + Assert.assertEquals("TUPLE", worker.execute("TUPLE")); + } + + @Test() + public void testPythonFailureWhileProcessingTuple() + { + mockStatic(LoggerFactory.class); + Logger logger = mock(Logger.class); + when(LoggerFactory.getLogger(any(Class.class))).thenReturn(logger); + + String exceptionString = "DUMMY EXCEPTION"; + String functionData = new String("TestFunction"); + PythonWorker failingMockWorker = mock(PythonWorker.class); + when(failingMockWorker.execute("TUPLE")).thenThrow(new Py4JException(exceptionString)); + + PythonWorkerProxy workerProxy = new PythonWorkerProxy(functionData.getBytes()); + workerProxy.register(failingMockWorker); +// verify(logger).debug("Registering python worker now"); +// verify(logger).debug("Python worker registered"); + String tupleValue = "TUPLE"; + Assert.assertEquals(null, workerProxy.execute(tupleValue)); + +// verify(logger).trace("Processing tuple:" + tupleValue); +// verify(logger).error("Exception encountered while executing operation for tuple:" + tupleValue + " Message:" + exceptionString); + + } +} diff --git a/python/src/test/resources/log4j.properties b/python/src/test/resources/log4j.properties new file mode 100644 index 0000000000..c89382e30a --- /dev/null +++ b/python/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=INFO + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug diff --git a/stream/pom.xml b/stream/pom.xml index 4389c4690c..f81cc812bf 100755 --- a/stream/pom.xml +++ b/stream/pom.xml @@ -94,6 +94,5 @@ cglib 3.2.1 - diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java index c09efa5d7d..837b01d7ef 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java @@ -32,6 +32,7 @@ import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; import com.datatorrent.api.Operator; /** @@ -156,7 +157,6 @@ public interface ApexStream */ > STREAM with(String propName, Object value); - /** * Create dag from stream * @return {@see DAG} @@ -169,7 +169,6 @@ public interface ApexStream */ void populateDag(DAG dag); - /** * Run the stream application in local mode * In Async mode, the method would return immediately and the dag would run for "duration" milliseconds @@ -178,8 +177,7 @@ public interface ApexStream * @param async true if run in Async mode * false if run in sync mode */ - void runEmbedded(boolean async, long duration, Callable exitCondition); - + LocalMode.Controller runEmbedded(boolean async, long duration, Callable exitCondition); /** * Submit the application to cluster diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java index bb0f78169b..f4983ffccb 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java @@ -7,7 +7,15 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 +<<<<<<< HEAD +<<<<<<< HEAD + * http://www.apache.org/licenses/LICENSE-2.0 +======= + * http://www.apache.org/licenses/LICENSE-2.0 +>>>>>>> f9250d8... Reformatted code +======= + * http://www.apache.org/licenses/LICENSE-2.0 +>>>>>>> e4d3bf9... Fixed style related changes * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -28,6 +36,8 @@ import java.util.concurrent.Callable; import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.apex.malhar.lib.function.Function; import org.apache.apex.malhar.lib.function.Function.FlatMapFunction; @@ -111,6 +121,8 @@ public class ApexStreamImpl implements ApexStream } + private static final Logger LOG = LoggerFactory.getLogger(ApexStreamImpl.class); + /** * The extension point of the stream * @@ -144,8 +156,8 @@ public Pair getLastStream() { return lastStream; } - } + } /** * Graph behind the stream @@ -319,8 +331,7 @@ public ApexStreamImpl print(Option... opts) public ApexStreamImpl print() { ConsoleOutputOperator consoleOutputOperator = new ConsoleOutputOperator(); - addOperator(consoleOutputOperator, - (Operator.InputPort)consoleOutputOperator.input, null, Option.Options.name(IDGenerator.generateOperatorIDWithUUID(consoleOutputOperator.getClass()))); + addOperator(consoleOutputOperator, (Operator.InputPort)consoleOutputOperator.input, null, Option.Options.name(IDGenerator.generateOperatorIDWithUUID(consoleOutputOperator.getClass()))); return this; } @@ -422,7 +433,7 @@ public void populateDag(DAG dag) } @Override - public void runEmbedded(boolean async, long duration, Callable exitCondition) + public LocalMode.Controller runEmbedded(boolean async, long duration, Callable exitCondition) { LocalMode lma = LocalMode.newInstance(); populateDag(lma.getDAG()); @@ -440,6 +451,7 @@ public void runEmbedded(boolean async, long duration, Callable exitCond lc.run(); } } + return lc; } diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java index d6201ad5d3..b5ef73b262 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java @@ -18,6 +18,8 @@ */ package org.apache.apex.malhar.stream.api.impl; +import java.util.List; + import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; import org.apache.apex.malhar.kafka.PartitionStrategy; import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator; @@ -28,6 +30,7 @@ import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator; import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator; +import com.datatorrent.lib.io.fs.InMemoryDataInputOperator; import static org.apache.apex.malhar.stream.api.Option.Options.name; @@ -53,6 +56,19 @@ public static ApexStream fromFolder(String folderName, Option... opts) return newStream.addOperator(fileLineInputOperator, null, fileLineInputOperator.output, opts); } + /** + * Allow you to provide data as in-memory list and various options to configure in-memory data input operator + * @param input + * @param opts + * @return + */ + public static ApexStream fromData(List input, Option... opts) + { + InMemoryDataInputOperator inMemoryDataInputOperator = new InMemoryDataInputOperator(input); + ApexStreamImpl newStream = new ApexStreamImpl<>(); + return newStream.addOperator(inMemoryDataInputOperator, null, inMemoryDataInputOperator.outputPort, opts); + } + public static ApexStream fromFolder(String folderName) { return fromFolder(folderName, name("FolderScanner")); @@ -123,5 +139,4 @@ public static ApexStream fromKafka09(String brokers, String topic, Parti return newStream.addOperator(kafkaInput, null, kafkaInput.outputPort, opts); } - } diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java index 5cc5d98528..924ec715d8 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Date; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.junit.Assert; @@ -86,6 +87,7 @@ public void testAddOperator() Assert.assertEquals("second", inputPortMeta.getOperatorMeta().getName()); } + // Assert the stream is thread local Assert.assertTrue(stream.getLocality() == DAG.Locality.THREAD_LOCAL); } diff --git a/stream/src/test/resources/log4j.properties b/stream/src/test/resources/log4j.properties new file mode 100644 index 0000000000..1c9776b5d8 --- /dev/null +++ b/stream/src/test/resources/log4j.properties @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug +log4j.logger.org=info