Skip to content

Commit

Permalink
fixed reconnection to server
Browse files Browse the repository at this point in the history
  • Loading branch information
glennerichall committed Apr 23, 2024
1 parent c74ca81 commit 9ec8988
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 84 deletions.
2 changes: 1 addition & 1 deletion octoprint_zupfe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self):
@property
def version(self):
# TODO get the version from setup.py
return "o.0.1.1"
return "o.0.1.2"

@property
def host(self):
Expand Down
40 changes: 40 additions & 0 deletions octoprint_zupfe/expo_backoff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import asyncio
import random
import time



class ExponentialBackoff:
def __init__(self, base_interval=1, max_interval=60, jitter_factor=0.5):
self.base_interval = base_interval
self.max_interval = max_interval
self.jitter_factor = jitter_factor
self.current_interval = base_interval
self.last_call_time = time.time()

def consume(self):
current_time = time.time()
# Check if wait was called after a 10-minute interval
if current_time - self.last_call_time >= 600:
self.current_interval = self.base_interval
else:
# Increase the backoff interval, up to the max_interval
self.current_interval = min(self.current_interval * 2, self.max_interval)

# Apply jitter
jitter = self.current_interval * self.jitter_factor * random.random()
backoff_with_jitter = self.current_interval + jitter - (self.jitter_factor / 2 * self.current_interval)
backoff_with_jitter = max(self.base_interval, backoff_with_jitter)

return backoff_with_jitter, current_time

def sleep(self):
backoff_with_jitter, current_time = self.consume()
time.sleep(backoff_with_jitter)
self.last_call_time = current_time

async def sleepAsync(self):
backoff_with_jitter, current_time = self.consume()
await asyncio.sleep(backoff_with_jitter)
self.last_call_time = current_time

26 changes: 26 additions & 0 deletions octoprint_zupfe/loops/frame_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import time

class FrameLimiter:
def __init__(self, max_rate_per_second):
self._stream_info = {
'lastFrameTime': 0,
'frameCount': 0,
'maxRate': max_rate_per_second,
}

def set_max_rate(self, max_rate_per_second):
self._stream_info['maxRate'] = max_rate_per_second

def accept(self, points=1):
current_time = int(time.time() * 1000)
stream_info = self._stream_info

elapsed_time = current_time - stream_info['lastFrameTime']
desired_interval = 1000 / (stream_info['maxRate'] * points)

if elapsed_time >= desired_interval or stream_info['frameCount'] == 0:
stream_info['lastFrameTime'] = current_time
stream_info['frameCount'] += 1
return True

return False
7 changes: 5 additions & 2 deletions octoprint_zupfe/loops/mjpeg_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from octoprint_zupfe.loops.frame_limiter import FrameLimiter
from octoprint_zupfe.loops.polling_manager import PollingManager
from octoprint_zupfe.messaging.message_builder import MessageBuilder
from octoprint_zupfe.loops.polling_thread import PollingThread
Expand All @@ -13,10 +14,12 @@ def poll(self):
self.on_polling_started()
stream_id = self._webcam.id
builder = MessageBuilder()
limiter = FrameLimiter(6)

def receive_frame(frame):
message = builder.new_mjpeg_frame(frame, stream_id)
self.send_frame(message['buffer'])
if limiter.accept():
message = builder.new_mjpeg_frame(frame, stream_id)
self.send_frame(message['buffer'])

def is_done():
return self._done
Expand Down
4 changes: 3 additions & 1 deletion octoprint_zupfe/loops/polling_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ def start(self):
if self._thread is None:
self._plugin.logger.debug(f'Starting {self._name} thread')
self._thread = self.create_thread(self._plugin)
self._thread.start()

self._thread.start()


@property
def name(self):
Expand Down
17 changes: 8 additions & 9 deletions octoprint_zupfe/loops/polling_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

def should_evict_transport(name, recipient, error=None):
transport = recipient['transport']
if recipient['missed_frames'] > 100:
if recipient['missed_frames'] > 50:
logger.debug(
"Unable to send stream from loop %s to recipient %s more than 100 times consecutively, evicting transport" % (
name, transport.uuid))
Expand Down Expand Up @@ -71,7 +71,6 @@ def has_recipients(self):
def running(self):
return not self._done


