Skip to content

Commit

Permalink
blah
Browse files Browse the repository at this point in the history
  • Loading branch information
glennerichall committed Feb 28, 2024
1 parent 0219ab6 commit 52fc88e
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 65 deletions.
23 changes: 12 additions & 11 deletions octoprint_zupfe/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,21 +203,17 @@ def subscribe_polling(manager, *kwargs):
content = message.json()
interval = content.get('interval')

subscription = manager.add_recipient(transport, interval, *kwargs)
if subscription:
response = {'subscription': subscription}
reply(response)
success = manager.add_recipient(transport, interval, *kwargs)
if success:
reply(RPC_RESPONSE_SUCCESS)
else:
reject(f'Unable to subscribe to {manager.name}')
except Exception as e:
reject(str(e))

def unsubscribe_polling(manager):
def unsubscribe_polling(manager, *kwargs):
try:
content = message.json()
subscription = content['subscription']
subscription = subscription['subscription']
remove = manager.remove_recipient(subscription)
remove = manager.remove_recipient(transport, *kwargs)
if remove:
reply(RPC_RESPONSE_SUCCESS)
else:
Expand All @@ -234,7 +230,12 @@ async def on_request_start_camera():
reject(str(e))

async def on_request_stop_camera():
unsubscribe_polling(plugin.mjpeg_manager)
try:
content = message.json()
camera_id = content['cameraId']
unsubscribe_polling(plugin.mjpeg_manager, camera_id)
except Exception as e:
reject(str(e))

async def on_request_receive_progress():
subscribe_polling(plugin.progress_manager)
Expand All @@ -246,7 +247,7 @@ async def on_request_read_temperatures():
subscribe_polling(plugin.temperature_manager)

async def on_request_stop_temperatures():
unsubscribe_polling(plugin.progress_manager)
unsubscribe_polling(plugin.temperature_manager)

event_handlers = {
EVENT_PRINTER_LINKED: on_linked,
Expand Down
19 changes: 5 additions & 14 deletions octoprint_zupfe/loops/mjpeg_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@ class MjpegStreamManager:
def __init__(self, plugin):
self._plugin = plugin
self._managers = {}
self._subscriptions = {}

def add_recipient(self, transport, interval, camera_id = None):
webcam_to_stream = None
for webcam in self._plugin.stream_webcams:
if webcam.id == camera_id:
webcam_to_stream = webcam

subscription = None
if webcam_to_stream is not None:
if not camera_id in self._managers:
self._managers[camera_id] = MjpegManager(self._plugin, webcam_to_stream)
Expand All @@ -57,26 +55,19 @@ def add_recipient(self, transport, interval, camera_id = None):
if not manager.running:
self._managers[camera_id].start()

subscription = manager.add_recipient(transport, interval)
self._subscriptions[subscription] = camera_id
manager.add_recipient(transport, interval)

return subscription
return True


def remove_recipient(self, subscription):
if subscription not in self._subscriptions:
return False

camera_id = self._subscriptions[subscription]
self._subscriptions.pop(subscription)

def remove_recipient(self, transport, camera_id):
if not camera_id in self._managers:
return False
else:
manager = self._managers[camera_id]
self._plugin.logger.debug(f"Unregistering subscription {subscription} from camera {camera_id}")
self._plugin.logger.debug(f"Unregistering transport {transport.uuid} ({transport.type}) from camera {camera_id}")

manager.remove_recipient(subscription)
manager.remove_recipient(transport)

if manager.is_done:
self._plugin.logger.debug(f"Camera {camera_id} has no more recipients, stopping thread")
Expand Down
6 changes: 3 additions & 3 deletions octoprint_zupfe/loops/polling_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ def add_recipient(self, transport, interval=1):

return self._thread.add_transport(transport, interval)

def remove_recipient(self, subscription):
def remove_recipient(self, transport, *kwargs):
result = False
if self._thread is not None:
self._plugin.logger.debug(f"Unregistering subscription {subscription} from {self._name} loop")
result = self._thread.remove_subscription(subscription)
self._plugin.logger.debug(f"Unregistering transport {transport.uuid} ({transport.type}) from {self._name} loop")
result = self._thread.remove_transport(transport)
if self._thread.is_done:
self._plugin.logger.debug(f'Loop {self._name} is done, flushing it')
self._thread = None
Expand Down
43 changes: 6 additions & 37 deletions octoprint_zupfe/loops/polling_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,16 @@ def log_recipients(self):
for uuid in list(self._recipients.keys()):
recipient = self._recipients[uuid]
transport_type = recipient.get('transport').type
subscriptions = recipient.get('subscriptions')
message += f'({transport_type}:{uuid}: [{subscriptions}] ) '
message += f'({transport_type}:{uuid} ) '

logger.debug(f'Recipient(s) of {self._name} : {message}')

def add_transport(self, transport, interval=1):
uuid = transport.uuid
subscription = random.randint(1, max_safe_integer_js - 1)

self._subscriptions[subscription] = uuid
if uuid is not None and uuid not in self._recipients:
remove_on_close_handler = transport.on_close(lambda _transport: self.remove_transport(_transport))
self._recipients[uuid] = {
'subscriptions': set(),
'interval': interval,
'transport': transport,
'remove_callback': remove_on_close_handler,
Expand All @@ -60,10 +56,8 @@ def add_transport(self, transport, interval=1):
else:
logger.debug(f'Transport {transport.uuid} ({transport.type}) is already a recipient of {self._name}')

self._recipients[uuid]['subscriptions'].add(subscription)
self.log_recipients()

return subscription
return True

@property
def is_done(self):
Expand Down Expand Up @@ -91,38 +85,13 @@ def remove_transport(self, transport):
recipient['remove_callback']()
self._recipients.pop(uuid)
logger.debug(f'Transport {transport.uuid} ({transport.type}) removed from {self._name}')
else:
logger.debug(f'Transport {transport.uuid} ({transport.type}) not in {self._name}, not removed')

self.log_recipients()
self.stop_if_empty()
return True

def remove_subscription(self, subscription):
uuid = None
found = False

if subscription in self._subscriptions:
uuid = self._subscriptions[subscription]
self._subscriptions.pop(subscription)

if uuid in self._recipients:
recipient = self._recipients.get(uuid)
transport = recipient['transport']
subscriptions = recipient['subscriptions']

if subscription in subscriptions:
found = True

subscriptions.remove(subscription)
logger.debug(
f'Subscription {subscription} on transport {transport.uuid} ({transport.type}) removed from recipients of {self._name}')

if len(subscriptions) == 0:
logger.debug(
f'Transport {transport.uuid} ({transport.type}) has no more subscriptions so removing it from {self._name}')
self.remove_transport(transport)

else:
logger.debug(f'Could not remove subscription {subscription} from recipients of {self._name}')

return found

def validate_and_evict_transport(self, name, recipient, error=None):
if should_evict_transport(name, recipient, error):
Expand Down

0 comments on commit 52fc88e

Please sign in to comment.