From b1586cdad910dbfc8c9da35f10afebe98c00dbcd Mon Sep 17 00:00:00 2001 From: daminichopra Date: Tue, 14 Dec 2021 10:21:38 +0000 Subject: [PATCH 1/4] Replace borg connection manager with thread safe db pool --- src/translators/base_translator.py | 2 +- src/translators/crate.py | 6 +-- src/translators/sql_translator.py | 13 +++--- src/utils/connection_manager.py | 74 +++++++++++++++++++++++------- 4 files changed, 69 insertions(+), 26 deletions(-) diff --git a/src/translators/base_translator.py b/src/translators/base_translator.py index 4d367c2a..a011f40b 100644 --- a/src/translators/base_translator.py +++ b/src/translators/base_translator.py @@ -17,7 +17,7 @@ def __init__(self, host, port, db_name): self.port = port self.db_name = db_name - def __enter__(self): + def __enter__(self, connection, query): self.setup() return self diff --git a/src/translators/crate.py b/src/translators/crate.py index ff5379bf..4b6fe37b 100644 --- a/src/translators/crate.py +++ b/src/translators/crate.py @@ -43,15 +43,15 @@ class CrateTranslator(sql_translator.SQLTranslator): NGSI_TO_SQL = NGSI_TO_SQL - def __init__(self, host, port=4200, db_name="ngsi-tsdb"): - super(CrateTranslator, self).__init__(host, port, db_name) + def __init__(self, connection, query , host, port=4200, db_name="ngsi-tsdb"): + super(CrateTranslator, self).__init__(host, connection, query , port, db_name) self.logger = logging.getLogger(__name__) self.dbCacheName = 'crate' self.ccm = None self.connection = None self.cursor = None - def setup(self): + def setup(self, connection, query): url = "{}:{}".format(self.host, self.port) self.ccm = ConnectionManager() self.connection = self.ccm.get_connection('crate') diff --git a/src/translators/sql_translator.py b/src/translators/sql_translator.py index 9c29e37f..8173558d 100644 --- a/src/translators/sql_translator.py +++ b/src/translators/sql_translator.py @@ -15,7 +15,8 @@ from cache.factory import get_cache, is_cache_available from translators.insert_splitter import to_insert_batches -from utils.connection_manager import Borg +from utils.connection_manager import ConnectionManager + # NGSI TYPES # Based on Orion output because official docs don't say much about these :( NGSI_DATETIME = 'DateTime' @@ -117,9 +118,9 @@ class SQLTranslator(base_translator.BaseTranslator): start_time = None - def __init__(self, host, port, db_name): + def __init__(self, host, port, db_name, connection, query): super(SQLTranslator, self).__init__(host, port, db_name) - qcm = QueryCacheManager() + qcm = QueryCacheManager(connection, query) self.cache = qcm.get_query_cache() self.default_ttl = None if self.cache: @@ -1781,11 +1782,11 @@ def _remove_from_cache(self, tenant_name, key): exc_info=True) -class QueryCacheManager(Borg): +class QueryCacheManager(ConnectionManager): cache = None - def __init__(self): - super(QueryCacheManager, self).__init__() + def __init__(self, connection, query): + super(QueryCacheManager, self ).__init__( connection, query) if is_cache_available() and self.cache is None: try: self.cache = get_cache() diff --git a/src/utils/connection_manager.py b/src/utils/connection_manager.py index 7c79ff87..5edcbbe6 100644 --- a/src/utils/connection_manager.py +++ b/src/utils/connection_manager.py @@ -1,25 +1,67 @@ -class Borg: - _shared_state = {} +from crate import client +from crate.client.sqlalchemy.dialect import CrateDialect +import sqlalchemy.pool as pool +import argparse +from sqlalchemy import create_engine - def __init__(self): - self.__dict__ = self._shared_state +class ConnectionManager(): + """ + Invoke queries to database in parallel. + """ + def __init__(self, connection, query): + self.connection = connection + self.query = query -class ConnectionManager(Borg): + def querydb_dbapi(connection, query): + """ + Submit query to database. + """ + cursor = connection.cursor() + cursor.execute(query) + result = cursor.fetchone() + cursor.close() + return result - connection = {} - - def __init__(self): - Borg.__init__(self) + def querydb_sqlalchemy(connection, query): + """ + Submit query to database. + """ + result = connection.execute(query).fetchone() + return result def set_connection(self, db, connection): self.connection[db] = connection - def get_connection(self, db): - try: - return self.connection[db] - except KeyError as e: - return None + def get_connection(connection): + parser = argparse.ArgumentParser() + parser.add_argument('--driver') + parser.add_argument('--pool', action='store_true') + args = parser.parse_args() + + if args.driver == "dbapi": + # Use DBAPI driver. + if args.pool: + query = querydb_dbapi + # Use a connection pool matching the number of workers. + engine = create_engine("crate://localhost:4200", connect_args={"pool_size": 10}) + else: + # Don't use a pool. + engine = create_engine("crate://localhost:4200") + elif args.driver == "sqlalchemy": + if args.pool: + query = querydb_sqlalchemy + # Use a connection pool matching the number of workers. + engine = create_engine("crate://localhost:4200", connect_args={"pool_size": 10}) + else: + # Don't use a pool. + engine = create_engine("crate://localhost:4200") + connection = engine.connect() + else: + raise ValueError("Unknown value for --driver: Use 'dbapi' or 'sqlalchemy'.") + + # Invoke some database queries. + query = 'SELECT 1;' - def reset_connection(self, db): - self.connection[db] = None +if __name__ == '__main__': + main() From ff4c0c2b60cc3d8850d32311ece9206a58a6d6d7 Mon Sep 17 00:00:00 2001 From: daminichopra Date: Sun, 19 Dec 2021 16:47:09 +0000 Subject: [PATCH 2/4] reverted and updated changes --- src/translators/base_translator.py | 2 +- src/translators/crate.py | 13 +++--- src/translators/sql_translator.py | 12 ++--- src/translators/timescale.py | 6 ++- src/utils/connection_manager.py | 74 +++++++----------------------- 5 files changed, 34 insertions(+), 73 deletions(-) diff --git a/src/translators/base_translator.py b/src/translators/base_translator.py index a011f40b..4d367c2a 100644 --- a/src/translators/base_translator.py +++ b/src/translators/base_translator.py @@ -17,7 +17,7 @@ def __init__(self, host, port, db_name): self.port = port self.db_name = db_name - def __enter__(self, connection, query): + def __enter__(self): self.setup() return self diff --git a/src/translators/crate.py b/src/translators/crate.py index 4b6fe37b..cc3f46c0 100644 --- a/src/translators/crate.py +++ b/src/translators/crate.py @@ -12,6 +12,7 @@ NGSI_GEOJSON, NGSI_GEOPOINT, NGSI_TEXT, NGSI_STRUCTURED_VALUE, \ NGSI_LD_GEOMETRY, TIME_INDEX, METADATA_TABLE_NAME, FIWARE_SERVICEPATH import logging +from sqlalchemy import create_engine from .crate_geo_query import from_ngsi_query from utils.cfgreader import EnvReader, StrVar, IntVar, FloatVar from utils.connection_manager import ConnectionManager @@ -43,18 +44,18 @@ class CrateTranslator(sql_translator.SQLTranslator): NGSI_TO_SQL = NGSI_TO_SQL - def __init__(self, connection, query , host, port=4200, db_name="ngsi-tsdb"): - super(CrateTranslator, self).__init__(host, connection, query , port, db_name) + def __init__(self, host, port=4200, db_name="ngsi-tsdb"): + super(CrateTranslator, self).__init__(host, port, db_name) self.logger = logging.getLogger(__name__) self.dbCacheName = 'crate' self.ccm = None self.connection = None self.cursor = None - def setup(self, connection, query): - url = "{}:{}".format(self.host, self.port) - self.ccm = ConnectionManager() - self.connection = self.ccm.get_connection('crate') + def setup(self): + url = "crate://{}:{}".format(self.host, self.port) + self.engine = sa.create_engine(url, connect_args={"pool_size": 10}) + self.connection = self.engine.connect() # Added backoff_factor for retry interval between attempt of # consecutive retries backoff_factor = EnvReader(log=logging.getLogger(__name__).debug) \ diff --git a/src/translators/sql_translator.py b/src/translators/sql_translator.py index 8173558d..9b607c78 100644 --- a/src/translators/sql_translator.py +++ b/src/translators/sql_translator.py @@ -15,7 +15,7 @@ from cache.factory import get_cache, is_cache_available from translators.insert_splitter import to_insert_batches -from utils.connection_manager import ConnectionManager +from utils.connection_manager import Borg # NGSI TYPES # Based on Orion output because official docs don't say much about these :( @@ -118,9 +118,9 @@ class SQLTranslator(base_translator.BaseTranslator): start_time = None - def __init__(self, host, port, db_name, connection, query): + def __init__(self, host, port, db_name): super(SQLTranslator, self).__init__(host, port, db_name) - qcm = QueryCacheManager(connection, query) + qcm = QueryCacheManager() self.cache = qcm.get_query_cache() self.default_ttl = None if self.cache: @@ -1782,11 +1782,11 @@ def _remove_from_cache(self, tenant_name, key): exc_info=True) -class QueryCacheManager(ConnectionManager): +class QueryCacheManager(Borg): cache = None - def __init__(self, connection, query): - super(QueryCacheManager, self ).__init__( connection, query) + def __init__(self): + super(QueryCacheManager, self ).__init__() if is_cache_available() and self.cache is None: try: self.cache = get_cache() diff --git a/src/translators/timescale.py b/src/translators/timescale.py index a574d919..ddc41156 100644 --- a/src/translators/timescale.py +++ b/src/translators/timescale.py @@ -15,6 +15,7 @@ from geocoding.slf.querytypes import SlfQuery import geocoding.slf.wktcodec from utils.cfgreader import * +from sqlalchemy import create_engine from utils.connection_manager import ConnectionManager # POSTGRES TYPES @@ -88,8 +89,9 @@ def __init__(self, conn_data=PostgresConnectionData()): self.dbCacheName = 'timescale' def setup(self): - self.ccm = ConnectionManager() - self.connection = self.ccm.get_connection('timescale') + url = "timescale://{}:{}".format(self.host, self.port) + self.engine = sa.create_engine(url, connect_args={"pool_size": 10}) + self.connection = self.engine.connect() if self.connection is None: try: pg8000.paramstyle = "qmark" diff --git a/src/utils/connection_manager.py b/src/utils/connection_manager.py index 5edcbbe6..7c79ff87 100644 --- a/src/utils/connection_manager.py +++ b/src/utils/connection_manager.py @@ -1,67 +1,25 @@ -from crate import client -from crate.client.sqlalchemy.dialect import CrateDialect -import sqlalchemy.pool as pool -import argparse -from sqlalchemy import create_engine +class Borg: + _shared_state = {} -class ConnectionManager(): - """ - Invoke queries to database in parallel. - """ + def __init__(self): + self.__dict__ = self._shared_state - def __init__(self, connection, query): - self.connection = connection - self.query = query - def querydb_dbapi(connection, query): - """ - Submit query to database. - """ - cursor = connection.cursor() - cursor.execute(query) - result = cursor.fetchone() - cursor.close() - return result +class ConnectionManager(Borg): - def querydb_sqlalchemy(connection, query): - """ - Submit query to database. - """ - result = connection.execute(query).fetchone() - return result + connection = {} + + def __init__(self): + Borg.__init__(self) def set_connection(self, db, connection): self.connection[db] = connection - def get_connection(connection): - parser = argparse.ArgumentParser() - parser.add_argument('--driver') - parser.add_argument('--pool', action='store_true') - args = parser.parse_args() - - if args.driver == "dbapi": - # Use DBAPI driver. - if args.pool: - query = querydb_dbapi - # Use a connection pool matching the number of workers. - engine = create_engine("crate://localhost:4200", connect_args={"pool_size": 10}) - else: - # Don't use a pool. - engine = create_engine("crate://localhost:4200") - elif args.driver == "sqlalchemy": - if args.pool: - query = querydb_sqlalchemy - # Use a connection pool matching the number of workers. - engine = create_engine("crate://localhost:4200", connect_args={"pool_size": 10}) - else: - # Don't use a pool. - engine = create_engine("crate://localhost:4200") - connection = engine.connect() - else: - raise ValueError("Unknown value for --driver: Use 'dbapi' or 'sqlalchemy'.") - - # Invoke some database queries. - query = 'SELECT 1;' + def get_connection(self, db): + try: + return self.connection[db] + except KeyError as e: + return None -if __name__ == '__main__': - main() + def reset_connection(self, db): + self.connection[db] = None From 52a273487a27bdab21a352738f3c1414332c4a5c Mon Sep 17 00:00:00 2001 From: daminichopra Date: Tue, 21 Dec 2021 16:33:16 +0000 Subject: [PATCH 3/4] updated code --- src/translators/crate.py | 8 ++++---- src/translators/sql_translator.py | 5 ++--- src/translators/timescale.py | 1 + 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/translators/crate.py b/src/translators/crate.py index cc3f46c0..6dd35bf8 100644 --- a/src/translators/crate.py +++ b/src/translators/crate.py @@ -12,6 +12,7 @@ NGSI_GEOJSON, NGSI_GEOPOINT, NGSI_TEXT, NGSI_STRUCTURED_VALUE, \ NGSI_LD_GEOMETRY, TIME_INDEX, METADATA_TABLE_NAME, FIWARE_SERVICEPATH import logging +import sqlalchemy as sa from sqlalchemy import create_engine from .crate_geo_query import from_ngsi_query from utils.cfgreader import EnvReader, StrVar, IntVar, FloatVar @@ -48,7 +49,6 @@ def __init__(self, host, port=4200, db_name="ngsi-tsdb"): super(CrateTranslator, self).__init__(host, port, db_name) self.logger = logging.getLogger(__name__) self.dbCacheName = 'crate' - self.ccm = None self.connection = None self.cursor = None @@ -64,11 +64,11 @@ def setup(self): try: self.connection = client.connect( [url], error_trace=True, backoff_factor=backoff_factor) - self.ccm.set_connection('crate', self.connection) + self.engine.set_connection('crate', self.connection) except Exception as e: self.logger.warning(str(e), exc_info=True) raise e - + self.cursor = self.connection.cursor() # TODO this reduce queries to crate, # but only within a single API call to QUANTUMLEAP @@ -92,7 +92,7 @@ def sql_error_handler(self, exception): if analyzer.is_aggregation_error(): return "AggrMethod cannot be applied" if analyzer.is_transient_error(): - self.ccm.reset_connection('crate') + self.engine.reset_connection('crate') self.setup() def get_db_version(self): diff --git a/src/translators/sql_translator.py b/src/translators/sql_translator.py index 9b607c78..a405043e 100644 --- a/src/translators/sql_translator.py +++ b/src/translators/sql_translator.py @@ -1741,9 +1741,8 @@ def _execute_query_via_cache(self, tenant_name, key, stmt, parameters=None, self.logger.warning("Caching not available, metadata data may " "not be consistent: " + str(e), exc_info=True) - - self.cursor.execute(stmt, parameters) - res = self.cursor.fetchall() + res = self.connection.execute(stmt, parameters).fetchall() + #res = self.cursor.fetchall() if res and self.cache: try: self._cache(tenant_name, key, res, ex) diff --git a/src/translators/timescale.py b/src/translators/timescale.py index ddc41156..89476ec1 100644 --- a/src/translators/timescale.py +++ b/src/translators/timescale.py @@ -14,6 +14,7 @@ import geocoding.slf.jsoncodec from geocoding.slf.querytypes import SlfQuery import geocoding.slf.wktcodec +import sqlalchemy as sa from utils.cfgreader import * from sqlalchemy import create_engine from utils.connection_manager import ConnectionManager From 9b442f3059ac1d4cd92d3b877be3069b3c544e64 Mon Sep 17 00:00:00 2001 From: daminichopra Date: Mon, 27 Dec 2021 15:59:19 +0000 Subject: [PATCH 4/4] rebase --- src/translators/crate.py | 5 +---- src/translators/sql_translator.py | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/translators/crate.py b/src/translators/crate.py index 6dd35bf8..06675eb3 100644 --- a/src/translators/crate.py +++ b/src/translators/crate.py @@ -55,7 +55,6 @@ def __init__(self, host, port=4200, db_name="ngsi-tsdb"): def setup(self): url = "crate://{}:{}".format(self.host, self.port) self.engine = sa.create_engine(url, connect_args={"pool_size": 10}) - self.connection = self.engine.connect() # Added backoff_factor for retry interval between attempt of # consecutive retries backoff_factor = EnvReader(log=logging.getLogger(__name__).debug) \ @@ -64,12 +63,10 @@ def setup(self): try: self.connection = client.connect( [url], error_trace=True, backoff_factor=backoff_factor) - self.engine.set_connection('crate', self.connection) except Exception as e: self.logger.warning(str(e), exc_info=True) raise e - - self.cursor = self.connection.cursor() + # TODO this reduce queries to crate, # but only within a single API call to QUANTUMLEAP # we need to think if we want to cache this information diff --git a/src/translators/sql_translator.py b/src/translators/sql_translator.py index a405043e..4a87885d 100644 --- a/src/translators/sql_translator.py +++ b/src/translators/sql_translator.py @@ -1741,8 +1741,8 @@ def _execute_query_via_cache(self, tenant_name, key, stmt, parameters=None, self.logger.warning("Caching not available, metadata data may " "not be consistent: " + str(e), exc_info=True) - res = self.connection.execute(stmt, parameters).fetchall() - #res = self.cursor.fetchall() + self.cursor.execute(stmt, parameters) + res = self.cursor.fetchall() if res and self.cache: try: self._cache(tenant_name, key, res, ex)