Skip to content

Commit

Permalink
Remove Pull Subscriber.
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYing Kuo <[email protected]>
  • Loading branch information
evshary committed Apr 1, 2024
1 parent 1516153 commit 321f46b
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 99 deletions.
5 changes: 0 additions & 5 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,6 @@ Subscriber
.. autoclass:: zenoh.Subscriber
:members:

PullSubscriber
--------------
.. autoclass:: zenoh.PullSubscriber
:members:

Reliability
-----------
.. autoclass:: zenoh.Reliability
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ fn zenoh(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<session::_Session>()?;
m.add_class::<session::_Publisher>()?;
m.add_class::<session::_Subscriber>()?;
m.add_class::<session::_PullSubscriber>()?;
m.add_class::<session::_Scout>()?;
m.add_class::<queryable::_Query>()?;
m.add_class::<queryable::_Queryable>()?;
Expand Down
35 changes: 1 addition & 34 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use zenoh::{
prelude::{sync::SyncResolve, SessionDeclarations},
publication::Publisher,
scouting::Scout,
subscriber::{PullSubscriber, Subscriber},
subscriber::Subscriber,
Session,
};

Expand Down Expand Up @@ -226,30 +226,6 @@ impl _Session {
Ok(_Subscriber(subscriber))
}

#[pyo3(signature = (key_expr, callback, **kwargs))]
pub fn declare_pull_subscriber(
&self,
key_expr: &_KeyExpr,
callback: &PyAny,
kwargs: Option<&PyDict>,
) -> PyResult<_PullSubscriber> {
let callback: PyClosure<(_Sample,)> = <_ as TryInto<_>>::try_into(callback)?;
let mut builder = self
.0
.declare_subscriber(&key_expr.0)
.pull_mode()
.with(callback);
if let Some(kwargs) = kwargs {
match kwargs.extract_item::<_Reliability>("reliability") {
Ok(reliabilty) => builder = builder.reliability(reliabilty.0),
Err(crate::ExtractError::Other(e)) => return Err(e),
_ => {}
}
}
let subscriber = builder.res().map_err(|e| e.to_pyerr())?;
Ok(_PullSubscriber(subscriber))
}

pub fn zid(&self) -> _ZenohId {
_ZenohId(self.0.zid())
}
Expand Down Expand Up @@ -290,15 +266,6 @@ impl _Publisher {
#[pyclass(subclass)]
pub struct _Subscriber(Subscriber<'static, ()>);

#[pyclass(subclass)]
pub struct _PullSubscriber(PullSubscriber<'static, ()>);
#[pymethods]
impl _PullSubscriber {
fn pull(&self) -> PyResult<()> {
self.0.pull().res_sync().map_err(|e| e.to_pyerr())
}
}

#[pyclass(subclass)]
pub struct _Scout(Scout<()>);

Expand Down
2 changes: 1 addition & 1 deletion zenoh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .zenoh import init_logger, scout as _scout
from .keyexpr import IntoKeyExpr, IntoSelector, KeyExpr, Selector
from .config import Config
from .session import Session, Publisher, Subscriber, PullSubscriber, Info
from .session import Session, Publisher, Subscriber, Info
from .enums import CongestionControl, Encoding, Priority, QueryConsolidation, QueryTarget, Reliability, SampleKind
from .value import Hello, Value, IntoValue, IValue, Sample, IntoSample, ZenohId, Timestamp, Reply
from .closures import Closure, IClosure, IntoClosure, Handler, IHandler, IntoHandler, ListCollector, Queue
Expand Down
59 changes: 1 addition & 58 deletions zenoh/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#
from typing import Union, Any, List

from .zenoh import _Session, _Config, _Publisher, _Subscriber, _PullSubscriber
from .zenoh import _Session, _Config, _Publisher, _Subscriber

from .keyexpr import KeyExpr, IntoKeyExpr, Selector, IntoSelector
from .config import Config
Expand Down Expand Up @@ -66,34 +66,6 @@ def undeclare(self):
self._subscriber_ = None


class PullSubscriber:
"""
A handle to a pull subscription.
Its main purpose is to keep the subscription active as long as it exists.
When constructed through ``Session.declare_pull_subscriber(session, keyexpr, handler)``, it exposes ``handler``'s receiver
through ``self.receiver``.
Calling ``self.pull()`` will prompt the Zenoh network to send a new sample when available.
"""

def __init__(self, s: _PullSubscriber, receiver=None):
self._subscriber_ = s
self.receiver = receiver

def pull(self):
"""
Prompts the Zenoh network to send a new sample if available.
Note that this sample will not be returned by this function, but provided to the handler's callback.
"""
self._subscriber_.pull()

def undeclare(self):
"Undeclares the subscription"
self._subscriber_ = None


class Session(_Session):
"""
A Zenoh Session, the core interraction point with a Zenoh network.
Expand Down Expand Up @@ -342,35 +314,6 @@ def declare_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample,
s = super().declare_subscriber(KeyExpr(keyexpr), handler.closure, **kwargs)
return Subscriber(s, handler.receiver)

def declare_pull_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any], reliability: Reliability = None) -> PullSubscriber:
"""
Declares a pull-mode subscriber, which will receive a single published sample with a key expression intersecting ``keyexpr`` any time its ``pull`` method is called.
These samples are passed to the `handler`'s closure as instances of the `Sample` class.
The `handler` can typically be a queue or a callback.
The `handler`'s receiver is returned as the `receiver` field of the returned `PullSubscriber`.
:param keyexpr: The key expression to subscribe to
:param handler:
:param reliability: the reliability to use when routing the subscribed samples
:rtype: PullSubscriber
:Examples:
>>> import zenoh
>>> s = zenoh.open({})
>>> sub = s.declare_pull_subscriber('key/expression', lambda sample:
... print(f"Received '{sample.key_expr}': '{sample.payload.decode('utf-8')}'"))
...
>>> sub.pull()
"""
handler = Handler(handler, lambda x: Sample._upgrade_(x))
kwargs = dict()
if reliability is not None:
kwargs['reliability'] = reliability
s = super().declare_pull_subscriber(KeyExpr(keyexpr), handler.closure, **kwargs)
return PullSubscriber(s, handler.receiver)

def close(self):
"""Attempts to close the Session.
Expand Down

0 comments on commit 321f46b

Please sign in to comment.