From d50fa0d9f6e798dcb3b5d15d3fb5b563d332678d Mon Sep 17 00:00:00 2001 From: Yann Diorcet Date: Thu, 15 Jun 2017 21:21:56 +0200 Subject: [PATCH 1/3] Regenerate project.xml --- CMakeLists.txt | 15 ++++++++++- Makefile.am | 1 + bindings/python/malamute/_malamute_ctypes.py | 17 ++++++------ ci_build.sh | 27 ++++++++++++-------- configure.ac | 20 +++++++++++++++ packaging/obs/_service | 4 +++ 6 files changed, 65 insertions(+), 19 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index cf1372ff..f880deab 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -66,6 +66,13 @@ if (MSVC) set(MORE_LIBRARIES ws2_32 Rpcrt4 Iphlpapi) endif() +# specific case of windows UWP +if( ${CMAKE_SYSTEM_NAME} STREQUAL "WindowsStore" AND ${CMAKE_SYSTEM_VERSION} STREQUAL "10.0") + ADD_DEFINITIONS(-DZMQ_HAVE_WINDOWS_UWP) + ADD_DEFINITIONS(-D_WIN32_WINNT=_WIN32_WINNT_WIN10) +endif() + + # required libraries for mingw if (MINGW) set(MORE_LIBRARIES -lws2_32 -lrpcrt4 -liphlpapi) @@ -163,7 +170,13 @@ set_target_properties(mlm PROPERTIES DEFINE_SYMBOL "MLM_EXPORTS" ) set_target_properties (mlm - PROPERTIES SOVERSION "1.0.1" + PROPERTIES SOVERSION "1" +) +target_link_libraries(mlm + ${ZEROMQ_LIBRARIES} ${MORE_LIBRARIES} +) +set_target_properties (mlm + PROPERTIES VERSION "1.1.0" ) target_link_libraries(mlm ${ZEROMQ_LIBRARIES} ${MORE_LIBRARIES} diff --git a/Makefile.am b/Makefile.am index d1c1529b..392df9f2 100644 --- a/Makefile.am +++ b/Makefile.am @@ -41,6 +41,7 @@ EXTRA_DIST += \ endif EXTRA_DIST += \ + bindings \ src/mlm_server_engine.inc \ src/mlm_client_engine.inc \ src/mlm_client_custom.xml \ diff --git a/bindings/python/malamute/_malamute_ctypes.py b/bindings/python/malamute/_malamute_ctypes.py index 5604b1da..e816eb73 100644 --- a/bindings/python/malamute/_malamute_ctypes.py +++ b/bindings/python/malamute/_malamute_ctypes.py @@ -11,19 +11,20 @@ # malamute lib = None -try: - # check to see if the shared object was embedded locally, attempt to load it - # if not, try to load it using the default system paths... - # we need to use os.chdir instead of trying to modify $LD_LIBRARY_PATH and reloading the interpreter - t = os.getcwd() - p = os.path.join(os.path.dirname(__file__), '..') # find the path to our $project_ctypes.py - os.chdir(p) # change directories briefly +# check to see if the shared object was embedded locally, attempt to load it +# if not, try to load it using the default system paths... +# we need to use os.chdir instead of trying to modify $LD_LIBRARY_PATH and reloading the interpreter +t = os.getcwd() +p = os.path.join(os.path.dirname(__file__), '..') # find the path to our $project_ctypes.py +os.chdir(p) # change directories briefly +try: from malamute import libmlm # attempt to import the shared lib if it exists lib = CDLL(libmlm.__file__) # if it exists try to load the shared lib - os.chdir(t) # switch back to orig dir except ImportError: pass +finally: + os.chdir(t) # switch back to orig dir if not lib: try: diff --git a/ci_build.sh b/ci_build.sh index 456ee3b2..0ed04b39 100755 --- a/ci_build.sh +++ b/ci_build.sh @@ -25,7 +25,8 @@ case "$CI_TRACE" in set -x ;; esac -if [ "$BUILD_TYPE" == "default" ] || [ "$BUILD_TYPE" == "default-Werror" ] || [ "$BUILD_TYPE" == "valgrind" ]; then +case "$BUILD_TYPE" in +default|default-Werror|default-with-docs|valgrind) LANG=C LC_ALL=C export LANG LC_ALL @@ -111,7 +112,6 @@ if [ "$BUILD_TYPE" == "default" ] || [ "$BUILD_TYPE" == "default-Werror" ] || [ CONFIG_OPTS+=("LDFLAGS=-L${BUILD_PREFIX}/lib") CONFIG_OPTS+=("PKG_CONFIG_PATH=${BUILD_PREFIX}/lib/pkgconfig") CONFIG_OPTS+=("--prefix=${BUILD_PREFIX}") - CONFIG_OPTS+=("--with-docs=no") if [ -z "${CI_CONFIG_QUIET-}" ] || [ "${CI_CONFIG_QUIET-}" = yes ] || [ "${CI_CONFIG_QUIET-}" = true ]; then CONFIG_OPTS+=("--quiet") fi @@ -167,10 +167,8 @@ if [ "$BUILD_TYPE" == "default" ] || [ "$BUILD_TYPE" == "default-Werror" ] || [ CONFIG_OPTS+=("CPP=${CPP}") fi - if [ -n "$ADDRESS_SANITIZER" ] && [ "$ADDRESS_SANITIZER" == "enabled" ]; then - CONFIG_OPTS+=("CFLAGS=-fsanitize=address") - CONFIG_OPTS+=("CXXFLAGS=-fsanitize=address") - fi + CONFIG_OPTS_COMMON=$CONFIG_OPTS + CONFIG_OPTS+=("--with-docs=no") # Clone and build dependencies, if not yet installed to Travis env as DEBs # or MacOS packages; other OSes are not currently supported by Travis cloud @@ -245,6 +243,13 @@ if [ "$BUILD_TYPE" == "default" ] || [ "$BUILD_TYPE" == "default-Werror" ] || [ echo "`date`: INFO: Starting build of currently tested project with DRAFT APIs..." CCACHE_BASEDIR=${PWD} export CCACHE_BASEDIR + if [ "$BUILD_TYPE" = "default-with-docs" ]; then + CONFIG_OPTS=$CONFIG_OPTS_COMMON + CONFIG_OPTS+=("--with-docs=yes") + fi + if [ -n "$ADDRESS_SANITIZER" ] && [ "$ADDRESS_SANITIZER" == "enabled" ]; then + CONFIG_OPTS+=("--enable-address-sanitizer=yes") + fi # Only use --enable-Werror on projects that are expected to have it # (and it is not our duty to check prerequisite projects anyway) CONFIG_OPTS+=("${CONFIG_OPT_WERROR}") @@ -296,9 +301,11 @@ if [ "$BUILD_TYPE" == "default" ] || [ "$BUILD_TYPE" == "default-Werror" ] || [ echo "CCache stats after build:" ccache -s fi - -elif [ "$BUILD_TYPE" == "bindings" ]; then + ;; +bindings) pushd "./bindings/${BINDING}" && ./ci_build.sh -else + ;; +*) pushd "./builds/${BUILD_TYPE}" && REPO_DIR="$(dirs -l +1)" ./ci_build.sh -fi + ;; +esac diff --git a/configure.ac b/configure.ac index 748a0acf..4cda3c57 100755 --- a/configure.ac +++ b/configure.ac @@ -56,6 +56,7 @@ PKG_PROG_PKG_CONFIG AX_PROJECT_LOCAL_HOOK # Code coverage +AC_MSG_CHECKING([whether to enable GCov]) AC_ARG_WITH(gcov, [AS_HELP_STRING([--with-gcov=yes/no], [With GCC Code Coverage reporting])], [MLM_GCOV="$withval"]) @@ -67,8 +68,27 @@ if test "x${MLM_GCOV}" == "xyes"; then CFLAGS="${CFLAGS} ${MLM_ORIG_CFLAGS}" fi AM_CONDITIONAL(WITH_GCOV, true) + AC_MSG_RESULT([yes]) else AM_CONDITIONAL(WITH_GCOV, false) + AC_MSG_RESULT([no]) +fi + +# Memory mis-use detection +AC_MSG_CHECKING([whether to enable ASan]) +AC_ARG_ENABLE(address-sanitizer, [AS_HELP_STRING([--enable-address-sanitizer=yes/no], + [Build with GCC Address Sanitizer instrumentation])], + [MLM_ASAN="$enableval"]) + +if test "x${MLM_ASAN}" == "xyes"; then + CFLAGS="${CFLAGS} -fsanitize=address" + CXXFLAGS="${CXXFLAGS} -fsanitize=address" + + AM_CONDITIONAL(ENABLE_ASAN, true) + AC_MSG_RESULT([yes]) +else + AM_CONDITIONAL(ENABLE_ASAN, false) + AC_MSG_RESULT([no]) fi # Set pkgconfigdir diff --git a/packaging/obs/_service b/packaging/obs/_service index 9e1114ba..f3529de1 100644 --- a/packaging/obs/_service +++ b/packaging/obs/_service @@ -3,8 +3,12 @@ https://github.com/zeromq/malamute git @PARENT_TAG@+git%cd + v(.*) + 1 + .git enable + malamute From a9ce0056a85d958f3ee4e55c9ddd12dc5f46d911 Mon Sep 17 00:00:00 2001 From: Yann Diorcet Date: Thu, 15 Jun 2017 21:50:21 +0200 Subject: [PATCH 2/3] Fix test with cmake --- CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index f880deab..fb86f1e8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -316,6 +316,7 @@ foreach(TEST_CLASS ${TEST_CLASSES}) add_test( NAME ${TEST_CLASS} COMMAND mlm_selftest --continue --verbose --test ${TEST_CLASS} + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR} ) set_tests_properties( ${TEST_CLASS} From 8f1531a08f7756c9d922a3803013b9583bd64551 Mon Sep 17 00:00:00 2001 From: Yann Diorcet Date: Sat, 6 Aug 2016 18:32:24 +0200 Subject: [PATCH 3/3] Improve API, using same signature for stream, service and mailbox (except tracker and timeout) --- api/mlm_client.api | 11 +- api/mlm_proto.api | 1 - .../jni/src/main/c/org_zeromq_mlm_MlmClient.c | 21 +- .../main/java/org/zeromq/mlm/MlmClient.java | 21 +- bindings/lua_ffi/malamute_ffi.lua | 12 +- bindings/nodejs/binding.cc | 17 -- bindings/nodejs/binding.h | 1 - bindings/python/malamute/__init__.py | 7 - bindings/python/malamute/_malamute_ctypes.py | 29 +-- bindings/python/test_malamute.py | 6 +- bindings/python_cffi/malamute_cffi.py | 6 - bindings/qml/src/QmlMlmClient.cpp | 8 - bindings/qml/src/QmlMlmClient.h | 5 - bindings/qt/src/qmlmclient.cpp | 10 - bindings/qt/src/qmlmclient.h | 5 - bindings/ruby/lib/malamute/ffi.rb | 17 +- bindings/ruby/lib/malamute/ffi/mlm_client.rb | 28 +-- bindings/ruby/lib/malamute/ffi/mlm_proto.rb | 3 - doc/mlm_client.doc | 85 ++++--- doc/mlm_client.txt | 83 ++++--- include/mlm_client.h | 13 +- include/mlm_proto.h | 1 - issues/issue_45/mlm_perf_recv.c | 3 +- issues/issue_45/mlm_perf_send.c | 16 +- issues/issue_81.c | 4 +- issues/issue_84.c | 4 +- issues/issue_87.c | 3 +- issues/issue_93.c | 4 +- src/mlm_client.c | 230 +++++++++--------- src/mlm_client_engine.inc | 94 +------ src/mlm_perftest.c | 7 +- src/mlm_proto.c | 34 +-- src/mlm_server.c | 64 +---- src/mlm_server_engine.inc | 24 -- src/mlm_tutorial.c | 29 ++- src/mshell.c | 3 +- 36 files changed, 318 insertions(+), 591 deletions(-) diff --git a/api/mlm_client.api b/api/mlm_client.api index 1e11ff9e..ef7f2fb2 100644 --- a/api/mlm_client.api +++ b/api/mlm_client.api @@ -70,14 +70,6 @@ - - Prepare to publish to a specified stream. After this, all messages are sent to - this stream exclusively. - Returns >= 0 if successful, -1 if interrupted. - - - - Consume messages with matching subjects. The pattern is a regular expression using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the @@ -104,6 +96,7 @@ Send STREAM SEND message to server, takes ownership of message and destroys message when done sending it. + Stream address Message subject Message body frames @@ -184,6 +177,7 @@ Send multipart string message to stream, end list with NULL Returns 0 if OK, -1 if failed due to lack of memory or other error. + @@ -215,6 +209,7 @@ of string contents received, or -1 in case of error. Free the returned subject and content strings when finished with them. To get the type of the command, use mlm_client_command (). + diff --git a/api/mlm_proto.api b/api/mlm_proto.api index ea5ce9f5..84add3cb 100644 --- a/api/mlm_proto.api +++ b/api/mlm_proto.api @@ -29,7 +29,6 @@ - diff --git a/bindings/jni/src/main/c/org_zeromq_mlm_MlmClient.c b/bindings/jni/src/main/c/org_zeromq_mlm_MlmClient.c index bb12757e..ac4a0823 100644 --- a/bindings/jni/src/main/c/org_zeromq_mlm_MlmClient.c +++ b/bindings/jni/src/main/c/org_zeromq_mlm_MlmClient.c @@ -68,15 +68,6 @@ Java_org_zeromq_mlm_MlmClient__1_1connect (JNIEnv *env, jclass c, jlong self, js return connect_; } -JNIEXPORT jint JNICALL -Java_org_zeromq_mlm_MlmClient__1_1setProducer (JNIEnv *env, jclass c, jlong self, jstring stream) -{ - char *stream_ = (char *) (*env)->GetStringUTFChars (env, stream, NULL); - jint set_producer_ = (jint) mlm_client_set_producer ((mlm_client_t *) (intptr_t) self, stream_); - (*env)->ReleaseStringUTFChars (env, stream, stream_); - return set_producer_; -} - JNIEXPORT jint JNICALL Java_org_zeromq_mlm_MlmClient__1_1setConsumer (JNIEnv *env, jclass c, jlong self, jstring stream, jstring pattern) { @@ -100,10 +91,12 @@ Java_org_zeromq_mlm_MlmClient__1_1setWorker (JNIEnv *env, jclass c, jlong self, } JNIEXPORT jint JNICALL -Java_org_zeromq_mlm_MlmClient__1_1send (JNIEnv *env, jclass c, jlong self, jstring subject, jlong content) +Java_org_zeromq_mlm_MlmClient__1_1send (JNIEnv *env, jclass c, jlong self, jstring address, jstring subject, jlong content) { + char *address_ = (char *) (*env)->GetStringUTFChars (env, address, NULL); char *subject_ = (char *) (*env)->GetStringUTFChars (env, subject, NULL); - jint send_ = (jint) mlm_client_send ((mlm_client_t *) (intptr_t) self, subject_, (zmsg_t **) (intptr_t) &content); + jint send_ = (jint) mlm_client_send ((mlm_client_t *) (intptr_t) self, address_, subject_, (zmsg_t **) (intptr_t) &content); + (*env)->ReleaseStringUTFChars (env, address, address_); (*env)->ReleaseStringUTFChars (env, subject, subject_); return send_; } @@ -204,11 +197,13 @@ Java_org_zeromq_mlm_MlmClient__1_1tracker (JNIEnv *env, jclass c, jlong self) } JNIEXPORT jint JNICALL -Java_org_zeromq_mlm_MlmClient__1_1sendx (JNIEnv *env, jclass c, jlong self, jstring subject, jstring content) +Java_org_zeromq_mlm_MlmClient__1_1sendx (JNIEnv *env, jclass c, jlong self, jstring address, jstring subject, jstring content) { + char *address_ = (char *) (*env)->GetStringUTFChars (env, address, NULL); char *subject_ = (char *) (*env)->GetStringUTFChars (env, subject, NULL); char *content_ = (char *) (*env)->GetStringUTFChars (env, content, NULL); - jint sendx_ = (jint) mlm_client_sendx ((mlm_client_t *) (intptr_t) self, subject_, content_); + jint sendx_ = (jint) mlm_client_sendx ((mlm_client_t *) (intptr_t) self, address_, subject_, content_); + (*env)->ReleaseStringUTFChars (env, address, address_); (*env)->ReleaseStringUTFChars (env, subject, subject_); (*env)->ReleaseStringUTFChars (env, content, content_); return sendx_; diff --git a/bindings/jni/src/main/java/org/zeromq/mlm/MlmClient.java b/bindings/jni/src/main/java/org/zeromq/mlm/MlmClient.java index d7459f88..3878899a 100644 --- a/bindings/jni/src/main/java/org/zeromq/mlm/MlmClient.java +++ b/bindings/jni/src/main/java/org/zeromq/mlm/MlmClient.java @@ -86,15 +86,6 @@ public int connect (String endpoint, int timeout, String address) { return __connect (self, endpoint, timeout, address); } /* - Prepare to publish to a specified stream. After this, all messages are sent to - this stream exclusively. - Returns >= 0 if successful, -1 if interrupted. - */ - native static int __setProducer (long self, String stream); - public int setProducer (String stream) { - return __setProducer (self, stream); - } - /* Consume messages with matching subjects. The pattern is a regular expression using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the start and end, . to match any character, \s and \S to match whitespace and @@ -121,9 +112,9 @@ public int setWorker (String address, String pattern) { Send STREAM SEND message to server, takes ownership of message and destroys message when done sending it. */ - native static int __send (long self, String subject, long content); - public int send (String subject, Zmsg content) { - return __send (self, subject, content.self); + native static int __send (long self, String address, String subject, long content); + public int send (String address, String subject, Zmsg content) { + return __send (self, address, subject, content.self); } /* Send MAILBOX SEND message to server, takes ownership of message @@ -211,9 +202,9 @@ public String tracker () { Send multipart string message to stream, end list with NULL Returns 0 if OK, -1 if failed due to lack of memory or other error. */ - native static int __sendx (long self, String subject, String content); - public int sendx (String subject, String content []) { - return __sendx (self, subject, content [0]); + native static int __sendx (long self, String address, String subject, String content); + public int sendx (String address, String subject, String content []) { + return __sendx (self, address, subject, content [0]); } /* Send multipart string to mailbox, end list with NULL diff --git a/bindings/lua_ffi/malamute_ffi.lua b/bindings/lua_ffi/malamute_ffi.lua index 02619ed8..fb1db1d5 100644 --- a/bindings/lua_ffi/malamute_ffi.lua +++ b/bindings/lua_ffi/malamute_ffi.lua @@ -206,12 +206,6 @@ int int mlm_client_connect (mlm_client_t *self, const char *endpoint, uint32_t timeout, const char *address); -// Prepare to publish to a specified stream. After this, all messages are sent to -// this stream exclusively. -// Returns >= 0 if successful, -1 if interrupted. -int - mlm_client_set_producer (mlm_client_t *self, const char *stream); - // Consume messages with matching subjects. The pattern is a regular expression // using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the // start and end, . to match any character, \s and \S to match whitespace and @@ -232,7 +226,7 @@ int // Send STREAM SEND message to server, takes ownership of message // and destroys message when done sending it. int - mlm_client_send (mlm_client_t *self, const char *subject, zmsg_t **content); + mlm_client_send (mlm_client_t *self, const char *address, const char *subject, zmsg_t **content); // Send MAILBOX SEND message to server, takes ownership of message // and destroys message when done sending it. @@ -286,7 +280,7 @@ const char * // Send multipart string message to stream, end list with NULL // Returns 0 if OK, -1 if failed due to lack of memory or other error. int - mlm_client_sendx (mlm_client_t *self, const char *subject, const char *content, ...); + mlm_client_sendx (mlm_client_t *self, const char *address, const char *subject, const char *content, ...); // Send multipart string to mailbox, end list with NULL // Returns 0 if OK, -1 if failed due to lack of memory or other error. @@ -306,7 +300,7 @@ int // subject and content strings when finished with them. To get the type of // the command, use mlm_client_command (). int - mlm_client_recvx (mlm_client_t *self, char **subject_p, char **string_p, ...); + mlm_client_recvx (mlm_client_t *self, char **address_p, char **subject_p, char **string_p, ...); // Enable verbose tracing (animation) of state machine activity. void diff --git a/bindings/nodejs/binding.cc b/bindings/nodejs/binding.cc index 9d653264..959dc67e 100644 --- a/bindings/nodejs/binding.cc +++ b/bindings/nodejs/binding.cc @@ -224,7 +224,6 @@ NAN_MODULE_INIT (MlmClient::Init) { Nan::SetPrototypeMethod (tpl, "connected", _connected); Nan::SetPrototypeMethod (tpl, "setPlainAuth", _set_plain_auth); Nan::SetPrototypeMethod (tpl, "connect", _connect); - Nan::SetPrototypeMethod (tpl, "setProducer", _set_producer); Nan::SetPrototypeMethod (tpl, "setConsumer", _set_consumer); Nan::SetPrototypeMethod (tpl, "setWorker", _set_worker); Nan::SetPrototypeMethod (tpl, "send", _send); @@ -369,22 +368,6 @@ NAN_METHOD (MlmClient::_connect) { info.GetReturnValue ().Set (Nan::New(result)); } -NAN_METHOD (MlmClient::_set_producer) { - MlmClient *mlm_client = Nan::ObjectWrap::Unwrap (info.Holder ()); - char *stream; - if (info [0]->IsUndefined ()) - return Nan::ThrowTypeError ("method requires a `stream`"); - else - if (!info [0]->IsString ()) - return Nan::ThrowTypeError ("`stream` must be a string"); - else { - Nan::Utf8String stream_utf8 (info [0].As()); - stream = *stream_utf8; - } - int result = mlm_client_set_producer (mlm_client->self, (const char *)stream); - info.GetReturnValue ().Set (Nan::New(result)); -} - NAN_METHOD (MlmClient::_set_consumer) { MlmClient *mlm_client = Nan::ObjectWrap::Unwrap (info.Holder ()); char *stream; diff --git a/bindings/nodejs/binding.h b/bindings/nodejs/binding.h index 1e773801..ca445d34 100644 --- a/bindings/nodejs/binding.h +++ b/bindings/nodejs/binding.h @@ -72,7 +72,6 @@ class MlmClient: public Nan::ObjectWrap { static NAN_METHOD (_connected); static NAN_METHOD (_set_plain_auth); static NAN_METHOD (_connect); - static NAN_METHOD (_set_producer); static NAN_METHOD (_set_consumer); static NAN_METHOD (_set_worker); static NAN_METHOD (_send); diff --git a/bindings/python/malamute/__init__.py b/bindings/python/malamute/__init__.py index 23d090d9..aeaa0aa2 100644 --- a/bindings/python/malamute/__init__.py +++ b/bindings/python/malamute/__init__.py @@ -49,13 +49,6 @@ def connect(self, endpoint, timeout, address): "Could not connect to malamute server at {!r}", endpoint, ) - def set_producer(self, stream): - result = self.c.set_producer(stream) - self._check_error( - result, - "Could not set producer", - ) - def set_worker(self, address, pattern): result = self.c.set_worker(address, pattern) self._check_error( diff --git a/bindings/python/malamute/_malamute_ctypes.py b/bindings/python/malamute/_malamute_ctypes.py index e816eb73..9192ca3f 100644 --- a/bindings/python/malamute/_malamute_ctypes.py +++ b/bindings/python/malamute/_malamute_ctypes.py @@ -136,7 +136,6 @@ class MlmProto(object): CONNECTION_PING = 2 # CONNECTION_PONG = 3 # CONNECTION_CLOSE = 4 # - STREAM_WRITE = 5 # STREAM_READ = 6 # STREAM_SEND = 7 # STREAM_DELIVER = 8 # @@ -402,14 +401,12 @@ def test(verbose): lib.mlm_client_set_plain_auth.argtypes = [mlm_client_p, c_char_p, c_char_p] lib.mlm_client_connect.restype = c_int lib.mlm_client_connect.argtypes = [mlm_client_p, c_char_p, c_int, c_char_p] -lib.mlm_client_set_producer.restype = c_int -lib.mlm_client_set_producer.argtypes = [mlm_client_p, c_char_p] lib.mlm_client_set_consumer.restype = c_int lib.mlm_client_set_consumer.argtypes = [mlm_client_p, c_char_p, c_char_p] lib.mlm_client_set_worker.restype = c_int lib.mlm_client_set_worker.argtypes = [mlm_client_p, c_char_p, c_char_p] lib.mlm_client_send.restype = c_int -lib.mlm_client_send.argtypes = [mlm_client_p, c_char_p, POINTER(czmq.zmsg_p)] +lib.mlm_client_send.argtypes = [mlm_client_p, c_char_p, c_char_p, POINTER(czmq.zmsg_p)] lib.mlm_client_sendto.restype = c_int lib.mlm_client_sendto.argtypes = [mlm_client_p, c_char_p, c_char_p, c_char_p, c_int, POINTER(czmq.zmsg_p)] lib.mlm_client_sendfor.restype = c_int @@ -433,13 +430,13 @@ def test(verbose): lib.mlm_client_tracker.restype = c_char_p lib.mlm_client_tracker.argtypes = [mlm_client_p] lib.mlm_client_sendx.restype = c_int -lib.mlm_client_sendx.argtypes = [mlm_client_p, c_char_p, c_char_p] +lib.mlm_client_sendx.argtypes = [mlm_client_p, c_char_p, c_char_p, c_char_p] lib.mlm_client_sendtox.restype = c_int lib.mlm_client_sendtox.argtypes = [mlm_client_p, c_char_p, c_char_p, c_char_p] lib.mlm_client_sendforx.restype = c_int lib.mlm_client_sendforx.argtypes = [mlm_client_p, c_char_p, c_char_p, c_char_p] lib.mlm_client_recvx.restype = c_int -lib.mlm_client_recvx.argtypes = [mlm_client_p, POINTER(c_char_p), POINTER(c_char_p)] +lib.mlm_client_recvx.argtypes = [mlm_client_p, POINTER(c_char_p), POINTER(c_char_p), POINTER(c_char_p)] lib.mlm_client_set_verbose.restype = None lib.mlm_client_set_verbose.argtypes = [mlm_client_p, c_bool] lib.mlm_client_test.restype = None @@ -538,14 +535,6 @@ def connect(self, endpoint, timeout, address): """ return lib.mlm_client_connect(self._as_parameter_, endpoint, timeout, address) - def set_producer(self, stream): - """ - Prepare to publish to a specified stream. After this, all messages are sent to -this stream exclusively. -Returns >= 0 if successful, -1 if interrupted. - """ - return lib.mlm_client_set_producer(self._as_parameter_, stream) - def set_consumer(self, stream, pattern): """ Consume messages with matching subjects. The pattern is a regular expression @@ -567,12 +556,12 @@ def set_worker(self, address, pattern): """ return lib.mlm_client_set_worker(self._as_parameter_, address, pattern) - def send(self, subject, content): + def send(self, address, subject, content): """ Send STREAM SEND message to server, takes ownership of message and destroys message when done sending it. """ - return lib.mlm_client_send(self._as_parameter_, subject, byref(czmq.zmsg_p.from_param(content))) + return lib.mlm_client_send(self._as_parameter_, address, subject, byref(czmq.zmsg_p.from_param(content))) def sendto(self, address, subject, tracker, timeout, content): """ @@ -645,12 +634,12 @@ def tracker(self): """ return lib.mlm_client_tracker(self._as_parameter_) - def sendx(self, subject, content, *args): + def sendx(self, address, subject, content, *args): """ Send multipart string message to stream, end list with NULL Returns 0 if OK, -1 if failed due to lack of memory or other error. """ - return lib.mlm_client_sendx(self._as_parameter_, subject, content, *args) + return lib.mlm_client_sendx(self._as_parameter_, address, subject, content, *args) def sendtox(self, address, subject, content, *args): """ @@ -666,7 +655,7 @@ def sendforx(self, address, subject, content, *args): """ return lib.mlm_client_sendforx(self._as_parameter_, address, subject, content, *args) - def recvx(self, subject_p, string_p, *args): + def recvx(self, address_p, subject_p, string_p, *args): """ Receive a subject and string content from the server. The content may be 1 or more string frames. This method is orthogonal to the sendx methods. @@ -676,7 +665,7 @@ def recvx(self, subject_p, string_p, *args): subject and content strings when finished with them. To get the type of the command, use mlm_client_command (). """ - return lib.mlm_client_recvx(self._as_parameter_, byref(c_char_p.from_param(subject_p)), byref(c_char_p.from_param(string_p)), *args) + return lib.mlm_client_recvx(self._as_parameter_, byref(c_char_p.from_param(address_p)), byref(c_char_p.from_param(subject_p)), byref(c_char_p.from_param(string_p)), *args) def set_verbose(self, verbose): """ diff --git a/bindings/python/test_malamute.py b/bindings/python/test_malamute.py index 95262559..ff2e899d 100644 --- a/bindings/python/test_malamute.py +++ b/bindings/python/test_malamute.py @@ -8,8 +8,6 @@ def test(addr): writer = MalamuteClient() print("writer.connect") writer.connect(addr, 100, b'writer') - print("writer.set_producer") - writer.set_producer(b'writer') reader = MalamuteClient() print("reader.connect") @@ -19,8 +17,8 @@ def test(addr): reader.set_consumer(b'writer', b'bar') print("writer.send") - writer.send(b'foo', [b'whoaaa', b'whaaaaaa']) - writer.send(b'bar', [b'whoaaa', b'whaaaaaa']) + writer.send(b'writer', b'foo', [b'whoaaa', b'whaaaaaa']) + writer.send(b'writer', b'bar', [b'whoaaa', b'whaaaaaa']) print(reader.recv()) print(reader.recv()) diff --git a/bindings/python_cffi/malamute_cffi.py b/bindings/python_cffi/malamute_cffi.py index 97a1fe1a..6a3394d3 100644 --- a/bindings/python_cffi/malamute_cffi.py +++ b/bindings/python_cffi/malamute_cffi.py @@ -217,12 +217,6 @@ int mlm_client_connect (mlm_client_t *self, const char *endpoint, uint32_t timeout, const char *address); -// Prepare to publish to a specified stream. After this, all messages are sent to -// this stream exclusively. -// Returns >= 0 if successful, -1 if interrupted. -int - mlm_client_set_producer (mlm_client_t *self, const char *stream); - // Consume messages with matching subjects. The pattern is a regular expression // using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the // start and end, . to match any character, \s and \S to match whitespace and diff --git a/bindings/qml/src/QmlMlmClient.cpp b/bindings/qml/src/QmlMlmClient.cpp index 5d461fc2..f75d80be 100644 --- a/bindings/qml/src/QmlMlmClient.cpp +++ b/bindings/qml/src/QmlMlmClient.cpp @@ -50,14 +50,6 @@ int QmlMlmClient::connect (const QString &endpoint, uint32_t timeout, const QStr return mlm_client_connect (self, endpoint.toUtf8().data(), timeout, address.toUtf8().data()); }; -/// -// Prepare to publish to a specified stream. After this, all messages are sent to -// this stream exclusively. -// Returns >= 0 if successful, -1 if interrupted. -int QmlMlmClient::setProducer (const QString &stream) { - return mlm_client_set_producer (self, stream.toUtf8().data()); -}; - /// // Consume messages with matching subjects. The pattern is a regular expression // using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the diff --git a/bindings/qml/src/QmlMlmClient.h b/bindings/qml/src/QmlMlmClient.h index be9b3541..4a1173b4 100644 --- a/bindings/qml/src/QmlMlmClient.h +++ b/bindings/qml/src/QmlMlmClient.h @@ -55,11 +55,6 @@ public slots: // Returns >= 0 if successful, -1 if interrupted. int connect (const QString &endpoint, uint32_t timeout, const QString &address); - // Prepare to publish to a specified stream. After this, all messages are sent to - // this stream exclusively. - // Returns >= 0 if successful, -1 if interrupted. - int setProducer (const QString &stream); - // Consume messages with matching subjects. The pattern is a regular expression // using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the // start and end, . to match any character, \s and \S to match whitespace and diff --git a/bindings/qt/src/qmlmclient.cpp b/bindings/qt/src/qmlmclient.cpp index f92ec947..e695b509 100644 --- a/bindings/qt/src/qmlmclient.cpp +++ b/bindings/qt/src/qmlmclient.cpp @@ -82,16 +82,6 @@ int QMlmClient::connect (const QString &endpoint, quint32 timeout, const QString return rv; } -/// -// Prepare to publish to a specified stream. After this, all messages are sent to -// this stream exclusively. -// Returns >= 0 if successful, -1 if interrupted. -int QMlmClient::setProducer (const QString &stream) -{ - int rv = mlm_client_set_producer (self, stream.toUtf8().data()); - return rv; -} - /// // Consume messages with matching subjects. The pattern is a regular expression // using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the diff --git a/bindings/qt/src/qmlmclient.h b/bindings/qt/src/qmlmclient.h index 386d3cf4..757eaae7 100644 --- a/bindings/qt/src/qmlmclient.h +++ b/bindings/qt/src/qmlmclient.h @@ -51,11 +51,6 @@ class QT_MLM_EXPORT QMlmClient : public QObject // Returns >= 0 if successful, -1 if interrupted. int connect (const QString &endpoint, quint32 timeout, const QString &address); - // Prepare to publish to a specified stream. After this, all messages are sent to - // this stream exclusively. - // Returns >= 0 if successful, -1 if interrupted. - int setProducer (const QString &stream); - // Consume messages with matching subjects. The pattern is a regular expression // using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the // start and end, . to match any character, \s and \S to match whitespace and diff --git a/bindings/ruby/lib/malamute/ffi.rb b/bindings/ruby/lib/malamute/ffi.rb index 1f136452..2f6bf1f5 100644 --- a/bindings/ruby/lib/malamute/ffi.rb +++ b/bindings/ruby/lib/malamute/ffi.rb @@ -494,17 +494,6 @@ def self.mlm_client_connect(*) raise NotImplementedError, "compile Malamute with --enable-drafts" end end - begin # DRAFT method - attach_function :mlm_client_set_producer, [:pointer, :string], :int, **opts - rescue ::FFI::NotFoundError - if $VERBOSE || $DEBUG - warn "The DRAFT function mlm_client_set_producer()" + - " is not provided by the installed Malamute library." - end - def self.mlm_client_set_producer(*) - raise NotImplementedError, "compile Malamute with --enable-drafts" - end - end begin # DRAFT method attach_function :mlm_client_set_consumer, [:pointer, :string, :string], :int, **opts rescue ::FFI::NotFoundError @@ -528,7 +517,7 @@ def self.mlm_client_set_worker(*) end end begin # DRAFT method - attach_function :mlm_client_send, [:pointer, :string, :pointer], :int, **opts + attach_function :mlm_client_send, [:pointer, :string, :string, :pointer], :int, **opts rescue ::FFI::NotFoundError if $VERBOSE || $DEBUG warn "The DRAFT function mlm_client_send()" + @@ -660,7 +649,7 @@ def self.mlm_client_tracker(*) end end begin # DRAFT method - attach_function :mlm_client_sendx, [:pointer, :string, :string, :varargs], :int, **opts + attach_function :mlm_client_sendx, [:pointer, :string, :string, :string, :varargs], :int, **opts rescue ::FFI::NotFoundError if $VERBOSE || $DEBUG warn "The DRAFT function mlm_client_sendx()" + @@ -693,7 +682,7 @@ def self.mlm_client_sendforx(*) end end begin # DRAFT method - attach_function :mlm_client_recvx, [:pointer, :pointer, :pointer, :varargs], :int, **opts + attach_function :mlm_client_recvx, [:pointer, :pointer, :pointer, :pointer, :varargs], :int, **opts rescue ::FFI::NotFoundError if $VERBOSE || $DEBUG warn "The DRAFT function mlm_client_recvx()" + diff --git a/bindings/ruby/lib/malamute/ffi/mlm_client.rb b/bindings/ruby/lib/malamute/ffi/mlm_client.rb index 4a355ca0..2d3f8286 100644 --- a/bindings/ruby/lib/malamute/ffi/mlm_client.rb +++ b/bindings/ruby/lib/malamute/ffi/mlm_client.rb @@ -161,19 +161,6 @@ def connect(endpoint, timeout, address) result end - # Prepare to publish to a specified stream. After this, all messages are sent to - # this stream exclusively. - # Returns >= 0 if successful, -1 if interrupted. - # - # @param stream [String, #to_s, nil] - # @return [Integer] - def set_producer(stream) - raise DestroyedError unless @ptr - self_p = @ptr - result = ::Malamute::FFI.mlm_client_set_producer(self_p, stream) - result - end - # Consume messages with matching subjects. The pattern is a regular expression # using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the # start and end, . to match any character, \s and \S to match whitespace and @@ -210,13 +197,14 @@ def set_worker(address, pattern) # Send STREAM SEND message to server, takes ownership of message # and destroys message when done sending it. # + # @param address [String, #to_s, nil] # @param subject [String, #to_s, nil] # @param content [::FFI::Pointer, #to_ptr] # @return [Integer] - def send(subject, content) + def send(address, subject, content) raise DestroyedError unless @ptr self_p = @ptr - result = ::Malamute::FFI.mlm_client_send(self_p, subject, content) + result = ::Malamute::FFI.mlm_client_send(self_p, address, subject, content) result end @@ -350,14 +338,15 @@ def tracker() # Send multipart string message to stream, end list with NULL # Returns 0 if OK, -1 if failed due to lack of memory or other error. # + # @param address [String, #to_s, nil] # @param subject [String, #to_s, nil] # @param content [String, #to_s, nil] # @param args [Array] see https://github.com/ffi/ffi/wiki/examples#using-varargs # @return [Integer] - def sendx(subject, content, *args) + def sendx(address, subject, content, *args) raise DestroyedError unless @ptr self_p = @ptr - result = ::Malamute::FFI.mlm_client_sendx(self_p, subject, content, *args) + result = ::Malamute::FFI.mlm_client_sendx(self_p, address, subject, content, *args) result end @@ -399,14 +388,15 @@ def sendforx(address, subject, content, *args) # subject and content strings when finished with them. To get the type of # the command, use mlm_client_command (). # + # @param address_p [::FFI::Pointer, #to_ptr] # @param subject_p [::FFI::Pointer, #to_ptr] # @param string_p [::FFI::Pointer, #to_ptr] # @param args [Array] see https://github.com/ffi/ffi/wiki/examples#using-varargs # @return [Integer] - def recvx(subject_p, string_p, *args) + def recvx(address_p, subject_p, string_p, *args) raise DestroyedError unless @ptr self_p = @ptr - result = ::Malamute::FFI.mlm_client_recvx(self_p, subject_p, string_p, *args) + result = ::Malamute::FFI.mlm_client_recvx(self_p, address_p, subject_p, string_p, *args) result end diff --git a/bindings/ruby/lib/malamute/ffi/mlm_proto.rb b/bindings/ruby/lib/malamute/ffi/mlm_proto.rb index 0ef24f55..725fe03e 100644 --- a/bindings/ruby/lib/malamute/ffi/mlm_proto.rb +++ b/bindings/ruby/lib/malamute/ffi/mlm_proto.rb @@ -36,9 +36,6 @@ class MlmProto # CONNECTION_CLOSE = 4 - # - STREAM_WRITE = 5 - # STREAM_READ = 6 diff --git a/doc/mlm_client.doc b/doc/mlm_client.doc index f2caf9a3..0fce4d0e 100644 --- a/doc/mlm_client.doc +++ b/doc/mlm_client.doc @@ -47,12 +47,6 @@ This is the class interface: MLM_EXPORT int mlm_client_connect (mlm_client_t *self, const char *endpoint, uint32_t timeout, const char *address); - // Prepare to publish to a specified stream. After this, all messages are sent to - // this stream exclusively. - // Returns >= 0 if successful, -1 if interrupted. - MLM_EXPORT int - mlm_client_set_producer (mlm_client_t *self, const char *stream); - // Consume messages with matching subjects. The pattern is a regular expression // using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the // start and end, . to match any character, \s and \S to match whitespace and @@ -200,39 +194,44 @@ This is the class self test code: rc = mlm_client_connect (reader, "tcp://127.0.0.1:9999", 1000, ""); assert (rc == 0); - mlm_client_set_producer (writer, "weather"); mlm_client_set_consumer (reader, "weather", "temp.*"); - mlm_client_sendx (writer, "temp.moscow", "1", NULL); - mlm_client_sendx (writer, "rain.moscow", "2", NULL); - mlm_client_sendx (writer, "temp.madrid", "3", NULL); - mlm_client_sendx (writer, "rain.madrid", "4", NULL); - mlm_client_sendx (writer, "temp.london", "5", NULL); - mlm_client_sendx (writer, "rain.london", "6", NULL); + mlm_client_sendx (writer, "weather", "temp.moscow", "1", NULL); + mlm_client_sendx (writer, "weather", "rain.moscow", "2", NULL); + mlm_client_sendx (writer, "weather", "temp.madrid", "3", NULL); + mlm_client_sendx (writer, "weather", "rain.madrid", "4", NULL); + mlm_client_sendx (writer, "weather", "temp.london", "5", NULL); + mlm_client_sendx (writer, "weather", "rain.london", "6", NULL); - char *subject, *content; - mlm_client_recvx (reader, &subject, &content, NULL); + char *address, *subject, *content; + mlm_client_recvx (reader, &address, &subject, &content, NULL); + assert (streq (address, "weather")); assert (streq (subject, "temp.moscow")); assert (streq (content, "1")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - mlm_client_recvx (reader, &subject, &content, NULL); + mlm_client_recvx (reader, &address, &subject, &content, NULL); + assert (streq (address, "weather")); assert (streq (subject, "temp.madrid")); assert (streq (content, "3")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_subject (reader), "temp.madrid")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - mlm_client_recvx (reader, &subject, &content, NULL); + mlm_client_recvx (reader, &address, &subject, &content, NULL); + assert (streq (address, "weather")); assert (streq (subject, "temp.london")); assert (streq (content, "5")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); @@ -249,13 +248,15 @@ This is the class self test code: mlm_client_sendtox (writer, "mailbox", "subject 1", "Message 1", "attachment", NULL); char *attach; - mlm_client_recvx (reader, &subject, &content, &attach, NULL); + mlm_client_recvx (reader, &address, &subject, &content, &attach, NULL); + assert (streq (mailbox, "address")); assert (streq (subject, "subject 1")); assert (streq (content, "Message 1")); assert (streq (attach, "attachment")); assert (streq (mlm_client_command (reader), "MAILBOX DELIVER")); assert (streq (mlm_client_subject (reader), "subject 1")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); zstr_free (&attach); @@ -272,17 +273,21 @@ This is the class self test code: rc = mlm_client_connect (reader, "tcp://127.0.0.1:9999", 500, "mailbox"); assert (rc == 0); - mlm_client_recvx (reader, &subject, &content, &attach, NULL); + mlm_client_recvx (reader, &address, &subject, &content, &attach, NULL); + assert (streq (address, "mailbox")); assert (streq (subject, "subject 2")); assert (streq (content, "Message 2")); assert (streq (mlm_client_command (reader), "MAILBOX DELIVER")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - mlm_client_recvx (reader, &subject, &content, &attach, NULL); + mlm_client_recvx (reader, &address, &subject, &content, &attach, NULL); + assert (streq (address, "mailbox")); assert (streq (subject, "subject 3")); assert (streq (content, "Message 3")); assert (streq (mlm_client_command (reader), "MAILBOX DELIVER")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); @@ -293,30 +298,36 @@ This is the class self test code: mlm_client_sendforx (writer, "printer", "bw.A4", "Important contract", NULL); mlm_client_sendforx (writer, "printer", "bw.A5", "Special conditions", NULL); - mlm_client_recvx (reader, &subject, &content, NULL); + mlm_client_recvx (reader, &address, &subject, &content, NULL); + assert (streq (address, "printer")); assert (streq (subject, "bw.A4")); assert (streq (content, "Important contract")); assert (streq (mlm_client_command (reader), "SERVICE DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - mlm_client_recvx (reader, &subject, &content, NULL); + mlm_client_recvx (reader, &address, &subject, &content, NULL); + assert (streq (address, "printer")); assert (streq (subject, "bw.A5")); assert (streq (content, "Special conditions")); assert (streq (mlm_client_command (reader), "SERVICE DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); // Test that writer shutdown does not cause message loss mlm_client_set_consumer (reader, "weather", "temp.*"); - mlm_client_sendx (writer, "temp.brussels", "7", NULL); + mlm_client_sendx (writer, "weather", "temp.brussels", "7", NULL); mlm_client_destroy (&writer); - mlm_client_recvx (reader, &subject, &content, NULL); + mlm_client_recvx (reader, &address, &subject, &content, NULL); + assert (streq (address, "weather")); assert (streq (subject, "temp.brussels")); assert (streq (content, "7")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); mlm_client_destroy (&reader); @@ -350,42 +361,44 @@ This is the class self test code: rc = mlm_client_connect (reader2, "tcp://127.0.0.1:9999", 1000, ""); assert (rc == 0); - mlm_client_set_producer (writer1, "weather"); - mlm_client_set_producer (writer2, "traffic"); mlm_client_set_consumer (reader1, "weather", "newyork"); mlm_client_set_consumer (reader1, "traffic", "newyork"); mlm_client_set_consumer (reader2, "weather", "newyork"); mlm_client_set_consumer (reader2, "traffic", "newyork"); - mlm_client_sendx (writer1, "newyork", "8", NULL); + mlm_client_sendx (writer1, "weather", "newyork", "8", NULL); - mlm_client_recvx (reader1, &subject, &content, NULL); - assert (streq (mlm_client_address (reader1), "weather")); + mlm_client_recvx (reader1, &address, &subject, &content, NULL); + assert (streq (address, "weather")); assert (streq (subject, "newyork")); assert (streq (content, "8")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - mlm_client_recvx (reader2, &subject, &content, NULL); - assert (streq (mlm_client_address (reader2), "weather")); + mlm_client_recvx (reader2, &address, &subject, &content, NULL); + assert (streq (address, "weather")); assert (streq (subject, "newyork")); assert (streq (content, "8")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - mlm_client_sendx (writer2, "newyork", "85", NULL); + mlm_client_sendx (writer2, "traffic", "newyork", "85", NULL); - mlm_client_recvx (reader1, &subject, &content, NULL); - assert (streq (mlm_client_address (reader1), "traffic")); + mlm_client_recvx (reader1, &address, &subject, &content, NULL); + assert (streq (address, "traffic")); assert (streq (subject, "newyork")); assert (streq (content, "85")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - mlm_client_recvx (reader2, &subject, &content, NULL); - assert (streq (mlm_client_address (reader2), "traffic")); + mlm_client_recvx (reader2, &address, &subject, &content, NULL); + assert (streq (address, "traffic")); assert (streq (subject, "newyork")); assert (streq (content, "85")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); diff --git a/doc/mlm_client.txt b/doc/mlm_client.txt index 35dc5c2d..8995155a 100644 --- a/doc/mlm_client.txt +++ b/doc/mlm_client.txt @@ -49,12 +49,6 @@ MLM_EXPORT int MLM_EXPORT int mlm_client_connect (mlm_client_t *self, const char *endpoint, uint32_t timeout, const char *address); -// Prepare to publish to a specified stream. After this, all messages are sent to -// this stream exclusively. -// Returns >= 0 if successful, -1 if interrupted. -MLM_EXPORT int - mlm_client_set_producer (mlm_client_t *self, const char *stream); - // Consume messages with matching subjects. The pattern is a regular expression // using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the // start and end, . to match any character, \s and \S to match whitespace and @@ -212,39 +206,44 @@ assert (rc == 0); rc = mlm_client_connect (reader, "tcp://127.0.0.1:9999", 1000, ""); assert (rc == 0); -mlm_client_set_producer (writer, "weather"); mlm_client_set_consumer (reader, "weather", "temp.*"); -mlm_client_sendx (writer, "temp.moscow", "1", NULL); -mlm_client_sendx (writer, "rain.moscow", "2", NULL); -mlm_client_sendx (writer, "temp.madrid", "3", NULL); -mlm_client_sendx (writer, "rain.madrid", "4", NULL); -mlm_client_sendx (writer, "temp.london", "5", NULL); -mlm_client_sendx (writer, "rain.london", "6", NULL); +mlm_client_sendx (writer, "weather", "temp.moscow", "1", NULL); +mlm_client_sendx (writer, "weather", "rain.moscow", "2", NULL); +mlm_client_sendx (writer, "weather", "temp.madrid", "3", NULL); +mlm_client_sendx (writer, "weather", "rain.madrid", "4", NULL); +mlm_client_sendx (writer, "weather", "temp.london", "5", NULL); +mlm_client_sendx (writer, "weather", "rain.london", "6", NULL); -char *subject, *content; -mlm_client_recvx (reader, &subject, &content, NULL); +char *address, *subject, *content; +mlm_client_recvx (reader, &address, &subject, &content, NULL); +assert (streq (address, "weather")); assert (streq (subject, "temp.moscow")); assert (streq (content, "1")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); +zstr_free (&address); zstr_free (&subject); zstr_free (&content); -mlm_client_recvx (reader, &subject, &content, NULL); +mlm_client_recvx (reader, &address, &subject, &content, NULL); +assert (streq (address, "weather")); assert (streq (subject, "temp.madrid")); assert (streq (content, "3")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_subject (reader), "temp.madrid")); assert (streq (mlm_client_sender (reader), "writer")); +zstr_free (&address); zstr_free (&subject); zstr_free (&content); -mlm_client_recvx (reader, &subject, &content, NULL); +mlm_client_recvx (reader, &address, &subject, &content, NULL); +assert (streq (address, "weather")); assert (streq (subject, "temp.london")); assert (streq (content, "5")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); +zstr_free (&address); zstr_free (&subject); zstr_free (&content); @@ -261,13 +260,15 @@ assert (rc == 0); mlm_client_sendtox (writer, "mailbox", "subject 1", "Message 1", "attachment", NULL); char *attach; -mlm_client_recvx (reader, &subject, &content, &attach, NULL); +mlm_client_recvx (reader, &address, &subject, &content, &attach, NULL); +assert (streq (address, "mailbox")); assert (streq (subject, "subject 1")); assert (streq (content, "Message 1")); assert (streq (attach, "attachment")); assert (streq (mlm_client_command (reader), "MAILBOX DELIVER")); assert (streq (mlm_client_subject (reader), "subject 1")); assert (streq (mlm_client_sender (reader), "writer")); +zstr_free (&address); zstr_free (&subject); zstr_free (&content); zstr_free (&attach); @@ -284,17 +285,21 @@ assert (rc == 0); rc = mlm_client_connect (reader, "tcp://127.0.0.1:9999", 500, "mailbox"); assert (rc == 0); -mlm_client_recvx (reader, &subject, &content, &attach, NULL); +mlm_client_recvx (reader, &address, &subject, &content, &attach, NULL); +assert (streq (address, "mailbox")); assert (streq (subject, "subject 2")); assert (streq (content, "Message 2")); assert (streq (mlm_client_command (reader), "MAILBOX DELIVER")); +zstr_free (&address); zstr_free (&subject); zstr_free (&content); -mlm_client_recvx (reader, &subject, &content, &attach, NULL); +mlm_client_recvx (reader, &address, &subject, &content, &attach, NULL); +assert (streq (address, "mailbox")); assert (streq (subject, "subject 3")); assert (streq (content, "Message 3")); assert (streq (mlm_client_command (reader), "MAILBOX DELIVER")); +zstr_free (&address); zstr_free (&subject); zstr_free (&content); @@ -305,19 +310,23 @@ mlm_client_set_worker (reader, "printer", "color.*"); mlm_client_sendforx (writer, "printer", "bw.A4", "Important contract", NULL); mlm_client_sendforx (writer, "printer", "bw.A5", "Special conditions", NULL); -mlm_client_recvx (reader, &subject, &content, NULL); +mlm_client_recvx (reader, &address, &subject, &content, NULL); +assert (streq (address, "printer")); assert (streq (subject, "bw.A4")); assert (streq (content, "Important contract")); assert (streq (mlm_client_command (reader), "SERVICE DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); +zstr_free (&address); zstr_free (&subject); zstr_free (&content); -mlm_client_recvx (reader, &subject, &content, NULL); +mlm_client_recvx (reader, &address, &subject, &content, NULL); +assert (streq (address, "printer")); assert (streq (subject, "bw.A5")); assert (streq (content, "Special conditions")); assert (streq (mlm_client_command (reader), "SERVICE DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); +zstr_free (&address); zstr_free (&subject); zstr_free (&content); @@ -326,9 +335,11 @@ mlm_client_set_consumer (reader, "weather", "temp.*"); mlm_client_sendx (writer, "temp.brussels", "7", NULL); mlm_client_destroy (&writer); -mlm_client_recvx (reader, &subject, &content, NULL); +mlm_client_recvx (reader, &address, &subject, &content, NULL); +assert (streq (address, "weather")); assert (streq (subject, "temp.brussels")); assert (streq (content, "7")); +zstr_free (&address); zstr_free (&subject); zstr_free (&content); mlm_client_destroy (&reader); @@ -362,42 +373,44 @@ assert (rc == 0); rc = mlm_client_connect (reader2, "tcp://127.0.0.1:9999", 1000, ""); assert (rc == 0); -mlm_client_set_producer (writer1, "weather"); -mlm_client_set_producer (writer2, "traffic"); mlm_client_set_consumer (reader1, "weather", "newyork"); mlm_client_set_consumer (reader1, "traffic", "newyork"); mlm_client_set_consumer (reader2, "weather", "newyork"); mlm_client_set_consumer (reader2, "traffic", "newyork"); -mlm_client_sendx (writer1, "newyork", "8", NULL); +mlm_client_sendx (writer1, "weather", "newyork", "8", NULL); -mlm_client_recvx (reader1, &subject, &content, NULL); -assert (streq (mlm_client_address (reader1), "weather")); +mlm_client_recvx (reader1, &address, &subject, &content, NULL); +assert (streq (address, "weather")); assert (streq (subject, "newyork")); assert (streq (content, "8")); +zstr_free (&address); zstr_free (&subject); zstr_free (&content); -mlm_client_recvx (reader2, &subject, &content, NULL); -assert (streq (mlm_client_address (reader2), "weather")); +mlm_client_recvx (reader2, &address, &subject, &content, NULL); +assert (streq (address, "weather")); assert (streq (subject, "newyork")); assert (streq (content, "8")); +zstr_free (&address); zstr_free (&subject); zstr_free (&content); -mlm_client_sendx (writer2, "newyork", "85", NULL); +mlm_client_sendx (writer2, "traffic", "newyork", "85", NULL); -mlm_client_recvx (reader1, &subject, &content, NULL); -assert (streq (mlm_client_address (reader1), "traffic")); +mlm_client_recvx (reader1, &address, &subject, &content, NULL); +assert (streq (address, "traffic")); assert (streq (subject, "newyork")); assert (streq (content, "85")); +zstr_free (&address); zstr_free (&subject); zstr_free (&content); -mlm_client_recvx (reader2, &subject, &content, NULL); -assert (streq (mlm_client_address (reader2), "traffic")); +mlm_client_recvx (reader2, &address, &subject, &content, NULL); +assert (streq (address, "traffic")); assert (streq (subject, "newyork")); assert (streq (content, "85")); +zstr_free (&address); zstr_free (&subject); zstr_free (&content); diff --git a/include/mlm_client.h b/include/mlm_client.h index 23fea910..8b7e4a84 100644 --- a/include/mlm_client.h +++ b/include/mlm_client.h @@ -72,13 +72,6 @@ MLM_EXPORT int MLM_EXPORT int mlm_client_connect (mlm_client_t *self, const char *endpoint, uint32_t timeout, const char *address); -// *** Draft method, for development use, may change without warning *** -// Prepare to publish to a specified stream. After this, all messages are sent to -// this stream exclusively. -// Returns >= 0 if successful, -1 if interrupted. -MLM_EXPORT int - mlm_client_set_producer (mlm_client_t *self, const char *stream); - // *** Draft method, for development use, may change without warning *** // Consume messages with matching subjects. The pattern is a regular expression // using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the @@ -102,7 +95,7 @@ MLM_EXPORT int // Send STREAM SEND message to server, takes ownership of message // and destroys message when done sending it. MLM_EXPORT int - mlm_client_send (mlm_client_t *self, const char *subject, zmsg_t **content); + mlm_client_send (mlm_client_t *self, const char *address, const char *subject, zmsg_t **content); // *** Draft method, for development use, may change without warning *** // Send MAILBOX SEND message to server, takes ownership of message @@ -168,7 +161,7 @@ MLM_EXPORT const char * // Send multipart string message to stream, end list with NULL // Returns 0 if OK, -1 if failed due to lack of memory or other error. MLM_EXPORT int - mlm_client_sendx (mlm_client_t *self, const char *subject, const char *content, ...); + mlm_client_sendx (mlm_client_t *self, const char *address, const char *subject, const char *content, ...); // *** Draft method, for development use, may change without warning *** // Send multipart string to mailbox, end list with NULL @@ -191,7 +184,7 @@ MLM_EXPORT int // subject and content strings when finished with them. To get the type of // the command, use mlm_client_command (). MLM_EXPORT int - mlm_client_recvx (mlm_client_t *self, char **subject_p, char **string_p, ...); + mlm_client_recvx (mlm_client_t *self, char **address_p, char **subject_p, char **string_p, ...); // *** Draft method, for development use, may change without warning *** // Enable verbose tracing (animation) of state machine activity. diff --git a/include/mlm_proto.h b/include/mlm_proto.h index 7a6d9def..b1db201b 100644 --- a/include/mlm_proto.h +++ b/include/mlm_proto.h @@ -197,7 +197,6 @@ typedef struct _mlm_proto_t mlm_proto_t; #define MLM_PROTO_CONNECTION_PING 2 // #define MLM_PROTO_CONNECTION_PONG 3 // #define MLM_PROTO_CONNECTION_CLOSE 4 // -#define MLM_PROTO_STREAM_WRITE 5 // #define MLM_PROTO_STREAM_READ 6 // #define MLM_PROTO_STREAM_SEND 7 // #define MLM_PROTO_STREAM_DELIVER 8 // diff --git a/issues/issue_45/mlm_perf_recv.c b/issues/issue_45/mlm_perf_recv.c index eab305bb..d5b5f8f6 100644 --- a/issues/issue_45/mlm_perf_recv.c +++ b/issues/issue_45/mlm_perf_recv.c @@ -19,7 +19,6 @@ void recv_actor (zsock_t *pipe, void *args) rc = mlm_client_connect (reader, MLM_DEFAULT_ENDPOINT, 1000, "reader1"); assert (rc == 0); - mlm_client_set_producer (writer, "weather"); mlm_client_set_consumer (reader, "weather", "temp."); zsock_t *incoming = mlm_client_msgpipe (reader); @@ -44,7 +43,7 @@ void recv_actor (zsock_t *pipe, void *args) if (streq (mlm_client_subject (reader), "temp.signal")) { printf ("Signaling after %d messages\n", count); - mlm_client_sendx (writer, "signal.ack", "ACK", NULL); + mlm_client_sendx (writer, "weather", "signal.ack", "ACK", NULL); count = 0; } zmsg_destroy (&msg); diff --git a/issues/issue_45/mlm_perf_send.c b/issues/issue_45/mlm_perf_send.c index e66ef41b..6cd018c6 100644 --- a/issues/issue_45/mlm_perf_send.c +++ b/issues/issue_45/mlm_perf_send.c @@ -34,7 +34,6 @@ void scenarioA (int total_count) assert (writer); int rc = mlm_client_connect (writer, MLM_DEFAULT_ENDPOINT, 0, "writer"); assert (rc == 0); - mlm_client_set_producer (writer, "weather"); mlm_client_t *reader = mlm_client_new (); assert (reader); @@ -47,10 +46,10 @@ void scenarioA (int total_count) int count = total_count; int64_t start = zclock_time (); while (count) { - mlm_client_sendx (writer, "rain.moscow", msg1024, NULL); + mlm_client_sendx (writer, "weather", "rain.moscow", msg1024, NULL); count--; } - mlm_client_sendx (writer, "rain.signal", "END", NULL); + mlm_client_sendx (writer, "weather", "rain.signal", "END", NULL); while (true) { zmsg_t *msg = mlm_client_recv (reader); @@ -81,7 +80,6 @@ void scenarioB (int total_count) assert (writer); int rc = mlm_client_connect (writer, MLM_DEFAULT_ENDPOINT, 0, "writer"); assert (rc == 0); - mlm_client_set_producer (writer, "weather"); printf ("Scenario B : Enqueuing and dequeuing %d messages of 1024 bytes each...\n",total_count); int count = 0; @@ -89,7 +87,7 @@ void scenarioB (int total_count) while (count != total_count) { char subject [20]; sprintf (subject, "temp.%d", count); - mlm_client_sendx (writer, subject, msg1024, NULL); + mlm_client_sendx (writer, "weather", subject, msg1024, NULL); count++; if ((count % 10000) == 0) printf ("."); @@ -138,13 +136,12 @@ void scenarioC (int total_count) assert (writer); int rc = mlm_client_connect (writer, MLM_DEFAULT_ENDPOINT, 0, "writer"); assert (rc == 0); - mlm_client_set_producer (writer, "weather"); printf ("Scenario C : Enqueuing and dequeuing simultaneously %d messages of 32 bytes each...\n",total_count); int count = 0; int64_t start = zclock_time (); while (count!=total_count) { - if (mlm_client_sendx (writer, "temp.moscow", msg32, NULL) !=0) { + if (mlm_client_sendx (writer, "weather", "temp.moscow", msg32, NULL) !=0) { printf ("Error : enable to send msg (mlm_client_sendx), count=%d",count); break; }; @@ -195,7 +192,6 @@ void scenarioD (int total_count) assert (writer); int rc = mlm_client_connect (writer, MLM_DEFAULT_ENDPOINT, 0, "writer"); assert (rc == 0); - mlm_client_set_producer (writer, "weather"); mlm_client_t *reader = mlm_client_new (); assert (reader); @@ -207,7 +203,7 @@ void scenarioD (int total_count) int count = 0; int64_t start = zclock_time (); while (count != total_count) { - mlm_client_sendx (writer, "temp.moscow", msg32768, NULL); + mlm_client_sendx (writer, "weather", "temp.moscow", msg32768, NULL); count++; if ((count % 10000) == 0) printf ("."); @@ -215,7 +211,7 @@ void scenarioD (int total_count) if (count >= 10000) printf ("\n"); - mlm_client_sendx (writer, "temp.signal", "END", NULL); + mlm_client_sendx (writer, "weather", "temp.signal", "END", NULL); while (true) { zmsg_t *msg = mlm_client_recv (reader); assert (msg); diff --git a/issues/issue_81.c b/issues/issue_81.c index 8c91374b..4da769f5 100644 --- a/issues/issue_81.c +++ b/issues/issue_81.c @@ -52,8 +52,6 @@ s_producer (zsock_t *pipe, void *args) int rc = mlm_client_connect (producer, broker_endpoint, 1000, "producer"); assert (rc == 0); - mlm_client_set_producer (producer, stream_name); - // Tell parent we're ready to go zsock_signal (pipe, 0); @@ -65,7 +63,7 @@ s_producer (zsock_t *pipe, void *args) break; // Caller sent us $TERM zmsg_t *content = zmsg_new (); zmsg_addstrf (content, "message %d", ++count); - mlm_client_send (producer, "subject", &content); + mlm_client_send (producer, stream_name, "subject", &content); } mlm_client_destroy (&producer); } diff --git a/issues/issue_84.c b/issues/issue_84.c index f99a125e..6624dc04 100644 --- a/issues/issue_84.c +++ b/issues/issue_84.c @@ -52,8 +52,6 @@ s_producer (zsock_t *pipe, void *args) int rc = mlm_client_connect (producer, broker_endpoint, 1000, "producer"); assert (rc == 0); - mlm_client_set_producer (producer, stream_name); - // Tell parent we're ready to go zsock_signal (pipe, 0); @@ -65,7 +63,7 @@ s_producer (zsock_t *pipe, void *args) break; // Caller sent us $TERM zmsg_t *content = zmsg_new (); zmsg_addstrf (content, "message %d", ++count); - mlm_client_send (producer, "subject", &content); + mlm_client_send (producer, stream_name, "subject", &content); } mlm_client_destroy (&producer); } diff --git a/issues/issue_87.c b/issues/issue_87.c index cc96ba41..c8d50ce5 100644 --- a/issues/issue_87.c +++ b/issues/issue_87.c @@ -14,7 +14,6 @@ s_producer (zsock_t *pipe, void *args) mlm_client_t *agent = mlm_client_new (); assert (agent); mlm_client_connect (agent, ENDPOINT, 1000, name); - mlm_client_set_producer (agent, STREAM); zsock_signal (pipe, 0); while (!zsys_interrupted) { @@ -23,7 +22,7 @@ s_producer (zsock_t *pipe, void *args) zmsg_t *zmsg = zmsg_new (); zframe_t *f = zframe_new ("ahoy", 5); zmsg_append (zmsg, &f); - int r = mlm_client_send (agent, "SUBJECT", &zmsg); + int r = mlm_client_send (agent, STREAM, "SUBJECT", &zmsg); if (r) zsys_debug ("mlm_client_send result = %i",r); } diff --git a/issues/issue_93.c b/issues/issue_93.c index 00551922..80e79297 100644 --- a/issues/issue_93.c +++ b/issues/issue_93.c @@ -6,8 +6,6 @@ s_producer (zsock_t *pipe, void *args) mlm_client_t *client = mlm_client_new (); assert (client); mlm_client_connect (client, MLM_DEFAULT_ENDPOINT, 1000, NULL); - int rc = mlm_client_set_producer (client, "stream"); - assert (rc == 0); zsock_signal (pipe, 0); // Send 1M messages of 1K then wait for 5 seconds @@ -17,7 +15,7 @@ s_producer (zsock_t *pipe, void *args) zframe_t *frame = zframe_new (NULL, 1024); memset (zframe_data (frame), 0, 1024); zmsg_append (content, &frame); - if (mlm_client_send (client, "subject", &content)) { + if (mlm_client_send (client, "stream", "subject", &content)) { zsys_debug ("mlm_client_send failed"); assert (false); } diff --git a/src/mlm_client.c b/src/mlm_client.c index fcb0a062..fb52fa22 100644 --- a/src/mlm_client.c +++ b/src/mlm_client.c @@ -80,11 +80,6 @@ static void s_replay_execute (client_t *self, replay_t *replay) { if (replay) { - if (streq (replay->name, "STREAM WRITE")) { - engine_set_next_event (self, set_producer_event); - mlm_proto_set_stream (self->message, replay->stream); - } - else if (streq (replay->name, "STREAM READ")) { engine_set_next_event (self, set_consumer_event); mlm_proto_set_stream (self->message, replay->stream); @@ -213,19 +208,6 @@ server_has_gone_offline (client_t *self) } -// --------------------------------------------------------------------------- -// prepare_stream_write_command -// - -static void -prepare_stream_write_command (client_t *self) -{ - zlistx_add_end (self->replays, - s_replay_new ("STREAM WRITE", self->args->stream, NULL)); - mlm_proto_set_stream (self->message, self->args->stream); -} - - // --------------------------------------------------------------------------- // prepare_stream_read_command // @@ -404,7 +386,7 @@ void mlm_stream_api_test (bool verbose) { const char *endpoint = "ipc://mlm_stream_api_server"; - char *subject, *content; + char *address, *subject, *content; int rc; printf (" * mlm_stream_api_test: \n"); @@ -430,22 +412,19 @@ mlm_stream_api_test (bool verbose) rc = mlm_client_connect (writer, endpoint, 1000, "writer"); assert (rc == 0); assert (mlm_client_connected (writer) == true); - // set writer to broadcast to channel "weather" - rc = mlm_client_set_producer (writer, "weather"); - assert (rc == 0); // start broadcasting temp messages - these will be lost, since our reader is not yet established - rc = mlm_client_sendx (writer, "temp.moscow", "1", NULL); + rc = mlm_client_sendx (writer, "weather", "temp.moscow", "1", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "rain.moscow", "2", NULL); + rc = mlm_client_sendx (writer, "weather", "rain.moscow", "2", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "temp.madrid", "3", NULL); + rc = mlm_client_sendx (writer, "weather", "temp.madrid", "3", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "rain.madrid", "4", NULL); + rc = mlm_client_sendx (writer, "weather", "rain.madrid", "4", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "temp.london", "5", NULL); + rc = mlm_client_sendx (writer, "weather", "temp.london", "5", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "rain.london", "6", NULL); + rc = mlm_client_sendx (writer, "weather", "rain.london", "6", NULL); assert (rc == 0); // create client broadcast sink in reader @@ -460,45 +439,51 @@ mlm_stream_api_test (bool verbose) assert (rc == 0); // start broadcasting temp messages - these will be received - rc = mlm_client_sendx (writer, "temp.moscow", "11", NULL); + rc = mlm_client_sendx (writer, "weather", "temp.moscow", "11", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "rain.moscow", "12", NULL); + rc = mlm_client_sendx (writer, "weather", "rain.moscow", "12", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "temp.madrid", "13", NULL); + rc = mlm_client_sendx (writer, "weather", "temp.madrid", "13", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "rain.madrid", "14", NULL); + rc = mlm_client_sendx (writer, "weather", "rain.madrid", "14", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "temp.london", "15", NULL); + rc = mlm_client_sendx (writer, "weather", "temp.london", "15", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "rain.london", "16", NULL); + rc = mlm_client_sendx (writer, "weather", "rain.london", "16", NULL); assert (rc == 0); // receive interesting broadcast messages (only the "temp.*" ones) - rc = mlm_client_recvx (reader, &subject, &content, NULL); + rc = mlm_client_recvx (reader, &address, &subject, &content, NULL); assert (rc != -1); + assert (streq (address, "weather")); assert (streq (subject, "temp.moscow")); assert (streq (content, "11")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - rc = mlm_client_recvx (reader, &subject, &content, NULL); + rc = mlm_client_recvx (reader, &address, &subject, &content, NULL); assert (rc != -1); + assert (streq (address, "weather")); assert (streq (subject, "temp.madrid")); assert (streq (content, "13")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_subject (reader), "temp.madrid")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - rc = mlm_client_recvx (reader, &subject, &content, NULL); + rc = mlm_client_recvx (reader, &address, &subject, &content, NULL); assert (rc != -1); + assert (streq (address, "weather")); assert (streq (subject, "temp.london")); assert (streq (content, "15")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); @@ -506,17 +491,17 @@ mlm_stream_api_test (bool verbose) mlm_client_destroy (&reader); // start broadcasting temp messages - these will not be received either - rc = mlm_client_sendx (writer, "temp.moscow", "21", NULL); + rc = mlm_client_sendx (writer, "weather", "temp.moscow", "21", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "rain.moscow", "22", NULL); + rc = mlm_client_sendx (writer, "weather", "rain.moscow", "22", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "temp.madrid", "23", NULL); + rc = mlm_client_sendx (writer, "weather", "temp.madrid", "23", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "rain.madrid", "24", NULL); + rc = mlm_client_sendx (writer, "weather", "rain.madrid", "24", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "temp.london", "25", NULL); + rc = mlm_client_sendx (writer, "weather", "temp.london", "25", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "rain.london", "26", NULL); + rc = mlm_client_sendx (writer, "weather", "rain.london", "26", NULL); assert (rc == 0); // Done, shut down @@ -530,7 +515,7 @@ mlm_stream_api_test (bool verbose) void mlm_service_api_test (bool verbose) { - char *subject, *content; + char *address, *subject, *content; int rc; printf (" * mlm_service_api: \n"); @@ -561,8 +546,6 @@ mlm_service_api_test (bool verbose) assert (rc == 0); assert (mlm_client_connected (requester) == true); -// mlm_client_set_producer (requester, "weather"); - mlm_client_t *worker = mlm_client_new (); assert (worker); mlm_client_set_verbose (worker, verbose); @@ -580,21 +563,25 @@ mlm_service_api_test (bool verbose) rc = mlm_client_sendforx (requester, "printer_service", "bw.A5", "Special conditions", NULL); assert (rc == 0); - rc = mlm_client_recvx (worker, &subject, &content, NULL); + rc = mlm_client_recvx (worker, &address, &subject, &content, NULL); assert (rc != -1); + assert (streq (address, "printer_service")); assert (streq (subject, "bw.A4")); assert (streq (content, "Important contract")); assert (streq (mlm_client_command (worker), "SERVICE DELIVER")); assert (streq (mlm_client_sender (worker), "requester_address")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - rc = mlm_client_recvx (worker, &subject, &content, NULL); + rc = mlm_client_recvx (worker, &address, &subject, &content, NULL); assert (rc != -1); + assert (streq (address, "printer_service")); assert (streq (subject, "bw.A5")); assert (streq (content, "Special conditions")); assert (streq (mlm_client_command (worker), "SERVICE DELIVER")); assert (streq (mlm_client_sender (worker), "requester_address")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); @@ -603,12 +590,14 @@ mlm_service_api_test (bool verbose) assert (rc == 0); mlm_client_destroy (&requester); - rc = mlm_client_recvx (worker, &subject, &content, NULL); + rc = mlm_client_recvx (worker, &address, &subject, &content, NULL); assert (rc != -1); + assert (streq (address, "printer_service")); assert (streq (subject, "bw.A6")); assert (streq (content, "Destroyed requester")); assert (streq (mlm_client_command (worker), "SERVICE DELIVER")); assert (streq (mlm_client_sender (worker), "requester_address")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); mlm_client_destroy (&worker); @@ -622,7 +611,7 @@ mlm_service_api_test (bool verbose) void mlm_services_api_test (bool verbose) { - char *subject, *content; + char *address, *subject, *content; int rc; printf (" * mlm_services_api: \n"); @@ -673,22 +662,18 @@ mlm_services_api_test (bool verbose) rc = mlm_client_set_worker (worker2, "print_service_address", "bw.*"); assert (rc == 0); - // define that requesters will generate printJob events - rc = mlm_client_set_producer (requester1, "print_service_stream"); - assert (rc == 0); - rc = mlm_client_set_producer (requester2, "print_service_stream"); - assert (rc == 0); // define that workers will listen to printJob events related to requests rc = mlm_client_set_consumer (worker1, "print_service_stream", "request.*"); assert (rc == 0); rc = mlm_client_set_consumer (worker2, "print_service_stream", "request.*"); assert (rc == 0); - rc = mlm_client_sendx (requester1, "request", "start", NULL); + rc = mlm_client_sendx (requester1, "print_service_stream", "request", "start", NULL); assert (rc == 0); - rc = mlm_client_recvx (worker1, &subject, &content, NULL); + rc = mlm_client_recvx (worker1, &address, &subject, &content, NULL); assert (rc != -1); + assert (streq (address, "print_service_stream")); assert (streq (subject, "request")); assert (streq (content, "start")); assert (mlm_client_status (worker1) == 0); @@ -698,16 +683,20 @@ mlm_services_api_test (bool verbose) assert (streq (mlm_client_sender (worker1), "requester_address_1")); assert (streq (mlm_client_subject (worker1), "request")); assert (mlm_client_tracker (worker1) == NULL); + zstr_free (&address); + assert (address == NULL); zstr_free (&subject); assert (subject == NULL); zstr_free (&content); assert (content == NULL); - rc = mlm_client_recvx (worker2, &subject, &content, NULL); + rc = mlm_client_recvx (worker2, &address, &subject, &content, NULL); assert (rc != -1); - assert (streq (mlm_client_address (worker2), "print_service_stream")); + assert (streq (address, "print_service_stream")); assert (streq (subject, "request")); assert (streq (content, "start")); + zstr_free (&address); + assert (address == NULL); zstr_free (&subject); assert (subject == NULL); zstr_free (&content); @@ -720,79 +709,89 @@ mlm_services_api_test (bool verbose) rc = mlm_client_sendforx (requester2, "print_service_address", "bw.A5", "Special conditions", NULL); assert (rc == 0); - rc = mlm_client_recvx (worker1, &subject, &content, NULL); + rc = mlm_client_recvx (worker1, &address, &subject, &content, NULL); assert (rc != -1); bool isFirst = false; if (streq (subject, "bw.A4")) { + assert (streq (address, "print_service_address")); assert (streq (mlm_client_sender (worker1), "requester_address_1")); assert (streq (content, "Important contract")); isFirst = true; } else { + assert (streq (address, "print_service_address")); assert (streq (subject, "bw.A5")); assert (streq (mlm_client_sender (worker1), "requester_address_2")); assert (streq (content, "Special conditions")); } assert (streq (mlm_client_command (worker1), "SERVICE DELIVER")); - assert (streq (mlm_client_address (worker1), "print_service_address")); assert (streq (mlm_client_subject (worker1), subject)); assert (streq (mlm_client_tracker (worker1), "")); assert (mlm_client_reason (worker1) == NULL); + zstr_free (&address); + assert (address == NULL); zstr_free (&subject); assert (subject == NULL); zstr_free (&content); assert (content == NULL); - rc = mlm_client_recvx (worker2, &subject, &content, NULL); + rc = mlm_client_recvx (worker2, &address, &subject, &content, NULL); assert (rc != -1); if ( isFirst ) { + assert (streq (address, "print_service_address")); assert (streq (subject, "bw.A5")); assert (streq (content, "Special conditions")); assert (streq (mlm_client_sender (worker2), "requester_address_2")); } else { + assert (streq (address, "print_service_address")); assert (streq (subject, "bw.A4")); assert (streq (mlm_client_sender (worker2), "requester_address_1")); assert (streq (content, "Important contract")); } assert (streq (mlm_client_command (worker2), "SERVICE DELIVER")); - assert (streq (mlm_client_address (worker2), "print_service_address")); assert (streq (mlm_client_subject (worker2), subject)); assert (streq (mlm_client_tracker (worker2), "")); assert (mlm_client_reason (worker2) == NULL); + zstr_free (&address); + assert (address == NULL); zstr_free (&subject); assert (subject == NULL); zstr_free (&content); assert (content == NULL); // generate event to tell workers work is done - rc = mlm_client_sendx (requester2, "request", "stop", NULL); + rc = mlm_client_sendx (requester2, "print_service_stream", "request", "stop", NULL); assert (rc == 0); - rc = mlm_client_recvx (worker1, &subject, &content, NULL); + rc = mlm_client_recvx (worker1, &address, &subject, &content, NULL); assert (rc != -1); assert (mlm_client_status (worker1) == 0); assert (streq (mlm_client_command (worker1), "STREAM DELIVER")); - assert (streq (mlm_client_address (worker1), "print_service_stream")); assert (streq (mlm_client_sender (worker1), "requester_address_2")); assert (streq (mlm_client_subject (worker1), "request")); assert (streq (mlm_client_tracker (worker1), "")); assert (mlm_client_reason (worker1) == NULL); + assert (streq (address, "print_service_stream")); assert (streq (subject, "request")); assert (streq (content, "stop")); + zstr_free (&address); + assert (address == NULL); zstr_free (&subject); assert (subject == NULL); zstr_free (&content); assert (content == NULL); - rc = mlm_client_recvx (worker2, &subject, &content, NULL); + rc = mlm_client_recvx (worker2, &address, &subject, &content, NULL); assert (rc != -1); assert (streq (mlm_client_command (worker2), "STREAM DELIVER")); - assert (streq (mlm_client_address (worker2), "print_service_stream")); assert (streq (mlm_client_sender (worker2), "requester_address_2")); assert (streq (mlm_client_subject (worker2), "request")); assert (streq (mlm_client_tracker (worker2), "")); assert (mlm_client_reason (worker2) == NULL); + assert (streq (address, "print_service_stream")); assert (streq (subject, "request")); assert (streq (content, "stop")); + zstr_free (&address); + assert (address == NULL); zstr_free (&subject); assert (subject == NULL); zstr_free (&content); @@ -829,10 +828,7 @@ mlm_client_test (bool verbose) assert (client); mlm_client_set_verbose (client, verbose); assert (mlm_client_connected (client) == false); - int rc = mlm_client_set_producer (client, "weather"); - assert (mlm_client_connected (client) == false); - assert ( rc == -1 ); - rc = mlm_client_set_consumer (client, "weather", ".*"); + int rc = mlm_client_set_consumer (client, "weather", ".*"); assert (mlm_client_connected (client) == false); assert ( rc == -1 ); rc = mlm_client_set_worker (client, "weather", ".*"); @@ -874,7 +870,7 @@ mlm_client_test (bool verbose) assert ( rc == 0 ); rc = mlm_client_connect (client, endpoint, 1000, "client_robust"); assert ( rc == 0 ); - + // Test, that issues with regexp are reported correctly rc = mlm_client_set_consumer (client, "MY_STREAM_WITH_BAD_PATTERN", "["); assert ( rc == -1 ); @@ -882,8 +878,6 @@ mlm_client_test (bool verbose) // stop the server zactor_destroy (&server); - rc = mlm_client_set_producer (client, "new_stream"); - assert ( rc == -1 ); rc = mlm_client_set_consumer (client, "new_stream", ".*"); assert ( rc == -1 ); rc = mlm_client_set_worker (client, "new_stream", ".*"); @@ -922,10 +916,6 @@ mlm_client_test (bool verbose) } rc = zstr_sendx (server, "LOAD", "src/mlm_client.cfg", NULL); assert (rc == 0); - rc = mlm_client_set_producer (client, "new_stream"); - assert ( rc == -1 ); // the method set producer is called too fast, - // so, the client didn't manage to establish a new connection with - // the newly appeared server zclock_sleep (5000); // wait a bit // after a while we are connected again assert (mlm_client_connected (client) == true ); @@ -1016,50 +1006,54 @@ mlm_client_test (bool verbose) rc = mlm_client_connect (reader, endpoint, 1000, ""); assert (rc == 0); - rc = mlm_client_set_producer (writer, "weather"); - assert (rc == 0); rc = mlm_client_set_consumer (reader, "weather", "temp.*"); assert (rc == 0); - rc = mlm_client_sendx (writer, "temp.moscow", "1", NULL); + rc = mlm_client_sendx (writer, "weather", "temp.moscow", "1", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "rain.moscow", "2", NULL); + rc = mlm_client_sendx (writer, "weather", "rain.moscow", "2", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "temp.madrid", "3", NULL); + rc = mlm_client_sendx (writer, "weather", "temp.madrid", "3", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "rain.madrid", "4", NULL); + rc = mlm_client_sendx (writer, "weather", "rain.madrid", "4", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "temp.london", "5", NULL); + rc = mlm_client_sendx (writer, "weather", "temp.london", "5", NULL); assert (rc == 0); - rc = mlm_client_sendx (writer, "rain.london", "6", NULL); + rc = mlm_client_sendx (writer, "weather", "rain.london", "6", NULL); assert (rc == 0); - char *subject, *content; - rc = mlm_client_recvx (reader, &subject, &content, NULL); + char *address, *subject, *content; + rc = mlm_client_recvx (reader, &address, &subject, &content, NULL); assert (rc != -1); + assert (streq (address, "weather")); assert (streq (subject, "temp.moscow")); assert (streq (content, "1")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - rc = mlm_client_recvx (reader, &subject, &content, NULL); + rc = mlm_client_recvx (reader, &address, &subject, &content, NULL); assert (rc != -1); + assert (streq (address, "weather")); assert (streq (subject, "temp.madrid")); assert (streq (content, "3")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_subject (reader), "temp.madrid")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - rc = mlm_client_recvx (reader, &subject, &content, NULL); + rc = mlm_client_recvx (reader, &address, &subject, &content, NULL); assert (rc != -1); + assert (streq (address, "weather")); assert (streq (subject, "temp.london")); assert (streq (content, "5")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); @@ -1078,14 +1072,16 @@ mlm_client_test (bool verbose) assert (rc != -1); char *attach; - rc = mlm_client_recvx (reader, &subject, &content, &attach, NULL); + rc = mlm_client_recvx (reader, &address, &subject, &content, &attach, NULL); assert (rc != -1); + assert (streq (address, "mailbox")); assert (streq (subject, "subject 1")); assert (streq (content, "Message 1")); assert (streq (attach, "attachment")); assert (streq (mlm_client_command (reader), "MAILBOX DELIVER")); assert (streq (mlm_client_subject (reader), "subject 1")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); zstr_free (&attach); @@ -1105,19 +1101,23 @@ mlm_client_test (bool verbose) rc = mlm_client_connect (reader, endpoint, 500, "mailbox"); assert (rc == 0); - rc = mlm_client_recvx (reader, &subject, &content, &attach, NULL); + rc = mlm_client_recvx (reader, &address, &subject, &content, &attach, NULL); assert (rc != -1); + assert (streq (address, "mailbox")); assert (streq (subject, "subject 2")); assert (streq (content, "Message 2")); assert (streq (mlm_client_command (reader), "MAILBOX DELIVER")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - rc = mlm_client_recvx (reader, &subject, &content, &attach, NULL); + rc = mlm_client_recvx (reader, &address, &subject, &content, &attach, NULL); assert (rc != -1); + assert (streq (address, "mailbox")); assert (streq (subject, "subject 3")); assert (streq (content, "Message 3")); assert (streq (mlm_client_command (reader), "MAILBOX DELIVER")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); @@ -1132,35 +1132,41 @@ mlm_client_test (bool verbose) rc = mlm_client_sendforx (writer, "printer", "bw.A5", "Special conditions", NULL); assert (rc != -1); - rc = mlm_client_recvx (reader, &subject, &content, NULL); + rc = mlm_client_recvx (reader, &address, &subject, &content, NULL); assert (rc != -1); + assert (streq (address, "printer")); assert (streq (subject, "bw.A4")); assert (streq (content, "Important contract")); assert (streq (mlm_client_command (reader), "SERVICE DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - rc = mlm_client_recvx (reader, &subject, &content, NULL); + rc = mlm_client_recvx (reader, &address, &subject, &content, NULL); assert (rc != -1); + assert (streq (address, "printer")); assert (streq (subject, "bw.A5")); assert (streq (content, "Special conditions")); assert (streq (mlm_client_command (reader), "SERVICE DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); // Test that writer shutdown does not cause message loss rc = mlm_client_set_consumer (reader, "weather", "temp.*"); assert (rc != -1); - rc = mlm_client_sendx (writer, "temp.brussels", "7", NULL); + rc = mlm_client_sendx (writer, "weather", "temp.brussels", "7", NULL); assert (rc != -1); mlm_client_destroy (&writer); - rc = mlm_client_recvx (reader, &subject, &content, NULL); + rc = mlm_client_recvx (reader, &address, &subject, &content, NULL); assert (rc != -1); + assert (streq (address, "weather")); assert (streq (subject, "temp.brussels")); assert (streq (content, "7")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); mlm_client_destroy (&reader); @@ -1198,10 +1204,6 @@ mlm_client_test (bool verbose) rc = mlm_client_connect (reader2, endpoint, 1000, ""); assert (rc == 0); - rc = mlm_client_set_producer (writer1, "weather"); - assert (rc != -1); - rc = mlm_client_set_producer (writer2, "traffic"); - assert (rc != -1); rc = mlm_client_set_consumer (reader1, "weather", "newyork"); assert (rc != -1); rc = mlm_client_set_consumer (reader1, "traffic", "newyork"); @@ -1211,41 +1213,45 @@ mlm_client_test (bool verbose) rc = mlm_client_set_consumer (reader2, "traffic", "newyork"); assert (rc != -1); - rc = mlm_client_sendx (writer1, "newyork", "8", NULL); + rc = mlm_client_sendx (writer1, "weather", "newyork", "8", NULL); assert (rc != -1); - rc = mlm_client_recvx (reader1, &subject, &content, NULL); + rc = mlm_client_recvx (reader1, &address, &subject, &content, NULL); assert (rc != -1); - assert (streq (mlm_client_address (reader1), "weather")); + assert (streq (address, "weather")); assert (streq (subject, "newyork")); assert (streq (content, "8")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - rc = mlm_client_recvx (reader2, &subject, &content, NULL); + rc = mlm_client_recvx (reader2, &address, &subject, &content, NULL); assert (rc != -1); - assert (streq (mlm_client_address (reader2), "weather")); + assert (streq (address, "weather")); assert (streq (subject, "newyork")); assert (streq (content, "8")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - rc = mlm_client_sendx (writer2, "newyork", "85", NULL); + rc = mlm_client_sendx (writer2, "traffic", "newyork", "85", NULL); assert (rc != -1); - rc = mlm_client_recvx (reader1, &subject, &content, NULL); + rc = mlm_client_recvx (reader1, &address, &subject, &content, NULL); assert (rc != -1); - assert (streq (mlm_client_address (reader1), "traffic")); + assert (streq (address, "traffic")); assert (streq (subject, "newyork")); assert (streq (content, "85")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - rc = mlm_client_recvx (reader2, &subject, &content, NULL); + rc = mlm_client_recvx (reader2, &address, &subject, &content, NULL); assert (rc != -1); - assert (streq (mlm_client_address (reader2), "traffic")); + assert (streq (address, "traffic")); assert (streq (subject, "newyork")); assert (streq (content, "85")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); diff --git a/src/mlm_client_engine.inc b/src/mlm_client_engine.inc index 1a86f450..ab9b0fb9 100644 --- a/src/mlm_client_engine.inc +++ b/src/mlm_client_engine.inc @@ -41,7 +41,6 @@ typedef enum { connect_event = 2, bad_endpoint_event = 3, destructor_event = 4, - set_producer_event = 5, set_consumer_event = 6, set_worker_event = 7, ok_event = 8, @@ -81,7 +80,6 @@ s_event_name [] = { "connect", "bad_endpoint", "destructor", - "set_producer", "set_consumer", "set_worker", "OK", @@ -177,8 +175,6 @@ static void client_is_connected (client_t *self); static void signal_server_not_present (client_t *self); -static void - prepare_stream_write_command (client_t *self); static void prepare_stream_read_command (client_t *self); static void @@ -538,15 +534,6 @@ s_client_execute (s_client_t *self, event_t event) } } else - if (self->event == set_producer_event) { - if (!self->exception) { - // signal failure - if (self->verbose) - zsys_debug ("%s: $ signal failure", self->log_prefix); - signal_failure (&self->client); - } - } - else if (self->event == set_consumer_event) { if (!self->exception) { // signal failure @@ -636,25 +623,6 @@ s_client_execute (s_client_t *self, event_t event) break; case connected_state: - if (self->event == set_producer_event) { - if (!self->exception) { - // prepare stream write command - if (self->verbose) - zsys_debug ("%s: $ prepare stream write command", self->log_prefix); - prepare_stream_write_command (&self->client); - } - if (!self->exception) { - // send STREAM_WRITE - if (self->verbose) - zsys_debug ("%s: $ send STREAM_WRITE", - self->log_prefix); - mlm_proto_set_id (self->message, MLM_PROTO_STREAM_WRITE); - mlm_proto_send (self->message, self->dealer); - } - if (!self->exception) - self->state = confirming_state; - } - else if (self->event == set_consumer_event) { if (!self->exception) { // prepare stream read command @@ -937,23 +905,6 @@ s_client_execute (s_client_t *self, event_t event) if (self->event == error_event) { } else - if (self->event == set_producer_event) { - if (!self->exception) { - // send STREAM_WRITE - if (self->verbose) - zsys_debug ("%s: $ send STREAM_WRITE", - self->log_prefix); - mlm_proto_set_id (self->message, MLM_PROTO_STREAM_WRITE); - mlm_proto_send (self->message, self->dealer); - } - if (!self->exception) { - // get next replay command - if (self->verbose) - zsys_debug ("%s: $ get next replay command", self->log_prefix); - get_next_replay_command (&self->client); - } - } - else if (self->event == set_consumer_event) { if (!self->exception) { // send STREAM_READ @@ -1029,15 +980,6 @@ s_client_execute (s_client_t *self, event_t event) break; case disconnected_state: - if (self->event == set_producer_event) { - if (!self->exception) { - // signal failure - if (self->verbose) - zsys_debug ("%s: $ signal failure", self->log_prefix); - signal_failure (&self->client); - } - } - else if (self->event == set_consumer_event) { if (!self->exception) { // signal failure @@ -1329,12 +1271,6 @@ s_client_handle_cmdpipe (zloop_t *loop, zsock_t *reader, void *argument) s_client_execute (self, destructor_event); } else - if (streq (method, "SET PRODUCER")) { - zstr_free (&self->args.stream); - zsock_recv (self->cmdpipe, "s", &self->args.stream); - s_client_execute (self, set_producer_event); - } - else if (streq (method, "SET CONSUMER")) { zstr_free (&self->args.stream); zstr_free (&self->args.pattern); @@ -1385,10 +1321,12 @@ s_client_handle_msgpipe (zloop_t *loop, zsock_t *reader, void *argument) zsock_signal (self->cmdpipe, 0); else if (streq (method, "STREAM SEND")) { + char *address; char *subject; zmsg_t *content; - zsock_brecv (self->msgpipe, "sp", &subject, &content); + zsock_brecv (self->msgpipe, "ssp", &address, &subject, &content); mlm_proto_set_id (self->message, MLM_PROTO_STREAM_SEND); + mlm_proto_set_address (self->message, address); mlm_proto_set_subject (self->message, subject); mlm_proto_set_content (self->message, &content); mlm_proto_send (self->message, self->dealer); @@ -1705,23 +1643,6 @@ mlm_client_destructor (mlm_client_t *self) } -// --------------------------------------------------------------------------- -// Prepare to publish to a specified stream. After this, all messages are sent to -// this stream exclusively. -// Returns >= 0 if successful, -1 if interrupted. - -int -mlm_client_set_producer (mlm_client_t *self, const char *stream) -{ - assert (self); - - zsock_send (self->actor, "ss", "SET PRODUCER", stream); - if (s_accept_reply (self, "SUCCESS", "FAILURE", NULL)) - return -1; // Interrupted or timed-out - return self->status; -} - - // --------------------------------------------------------------------------- // Consume messages with matching subjects. The pattern is a regular expression // using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the @@ -1766,12 +1687,12 @@ mlm_client_set_worker (mlm_client_t *self, const char *address, const char *patt // destroys when done sending it. int -mlm_client_send (mlm_client_t *self, const char *subject, zmsg_t **content_p) +mlm_client_send (mlm_client_t *self, const char *address, const char *subject, zmsg_t **content_p) { assert (self); // Send message name as first, separate frame zstr_sendm (self->msgpipe, "STREAM SEND"); - int rc = zsock_bsend (self->msgpipe, "sp", subject, *content_p); + int rc = zsock_bsend (self->msgpipe, "ssp", address, subject, *content_p); *content_p = NULL; // Take ownership of content return rc; } @@ -1945,6 +1866,7 @@ mlm_client_set_verbose (mlm_client_t *self, bool verbose) int mlm_client_sendx ( mlm_client_t *self, + const char *address, const char *subject, const char *content, ...) { @@ -1959,7 +1881,7 @@ mlm_client_sendx ( content = va_arg (args, char *); } va_end (args); - return mlm_client_send (self, subject, &msg); + return mlm_client_send (self, address, subject, &msg); } @@ -2027,6 +1949,7 @@ mlm_client_sendforx ( int mlm_client_recvx ( mlm_client_t *self, + char **address_p, char **subject_p, char **string_p, ...) { @@ -2035,6 +1958,7 @@ mlm_client_recvx ( if (!msg) return -1; + *address_p = strdup (mlm_client_address (self)); *subject_p = strdup (mlm_client_subject (self)); int count = 0; va_list args; diff --git a/src/mlm_perftest.c b/src/mlm_perftest.c index 41fdaeb2..68cfe2f5 100644 --- a/src/mlm_perftest.c +++ b/src/mlm_perftest.c @@ -39,7 +39,6 @@ int main (int argc, char *argv []) rc = mlm_client_connect (writer, "tcp://127.0.0.1:9999", 0, "writer"); assert (rc == 0); - mlm_client_set_producer (writer, "weather"); mlm_client_set_consumer (reader, "weather", "temp."); int64_t start = zclock_time (); @@ -49,11 +48,11 @@ int main (int argc, char *argv []) printf ("COUNT=%d\n", count); while (count) { - mlm_client_sendx (writer, "temp.moscow", "10", NULL); - mlm_client_sendx (writer, "rain.moscow", "0", NULL); + mlm_client_sendx (writer, "weather", "temp.moscow", "10", NULL); + mlm_client_sendx (writer, "weather", "rain.moscow", "0", NULL); count--; } - mlm_client_sendx (writer, "temp.signal", "END", NULL); + mlm_client_sendx (writer, "weather", "temp.signal", "END", NULL); while (true) { zmsg_t *msg = mlm_client_recv (reader); diff --git a/src/mlm_proto.c b/src/mlm_proto.c index 4b2beea0..30bd356c 100644 --- a/src/mlm_proto.c +++ b/src/mlm_proto.c @@ -342,16 +342,13 @@ mlm_proto_recv (mlm_proto_t *self, zsock_t *input) case MLM_PROTO_CONNECTION_CLOSE: break; - case MLM_PROTO_STREAM_WRITE: - GET_STRING (self->stream); - break; - case MLM_PROTO_STREAM_READ: GET_STRING (self->stream); GET_STRING (self->pattern); break; case MLM_PROTO_STREAM_SEND: + GET_STRING (self->address); GET_STRING (self->subject); // Get zero or more remaining frames zmsg_destroy (&self->content); @@ -486,14 +483,12 @@ mlm_proto_send (mlm_proto_t *self, zsock_t *output) frame_size += 2; // version frame_size += 1 + strlen (self->address); break; - case MLM_PROTO_STREAM_WRITE: - frame_size += 1 + strlen (self->stream); - break; case MLM_PROTO_STREAM_READ: frame_size += 1 + strlen (self->stream); frame_size += 1 + strlen (self->pattern); break; case MLM_PROTO_STREAM_SEND: + frame_size += 1 + strlen (self->address); frame_size += 1 + strlen (self->subject); break; case MLM_PROTO_STREAM_DELIVER: @@ -562,16 +557,13 @@ mlm_proto_send (mlm_proto_t *self, zsock_t *output) PUT_STRING (self->address); break; - case MLM_PROTO_STREAM_WRITE: - PUT_STRING (self->stream); - break; - case MLM_PROTO_STREAM_READ: PUT_STRING (self->stream); PUT_STRING (self->pattern); break; case MLM_PROTO_STREAM_SEND: + PUT_STRING (self->address); PUT_STRING (self->subject); nbr_frames += self->content? zmsg_size (self->content): 1; have_content = true; @@ -693,11 +685,6 @@ mlm_proto_print (mlm_proto_t *self) zsys_debug ("MLM_PROTO_CONNECTION_CLOSE:"); break; - case MLM_PROTO_STREAM_WRITE: - zsys_debug ("MLM_PROTO_STREAM_WRITE:"); - zsys_debug (" stream='%s'", self->stream); - break; - case MLM_PROTO_STREAM_READ: zsys_debug ("MLM_PROTO_STREAM_READ:"); zsys_debug (" stream='%s'", self->stream); @@ -706,6 +693,7 @@ mlm_proto_print (mlm_proto_t *self) case MLM_PROTO_STREAM_SEND: zsys_debug ("MLM_PROTO_STREAM_SEND:"); + zsys_debug (" address='%s'", self->address); zsys_debug (" subject='%s'", self->subject); zsys_debug (" content="); if (self->content) @@ -867,8 +855,6 @@ mlm_proto_command (mlm_proto_t *self) case MLM_PROTO_CONNECTION_CLOSE: return ("CONNECTION_CLOSE"); break; - case MLM_PROTO_STREAM_WRITE: - return ("STREAM_WRITE"); break; case MLM_PROTO_STREAM_READ: return ("STREAM_READ"); @@ -1226,18 +1212,6 @@ mlm_proto_test (bool verbose) mlm_proto_recv (self, input); assert (mlm_proto_routing_id (self)); } - mlm_proto_set_id (self, MLM_PROTO_STREAM_WRITE); - - mlm_proto_set_stream (self, "Life is short but Now lasts for ever"); - // Send twice - mlm_proto_send (self, output); - mlm_proto_send (self, output); - - for (instance = 0; instance < 2; instance++) { - mlm_proto_recv (self, input); - assert (mlm_proto_routing_id (self)); - assert (streq (mlm_proto_stream (self), "Life is short but Now lasts for ever")); - } mlm_proto_set_id (self, MLM_PROTO_STREAM_READ); mlm_proto_set_stream (self, "Life is short but Now lasts for ever"); diff --git a/src/mlm_server.c b/src/mlm_server.c index 31c57c5b..138246ac 100644 --- a/src/mlm_server.c +++ b/src/mlm_server.c @@ -415,24 +415,6 @@ register_new_client (client_t *self) } -// --------------------------------------------------------------------------- -// store_stream_writer -// - -static void -store_stream_writer (client_t *self) -{ - // A writer talks to a single stream - self->writer = s_stream_require (self, mlm_proto_stream (self->message)); - if (self->writer) - mlm_proto_set_status_code (self->message, MLM_PROTO_SUCCESS); - else { - engine_set_exception (self, exception_event); - zsys_warning ("writer trying to talk to multiple streams"); - } -} - - // --------------------------------------------------------------------------- // store_stream_reader // @@ -460,20 +442,16 @@ store_stream_reader (client_t *self) static void write_message_to_stream (client_t *self) { - if (self->writer) { - mlm_msg_t *msg = mlm_msg_new ( - self->address, - self->writer->name, - mlm_proto_subject (self->message), - NULL, - mlm_proto_timeout (self->message), - mlm_proto_get_content (self->message)); - zsock_bsend (self->writer->msgpipe, "pp", self, msg); - } - else { - engine_set_exception (self, exception_event); - zsys_warning ("client attempted to send without writer"); - } + mlm_msg_t *msg = mlm_msg_new ( + self->address, + mlm_proto_address (self->message), + mlm_proto_subject (self->message), + NULL, + mlm_proto_timeout (self->message), + mlm_proto_get_content (self->message)); + stream_t *stream = s_stream_require (self, mlm_proto_address (self->message)); + assert (stream); + zsock_bsend (stream->msgpipe, "pp", self, msg); } @@ -771,15 +749,6 @@ mlm_server_test (bool verbose) mlm_proto_t *proto = mlm_proto_new (); - // Server insists that connection starts properly - mlm_proto_set_id (proto, MLM_PROTO_STREAM_WRITE); - mlm_proto_send (proto, reader); - zclock_sleep (500); // to calm things down && make memcheck pass. Thanks @malanka - mlm_proto_recv (proto, reader); - zclock_sleep (500); // detto as above - assert (mlm_proto_id (proto) == MLM_PROTO_ERROR); - assert (mlm_proto_status_code (proto) == MLM_PROTO_COMMAND_INVALID); - // Now do a stream publish-subscribe test zsock_t *writer = zsock_new (ZMQ_DEALER); assert (writer); @@ -797,13 +766,6 @@ mlm_server_test (bool verbose) mlm_proto_recv (proto, writer); assert (mlm_proto_id (proto) == MLM_PROTO_OK); - // Prepare to write and read a "weather" stream - mlm_proto_set_id (proto, MLM_PROTO_STREAM_WRITE); - mlm_proto_set_stream (proto, "weather"); - mlm_proto_send (proto, writer); - mlm_proto_recv (proto, writer); - assert (mlm_proto_id (proto) == MLM_PROTO_OK); - mlm_proto_set_id (proto, MLM_PROTO_STREAM_READ); mlm_proto_set_pattern (proto, "temp.*"); mlm_proto_send (proto, reader); @@ -976,13 +938,13 @@ mlm_server_test (bool verbose) mlm_client_t *client_1 = mlm_client_new (); int rv = mlm_client_connect (client_1, endpoint, 1000, "Karol"); assert (rv != -1); - rv = mlm_client_set_producer (client_1, "STREAM_1"); + rv = mlm_client_sendx (client_1, "STREAM_1", "1", NULL); assert (rv != -1); mlm_client_t *client_2 = mlm_client_new (); rv = mlm_client_connect (client_2, endpoint, 1000, "Tomas"); assert (rv != -1); - rv = mlm_client_set_producer (client_2, "STREAM_2"); + rv = mlm_client_sendx (client_2, "STREAM_2", "2", NULL); assert (rv != -1); mlm_client_t *client_3 = mlm_client_new (); @@ -1039,7 +1001,7 @@ mlm_server_test (bool verbose) mlm_client_t *client_4 = mlm_client_new (); rv = mlm_client_connect (client_4, endpoint, 1000, "Michal"); assert (rv >= 0); - rv = mlm_client_set_producer (client_4, "New stream"); + rv = mlm_client_sendx (client_4, "New stream", "n", NULL); assert (rv >= 0); zlistx_add_end (expected_streams, (void *) "STREAM_1"); diff --git a/src/mlm_server_engine.inc b/src/mlm_server_engine.inc index b48a5448..d3d0e31c 100644 --- a/src/mlm_server_engine.inc +++ b/src/mlm_server_engine.inc @@ -39,7 +39,6 @@ typedef enum { NULL_event = 0, terminate_event = 1, connection_open_event = 2, - stream_write_event = 3, stream_read_event = 4, stream_send_event = 5, mailbox_send_event = 6, @@ -158,8 +157,6 @@ static void check_for_mailbox_messages (client_t *self); static void signal_command_invalid (client_t *self); -static void - store_stream_writer (client_t *self); static void store_stream_reader (client_t *self); static void @@ -384,9 +381,6 @@ s_protocol_event (mlm_proto_t *message) case MLM_PROTO_CONNECTION_CLOSE: return connection_close_event; break; - case MLM_PROTO_STREAM_WRITE: - return stream_write_event; - break; case MLM_PROTO_STREAM_READ: return stream_read_event; break; @@ -701,24 +695,6 @@ s_client_execute (s_client_t *self, event_t event) break; case connected_state: - if (self->event == stream_write_event) { - if (!self->exception) { - // store stream writer - if (self->server->verbose) - zsys_debug ("%s: $ store stream writer", self->log_prefix); - store_stream_writer (&self->client); - } - if (!self->exception) { - // send OK - if (self->server->verbose) - zsys_debug ("%s: $ send OK", - self->log_prefix); - mlm_proto_set_id (self->server->message, MLM_PROTO_OK); - mlm_proto_set_routing_id (self->server->message, self->routing_id); - mlm_proto_send (self->server->message, self->server->router); - } - } - else if (self->event == stream_read_event) { if (!self->exception) { // store stream reader diff --git a/src/mlm_tutorial.c b/src/mlm_tutorial.c index 6f95523f..11d7e481 100644 --- a/src/mlm_tutorial.c +++ b/src/mlm_tutorial.c @@ -70,28 +70,27 @@ int main (int argc, char *argv []) rc = mlm_client_connect (writer, "tcp://127.0.0.1:9999", 1000, "writer"); assert (rc == 0); - // The writer publishes to the "weather" stream - mlm_client_set_producer (writer, "weather"); - // The reader consumes temperature messages off the "weather" stream mlm_client_set_consumer (reader, "weather", "temp.*"); // The writer sends a series of messages with various subjects. The // sendx method sends string data to the stream (we send the subject, // then one or more strings): - mlm_client_sendx (writer, "temp.moscow", "1", NULL); - mlm_client_sendx (writer, "rain.moscow", "2", NULL); - mlm_client_sendx (writer, "temp.madrid", "3", NULL); - mlm_client_sendx (writer, "rain.madrid", "4", NULL); - mlm_client_sendx (writer, "temp.london", "5", NULL); - mlm_client_sendx (writer, "rain.london", "6", NULL); + mlm_client_sendx (writer, "weather", "temp.moscow", "1", NULL); + mlm_client_sendx (writer, "weather", "rain.moscow", "2", NULL); + mlm_client_sendx (writer, "weather", "temp.madrid", "3", NULL); + mlm_client_sendx (writer, "weather", "rain.madrid", "4", NULL); + mlm_client_sendx (writer, "weather", "temp.london", "5", NULL); + mlm_client_sendx (writer, "weather", "rain.london", "6", NULL); // The simplest way to receive a message is via the recvx method, // which stores multipart string data: - char *subject, *content; - mlm_client_recvx (reader, &subject, &content, NULL); + char *address, *subject, *content; + mlm_client_recvx (reader, &address, &subject, &content, NULL); + assert (streq (address, "weather")); assert (streq (subject, "temp.moscow")); assert (streq (content, "1")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); @@ -102,15 +101,19 @@ int main (int argc, char *argv []) assert (streq (mlm_client_address (reader), "weather")); // Let's get the other two messages: - mlm_client_recvx (reader, &subject, &content, NULL); + mlm_client_recvx (reader, &address, &subject, &content, NULL); + assert (streq (address, "weather")); assert (streq (subject, "temp.madrid")); assert (streq (content, "3")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); - mlm_client_recvx (reader, &subject, &content, NULL); + mlm_client_recvx (reader, &address, &subject, &content, NULL); + assert (streq (address, "weather")); assert (streq (subject, "temp.london")); assert (streq (content, "5")); + zstr_free (&address); zstr_free (&subject); zstr_free (&content); diff --git a/src/mshell.c b/src/mshell.c index f7c07999..e708f421 100644 --- a/src/mshell.c +++ b/src/mshell.c @@ -66,8 +66,7 @@ int main (int argc, char *argv []) return 0; } if (content) { - mlm_client_set_producer (client, stream); - mlm_client_sendx (client, subject, content, NULL); + mlm_client_sendx (client, stream, subject, content, NULL); } else { // Consume the event subjects specified by the pattern