Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic Number of subchannels in PUSH/PULL model #214

Open
bpadalino opened this issue Oct 18, 2019 · 11 comments
Open

Dynamic Number of subchannels in PUSH/PULL model #214

bpadalino opened this issue Oct 18, 2019 · 11 comments
Labels

Comments

@bpadalino
Copy link

I'm experimenting with FairMQ and I am trying to understand a few of the concepts that I seem to be missing.

I am interested in using the shmem transport, and specifically I want to do something like the built in FairMQMultiplier device, but I am looking to make it more dynamic without interrupting the service already provided by other channels.

From what I can tell, the channel configurations are parsed from JSON or input through the command line. I'm interested in performing the 1:N multiplication of my data based on the number of connected devices when I setup a PUSH/PULL model. Currently, that model seems to split the work among the connected clients, and not duplicate all messages. I haven't found a mode or setting that seems to be what I want just yet.

Let me know if this request doesn't make sense and I can try to clarify. Any guidance would be helpful.

@dennisklein
Copy link
Member

If I understand you correctly, you are looking for the PUB/SUB scalability protocol (duplicates messages). At the moment our shmem transport does not support PUB/SUB. Received messages would all point to the same underlying shmem buffer, so one would need to implement copy-on-write semantics (OR set the buffer read-only). This would most likely involve extending the existing message API. While certainly doable, it was just not done up to now due to lack of use cases (known to us) justifying the manpower.

At the moment, there are no plans to allocate any manpower from the core dev team on this in the near future, but I will present this issue in our next internal dev meeting on Monday (if you have additional information to strengthen your use case you do not want to post publicly, you are welcome to contact me via email too).

Alternatively, we would definitely be happy to guide and review any community contribution attempt on this topic.

@bpadalino
Copy link
Author

Indeed that is what I am looking for.

I'm willing to do the work with guidance of code style so it can be contributed back. I am guessing it would be part of the channel and/or socket configuration like duplicate=1 and to apply it when the socket type is set to push.

Then the same semantics that are done with the Copy() call in the Copy/Push example, but internal instead?

@dennisklein
Copy link
Member

dennisklein commented Oct 18, 2019

Socket types would be PUB/SUB, not PUSH/PULL. As a first step, I would recommend to build your use case with the PUB/SUB channels with e.g. the standard zmq transport. Once this works, we can point you to the relevant source code to enable the shmem transport.

@dennisklein
Copy link
Member

dennisklein commented Oct 18, 2019

If you need a PUB/SUB-like pattern that can create backpressure (PUB/SUB drops messages if queues are full), you might want to have a look at REQ/REP channel types. You can connect multiple REQ channels to a single REP channel. Then you can have the REQ channels ask for work and so forth.

@bpadalino
Copy link
Author

I can't handle dropped messages, unfortunately. Alternatively, can I dynamically add a new subchannel to a device once it's running, and be able to query the number of active channels?

I tried switching over to zmq just for my current PUSH/PULL mechanism and it seems I get a SIGABRT Bad Address for some reason. I am trying to install nanomsg now to see if that transport also has the issue.

Can you let me know about dynamically adding subchannels? Then I could do a REQ/REP to add/remove channels dynamically in my program?

@bpadalino
Copy link
Author

It seems to be some strange ZMQ bug with the Bad Address SIGABRT. So I've successfully installed nanomsg and msgpack, switched over to PUB/SUB instead of PUSH/PULL and notice my clients now get all the messages from the publisher, and the sinks can dynamically connect.

I suppose now I need to figure out where in shmem I need to modify to get PUB/SUB to work?

@dennisklein
Copy link
Member

I suppose now I need to figure out where in shmem I need to modify to get PUB/SUB to work?

Correct, some parts of the FairMQChannel class and the code in fairmq/shmem files will be the area to get familiar with. You will find that the shmem transport still uses zeromq for its meta data band and connection mgmt. Next, you will find some code dealing with the lifetime mgmt of the shmem buffers, e.g. some signal might be sent back to the original process who allocated the message so it knows when it is safe to deallocate. Now, when enabling PUB/SUB for shmem, you need to see whether the existing refcounting can still work or if it needs to be modified to account for the duplication of messages by the publisher. On the subscriber side you will need to figure out a solution to not violate FairMQ's single message ownership principle. That means, that the shmem message buffer either needs to become read-only or copy-on-write. This needs some research on the shmem subsystem and if and how it could be implemented. As you will see we use Boost::interprocess. Maybe, there are hints on how to do such a thing already in their docs.

@dennisklein
Copy link
Member

It seems to be some strange ZMQ bug with the Bad Address SIGABRT

Could you pls open a separate bug report for this with FairMQ (we have a bug report template) so we can see, if it is something we could handle better and also for documentation it would be useful, other might encounter the issue too. Thx!

@bpadalino
Copy link
Author

I will try to track down the ZMQ issue - see if it's been fixed in a later release, and file a bug report if it hasn't.

As for the boost::interprocess stuff, it seems they have some documentation on opening read-only or copy on write segments.

In fairmq/shm, I see the fSegment and fManagementSegment in the Manager.cxx file, but I don't see where I might distinguish a PUB versus SUB.

Also, can you shed a little more light on the ZMQ requirement for the management of the shmem buffers? What information is required to be sent over ZMQ versus just over the shmem channel? Sorry for my ignorance on the topic.

@dennisklein
Copy link
Member

dennisklein commented Oct 21, 2019

, but I don't see where I might distinguish a PUB versus SUB.

https://github.com/FairRootGroup/FairMQ/blob/dev/fairmq/shmem/FairMQSocketSHM.cxx#L78-L82

As for the boost::interprocess stuff, it seems they have some documentation on opening read-only or copy on write segments.

Yes, this looks like the right stuff you would need. My feeling is, a read-only solution will turn out to be less complicated to implement on top of the existing code and thus might be the best direction to start with. But I might be wrong, I also have no idea how sophisticated your software development skills are and what experience you have, so it might be even a piece of cake for you :)

Also, can you shed a little more light on the ZMQ requirement for the management of the shmem buffers? What information is required to be sent over ZMQ versus just over the shmem channel? Sorry for my ignorance on the topic.

Don't worry, it is a perfectly legit question. Technically, there is no ZMQ requirement for a shmem implementation. It is more of a historic development. First, we had the zeromq transport implementation in FairMQ. All the different transports offered by ZMQ will internally copy the message buffers in one way or another. Eventually, we worked on a requirement from our community to reduce memory pressure by skipping this internal copy in the case of intra-node communication. The easiest approach maintaining most of the features (connection semantics, message queuing, ZMQ's scalability protocols like PAIR, PUSH/PULL, REQ/REP (with some exceptions)) was to modify the existing zeromq transport in a way to put the user message content into a shmem buffer and transfer a handle to it via zmq. This approach turned out to outperform other attempts to move also the remaining zmq logic to shmem natively (e.g. one attempt was using boost::interprocess::message_queue). I am not saying a fast(er), pure shmem implementation can be achieved (we don't know), but there was no motivation to follow down this path (yet) since the existing shmem transport in FairMQ performs well enough (we see message rates of O(10^6) and bandwidth limitations are usually caused by the chosen memory allocator) for all the use cases we had to solve up to now.

@rbx
Copy link
Member

rbx commented Oct 21, 2019

Can you let me know about dynamically adding subchannels? Then I could do a REQ/REP to add/remove channels dynamically in my program?

No, channels or sub-channels can only be added/modified during the init state, doesn't have to be JSON or command line, you can add them programmatically via config/command plugin. But if your device is already in running state, you will need a trip back to init.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants