diff --git a/README.md b/README.md index e72c942..5b6bcc8 100644 --- a/README.md +++ b/README.md @@ -17,44 +17,46 @@ To uninstall the library, you can use the command: python3 -m pip uninstall questdb_query ``` -## Basic Usage, querying into Numpy +## Basic Usage, querying into Pandas Once installed, you can use the library to query a QuestDB database. Here's an example that demonstrates how to query -CPU utilization data using the library against a database running on `localhost`. +CPU utilization data using the library against a database running on `localhost` on the default HTTP port (9000). ```python -from questdb_query import numpy_query +from questdb_query import pandas_query -np_arrs = numpy_query(''' - select - timestamp, hostname, datacenter, usage_user, usage_nice - from - cpu - limit 10''') +df = pandas_query('select * from cpu limit 1000') ``` -The `np_arrs` object is a python `dict` which holds a numpy array per column, keyed by column name: +This allows you, for example, to pre-aggregate results: + ```python ->>> np_arrs -{'timestamp': array(['2016-01-01T00:00:00.000000000', '2016-01-01T00:00:10.000000000', - '2016-01-01T00:00:20.000000000', '2016-01-01T00:00:30.000000000', - '2016-01-01T00:00:40.000000000', '2016-01-01T00:00:50.000000000', - '2016-01-01T00:01:00.000000000', '2016-01-01T00:01:10.000000000', - '2016-01-01T00:01:20.000000000', '2016-01-01T00:01:30.000000000'], - dtype='datetime64[ns]'), 'hostname': array(['host_0', 'host_1', 'host_2', 'host_3', 'host_4', 'host_5', - 'host_6', 'host_7', 'host_8', 'host_9'], dtype=object), 'datacenter': array(['ap-southeast-2b', 'eu-west-1b', 'us-west-1b', 'us-west-2c', - 'us-west-2b', 'eu-west-1b', 'eu-west-1b', 'us-west-1a', - 'ap-southeast-2a', 'us-east-1a'], dtype=object), 'usage_user': array([1.39169048, 0.33846369, 0. , 1.81511203, 0.84273104, - 0. , 0. , 0.28085548, 0. , 1.37192634]), 'usage_nice': array([0.30603088, 1.21496673, 0. , 0.16688796, 0. , - 2.77319521, 0.40332488, 1.81585253, 1.92844804, 2.12841919])} +>>> df = df[['region', 'usage_user', 'usage_nice']].groupby('region').mean() +>>> df + usage_user usage_nice +region +ap-northeast-1 8.163766 6.492334 +ap-southeast-1 6.511215 7.341863 +ap-southeast-2 6.788770 6.257839 +eu-central-1 7.392642 6.416479 +eu-west-1 7.213417 7.185956 +sa-east-1 7.143568 5.925026 +us-east-1 7.620643 7.243553 +us-west-1 6.286770 6.531977 +us-west-2 6.228692 6.439672 ``` -If we wanted to calculate a (rather non-sensical) weighted average of `usage_user` and `usage_nice` we can -do this by accessing the `numpy` columns: +You can then switch over to numpy with a simple and fast conversion: ```python ->>> np_arrs['usage_user'].dot(np_arrs['usage_nice'].T) -4.5700692045031985 +>>> from questdb_query import pandas_to_numpy +>>> np_arrs = pandas_to_numpy(df) +>>> np_arrs +{'usage_user': array([8.16376556, 6.51121543, 6.78876964, 7.3926419 , 7.21341716, + 7.14356839, 7.62064304, 6.28677006, 6.22869169]), 'usage_nice': array([6.49233392, 7.34186348, 6.25783903, 6.41647863, 7.18595643, + 5.92502642, 7.24355328, 6.53197733, 6.43967247]), 'region': array(['ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2', + 'eu-central-1', 'eu-west-1', 'sa-east-1', 'us-east-1', 'us-west-1', + 'us-west-2'], dtype=object)} ``` ## Querying a remote database @@ -62,15 +64,22 @@ do this by accessing the `numpy` columns: If your database is running on a remote host, specify an endpoint: ```python -from questdb_query import numpy_query, Endpoint +from questdb_query import pandas_query, Endpoint -endpoint = Endpoint(host='your.hostname.com', https=True, username='user', password='pass') +endpoint = Endpoint(host='your.hostname.com', port=22453, https=True, username='user', password='pass') np_arrs = numpy_query('select * from cpu limit 10', endpoint) ``` Note how the example above enables HTTPS and specifies a username and password for authentication. +The port is optional and defaults to 9000 for HTTP and 443 for HTTPS. + +Alternatively, if the server is set up with token-based authentication you can use the `token` parameter: + +```python +endpoint = Endpoint(host='your.hostname.com', https=True, token='your_token') +``` ## Chunks: Query Parallelism @@ -99,47 +108,58 @@ _Read on for more details on benchmarking: This is covered later in this README > To avoid consistency issues formulate the query so that it only queries data that is not changing. > You can do this, for example, by specifying a `timestamp` range in the `WHERE` clause. -## Querying into Pandas +## Querying into Numpy -You can also query into Pandas: +You can also query directly into a dictionary of Numpy arrays. -```python -from questdb_query import pandas_query, Endpoint +Notice that Numpy's datatypes are more limited than Panadas, specifically in the +handling of null values. -endpoint = Endpoint(host='your.hostname.com', https=True, username='user', password='pass') +This is a simple shorthand for querying into Pandas and then converting to Numpy: -df = pandas_query('select * from cpu limit 1000', endpoint) +```python +def numpy_query(query: str, endpoint: Endpoint = None, + chunks: int = 1, timeout: int = None) -> dict[str, np.array]: + df = pandas_query(query, endpoint, chunks, timeout) + return pandas_to_numpy(df) ``` -This allows you, for example, to pre-aggregate results: +To use it, pass the query string to the `numpy_query` function, along with the +same optional parameters as the `pandas_query` function. ```python ->>> df = df[['region', 'usage_user', 'usage_nice']].groupby('region').mean() ->>> df - usage_user usage_nice -region -ap-northeast-1 8.163766 6.492334 -ap-southeast-1 6.511215 7.341863 -ap-southeast-2 6.788770 6.257839 -eu-central-1 7.392642 6.416479 -eu-west-1 7.213417 7.185956 -sa-east-1 7.143568 5.925026 -us-east-1 7.620643 7.243553 -us-west-1 6.286770 6.531977 -us-west-2 6.228692 6.439672 -``` +from questdb_query import numpy_query -You can then switch over to numpy with a simple and fast conversion: +np_arrs = numpy_query(''' + select + timestamp, hostname, datacenter, usage_user, usage_nice + from + cpu + limit 10''') +``` +The `np_arrs` object is a python `dict` which holds a numpy array per column, keyed by column name: ```python ->>> from questdb_query import pandas_to_numpy ->>> np_arrs = pandas_to_numpy(df) >>> np_arrs -{'usage_user': array([8.16376556, 6.51121543, 6.78876964, 7.3926419 , 7.21341716, - 7.14356839, 7.62064304, 6.28677006, 6.22869169]), 'usage_nice': array([6.49233392, 7.34186348, 6.25783903, 6.41647863, 7.18595643, - 5.92502642, 7.24355328, 6.53197733, 6.43967247]), 'region': array(['ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2', - 'eu-central-1', 'eu-west-1', 'sa-east-1', 'us-east-1', 'us-west-1', - 'us-west-2'], dtype=object)} +{'timestamp': array(['2016-01-01T00:00:00.000000000', '2016-01-01T00:00:10.000000000', + '2016-01-01T00:00:20.000000000', '2016-01-01T00:00:30.000000000', + '2016-01-01T00:00:40.000000000', '2016-01-01T00:00:50.000000000', + '2016-01-01T00:01:00.000000000', '2016-01-01T00:01:10.000000000', + '2016-01-01T00:01:20.000000000', '2016-01-01T00:01:30.000000000'], + dtype='datetime64[ns]'), 'hostname': array(['host_0', 'host_1', 'host_2', 'host_3', 'host_4', 'host_5', + 'host_6', 'host_7', 'host_8', 'host_9'], dtype=object), 'datacenter': array(['ap-southeast-2b', 'eu-west-1b', 'us-west-1b', 'us-west-2c', + 'us-west-2b', 'eu-west-1b', 'eu-west-1b', 'us-west-1a', + 'ap-southeast-2a', 'us-east-1a'], dtype=object), 'usage_user': array([1.39169048, 0.33846369, 0. , 1.81511203, 0.84273104, + 0. , 0. , 0.28085548, 0. , 1.37192634]), 'usage_nice': array([0.30603088, 1.21496673, 0. , 0.16688796, 0. , + 2.77319521, 0.40332488, 1.81585253, 1.92844804, 2.12841919])} +``` + +If we wanted to calculate a (rather non-sensical) weighted average of `usage_user` and `usage_nice` we can +do this by accessing the `numpy` columns: + +```python +>>> np_arrs['usage_user'].dot(np_arrs['usage_nice'].T) +4.5700692045031985 ``` ## Benchmarking @@ -149,9 +169,9 @@ You can then switch over to numpy with a simple and fast conversion: Each query result also contains a `Stats` object with the performance summary which you can print. ```python ->>> from questdb_query import numpy_query ->>> np_arrs = numpy_query('select * from cpu', chunks=8) ->>> print(np_arrs.query_stats) +>>> from questdb_query import pandas_query +>>> df = pandas_query('select * from cpu', chunks=8) +>>> print(df.query_stats) Duration: 2.631s Millions of lines: 5.000 Millions of lines/s: 1.901 @@ -162,9 +182,9 @@ MiB/s: 506.381 You can also extract individual fields: ```python ->>> np_arrs.query_stats +>>> df.query_stats Stats(duration_s=2.630711865, line_count=5000000, byte_count=1396853875, throughput_mbs=506.3814407360216, throughput_mlps=1.900626239810569) ->>> np_arrs.query_stats.throughput_mlps +>>> df.query_stats.throughput_mlps 1.900626239810569 ``` diff --git a/questdb_query/asynchronous.py b/questdb_query/asynchronous.py index c8581e2..e9ba189 100644 --- a/questdb_query/asynchronous.py +++ b/questdb_query/asynchronous.py @@ -118,10 +118,19 @@ def _read_csv(): try: if col_type in ('TIMESTAMP', 'DATE'): series = df[col_name] - # Drop the UTC timezone during conversion. - # This allows `.to_numpy()` on the series to - # yield a `datetime64` dtype column. - series = pd.to_datetime(series).dt.tz_convert(None) + # if the series is empty (or full of nulls) its csv-read + # default dtype (float64) is not one which we can + # convert `.to_datetime`, + if series.empty or series.isnull().all(): + # so to work around this we first convert the series + # to Int64 (nullable). + series = series.astype('Int64') + series = pd.to_datetime(series, unit='ns') + else: + # Drop the UTC timezone during conversion. + # This allows `.to_numpy()` on the series to + # yield a `datetime64` dtype column. + series = pd.to_datetime(series).dt.tz_convert(None) df[col_name] = series except Exception as e: print(df[col_name]) diff --git a/questdb_query/tool.py b/questdb_query/tool.py index b099a2a..aa36980 100644 --- a/questdb_query/tool.py +++ b/questdb_query/tool.py @@ -19,6 +19,7 @@ def _parse_args(): parser.add_argument('--https', action='store_true') parser.add_argument('--username', type=str) parser.add_argument('--password', type=str) + parser.add_argument('--token', type=str) parser.add_argument('--chunks', type=int, default=1) parser.add_argument('query', type=str) return parser.parse_args() @@ -30,7 +31,8 @@ def main(args): port=args.port, https=args.https, username=args.username, - password=args.password) + password=args.password, + token=args.token) df = pandas_query(args.query, endpoint, args.chunks) print(df) print() diff --git a/tests/tests.py b/tests/tests.py index cd5b4b9..139390f 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -125,9 +125,10 @@ def load_all_types_table(qdb): long_number, crypto_hash ) VALUES - (1, true, '192.168.1.1', 25, 72, 'A', 1000.5, 'USD', 'Test record 1', '2023-01-01T00:00:00.000Z', '2023-01-01T00:00:00.000000Z', 200.00, '123e4567-e89b-12d3-a456-426614174000', 123456789012345, '0x7fffffffffffffffffffffffffffffff'), - (2, false, NULL, 30, 68, 'B', 1500.3, 'EUR', 'Test record 2', NULL, '2023-01-02T00:00:00.000000Z', 300.00, '123e4567-e89b-12d3-a456-426614174001', 987654321098765, NULL), - (3, NULL, '10.0.0.1', 35, NULL, 'C', NULL, 'JPY', 'Test record 3', '2023-01-03T00:00:00.000Z', '2023-01-03T00:00:00.000000Z', NULL, '123e4567-e89b-12d3-a456-426614174002', NULL, '0x1fffffffffffffffffffffffffffffff'); + -- id active ip_address age temp gra acc_bal curr description record_date event_timestamp revenue user_uuid long_number crypto_hash + (1, true, '192.168.1.1', 25, 72, 'A', 1000.5, 'USD', 'Test record 1', '2023-01-01T00:00:00.000Z', '2023-01-01T00:00:00.000000Z', 200.00, '123e4567-e89b-12d3-a456-426614174000', 123456789012345, '0x7fffffffffffffffffffffffffffffff'), + (2, false, NULL, 30, 68, 'B', 1500.25, 'EUR', NULL, NULL, '2023-01-02T00:00:00.000000Z', 300.00, '123e4567-e89b-12d3-a456-426614174001', 987654321098765, NULL), + (3, NULL, '10.0.0.1', 35, -40, 'C', NULL, 'JPY', 'Test record 3', '2023-01-03T00:00:00.000Z', '2023-01-03T00:00:00.000000Z', NULL, '123e4567-e89b-12d3-a456-426614174002', NULL, '0x1fffffffffffffffffffffffffffffff'); ''') def load_trips_table(qdb): @@ -452,6 +453,71 @@ def test_almost_all_types(self): for key in exp_schema: self.assertEqual((key, exp_schema[key]), (key, schema[key])) + exp_df = pd.DataFrame({ + 'id': pd.Series([1, 2, 3], dtype='Int32'), + 'active': pd.Series([True, False, None], dtype='bool'), + 'ip_address': pd.Series(['192.168.1.1', None, '10.0.0.1'], dtype='string'), + 'age': pd.Series([25, 30, 35], dtype='int8'), + 'temperature': pd.Series([72, 68, -40], dtype='int16'), + 'grade': pd.Series(['A', 'B', 'C'], dtype='string'), + 'account_balance': pd.Series([1000.5, 1500.25, None], dtype='float32'), + 'currency_symbol': pd.Series(['USD', 'EUR', 'JPY'], dtype='string'), + 'description': pd.Series(['Test record 1', None, 'Test record 3'], dtype='string'), + 'record_date': pd.Series(['2023-01-01T00:00:00.000', None, '2023-01-03T00:00:00.000'], dtype='datetime64[ns]'), + 'event_timestamp': pd.Series(['2023-01-01T00:00:00.000000', '2023-01-02T00:00:00.000000', '2023-01-03T00:00:00.000000'], dtype='datetime64[ns]'), + 'revenue': pd.Series([200.00, 300.00, None], dtype='float64'), + 'user_uuid': pd.Series(['123e4567-e89b-12d3-a456-426614174000', '123e4567-e89b-12d3-a456-426614174001', '123e4567-e89b-12d3-a456-426614174002'], dtype='string'), + 'long_number': pd.Series([123456789012345, 987654321098765, None], dtype='Int64'), + 'crypto_hash': pd.Series(['0x7fffffffffffffffffffffffffffffff', None, '0x1fffffffffffffffffffffffffffffff'], dtype='string'), + }) + assert_frame_equal(act, exp_df, check_column_type=True) + + def test_almost_all_types_0_rows(self): + act = self.s_pandas_query('SELECT * FROM almost_all_types WHERE id = 0') + schema = { + name: str(val) + for name, val + in act.dtypes.to_dict().items()} + exp_schema = { + 'id': 'Int32', + 'active': 'bool', + 'ip_address': 'string', + 'age': 'int8', + 'temperature': 'int16', + 'grade': 'string', + 'account_balance': 'float32', + 'currency_symbol': 'string', + 'description': 'string', + 'record_date': 'datetime64[ns]', + 'event_timestamp': 'datetime64[ns]', + 'revenue': 'float64', + 'user_uuid': 'string', + 'long_number': 'Int64', + 'crypto_hash': 'string', + } + self.assertEqual(exp_schema.keys(), schema.keys()) + for key in exp_schema: + self.assertEqual((key, exp_schema[key]), (key, schema[key])) + + exp_df = pd.DataFrame({ + 'id': pd.Series([], dtype='Int32'), + 'active': pd.Series([], dtype='bool'), + 'ip_address': pd.Series([], dtype='string'), + 'age': pd.Series([], dtype='int8'), + 'temperature': pd.Series([], dtype='int16'), + 'grade': pd.Series([], dtype='string'), + 'account_balance': pd.Series([], dtype='float32'), + 'currency_symbol': pd.Series([], dtype='string'), + 'description': pd.Series([], dtype='string'), + 'record_date': pd.Series([], dtype='datetime64[ns]'), + 'event_timestamp': pd.Series([], dtype='datetime64[ns]'), + 'revenue': pd.Series([], dtype='float64'), + 'user_uuid': pd.Series([], dtype='string'), + 'long_number': pd.Series([], dtype='Int64'), + 'crypto_hash': pd.Series([], dtype='string'), + }) + assert_frame_equal(act, exp_df, check_column_type=True) + async def test_async_pandas(self): act = await self.a_pandas_query('SELECT count() FROM trips') exp = pd.DataFrame({'count': pd.Series([10000], dtype='Int64')})