def stop_if_empty(self):
if not self.has_recipients and self._stop_if_no_recipients:
logger.debug(
Expand All @@ -92,20 +91,20 @@ def remove_transport(self, transport):
self.stop_if_empty()
return True


def validate_and_evict_transport(self, name, recipient, error=None):
if should_evict_transport(name, recipient, error):
self.remove_transport(recipient['transport'])

def start(self):
self._done = False
thread = threading.Thread(target=self.poll)
self._thread = thread
if self._thread is None:
self._done = False
thread = threading.Thread(target=self.poll)
self._thread = thread

# daemon mode is mandatory so threads get killed when server shuts down const client = this.createClient(ws, req);
# daemon mode is mandatory so threads get killed when server shuts down const client = this.createClient(ws, req);

thread.daemon = True
thread.start()
thread.daemon = True
thread.start()

def stop(self):
self._thread = None
Expand Down
2 changes: 1 addition & 1 deletion octoprint_zupfe/loops/progress_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

class ProgressThread(PollingThreadWithInterval):
def __init__(self, plugin):
super().__init__("progress", stop_if_no_recipients=True, interval=0.1)
super().__init__("progress", stop_if_no_recipients=True, interval=1)
self._plugin = plugin
self._p2p = self._plugin.message_factory
self._progress = self._plugin.progress
Expand Down
19 changes: 16 additions & 3 deletions octoprint_zupfe/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,21 @@ async def initialize_backend(plugin):
await plugin.backend.init()
plugin.frontend.emitInitialized()

# FIXME this is really not the best implementation, we need to check for api key validity on ws disconnection
# and wait for the validation in the websocket class before attempting a new connection.
validating_api_key = False

async def validate_api_key():
octo_id = plugin.settings.get('octoprint_id', None)
api_key = plugin.settings.get('api_key', None)

uuid_valid = False
nonlocal validating_api_key

if validating_api_key: return

validating_api_key = True

while not uuid_valid:
if octo_id is None:
plugin.logger.debug('No octoid, asking for a new one')
Expand All @@ -55,6 +65,7 @@ async def validate_api_key():
plugin.settings.save_if_updated('octoprint_id', octo_id)
plugin.settings.save_if_updated('api_key', api_key)
plugin.settings.save_if_updated('linked', False)
plugin.backend.ws.set_credentials(octo_id, api_key)
else:
plugin.backend.set_octo_id(octo_id, api_key)

Expand All @@ -73,11 +84,13 @@ async def validate_api_key():
# self.settings.settings.plugins.zupfe.api_key in zupfe.js does not seem to get settings everytime
plugin.frontend.emitApiKey(api_key)

validating_api_key = False

async def on_close():
plugin.frontend.emitBackendDisconnected()
await validate_api_key()
await asyncio.sleep(1)
connect_ws()
nonlocal validating_api_key
if not validating_api_key:
await validate_api_key()

def connect_ws():
try:
Expand Down
2 changes: 0 additions & 2 deletions octoprint_zupfe/static/js/zupfe.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ $(function () {
let settingsRoot = $("#settings_plugin_zupfe");
let navbarRoot = $("#navbar_plugin_zupfe");

console.log('Message from backend ', message);

switch (message.type) {
case EVENT_OCTOPRINT_APIKEY_RECEIVED:
self.api_key(message.api_key)
Expand Down
9 changes: 7 additions & 2 deletions octoprint_zupfe/transport/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import aiohttp

from octoprint_zupfe.expo_backoff import ExponentialBackoff
from octoprint_zupfe.messaging.message_builder import MessageBuilder

logger = logging.getLogger("octoprint.plugins.zupfe")
Expand Down Expand Up @@ -40,10 +41,14 @@ async def request(method,
url,
headers=None,
data=None,
max_retries=float('inf')):
max_retries=float('inf'),
backoff=None):
retries = 0
ok_status = False

if backoff is None:
backoff = ExponentialBackoff()

while retries < max_retries and not ok_status:
session = aiohttp.ClientSession()

Expand All @@ -54,7 +59,7 @@ async def request(method,
except aiohttp.ClientError as e:
logger.debug(f'Request {method} to {url} failed with error: {e}')
retries += 1
await asyncio.sleep(1)
await backoff.sleepAsync()

logger.error(f'Maximum number of retries ({max_retries}) reached. Request {method}: {url} failed.')
return None
Expand Down
Loading

0 comments on commit 9ec8988

Please sign in to comment.