Skip to content

Commit

Permalink
Properly inject db/peer manager into req handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlougheed committed Jun 17, 2020
1 parent 8d95346 commit 5a27c7e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 27 deletions.
22 changes: 14 additions & 8 deletions bento_federation_service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,18 @@ async def post_start_hook(peer_manager: PeerManager):
print(f"[{SERVICE_NAME} {datetime.utcnow()}] Post-start hook finished", flush=True)


# noinspection PyAbstractClass
# noinspection PyAbstractClass,PyAttributeOutsideInit
class PostStartHookHandler(RequestHandler):
def initialize(self, peer_manager):
self.peer_manager = peer_manager

async def get(self):
"""
Handles post-start hook which pings the node registry with the current node's information.
:return:
"""
print(f"[{SERVICE_NAME} {datetime.utcnow()}] Post-start hook invoked via URL request", flush=True)
await post_start_hook(self.application.peer_manager)
await post_start_hook(self.peer_manager)
self.clear()
self.set_status(204)

Expand All @@ -66,15 +69,18 @@ def __init__(self, db, base_path: str):
self.db = db
self.peer_manager = PeerManager(self.db)

super(Application, self).__init__([
args_pm = dict(peer_manager=self.peer_manager)
args_full = dict(db=db, peer_manager=self.peer_manager)

super().__init__([
url(f"{base_path}/service-info", ServiceInfoHandler),
url(f"{base_path}/private/post-start-hook", PostStartHookHandler),
url(f"{base_path}/peers", PeerHandler),
url(f"{base_path}/private/peers/refresh", PeerRefreshHandler),
url(f"{base_path}/private/post-start-hook", PostStartHookHandler, args_pm),
url(f"{base_path}/peers", PeerHandler, args_full),
url(f"{base_path}/private/peers/refresh", PeerRefreshHandler, args_pm),
url(f"{base_path}/dataset-search", DatasetsSearchHandler),
url(f"{base_path}/private/dataset-search/([a-zA-Z0-9\\-_]+)", PrivateDatasetSearchHandler),
url(f"{base_path}/federated-dataset-search", FederatedDatasetsSearchHandler),
url(f"{base_path}/search-aggregate/([a-zA-Z0-9\\-_/]+)", SearchHandler),
url(f"{base_path}/federated-dataset-search", FederatedDatasetsSearchHandler, args_pm),
url(f"{base_path}/search-aggregate/([a-zA-Z0-9\\-_/]+)", SearchHandler, args_pm),
])

if INITIALIZE_IMMEDIATELY:
Expand Down
37 changes: 22 additions & 15 deletions bento_federation_service/peers/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,25 @@

# noinspection PyAbstractClass,PyAttributeOutsideInit
class PeerHandler(RequestHandler):
def initialize(self, db, peer_manager):
self.db = db
self.peer_manager = peer_manager

async def options(self):
self.set_status(204)
await self.finish()

async def get(self):
peers = list(await self.application.peer_manager.get_peers())
self.write({"peers": peers, "last_updated": self.application.peer_manager.last_peers_update.timestamp()})
peers = list(await self.peer_manager.get_peers())
self.write({"peers": peers, "last_updated": self.peer_manager.last_peers_update.timestamp()})

async def post(self):
"""
Handle notifies from other nodes.
"""

c = self.application.db.cursor()
new_pci = self.application.peer_manager.peer_cache_invalidated
c = self.db.cursor()
new_pci = self.peer_manager.peer_cache_invalidated

try:
# Test that the peer's peers can be seen and are providing the correct service type.
Expand All @@ -59,23 +63,23 @@ async def post(self):
await self.finish(forbidden_error("OIDC realm mismatch"))
return

if peer_self in self.application.peer_manager.notifying:
if peer_self in self.peer_manager.notifying:
# Another request is already being processed from the same node. Assume the data is the same...
# TODO: Is this a valid assumption?
self.clear()
self.set_status(200) # TODO: Wrong response code?
return

self.application.peer_manager.notifying.add(peer_self)
self.peer_manager.notifying.add(peer_self)

client = AsyncHTTPClient()

for peer_url in peer_peers:
if peer_url in attempted_contact:
continue

if (peer_url in self.application.peer_manager.last_errored and
datetime.now().timestamp() - self.application.peer_manager.last_errored[peer_url] <
if (peer_url in self.peer_manager.last_errored and
datetime.now().timestamp() - self.peer_manager.last_errored[peer_url] <
LAST_ERRORED_CACHE_TIME):
# Avoid repetitively hitting dead nodes
continue
Expand All @@ -94,20 +98,20 @@ async def post(self):
# Peer two-way communication is possible
new_pci = new_pci or not check_peer_exists(c, peer_url)
insert_or_ignore_peer(c, peer_url)
self.application.db.commit()
self.db.commit()

except Exception as e: # Parse error or HTTP error
# TODO: Better / more compliant error message, don't return early
self.application.peer_manager.last_errored[peer_url] = datetime.now().timestamp()
self.peer_manager.last_errored[peer_url] = datetime.now().timestamp()
print(f"[{SERVICE_NAME} {datetime.now()}] Error when processing notify from peer {peer_url}.\n"
f" Error: {str(e)}", flush=True)

finally:
attempted_contact.add(peer_url)

self.application.peer_manager.notifying.remove(peer_self)
self.peer_manager.notifying.remove(peer_self)

self.application.peer_manager.peer_cache_invalidated = new_pci
self.peer_manager.peer_cache_invalidated = new_pci
self.clear()
self.set_status(204)

Expand All @@ -118,8 +122,11 @@ async def post(self):
await self.finish(bad_request_error("Invalid request body or other Python KeyError")) # TODO: Better msg


# noinspection PyAbstractClass
# noinspection PyAbstractClass,PyAttributeOutsideInit
class PeerRefreshHandler(RequestHandler):
def initialize(self, peer_manager):
self.peer_manager = peer_manager

async def post(self):
"""
Handles refreshing the peer list if a user decides to clear existing nodes.
Expand All @@ -129,8 +136,8 @@ async def post(self):
clear_db_and_insert_fixed_nodes()

# Invalidate in-memory peer cache and force a refresh
self.application.peer_manager.peer_cache_invalidated = True
self.application.peer_manager.get_peers()
self.peer_manager.peer_cache_invalidated = True
self.peer_manager.get_peers()

self.clear()
self.set_status(204)
7 changes: 5 additions & 2 deletions bento_federation_service/search/federated_dataset_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
__all__ = ["FederatedDatasetsSearchHandler"]


# noinspection PyAbstractClass
# noinspection PyAbstractClass,PyAttributeOutsideInit
class FederatedDatasetsSearchHandler(RequestHandler):
def initialize(self, peer_manager):
self.peer_manager = peer_manager

@staticmethod
async def search_worker(peer_queue: Queue, request_body: bytes, responses: list):
client = AsyncHTTPClient()
Expand Down Expand Up @@ -69,7 +72,7 @@ async def post(self):

# Federate out requests

peer_queue = get_new_peer_queue(await self.application.peer_manager.get_peers())
peer_queue = get_new_peer_queue(await self.peer_manager.get_peers())
responses = []
workers = tornado.gen.multi([self.search_worker(peer_queue, self.request.body, responses)
for _ in range(WORKERS)])
Expand Down
7 changes: 5 additions & 2 deletions bento_federation_service/search/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
__all__ = ["SearchHandler"]


# noinspection PyAbstractClass
# noinspection PyAbstractClass,PyAttributeOutsideInit
class SearchHandler(RequestHandler):
def initialize(self, peer_manager):
self.peer_manager = peer_manager

async def search_worker(self, peer_queue: Queue, search_path: str, responses: list):
client = AsyncHTTPClient()

Expand Down Expand Up @@ -53,7 +56,7 @@ async def post(self, search_path: str):
await self.finish(bad_request_error("Invalid request format (missing body)"))
return

peer_queue = get_new_peer_queue(await self.application.peer_manager.get_peers())
peer_queue = get_new_peer_queue(await self.peer_manager.get_peers())
responses = []
workers = tornado.gen.multi([self.search_worker(peer_queue, search_path, responses) for _ in range(WORKERS)])
await peer_queue.join()
Expand Down

0 comments on commit 5a27c7e

Please sign in to comment.