Skip to content

Commit

Permalink
improving asyncio handling for neo4j bolt driver, reverting uvicorn u…
Browse files Browse the repository at this point in the history
…pgrade
  • Loading branch information
EvanDietzMorris committed Oct 8, 2024
1 parent 7537ba2 commit 489e68e
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 21 deletions.
2 changes: 1 addition & 1 deletion PLATER/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ fastapi==0.85.0
pyaml==20.4.0
pytest==8.3.3
pytest-asyncio==0.24.0
uvicorn[standard]==0.31.0
uvicorn==0.24.0
reasoner-transpiler==2.3.4
reasoner-pydantic==5.0.6
httpx==0.27.2
Expand Down
6 changes: 4 additions & 2 deletions PLATER/services/util/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@
from PLATER.services.config import config


def get_graph_interface():
async def get_graph_interface():
"""Get graph interface."""
return GraphInterface(
graph_interface = GraphInterface(
host=config.get('NEO4J_HOST', 'localhost'),
port=config.get('NEO4J_BOLT_PORT', '7687'),
auth=(
config.get('NEO4J_USERNAME'),
config.get('NEO4J_PASSWORD')
)
)
await graph_interface.connect_to_neo4j()
return graph_interface


def construct_open_api_schema(app, trapi_version, prefix="", plater_title='Plater API'):
Expand Down
22 changes: 16 additions & 6 deletions PLATER/services/util/graph_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import httpx
import time
import neo4j
import asyncio

import neo4j.exceptions
from neo4j import unit_of_work
Expand Down Expand Up @@ -31,17 +30,19 @@ def __init__(self,
self.database_name = database_name
self.database_auth = auth
self.graph_db_uri = f'bolt://{host}:{port}'
self.neo4j_driver = asyncio.run(self.get_async_neo4j_driver())
self.neo4j_driver = None
self.sync_neo4j_driver = None
self._supports_apoc = None

async def connect_to_neo4j(self):
logger.debug('PINGING NEO4J')
self.ping()
logger.debug('CHECKING IF NEO4J SUPPORTS APOC')
self._supports_apoc = None
self.check_apoc_support()
logger.debug(f'SUPPORTS APOC : {self._supports_apoc}')

async def get_async_neo4j_driver(self):
return neo4j.AsyncGraphDatabase.driver(self.graph_db_uri, auth=self.database_auth)
self.neo4j_driver = neo4j.AsyncGraphDatabase.driver(self.graph_db_uri,
auth=self.database_auth,
**{'telemetry_disabled': True})

@staticmethod
@unit_of_work(timeout=NEO4J_QUERY_TIMEOUT)
Expand Down Expand Up @@ -233,6 +234,8 @@ def __init__(self, host: str, port: str, auth: tuple, scheme: str = 'http'):
'Content-Type': 'application/json',
'Authorization': 'Basic %s' % base64.b64encode(f"{auth[0]}:{auth[1]}".encode('utf-8')).decode('utf-8')
}

async def connect_to_neo4j(self):
# ping and raise error if neo4j doesn't respond.
logger.debug('PINGING NEO4J')
self.ping()
Expand Down Expand Up @@ -396,6 +399,9 @@ def __init__(self, host, port, auth, protocol='bolt'):
self.toolkit = get_biolink_model_toolkit()
self.bl_version = config.get('BL_VERSION', '4.2.1')

async def connect_to_neo4j(self):
await self.driver.connect_to_neo4j()

def find_biolink_leaves(self, biolink_concepts: list):
"""
Given a list of biolink concepts, returns the leaves removing any parent concepts.
Expand Down Expand Up @@ -688,3 +694,7 @@ def __init__(self, host, port, auth, protocol='bolt'):
def __getattr__(self, item):
# proxy function calls to the inner object.
return getattr(self.instance, item)

@staticmethod
async def connect_to_neo4j():
await GraphInterface.instance.connect_to_neo4j()
2 changes: 1 addition & 1 deletion PLATER/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@

setup(
name='PLATER',
version='v2.0.6',
version='v2.1.2',
packages=find_packages(),
)
26 changes: 15 additions & 11 deletions PLATER/tests/test_graph_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,25 @@
import os

# TODO - improve these tests
# They are a mess, partially because Neo4jHTTPDriver initialization calls ping() and check_apoc_support() in init
# which means both of those calls need to be mocked every time. That should either be consolidated with a fixture kind
# of solution, or those things could be moved outside of the init. More importantly, we should have tests for the bolt
# protocol and driver. It's questionable how helpful these mocked tests are for testing the driver(s) anyway though.
# They are a mess, partially because Neo4jHTTPDriver connect_to_neo4j calls ping() and check_apoc_support() in init
# which means both of those calls need to be mocked every time either is called. More importantly, we should have tests
# for the bolt protocol driver. It's questionable how helpful these mocked tests are for testing the driver(s) anyway.


def test_neo4j_http_driver_ping_success(httpx_mock: HTTPXMock):
@pytest.mark.asyncio
async def test_neo4j_http_driver_ping_success(httpx_mock: HTTPXMock):
httpx_mock.add_response(url="http://localhost:7474/", method="GET", status_code=200)
httpx_mock.add_response(url="http://localhost:7474/db/neo4j/tx/commit", method="POST", status_code=200)
driver = Neo4jHTTPDriver(host='localhost', port='7474', auth=('neo4j', 'somepass'))
await driver.connect_to_neo4j()


def test_neo4j_http_driver_ping_fail(httpx_mock: HTTPXMock):
@pytest.mark.asyncio
async def test_neo4j_http_driver_ping_fail(httpx_mock: HTTPXMock):
httpx_mock.add_response(url="http://localhost:7474/", method="GET", status_code=500)
try:
driver = Neo4jHTTPDriver(host='localhost', port='7474', auth=('neo4j', 'somepass'))
await driver.connect_to_neo4j()
assert False
except RuntimeError:
assert True
Expand All @@ -32,6 +35,7 @@ async def test_neo4j_http_driver_run_cypher(httpx_mock: HTTPXMock):
httpx_mock.add_response(url="http://localhost:7474/", method="GET", status_code=200)
httpx_mock.add_response(url="http://localhost:7474/db/neo4j/tx/commit", method="POST", status_code=200)
driver = Neo4jHTTPDriver(host='localhost', port='7474', auth=('neo4j', 'somepass'))
await driver.connect_to_neo4j()

test_response = {"some": "response"}
query = "some test cypher"
Expand All @@ -51,6 +55,7 @@ async def test_neo4j_http_driver_run_cypher(httpx_mock: HTTPXMock):
async def test_neo4j_http_driver_run_cypher_fail(httpx_mock: HTTPXMock):
httpx_mock.add_response(url="http://localhost:7474/", method="GET", status_code=200)
driver = Neo4jHTTPDriver(host='localhost', port='7474', auth=('neo4j', 'somepass'))
await driver.connect_to_neo4j()
test_response = {"errors": "some_error"}
query = "some test cypher"

Expand Down Expand Up @@ -87,6 +92,7 @@ async def test_neo4j_http_driver_apoc(httpx_mock: HTTPXMock):
}).encode('utf-8'), json={}
)
driver = Neo4jHTTPDriver(host='localhost', port='7474', auth=('neo4j', 'somepass'))
await driver.connect_to_neo4j()
assert driver.check_apoc_support() is True

httpx_mock.add_response(url="http://localhost:7474/", method="GET", status_code=200)
Expand All @@ -99,6 +105,7 @@ async def test_neo4j_http_driver_apoc(httpx_mock: HTTPXMock):
]
}).encode('utf-8'), json={"errors": "apoc not supported"})
driver = Neo4jHTTPDriver(host='localhost', port='7474', auth=('neo4j', 'somepass'))
await driver.connect_to_neo4j()
assert driver.check_apoc_support() is False

@pytest.mark.asyncio
Expand Down Expand Up @@ -127,8 +134,6 @@ async def test_driver_convert_to_dict():

@pytest.mark.asyncio
async def test_graph_interface_biolink_leaves(httpx_mock: HTTPXMock):
httpx_mock.add_response(url="http://localhost:7474/", method="GET", status_code=200)
httpx_mock.add_response(url="http://localhost:7474/db/neo4j/tx/commit", method="POST", status_code=200)
gi = GraphInterface('localhost', '7474', auth=('neo4j', ''), protocol='http')
list_1 = [
"biolink:SmallMolecule",
Expand All @@ -148,8 +153,6 @@ async def test_graph_interface_biolink_leaves(httpx_mock: HTTPXMock):

@pytest.mark.asyncio
async def test_graph_interface_predicate_inverse(httpx_mock: HTTPXMock):
httpx_mock.add_response(url="http://localhost:7474/", method="GET", status_code=200)
httpx_mock.add_response(url="http://localhost:7474/db/neo4j/tx/commit", method="POST", status_code=200)
gi = GraphInterface('localhost', '7474', auth=('neo4j', ''), protocol='http')
non_exist_predicate = "biolink:some_predicate"
assert gi.invert_predicate(non_exist_predicate) is None
Expand All @@ -171,7 +174,8 @@ async def test_graph_interface_get_schema(httpx_mock: HTTPXMock):
httpx_mock.add_response(url="http://localhost:7474/db/neo4j/tx/commit", method="POST", status_code=200)

gi = GraphInterface('localhost', '7474', auth=('neo4j', ''), protocol='http')
with open( os.path.join(os.path.dirname(__file__), 'data', 'schema_cypher_response.json'))as f:
await gi.connect_to_neo4j()
with open(os.path.join(os.path.dirname(__file__), 'data', 'schema_cypher_response.json'))as f:
get_schema_response_json = json.load(f)
httpx_mock.add_response(url="http://localhost:7474/db/neo4j/tx/commit", method="POST", status_code=200,
match_content=json.dumps({
Expand Down

0 comments on commit 489e68e

Please sign in to comment.