Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Ring Buffer and remove pull subscriber #171

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,6 @@ Subscriber
.. autoclass:: zenoh.Subscriber
:members:

PullSubscriber
--------------
.. autoclass:: zenoh.PullSubscriber
:members:

Reliability
-----------
.. autoclass:: zenoh.Reliability
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ fn zenoh(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<session::_Session>()?;
m.add_class::<session::_Publisher>()?;
m.add_class::<session::_Subscriber>()?;
m.add_class::<session::_PullSubscriber>()?;
m.add_class::<session::_Scout>()?;
m.add_class::<queryable::_Query>()?;
m.add_class::<queryable::_Queryable>()?;
Expand Down
35 changes: 1 addition & 34 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use zenoh::{
prelude::{sync::SyncResolve, SessionDeclarations},
publication::Publisher,
scouting::Scout,
subscriber::{PullSubscriber, Subscriber},
subscriber::Subscriber,
Session,
};

Expand Down Expand Up @@ -226,30 +226,6 @@ impl _Session {
Ok(_Subscriber(subscriber))
}

#[pyo3(signature = (key_expr, callback, **kwargs))]
pub fn declare_pull_subscriber(
&self,
key_expr: &_KeyExpr,
callback: &PyAny,
kwargs: Option<&PyDict>,
) -> PyResult<_PullSubscriber> {
let callback: PyClosure<(_Sample,)> = <_ as TryInto<_>>::try_into(callback)?;
let mut builder = self
.0
.declare_subscriber(&key_expr.0)
.pull_mode()
.with(callback);
if let Some(kwargs) = kwargs {
match kwargs.extract_item::<_Reliability>("reliability") {
Ok(reliabilty) => builder = builder.reliability(reliabilty.0),
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
}
let subscriber = builder.res().map_err(|e| e.to_pyerr())?;
Ok(_PullSubscriber(subscriber))
}

pub fn zid(&self) -> _ZenohId {
_ZenohId(self.0.zid())
}
Expand Down Expand Up @@ -290,15 +266,6 @@ impl _Publisher {
#[pyclass(subclass)]
pub struct _Subscriber(Subscriber<'static, ()>);

#[pyclass(subclass)]
pub struct _PullSubscriber(PullSubscriber<'static, ()>);
#[pymethods]
impl _PullSubscriber {
fn pull(&self) -> PyResult<()> {
self.0.pull().res_sync().map_err(|e| e.to_pyerr())
}
}

#[pyclass(subclass)]
pub struct _Scout(Scout<()>);

Expand Down
2 changes: 1 addition & 1 deletion zenoh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .zenoh import init_logger, scout as _scout
from .keyexpr import IntoKeyExpr, IntoSelector, KeyExpr, Selector
from .config import Config
from .session import Session, Publisher, Subscriber, PullSubscriber, Info
from .session import Session, Publisher, Subscriber, Info
from .enums import CongestionControl, Encoding, Priority, QueryConsolidation, QueryTarget, Reliability, SampleKind
from .value import Hello, Value, IntoValue, IValue, Sample, IntoSample, ZenohId, Timestamp, Reply
from .closures import Closure, IClosure, IntoClosure, Handler, IHandler, IntoHandler, ListCollector, Queue
Expand Down
59 changes: 1 addition & 58 deletions zenoh/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#
from typing import Union, Any, List

from .zenoh import _Session, _Config, _Publisher, _Subscriber, _PullSubscriber
from .zenoh import _Session, _Config, _Publisher, _Subscriber

from .keyexpr import KeyExpr, IntoKeyExpr, Selector, IntoSelector
from .config import Config
Expand Down Expand Up @@ -66,34 +66,6 @@ def undeclare(self):
self._subscriber_ = None


class PullSubscriber:
"""
A handle to a pull subscription.

Its main purpose is to keep the subscription active as long as it exists.

When constructed through ``Session.declare_pull_subscriber(session, keyexpr, handler)``, it exposes ``handler``'s receiver
through ``self.receiver``.

Calling ``self.pull()`` will prompt the Zenoh network to send a new sample when available.
"""

def __init__(self, s: _PullSubscriber, receiver=None):
self._subscriber_ = s
self.receiver = receiver

def pull(self):
"""
Prompts the Zenoh network to send a new sample if available.
Note that this sample will not be returned by this function, but provided to the handler's callback.
"""
self._subscriber_.pull()

def undeclare(self):
"Undeclares the subscription"
self._subscriber_ = None


class Session(_Session):
"""
A Zenoh Session, the core interraction point with a Zenoh network.
Expand Down Expand Up @@ -342,35 +314,6 @@ def declare_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample,
s = super().declare_subscriber(KeyExpr(keyexpr), handler.closure, **kwargs)
return Subscriber(s, handler.receiver)

def declare_pull_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any], reliability: Reliability = None) -> PullSubscriber:
"""
Declares a pull-mode subscriber, which will receive a single published sample with a key expression intersecting ``keyexpr`` any time its ``pull`` method is called.

These samples are passed to the `handler`'s closure as instances of the `Sample` class.
The `handler` can typically be a queue or a callback.
The `handler`'s receiver is returned as the `receiver` field of the returned `PullSubscriber`.

:param keyexpr: The key expression to subscribe to
:param handler:
:param reliability: the reliability to use when routing the subscribed samples
:rtype: PullSubscriber

:Examples:

>>> import zenoh
>>> s = zenoh.open({})
>>> sub = s.declare_pull_subscriber('key/expression', lambda sample:
... print(f"Received '{sample.key_expr}': '{sample.payload.decode('utf-8')}'"))
...
>>> sub.pull()
"""
handler = Handler(handler, lambda x: Sample._upgrade_(x))
kwargs = dict()
if reliability is not None:
kwargs['reliability'] = reliability
s = super().declare_pull_subscriber(KeyExpr(keyexpr), handler.closure, **kwargs)
return PullSubscriber(s, handler.receiver)

def close(self):
"""Attempts to close the Session.

Expand Down