diff --git a/cli/dbapi/README.md b/cli/dbapi/README.md new file mode 100644 index 0000000..f9ef8d9 --- /dev/null +++ b/cli/dbapi/README.md @@ -0,0 +1,26 @@ +# Quick Start + +Use the DBAPI interface to query Upsolver: + +```python +import cli.dbapi as upsolver +conn=upsolver.connect( + token='', + apiurl='' +) +cur = conn.cursor() +cur.execute('SELECT * FROM catalog.database.transformed_data LIMIT 10') +rows = cur.fetchall() +``` + +The DBAPI implementation in `cli.dbapi` provides methods to retrieve fewer +rows for example `Cursor.fetchone()` or `Cursor.fetchmany()`. By default +`Cursor.fetchmany()` fetches one row. Set +`cli.dbapi.Cursor.arraysize` to fetch specific number of rows with `Cursor.fetchmany()`. + +After the `Cursor.execute` was invoked, descriptive attributes become available. +The `Cursor.rowcount` contains the number of rows produced by execute. +The `Cursor.description` contains the description of columns produced by execute. + +This implementation also provides `Cursor.executefile` that is not part of DBAPI, +but can be used the same as a file can be used in `CLI`. \ No newline at end of file diff --git a/cli/dbapi/__init__.py b/cli/dbapi/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cli/dbapi/connection.py b/cli/dbapi/connection.py new file mode 100644 index 0000000..82dcf9f --- /dev/null +++ b/cli/dbapi/connection.py @@ -0,0 +1,82 @@ +""" +Implementation of connection by the Python DBAPI 2.0 as described in +https://www.python.org/dev/peps/pep-0249/ . + +""" + +from cli.dbapi.logging_utils import logger +from cli.dbapi.exceptions import NotSupportedError, InterfaceError +from cli.dbapi.cursor import Cursor, check_closed + +from cli.upsolver.query import RestQueryApi +from cli.upsolver.requester import Requester +from cli.upsolver.poller import DBAPIResponsePoller +from cli.upsolver.auth_filler import TokenAuthFiller +from cli.utils import convert_time_str +from cli.errors import InvalidOptionErr + + +def connect(token, api_url): + logger.debug(f"pep249 Creating connection for object ") + return Connection(token, api_url) + + +def get_duration_in_seconds(time): + if type(time) == float: + return time + if type(time) == int: + return float(time) + if type(time) == str: + try: + return convert_time_str(None, None, time) + except InvalidOptionErr: + raise InterfaceError + raise InterfaceError + + +class Connection: + """A PEP 249 compliant Connection protocol.""" + + def __init__(self, token, api_url, timeout_sec='60s'): + self._api = RestQueryApi( + requester=Requester( + base_url=api_url, + auth_filler=TokenAuthFiller(token) + ), + poller_builder=lambda to_sec: DBAPIResponsePoller(max_time_sec=to_sec) + ) + + self._timeout = get_duration_in_seconds(timeout_sec) + self._closed = False + + @check_closed + def cursor(self): + logger.debug(f"pep249 Cursor creating for object {self.__class__.__name__}") + + if self._api is None: + raise InterfaceError + + return Cursor(self) + + @check_closed + def close(self) -> None: + logger.debug(f"pep249 close {self.__class__.__name__}") + self._closed = True + + @property + def closed(self) -> bool: + return self._closed + + def commit(self): + raise NotSupportedError + + def rollback(self): + raise NotSupportedError + + @check_closed + def query(self, command): + logger.debug(f"pep249 Execute query") + if self._api is None: + raise InterfaceError + + return self._api.execute(command, self._timeout) diff --git a/cli/dbapi/cursor.py b/cli/dbapi/cursor.py new file mode 100644 index 0000000..af83197 --- /dev/null +++ b/cli/dbapi/cursor.py @@ -0,0 +1,236 @@ +""" +Implementation of cursor by the Python DBAPI 2.0 as described in +https://www.python.org/dev/peps/pep-0249/ . + +""" +from functools import wraps +from pathlib import Path +from typing import Optional, Sequence, Type, Union + +from cli.dbapi.logging_utils import logger +from cli.dbapi.exceptions import NotSupportedError, InterfaceError +from cli.dbapi.types_definitions import ( + QueryParameters, + ResultRow, + ResultSet, + SQLQuery, + ColumnDescription, + ProcName, + ProcArgs, +) + + +def check_closed(func): + @wraps(func) + def wrapped(self, *args, **kwargs): + if self.closed: + raise InterfaceError + return func(self, *args, **kwargs) + return wrapped + + +class Cursor: + """A PEP 249 compliant Cursor protocol.""" + def __init__(self, connection): + self._connection = connection + self._arraysize = 1 + self._rowcount = -1 + self._description = None + self._iterator = None + self._closed = False + + @check_closed + def execute(self, operation: SQLQuery, parameters: Optional[QueryParameters] = None): + """ + Execute an SQL query. Values may be bound by passing parameters + as outlined in PEP 249. + + """ + logger.debug(f"pep249 execute {self.__class__.__name__} query '{operation}'") + if parameters is not None: + raise NotSupportedError + + query_response = self._connection.query(operation) + return self._prepare_query_results(query_response) + + @check_closed + def executefile(self, file_path: str): + """ + Execute an SQL query from file. + """ + logger.debug(f"pep249 executefile {self.__class__.__name__} file '{file_path}'") + + p = Path(file_path) + if not p.exists(): + raise InterfaceError + operation = p.read_text() + return self.execute(operation) + + def _prepare_query_results(self, query_response): + first_response = next(query_response) + self._rowcount = -1 if first_response.get('has_next_page', True) else len(first_response['data']) + self._description = [(c['name'], c['columnType'].get('clazz'), None, None, None, None, None) + for c in first_response['columns']] + self._iterator = self._generate_rows(first_response, query_response) + return self._iterator + + @staticmethod + def _generate_rows(first_page, next_pages): + if 'data' in first_page: + for row in first_page['data']: + yield row + elif first_page.get('message'): + yield first_page.get('message') + + for next_page in next_pages: + if 'data' in next_page: + for row in next_page['data']: + yield row + elif next_page.get('message'): + yield next_page.get('message') + + def executemany(self, operation: SQLQuery, seq_of_parameters: Sequence[QueryParameters]): + raise NotSupportedError + + def callproc(self, procname: ProcName, parameters: Optional[ProcArgs] = None) -> Optional[ProcArgs]: + raise NotSupportedError + + @property + @check_closed + def description(self) -> Optional[Sequence[ColumnDescription]]: + """ + A read-only attribute returning a sequence containing a description + (a seven-item sequence) for each column in the result set. The first + item of the sequence is a column name, the second is a column type, + which is always STRING in current implementation, other items are not + meaningful. + + If no execute has been performed or there is no result set, return None. + """ + logger.debug(f"pep249 description {self.__class__.__name__}") + return self._description + + @property + @check_closed + def rowcount(self) -> int: + """ + If no execute has been performed or the rowcount cannot be determined, + this should return -1. + """ + logger.debug(f"pep249 rowcount {self.__class__.__name__}") + return self._rowcount + + @property + @check_closed + def arraysize(self) -> int: + """ + An attribute specifying the number of rows to fetch at a time with + `fetchmany`. + + Defaults to 1, meaning fetch a single row at a time. + """ + logger.debug(f"pep249 arraysize {self.__class__.__name__}") + + return self._arraysize + + @arraysize.setter + @check_closed + def arraysize(self, value: int): + logger.debug(f"pep249 arraysize {self.__class__.__name__}") + + if value > 0: + self._arraysize = value + else: + raise InterfaceError + + @check_closed + def fetchone(self) -> Optional[ResultRow]: + """ + Fetch the next row from the query result set as a sequence of Python + types (or return None when no more rows are available). + + If the previous call to `execute` did not produce a result set, an + error can be raised. + + """ + logger.debug(f"pep249 fetchone {self.__class__.__name__}") + + if self._iterator is None: + raise InterfaceError + + try: + return next(self._iterator) + except StopIteration: + return None + + @check_closed + def fetchmany(self, size: Optional[int] = None) -> Optional[ResultSet]: + """ + Fetch the next `size` rows from the query result set as a list + of sequences of Python types. + + If the size parameter is not supplied, the arraysize property will + be used instead. + + If rows in the result set have been exhausted, an an empty list + will be returned. If the previous call to `execute` did not + produce a result set, an error can be raised. + + """ + logger.debug(f"pep249 fetchmany {self.__class__.__name__}") + + if self._iterator is None: + raise InterfaceError + + result = [] + for _ in range(size or self.arraysize): + row = self.fetchone() + if row is None: + break + result.append(row) + + return result + + @check_closed + def fetchall(self) -> ResultSet: + """ + Fetch the remaining rows from the query result set as a list of + sequences of Python types. + + If rows in the result set have been exhausted, an an empty list + will be returned. If the previous call to `execute` did not + produce a result set, an error can be raised. + + """ + logger.debug(f"pep249 fetchall {self.__class__.__name__}") + + if self._iterator is None: + raise InterfaceError + + result = [] + while True: + row = self.fetchone() + if row is None: + break + result.append(row) + + return result + + @check_closed + def nextset(self) -> Optional[bool]: + raise NotSupportedError + + def setinputsizes(self, sizes: Sequence[Optional[Union[int, Type]]]) -> None: + raise NotSupportedError + + def setoutputsize(self, size: int, column: Optional[int]) -> None: + raise NotSupportedError + + @check_closed + def close(self) -> None: + logger.debug(f"pep249 close {self.__class__.__name__}") + self._closed = True + + @property + def closed(self) -> bool: + return self._closed diff --git a/cli/dbapi/exceptions.py b/cli/dbapi/exceptions.py new file mode 100644 index 0000000..8ce7f0c --- /dev/null +++ b/cli/dbapi/exceptions.py @@ -0,0 +1,81 @@ +""" +Exceptions outlined in PEP 249. + +""" + + +class Error(BaseException): + """Base error outlined in PEP 249.""" + + +class InterfaceError(Error): + """ + Interface error outlined in PEP 249. + + Raised for errors with the database interface. + + """ + + +class DatabaseError(Error, RuntimeError): + """ + Database error outlined in PEP 249. + + Raised for errors with the database. + + """ + + +class DataError(DatabaseError): + """ + Data error outlined in PEP 249. + + Raised for errors that are due to problems with processed data. + + """ + + +class OperationalError(DatabaseError): + """ + Operational error outlined in PEP 249. + + Raised for errors in the database's operation. + + """ + + +class IntegrityError(DatabaseError): + """ + Integrity error outlined in PEP 249. + + Raised when errors occur which affect the relational integrity of + the database (e.g. constraint violations). + + """ + + +class InternalError(DatabaseError): + """ + Integrity error outlined in PEP 249. + + Raised when the database encounters an internal error. + + """ + + +class ProgrammingError(DatabaseError): + """ + Programming error outlined in PEP 249. + + Raised for SQL programming errors. + + """ + + +class NotSupportedError(DatabaseError, NotImplementedError): + """ + Not supported error outlined in PEP 249. + + Raised when an unsupported operation is attempted. + + """ diff --git a/cli/dbapi/logging_utils.py b/cli/dbapi/logging_utils.py new file mode 100644 index 0000000..cc94404 --- /dev/null +++ b/cli/dbapi/logging_utils.py @@ -0,0 +1,10 @@ +import logging +from sys import stderr + +LOG_NAME = "pep249-upsolver" +DEFAULT_LOGLEVEL = logging.DEBUG +logger = logging.getLogger(LOG_NAME) +logger.setLevel(DEFAULT_LOGLEVEL) +handler = logging.StreamHandler(stderr) +handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s")) +logger.addHandler(handler) diff --git a/cli/dbapi/types_definitions.py b/cli/dbapi/types_definitions.py new file mode 100644 index 0000000..bd94dea --- /dev/null +++ b/cli/dbapi/types_definitions.py @@ -0,0 +1,59 @@ +""" +Type aliases useful for the abstract DB API implementations. + +""" +from typing import Any, Dict, Optional, Sequence, Tuple, Union + +__all__ = [ + "SQLQuery", + "QueryParameters", + "ResultRow", + "ResultSet", + "ColumnDescription", + "ProcName", + "ProcArgs", +] + +# An SQL query/command, e.g. a complete script or a parameterised +# statement. +SQLQuery = str + +# Parameters to be passed to the query. +# https://www.python.org/dev/peps/pep-0249/#paramstyle +# For the typical 'qmark' style, these will typically be passed as a +# sequence of types (although this could be a dict with integer keys). +# For other styles, parameters can be passed as a dict of string to +# other types. +QueryParameters = Union[Sequence[Any], Dict[Union[str, int], Any]] + +# A row of returned types. Typically a tuple of Python types which match +# the database column types, sometimes a dict of column name to value. +ResultRow = Union[Sequence[Any], Dict[str, Any]] +# A sequence of result rows - the full set or part of a set. +ResultSet = Sequence[ResultRow] + +# The description attributes. +# https://www.python.org/dev/peps/pep-0249/#description +# There isn't a particularly good reference for these, it is +# common to supply only the first two. +Name = str +TypeCode = type +DisplaySize = int +InternalSize = int +Precision = int +Scale = int +NullOK = bool +# The full description. +ColumnDescription = Tuple[ + Name, + TypeCode, + Optional[DisplaySize], + Optional[InternalSize], + Optional[Precision], + Optional[Scale], + Optional[NullOK], +] + +# Used with Cursor.callproc. +ProcName = str +ProcArgs = Sequence[Any] diff --git a/cli/upsolver/poller.py b/cli/upsolver/poller.py index 25bf3c6..19c56b2 100644 --- a/cli/upsolver/poller.py +++ b/cli/upsolver/poller.py @@ -40,6 +40,18 @@ def __init__(self, self.wait_interval_sec = wait_interval_sec self.max_time_sec = max_time_sec + @staticmethod + def _get_result_from_json(rjson): + if 'result' in rjson: + result = rjson['result'] + grid = result['grid'] # columns, data, ... + column_names = [c['name'] for c in grid['columns']] + data_w_columns: ExecutionResult = [dict(zip(column_names, row)) for row in grid['data']] + + return data_w_columns, result.get('next') + else: + return [rjson], None + def _get_result_helper(self, requester: Requester, resp: UpsolverResponse, @@ -94,15 +106,7 @@ def extract_json() -> dict: start_time=start_time, ) - if 'result' in rjson: - result = rjson['result'] - grid = result['grid'] # columns, data, ... - column_names = [c['name'] for c in grid['columns']] - data_w_columns: ExecutionResult = [dict(zip(column_names, row)) for row in grid['data']] - - return data_w_columns, result.get('next') - else: - return [rjson], None + return self._get_result_from_json(rjson) def __call__(self, requester: Requester, resp: UpsolverResponse) -> \ tuple: @@ -115,3 +119,17 @@ def __call__(self, requester: Requester, resp: UpsolverResponse) -> \ be delivered, and can be retrieved using the returned path. """ return self._get_result_helper(requester, resp, start_time=time.time()) + + +class DBAPIResponsePoller(SimpleResponsePoller): + def __init__(self, *args, **kwargs): + SimpleResponsePoller.__init__(self, *args, **kwargs) + + @staticmethod + def _get_result_from_json(rjson): + if 'result' in rjson: + result = rjson['result'] + result['grid']['has_next_page'] = result.get('next') is not None + return result['grid'], result.get('next') + else: + return rjson, None