Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace borg connection manager with thread safe db pool #593

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/translators/base_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self, host, port, db_name):
self.port = port
self.db_name = db_name

def __enter__(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think you need to change this method, the idea should be to replace the connection manager with sqlalchemy "create engine", but without changing interfaces

def __enter__(self, connection, query):
self.setup()
return self

Expand Down
6 changes: 3 additions & 3 deletions src/translators/crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@
class CrateTranslator(sql_translator.SQLTranslator):
NGSI_TO_SQL = NGSI_TO_SQL

def __init__(self, host, port=4200, db_name="ngsi-tsdb"):
super(CrateTranslator, self).__init__(host, port, db_name)
def __init__(self, connection, query , host, port=4200, db_name="ngsi-tsdb"):
super(CrateTranslator, self).__init__(host, connection, query , port, db_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment above

self.logger = logging.getLogger(__name__)
self.dbCacheName = 'crate'
self.ccm = None
self.connection = None
self.cursor = None

def setup(self):
def setup(self, connection, query):
url = "{}:{}".format(self.host, self.port)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, in here, you would use create_engine to get your connection using the proper parameters

self.ccm = ConnectionManager()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here could be an example (but don't take granted it works ;) ):

url = "crate://{}:{}".format(self.host, self.port)
        self.engine = sa.create_engine(url, connect_args={"pool_size": 10})
        self.connection = self.engine.connect()

of course it could be possible to create the two connection pools (the one for crate and the one for timescale), at the app start, and pass them, this would be probably the best option from a tech point, but you would need probably to change quite some code...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated as suggested

Copy link
Contributor Author

@daminichopra daminichopra Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chicco785 @c0c0n3 , I have PR as suggested but some test are failing which are not shown up as in my local env.

self.connection = self.ccm.get_connection('crate')
Expand Down
13 changes: 7 additions & 6 deletions src/translators/sql_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

from cache.factory import get_cache, is_cache_available
from translators.insert_splitter import to_insert_batches
from utils.connection_manager import Borg
from utils.connection_manager import ConnectionManager

# NGSI TYPES
# Based on Orion output because official docs don't say much about these :(
NGSI_DATETIME = 'DateTime'
Expand Down Expand Up @@ -117,9 +118,9 @@ class SQLTranslator(base_translator.BaseTranslator):

start_time = None

def __init__(self, host, port, db_name):
def __init__(self, host, port, db_name, connection, query):
super(SQLTranslator, self).__init__(host, port, db_name)
qcm = QueryCacheManager()
qcm = QueryCacheManager(connection, query)
self.cache = qcm.get_query_cache()
self.default_ttl = None
if self.cache:
Expand Down Expand Up @@ -1781,11 +1782,11 @@ def _remove_from_cache(self, tenant_name, key):
exc_info=True)


class QueryCacheManager(Borg):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you can keep the borg here

class QueryCacheManager(ConnectionManager):
cache = None

def __init__(self):
super(QueryCacheManager, self).__init__()
def __init__(self, connection, query):
super(QueryCacheManager, self ).__init__( connection, query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and leave this unchanged

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated as suggested

if is_cache_available() and self.cache is None:
try:
self.cache = get_cache()
Expand Down
74 changes: 58 additions & 16 deletions src/utils/connection_manager.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,67 @@
class Borg:
_shared_state = {}
from crate import client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should leave this class unchanged, because as said the idea is to replace directly in setup, secondly, you would need to proceed, so to say, in parallel and support both crate and timescale, easy if you keep this unchanged, if you touch this, you would need to adjust also timescale code, since it uses this code

from crate.client.sqlalchemy.dialect import CrateDialect
import sqlalchemy.pool as pool
import argparse
from sqlalchemy import create_engine

def __init__(self):
self.__dict__ = self._shared_state
class ConnectionManager():
"""
Invoke queries to database in parallel.
"""

def __init__(self, connection, query):
self.connection = connection
self.query = query

class ConnectionManager(Borg):
def querydb_dbapi(connection, query):
"""
Submit query to database.
"""
cursor = connection.cursor()
cursor.execute(query)
result = cursor.fetchone()
cursor.close()
return result

connection = {}

def __init__(self):
Borg.__init__(self)
def querydb_sqlalchemy(connection, query):
"""
Submit query to database.
"""
result = connection.execute(query).fetchone()
return result

def set_connection(self, db, connection):
self.connection[db] = connection

def get_connection(self, db):
try:
return self.connection[db]
except KeyError as e:
return None
def get_connection(connection):
parser = argparse.ArgumentParser()
parser.add_argument('--driver')
parser.add_argument('--pool', action='store_true')
args = parser.parse_args()

if args.driver == "dbapi":
# Use DBAPI driver.
if args.pool:
query = querydb_dbapi
# Use a connection pool matching the number of workers.
engine = create_engine("crate://localhost:4200", connect_args={"pool_size": 10})
else:
# Don't use a pool.
engine = create_engine("crate://localhost:4200")
elif args.driver == "sqlalchemy":
if args.pool:
query = querydb_sqlalchemy
# Use a connection pool matching the number of workers.
engine = create_engine("crate://localhost:4200", connect_args={"pool_size": 10})
else:
# Don't use a pool.
engine = create_engine("crate://localhost:4200")
connection = engine.connect()
else:
raise ValueError("Unknown value for --driver: Use 'dbapi' or 'sqlalchemy'.")

# Invoke some database queries.
query = 'SELECT 1;'

def reset_connection(self, db):
self.connection[db] = None
if __name__ == '__main__':
main()