You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This primarily impacts sources which run indefinitely (HttpClientSourceStage, AppShieldSourceStage, RSSSourceStage)
C++ sources avoid this issue as they have access to the subscriber object and a well behaved source stage will routinely check the subscriber.is_subscribed() method at the beginning of each poll iteration.
Python sources don't have access to the subscriber, and while the pymrc C++ wrapper (python/mrc/_pymrc/src/segment.cpp) can check this there are two problems:
It doesn't stop iterating when it returns a false (easily fixed).
It blocks on each iteration of for (auto next_val : iter_wrapper), so unless the source yields, the C++ code is blocked.
Ideal solution:
The MRC executor should accept an optional on_stage_change callback method
pymrc should wrap this allowing for a Python impl for the callback
Morpheus' pipeline.py should supply a callback which calls Pipeline.stop() when the status changes to a terminal state.
Version
24.10
Which installation method(s) does this occur on?
Source
Describe the bug.
This primarily impacts sources which run indefinitely (
HttpClientSourceStage
,AppShieldSourceStage
,RSSSourceStage
)C++ sources avoid this issue as they have access to the
subscriber
object and a well behaved source stage will routinely check thesubscriber.is_subscribed()
method at the beginning of each poll iteration.Python sources don't have access to the subscriber, and while the pymrc C++ wrapper (
python/mrc/_pymrc/src/segment.cpp
) can check this there are two problems:for (auto next_val : iter_wrapper)
, so unless the source yields, the C++ code is blocked.Ideal solution:
The MRC executor should accept an optional
on_stage_change
callback methodpymrc should wrap this allowing for a Python impl for the callback
Morpheus'
pipeline.py
should supply a callback which callsPipeline.stop()
when the status changes to a terminal state.Minimum reproducible example
Relevant log output
Click here to see error details
Full env printout
Click here to see environment details
Other/Misc.
No response
Code of Conduct
The text was updated successfully, but these errors were encountered: