-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Method of accepting/disposing messages on a Receiver which doesn't stop the receiving of messages on that Receiver #282
Comments
I think there's some confusion. When you
Depending on the message settle mode, there are some internal states that need to be modified. Having a separate entity that does the disposal simply means the lock is hidden from you but it's still there. Plus, I'm also not aware of any other AMQP 1.0 clients that does this. Please refer me to one if you know one. |
While I may not fully understand the value behind this request, you are welcome to create your own fork and implement it as you see fit |
Ah, that does improve things then - if no IO is actually happening during the .await while disposing then I suppose it's not as big deal as it would otherwise be to .await while receiving. I've found that I'm having the same problem on the publisher end. Say we have a tokio mpsc
I think in this case the .await of the Is this understanding correct? If so, is there some way to achieve concurrency for a |
In this case, you have the |
Apologies for the delayed response. Thank you, yes the |
Following the cancel safety example, it seems the generally accepted way of accepting messages from another thread is to use a channel to send the original message back from the other thread to the receiver function and
tokio::select!
over that channel and the Receiver'srecv()
method. The problem is that once we've got a message back on this channel and we callreceiver.accept(msg)
on it, we then must await the result of that call - and while awaiting that, we are no longer receiving messages. The usual way to we'd do things concurrently would be to usetokio::spawn
ortokio::task::spawn_local
to spawn another task which can .await the call toreceiver.accept
orreceiver.dispose
. If we do that, we'd need to ensure the receiver would live long enough whilst doing this, so we'd need to wrap it in anArc
(if usingtokio::spawn
) orRc
(if usingtokio::task::spawn_local
). However, even that won't be enough for this because the receiver's.recv()
method takes a mutable reference to the Receiver. I think even usingtokio::task::spawn_local
and using aCell
for interior mutability wouldn't work because the cell would need to be "locked" across an .await so would just panic at run time.Would it be possible then to have some new mechanism for disposing of messages without holding a reference to the original Receiver? Perhaps a new
.create_message_disposer()
method onReceiver
, which returns a newMessageDisposer
struct which hasdispose
/accept
/etc. methods?The text was updated successfully, but these errors were encountered: