Skip to content

Commit

Permalink
Refactor to replace print by logger.debug (#272)
Browse files Browse the repository at this point in the history
* Refactor to replace print by logger.debug

* bump version

* linting

* linting

* linting

* revert logger refactor

* linting

* Set up pytest version
  • Loading branch information
jsansaloni authored Feb 1, 2024
1 parent 5518cf7 commit 2baccb6
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ coverage: ## generates codecov report
coverage report -m

release: clean install-deploy-requirements sdist ## package and upload a release
twine upload -u mercadonatech dist/*
twine upload -u __token__ dist/*

sdist: clean ## package
python setup.py sdist
Expand Down
2 changes: 1 addition & 1 deletion rele/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "1.14.0b1"
__version__ = "1.14.0"

try:
import django
Expand Down
72 changes: 27 additions & 45 deletions rele/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import signal
import socket
import sys
import threading
import time
from concurrent import futures
from datetime import datetime
Expand All @@ -18,8 +17,12 @@
logger = logging.getLogger(__name__)


class NotConnectionError(BaseException):
pass


def check_internet_connection():
print("Checking connection")
logger.debug("Checking connection")
remote_server = "www.google.com"
port = 80
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand All @@ -33,10 +36,6 @@ def check_internet_connection():
sock.close()


class NotConnectionError(BaseException):
pass


class Worker:
"""A Worker manages the subscriptions which consume Google PubSub messages.
Expand Down Expand Up @@ -73,10 +72,10 @@ def setup(self):
If the subscription already exists, the subscription will not be
re-created. Therefore, it is idempotent.
"""
print(f"[{datetime.now()}][{threading.get_ident()}][start] start setup")
logger.debug(f"[start] start setup")
for subscription in self._subscriptions:
self._subscriber.update_or_create_subscription(subscription)
print(f"[{datetime.now()}][{threading.get_ident()}][setup] end setup")
logger.debug(f"[setup] end setup")

def start(self):
"""Begin consuming all subscriptions.
Expand All @@ -88,25 +87,25 @@ def start(self):
The futures are stored so that they can be cancelled later on
for a graceful shutdown of the worker.
"""
print(f"[{datetime.now()}][{threading.get_ident()}][start] start start")
logger.debug(f"[start] start start")
run_middleware_hook("pre_worker_start")
for subscription in self._subscriptions:
self._boostrap_consumption(subscription)
run_middleware_hook("post_worker_start")
print(f"[{datetime.now()}][{threading.get_ident()}][start] end start")
logger.debug(f"[start] end start")

def run_forever(self, sleep_interval=1):
"""Shortcut for calling setup, start, and _wait_forever.
:param sleep_interval: Number of seconds to sleep in the ``while True`` loop
"""
print(f"[{datetime.now()}][{threading.get_ident()}][run_forever] setup")
logger.debug(f"[run_forever] setup")
self.setup()
print(f"[{datetime.now()}][{threading.get_ident()}][run_forever] start")
logger.debug(f"[run_forever] start")
self.start()
print(f"[{datetime.now()}][{threading.get_ident()}][run_forever] wait for ever")
logger.debug(f"[run_forever] wait for ever")
self._wait_forever(sleep_interval=sleep_interval)
print(f"[{datetime.now()}][{threading.get_ident()}][run_forever] finish")
logger.debug(f"[run_forever] finish")

def stop(self, signal=None, frame=None):
"""Manage the shutdown process of the worker.
Expand All @@ -133,33 +132,27 @@ def stop(self, signal=None, frame=None):
sys.exit(0)

def _boostrap_consumption(self, subscription):
print(
f"[{datetime.now()}][{threading.get_ident()}][_boostrap_consumption][0] "
f"subscription {subscription.name}"
)
logger.debug(f"[_boostrap_consumption][0] " f"subscription {subscription.name}")

if subscription in self._futures:
print(
f"[{datetime.now()}][{threading.get_ident()}]"
logger.debug(
f"[_boostrap_consumption][1] subscription {subscription.name} "
f"futures in [{self._futures[subscription]._state}]"
)
self._futures[subscription].cancel()
print(
f"[{datetime.now()}][{threading.get_ident()}]"
logger.debug(
f"[_boostrap_consumption][2] subscription {subscription.name} "
"future cancelled"
)
self._futures[subscription].result()
print(
f"[{datetime.now()}][{threading.get_ident()}]"
logger.debug(
f"[_boostrap_consumption][3] subscription {subscription.name} "
"future cancelled and result"
)

if not check_internet_connection():
print(
f"[{datetime.now()}][{threading.get_ident()}] Not internet "
logger.debug(
f"Not internet "
f"connection when boostrap a consumption for {subscription}"
)
raise NotConnectionError
Expand All @@ -175,39 +168,31 @@ def _boostrap_consumption(self, subscription):
callback=Callback(subscription),
scheduler=scheduler,
)
print(
f"[{datetime.now()}][{threading.get_ident()}][_boostrap_consumption][3] "
logger.debug(
f"[_boostrap_consumption][3] "
f"subscription {subscription.name} future in "
f"[{self._futures[subscription]._state}]"
)

def _wait_forever(self, sleep_interval):
logger.info("Consuming subscriptions...")
while True:
print(
f"[{datetime.now()}][{threading.get_ident()}]"
f"[_wait_forever][0] Futures: {self._futures.values()}"
)
logger.debug(f"[_wait_forever][0] Futures: {self._futures.values()}")

if datetime.now().timestamp() % 50 < 1 and not check_internet_connection():
print(
f"[{datetime.now()}][{threading.get_ident()}] "
"Not internet connection, raising an Exception"
)
logger.debug("Not internet connection, raising an Exception")
raise NotConnectionError

for subscription, future in self._futures.items():
if future.cancelled() or future.done():
print(
f"[{datetime.now()}][{threading.get_ident()}]"
logger.debug(
"[_wait_forever][1] Restarting consumption "
f"of {subscription.name}."
)
logger.info(f"Restarting consumption of {subscription.name}.")
self._boostrap_consumption(subscription)

print(
f"[{datetime.now()}][{threading.get_ident()}]"
logger.debug(
f"[_wait_forever][2] Sleep {sleep_interval} "
f"second(s) with futures: {self._futures.values()}"
)
Expand Down Expand Up @@ -238,12 +223,9 @@ def create_and_run(subs, config):
:param subs: List :class:`~rele.subscription.Subscription`
:param config: :class:`~rele.config.Config`
"""
print(
f"[{datetime.now()}][{threading.get_ident()}]"
f"Configuring worker with {len(subs)} subscription(s)..."
)
logger.debug(f"" f"Configuring worker with {len(subs)} subscription(s)...")
for sub in subs:
print(f"[{datetime.now()}][{threading.get_ident()}] {sub}")
print(f"Subscription: {sub}")
worker = Worker(
subs,
config.gc_project_id,
Expand Down
2 changes: 1 addition & 1 deletion requirements/test.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pytest>=4.6.0
pytest==7.3.1
pytest-cov>=2.7.0
pytest-django>=3.5
coverage>=4.4.0
Expand Down

0 comments on commit 2baccb6

Please sign in to comment.