-
Notifications
You must be signed in to change notification settings - Fork 148
[REVIEW-ONLY] Python binding for high level apis #613
base: master
Are you sure you want to change the base?
Conversation
@@ -21,12 +21,13 @@ | |||
import java.io.IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this change here? Can you rebase?
</executions> | ||
</plugin> | ||
<plugin> | ||
<artifactId>maven-dependency-plugin</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it need to be redefined? Can it not be inherited from the parent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed this one
python/pom.xml
Outdated
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-client</artifactId> | ||
<version>${hadoop.version}</version> | ||
<!--<scope>provided</scope>--> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these not provided?
python/examples/stock_filter.py
Outdated
@@ -0,0 +1,14 @@ | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this go into standard malhar/examples?
There are some binary files checked in, example py4j-0.10.4.jar, py4j-0.10.4-src.zip, py4j-0.10.4.tar.gz. Are these needed? |
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.apex.malhar.python.operator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Package path doesn't have stream. What is the benefit of having this class here instead of in python package.
int pythonServerStartAttempts = 5; | ||
while (!this.py4jListener.isPythonServerStarted() && !this.pythonWorkerProxy.isFunctionEnabled() && pythonServerStartAttempts > 0) { | ||
try { | ||
Thread.sleep(5000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timeout should be configurable
Thread.sleep(5000L); | ||
LOG.debug("Waiting for Python Worker Registration"); | ||
--pythonServerStartAttempts; | ||
} catch (InterruptedException var9) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable name.. sounds generated
this.pythonWorkerProxy.setFunction(this.operationType.getType()); | ||
} | ||
if (!this.py4jListener.isPythonServerStarted()) { | ||
LOG.error("Python server could not be started"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this throw an exception and fail
@@ -77,7 +78,7 @@ | |||
* @param <O> type of the output | |||
* @return new stream of type O | |||
*/ | |||
<O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort, Option... opts); | |||
<O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort, Option... opts); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid formatting change if there is no other change here.
* @param serializedFunction stores Serialized Function data | ||
* @return new stream of type T | ||
*/ | ||
<STREAM extends ApexStream<T>> STREAM map_func(byte[] serializedFunction, Option... opts); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function name not in java style
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting change may be unnecessary
foundPort = serverSocket.getLocalPort(); | ||
serverSocket.close(); | ||
return foundPort; | ||
} catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finally to close socket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are few binaries files present in the pull request, need to discuss with community about which types of binary files are acceptable in repository.
|
||
public class InMemoryDataInputOperator<T> implements InputOperator | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra space
docs/python/main.md
Outdated
|
||
from pyapex import createApp | ||
a=createApp('python_app').fromKafka08('localhost:2181','test_topic') \ | ||
.setFilter('filter_operator',f) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer functions names such as map/filter/flatmap rather than setMap, setFilter, setFlatmap.
docs/python/main.md
Outdated
|
||
``` | ||
|
||
def f(a): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Give meaningful names to functions in documentations. how about filterCondition?
@Override | ||
public void beginWindow(long l) | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra space
pom.xml
Outdated
@@ -25,7 +25,7 @@ | |||
<parent> | |||
<groupId>org.apache.apex</groupId> | |||
<artifactId>apex</artifactId> | |||
<version>3.4.0</version> | |||
<version>3.6.0</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apache Apex dependency is already moved to 3.6, you can undo this change.
} else { | ||
pythonEnvPath = py4jDependencyFile.getAbsolutePath(); | ||
} | ||
LOG.debug("Final python environment path with Py4j depenency path: " + pythonEnvPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use {} in LOG statements
|
||
import java.util.Map; | ||
|
||
public interface PythonWorker<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add java documentation for public interfaces / classes and methods
stream/pom.xml
Outdated
@@ -94,6 +94,35 @@ | |||
<artifactId>cglib</artifactId> | |||
<version>3.2.1</version> | |||
</dependency> | |||
|
|||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove these dependencies from stream package.
@@ -212,4 +210,6 @@ | |||
*/ | |||
WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption, Duration allowLateness); | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra lines added
@@ -80,7 +81,7 @@ public void testAddOperator() | |||
// Assert the stream is from first operator to second operator | |||
Assert.assertEquals("first", stream.getSource().getOperatorMeta().getName()); | |||
Assert.assertTrue(1 == stream.getSinks().size()); | |||
Assert.assertEquals("second", stream.getSinks().get(0).getOperatorWrapper().getName()); | |||
Assert.assertEquals("second", ((List<LogicalPlan.InputPortMeta>)(stream.getSinks())).get(0).getOperatorMeta().getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this part of your code change?
@patilvikram please make sure that jenkins / travis build passes. |
92a1f1b
to
cf79a17
Compare
cf79a17
to
151413c
Compare
5336cd4
to
a043f96
Compare
def __init__(self, trigger_type, count): | ||
super(CountTrigger, self).__init__(trigger_type) | ||
self.count = count | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove Extra lines from bottom of this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add documentation to code
python/apex-python/src/setup.py
Outdated
setup(name='pyapex', | ||
version='0.0.4', | ||
py_modules=['pyapex','pyapex.runtime','pyapex.functions'], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove
fi | ||
if [ -f "$MVN_GENERATED_PATH" ]; then | ||
# development launch mode | ||
DT_CORE_JAR="$BUILD_DIR/malhar-python-3.8.0-SNAPSHOT.jar" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this versions dyanmically stamped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check how I can include current artifact in the classpath files like MVN_GENERATED_PATH
#ln -sf $PYAPEX_HOME "$script_dir/pyapex" | ||
if [ -z "$PYTHONPATH" ] | ||
then | ||
export PYTHONPATH="$PYAPEX_HOME/deps/pyapex-0.0.4-src.zip" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add versions dynamically here
{ | ||
private static final Logger LOG = LoggerFactory.getLogger(PythonGenericOperator.class); | ||
protected byte[] serializedFunction = null; | ||
private PythonServer server = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make Python Server transient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed this one
{ | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(PythonWindowedOperator.class); | ||
private PythonServer server = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make PythonServer server transient
assert False, "Invalid Option" | ||
|
||
|
||
if __name__ == "__main__": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update this one in pyshell to call apexcli commands directly for kill / shutdown or any other updates
9a7f129
to
8d06aa2
Compare
Did this branch die? Has it been superceded by another? I got here from this ticket which was linked from the roadmap. |
@darrengarvey Right now its still at the dangling stage, the review is not yet completed. Let me know if you have any questions about this issue or PR. |
@PramodSSImmaneni @chinmaykolhatkar
I have created this REVIEW-ONLY PR for extending Python support for extending High Level APIs. Currently this is in early stage so I will add documentations which will help you setup environment to launch python app from your machine. Also I will include simple example in this PR.