Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose more SessionOptions from libbmq #7

Merged
merged 13 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/api_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ Session
.. autoclass:: Session
:members:

.. autoclass:: Timeouts
:members:
:member-order: bysource

.. autoclass:: SessionOptions
:members:
:member-order: bysource

.. autoclass:: QueueOptions
:members:
:member-order: bysource
Expand Down
1 change: 1 addition & 0 deletions news/8.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Exposed more `SessionOptions` from libbmq
pniedzielski marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 4 additions & 0 deletions src/blazingmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from ._monitors import BasicHealthMonitor
from ._session import QueueOptions
from ._session import Session
from ._session import SessionOptions
from ._timeouts import Timeouts
from ._typing import PropertyTypeDict
from ._typing import PropertyValueDict
from .exceptions import Error
Expand All @@ -42,6 +44,8 @@
"Message",
"MessageHandle",
"Session",
"SessionOptions",
"Timeouts",
"__version__",
"exceptions",
"session_events",
Expand Down
8 changes: 7 additions & 1 deletion src/blazingmq/_ext.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ from blazingmq import CompressionAlgorithmType
from blazingmq import Message
from blazingmq import MessageHandle
from blazingmq import PropertyType
from blazingmq import Timeouts
from blazingmq.session_events import SessionEvent

DEFAULT_MAX_UNCONFIRMED_MESSAGES: int = ...
Expand All @@ -44,7 +45,12 @@ class Session:
on_message: Optional[Callable[[Message, MessageHandle], None]] = None,
broker: bytes,
message_compression_algorithm: CompressionAlgorithmType,
timeout: Optional[float] = None,
num_processing_threads: Optional[int] = None,
blob_buffer_size: Optional[int] = None,
channel_high_watermark: Optional[int] = None,
event_queue_watermarks: Optional[tuple[int, int]] = None,
stats_dump_interval: Optional[int | float] = None,
timeouts: Timeouts = (Timeouts()),
pniedzielski marked this conversation as resolved.
Show resolved Hide resolved
monitor_host_health: bool = False,
fake_host_health_monitor: Optional[FakeHostHealthMonitor] = None,
) -> None: ...
Expand Down
43 changes: 39 additions & 4 deletions src/blazingmq/_ext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import logging
import weakref

from bsl cimport optional
from bsl cimport pair
from bsl cimport shared_ptr
from bsl.bsls cimport TimeInterval
from cpython.ceval cimport PyEval_InitThreads
Expand All @@ -43,6 +44,7 @@ from . import _callbacks
from . import _enums
from . import _messages
from . import _script_name
from . import _timeouts
from . import session_events
from .exceptions import BrokerTimeoutError
from .exceptions import Error
Expand Down Expand Up @@ -164,15 +166,40 @@ cdef class Session:
on_message=None,
broker not None: bytes = b'tcp://localhost:30114',
message_compression_algorithm not None=_enums.CompressionAlgorithmType.NONE,
timeout: Optional[int|float] = None,
num_processing_threads: Optional[int] = None,
blob_buffer_size: Optional[int] = None,
channel_high_watermark: Optional[int] = None,
event_queue_watermarks: Optional[tuple[int,int]] = None,
stats_dump_interval: Optional[int|float] = None,
timeouts: _timeouts.Timeouts = (_timeouts.Timeouts()),
pniedzielski marked this conversation as resolved.
Show resolved Hide resolved
monitor_host_health: bool = False,
fake_host_health_monitor: FakeHostHealthMonitor = None,
_mock: Optional[object] = None,
) -> None:
cdef shared_ptr[ManualHostHealthMonitor] fake_host_health_monitor_sp
cdef optional[int] c_num_processing_threads
cdef optional[int] c_blob_buffer_size
cdef optional[int] c_channel_high_watermark
cdef optional[pair[int,int]] c_event_queue_watermarks
cdef TimeInterval c_stats_dump_interval = create_time_interval(stats_dump_interval)
cdef TimeInterval c_connect_timeout = create_time_interval(timeouts.connect_timeout)
cdef TimeInterval c_disconnect_timeout = create_time_interval(timeouts.disconnect_timeout)
cdef TimeInterval c_open_queue_timeout = create_time_interval(timeouts.open_queue_timeout)
cdef TimeInterval c_configure_queue_timeout = create_time_interval(timeouts.configure_queue_timeout)
cdef TimeInterval c_close_queue_timeout = create_time_interval(timeouts.close_queue_timeout)

PyEval_InitThreads()

if num_processing_threads is not None:
c_num_processing_threads = optional[int](num_processing_threads)
if blob_buffer_size is not None:
c_blob_buffer_size = optional[int](blob_buffer_size)
if channel_high_watermark is not None:
c_channel_high_watermark = optional[int](channel_high_watermark)
if event_queue_watermarks is not None:
c_event_queue_watermarks = optional[pair[int,int]](
pair[int,int](event_queue_watermarks[0], event_queue_watermarks[1]))

self.monitor_host_health = monitor_host_health

if fake_host_health_monitor:
Expand All @@ -194,21 +221,29 @@ cdef class Session:
cdef char *c_broker_uri = broker
script_name = _script_name.get_script_name()
cdef char *c_script_name = script_name
cdef TimeInterval c_timeout = create_time_interval(timeout)
self._session = new NativeSession(
session_cb,
message_cb,
ack_cb,
c_broker_uri,
c_script_name,
COMPRESSION_ALGO_FROM_PY_MAPPING[message_compression_algorithm],
c_timeout,
c_num_processing_threads,
c_blob_buffer_size,
c_channel_high_watermark,
c_event_queue_watermarks,
c_stats_dump_interval,
c_connect_timeout,
c_disconnect_timeout,
c_open_queue_timeout,
c_configure_queue_timeout,
c_close_queue_timeout,
monitor_host_health,
fake_host_health_monitor_sp,
Error,
BrokerTimeoutError,
_mock)
self._session.start(c_timeout)
self._session.start(c_connect_timeout)
atexit.register(ensure_stop_session_impl, weakref.ref(self))

def stop(self) -> None:
Expand Down
Loading
Loading