Skip to content

Commit

Permalink
fix: fixed errors when parsing DATE and TIMESTAMP columns in zero row…
Browse files Browse the repository at this point in the history
…-len queries (#8)
  • Loading branch information
amunra authored Apr 18, 2024
1 parent c6b54f6 commit 6b2a741
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 70 deletions.
144 changes: 82 additions & 62 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,60 +17,69 @@ To uninstall the library, you can use the command:
python3 -m pip uninstall questdb_query
```

## Basic Usage, querying into Numpy
## Basic Usage, querying into Pandas

Once installed, you can use the library to query a QuestDB database. Here's an example that demonstrates how to query
CPU utilization data using the library against a database running on `localhost`.
CPU utilization data using the library against a database running on `localhost` on the default HTTP port (9000).

```python
from questdb_query import numpy_query
from questdb_query import pandas_query

np_arrs = numpy_query('''
select
timestamp, hostname, datacenter, usage_user, usage_nice
from
cpu
limit 10''')
df = pandas_query('select * from cpu limit 1000')
```

The `np_arrs` object is a python `dict` which holds a numpy array per column, keyed by column name:
This allows you, for example, to pre-aggregate results:

```python
>>> np_arrs
{'timestamp': array(['2016-01-01T00:00:00.000000000', '2016-01-01T00:00:10.000000000',
'2016-01-01T00:00:20.000000000', '2016-01-01T00:00:30.000000000',
'2016-01-01T00:00:40.000000000', '2016-01-01T00:00:50.000000000',
'2016-01-01T00:01:00.000000000', '2016-01-01T00:01:10.000000000',
'2016-01-01T00:01:20.000000000', '2016-01-01T00:01:30.000000000'],
dtype='datetime64[ns]'), 'hostname': array(['host_0', 'host_1', 'host_2', 'host_3', 'host_4', 'host_5',
'host_6', 'host_7', 'host_8', 'host_9'], dtype=object), 'datacenter': array(['ap-southeast-2b', 'eu-west-1b', 'us-west-1b', 'us-west-2c',
'us-west-2b', 'eu-west-1b', 'eu-west-1b', 'us-west-1a',
'ap-southeast-2a', 'us-east-1a'], dtype=object), 'usage_user': array([1.39169048, 0.33846369, 0. , 1.81511203, 0.84273104,
0. , 0. , 0.28085548, 0. , 1.37192634]), 'usage_nice': array([0.30603088, 1.21496673, 0. , 0.16688796, 0. ,
2.77319521, 0.40332488, 1.81585253, 1.92844804, 2.12841919])}
>>> df = df[['region', 'usage_user', 'usage_nice']].groupby('region').mean()
>>> df
usage_user usage_nice
region
ap-northeast-1 8.163766 6.492334
ap-southeast-1 6.511215 7.341863
ap-southeast-2 6.788770 6.257839
eu-central-1 7.392642 6.416479
eu-west-1 7.213417 7.185956
sa-east-1 7.143568 5.925026
us-east-1 7.620643 7.243553
us-west-1 6.286770 6.531977
us-west-2 6.228692 6.439672
```

If we wanted to calculate a (rather non-sensical) weighted average of `usage_user` and `usage_nice` we can
do this by accessing the `numpy` columns:
You can then switch over to numpy with a simple and fast conversion:

```python
>>> np_arrs['usage_user'].dot(np_arrs['usage_nice'].T)
4.5700692045031985
>>> from questdb_query import pandas_to_numpy
>>> np_arrs = pandas_to_numpy(df)
>>> np_arrs
{'usage_user': array([8.16376556, 6.51121543, 6.78876964, 7.3926419 , 7.21341716,
7.14356839, 7.62064304, 6.28677006, 6.22869169]), 'usage_nice': array([6.49233392, 7.34186348, 6.25783903, 6.41647863, 7.18595643,
5.92502642, 7.24355328, 6.53197733, 6.43967247]), 'region': array(['ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2',
'eu-central-1', 'eu-west-1', 'sa-east-1', 'us-east-1', 'us-west-1',
'us-west-2'], dtype=object)}
```

## Querying a remote database

If your database is running on a remote host, specify an endpoint:

```python
from questdb_query import numpy_query, Endpoint
from questdb_query import pandas_query, Endpoint

endpoint = Endpoint(host='your.hostname.com', https=True, username='user', password='pass')
endpoint = Endpoint(host='your.hostname.com', port=22453, https=True, username='user', password='pass')

np_arrs = numpy_query('select * from cpu limit 10', endpoint)
```

Note how the example above enables HTTPS and specifies a username and password for authentication.

The port is optional and defaults to 9000 for HTTP and 443 for HTTPS.

Alternatively, if the server is set up with token-based authentication you can use the `token` parameter:

```python
endpoint = Endpoint(host='your.hostname.com', https=True, token='your_token')
```

## Chunks: Query Parallelism

Expand Down Expand Up @@ -99,47 +108,58 @@ _Read on for more details on benchmarking: This is covered later in this README
> To avoid consistency issues formulate the query so that it only queries data that is not changing.
> You can do this, for example, by specifying a `timestamp` range in the `WHERE` clause.
## Querying into Pandas
## Querying into Numpy

You can also query into Pandas:
You can also query directly into a dictionary of Numpy arrays.

```python
from questdb_query import pandas_query, Endpoint
Notice that Numpy's datatypes are more limited than Panadas, specifically in the
handling of null values.

endpoint = Endpoint(host='your.hostname.com', https=True, username='user', password='pass')
This is a simple shorthand for querying into Pandas and then converting to Numpy:

df = pandas_query('select * from cpu limit 1000', endpoint)
```python
def numpy_query(query: str, endpoint: Endpoint = None,
chunks: int = 1, timeout: int = None) -> dict[str, np.array]:
df = pandas_query(query, endpoint, chunks, timeout)
return pandas_to_numpy(df)
```

This allows you, for example, to pre-aggregate results:
To use it, pass the query string to the `numpy_query` function, along with the
same optional parameters as the `pandas_query` function.

```python
>>> df = df[['region', 'usage_user', 'usage_nice']].groupby('region').mean()
>>> df
usage_user usage_nice
region
ap-northeast-1 8.163766 6.492334
ap-southeast-1 6.511215 7.341863
ap-southeast-2 6.788770 6.257839
eu-central-1 7.392642 6.416479
eu-west-1 7.213417 7.185956
sa-east-1 7.143568 5.925026
us-east-1 7.620643 7.243553
us-west-1 6.286770 6.531977
us-west-2 6.228692 6.439672
```
from questdb_query import numpy_query

You can then switch over to numpy with a simple and fast conversion:
np_arrs = numpy_query('''
select
timestamp, hostname, datacenter, usage_user, usage_nice
from
cpu
limit 10''')
```

The `np_arrs` object is a python `dict` which holds a numpy array per column, keyed by column name:
```python
>>> from questdb_query import pandas_to_numpy
>>> np_arrs = pandas_to_numpy(df)
>>> np_arrs
{'usage_user': array([8.16376556, 6.51121543, 6.78876964, 7.3926419 , 7.21341716,
7.14356839, 7.62064304, 6.28677006, 6.22869169]), 'usage_nice': array([6.49233392, 7.34186348, 6.25783903, 6.41647863, 7.18595643,
5.92502642, 7.24355328, 6.53197733, 6.43967247]), 'region': array(['ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2',
'eu-central-1', 'eu-west-1', 'sa-east-1', 'us-east-1', 'us-west-1',
'us-west-2'], dtype=object)}
{'timestamp': array(['2016-01-01T00:00:00.000000000', '2016-01-01T00:00:10.000000000',
'2016-01-01T00:00:20.000000000', '2016-01-01T00:00:30.000000000',
'2016-01-01T00:00:40.000000000', '2016-01-01T00:00:50.000000000',
'2016-01-01T00:01:00.000000000', '2016-01-01T00:01:10.000000000',
'2016-01-01T00:01:20.000000000', '2016-01-01T00:01:30.000000000'],
dtype='datetime64[ns]'), 'hostname': array(['host_0', 'host_1', 'host_2', 'host_3', 'host_4', 'host_5',
'host_6', 'host_7', 'host_8', 'host_9'], dtype=object), 'datacenter': array(['ap-southeast-2b', 'eu-west-1b', 'us-west-1b', 'us-west-2c',
'us-west-2b', 'eu-west-1b', 'eu-west-1b', 'us-west-1a',
'ap-southeast-2a', 'us-east-1a'], dtype=object), 'usage_user': array([1.39169048, 0.33846369, 0. , 1.81511203, 0.84273104,
0. , 0. , 0.28085548, 0. , 1.37192634]), 'usage_nice': array([0.30603088, 1.21496673, 0. , 0.16688796, 0. ,
2.77319521, 0.40332488, 1.81585253, 1.92844804, 2.12841919])}
```

If we wanted to calculate a (rather non-sensical) weighted average of `usage_user` and `usage_nice` we can
do this by accessing the `numpy` columns:

```python
>>> np_arrs['usage_user'].dot(np_arrs['usage_nice'].T)
4.5700692045031985
```

## Benchmarking
Expand All @@ -149,9 +169,9 @@ You can then switch over to numpy with a simple and fast conversion:
Each query result also contains a `Stats` object with the performance summary which you can print.

```python
>>> from questdb_query import numpy_query
>>> np_arrs = numpy_query('select * from cpu', chunks=8)
>>> print(np_arrs.query_stats)
>>> from questdb_query import pandas_query
>>> df = pandas_query('select * from cpu', chunks=8)
>>> print(df.query_stats)
Duration: 2.631s
Millions of lines: 5.000
Millions of lines/s: 1.901
Expand All @@ -162,9 +182,9 @@ MiB/s: 506.381
You can also extract individual fields:

```python
>>> np_arrs.query_stats
>>> df.query_stats
Stats(duration_s=2.630711865, line_count=5000000, byte_count=1396853875, throughput_mbs=506.3814407360216, throughput_mlps=1.900626239810569)
>>> np_arrs.query_stats.throughput_mlps
>>> df.query_stats.throughput_mlps
1.900626239810569
```

Expand Down
17 changes: 13 additions & 4 deletions questdb_query/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,19 @@ def _read_csv():
try:
if col_type in ('TIMESTAMP', 'DATE'):
series = df[col_name]
# Drop the UTC timezone during conversion.
# This allows `.to_numpy()` on the series to
# yield a `datetime64` dtype column.
series = pd.to_datetime(series).dt.tz_convert(None)
# if the series is empty (or full of nulls) its csv-read
# default dtype (float64) is not one which we can
# convert `.to_datetime`,
if series.empty or series.isnull().all():
# so to work around this we first convert the series
# to Int64 (nullable).
series = series.astype('Int64')
series = pd.to_datetime(series, unit='ns')
else:
# Drop the UTC timezone during conversion.
# This allows `.to_numpy()` on the series to
# yield a `datetime64` dtype column.
series = pd.to_datetime(series).dt.tz_convert(None)
df[col_name] = series
except Exception as e:
print(df[col_name])
Expand Down
4 changes: 3 additions & 1 deletion questdb_query/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def _parse_args():
parser.add_argument('--https', action='store_true')
parser.add_argument('--username', type=str)
parser.add_argument('--password', type=str)
parser.add_argument('--token', type=str)
parser.add_argument('--chunks', type=int, default=1)
parser.add_argument('query', type=str)
return parser.parse_args()
Expand All @@ -30,7 +31,8 @@ def main(args):
port=args.port,
https=args.https,
username=args.username,
password=args.password)
password=args.password,
token=args.token)
df = pandas_query(args.query, endpoint, args.chunks)
print(df)
print()
Expand Down
72 changes: 69 additions & 3 deletions tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ def load_all_types_table(qdb):
long_number,
crypto_hash
) VALUES
(1, true, '192.168.1.1', 25, 72, 'A', 1000.5, 'USD', 'Test record 1', '2023-01-01T00:00:00.000Z', '2023-01-01T00:00:00.000000Z', 200.00, '123e4567-e89b-12d3-a456-426614174000', 123456789012345, '0x7fffffffffffffffffffffffffffffff'),
(2, false, NULL, 30, 68, 'B', 1500.3, 'EUR', 'Test record 2', NULL, '2023-01-02T00:00:00.000000Z', 300.00, '123e4567-e89b-12d3-a456-426614174001', 987654321098765, NULL),
(3, NULL, '10.0.0.1', 35, NULL, 'C', NULL, 'JPY', 'Test record 3', '2023-01-03T00:00:00.000Z', '2023-01-03T00:00:00.000000Z', NULL, '123e4567-e89b-12d3-a456-426614174002', NULL, '0x1fffffffffffffffffffffffffffffff');
-- id active ip_address age temp gra acc_bal curr description record_date event_timestamp revenue user_uuid long_number crypto_hash
(1, true, '192.168.1.1', 25, 72, 'A', 1000.5, 'USD', 'Test record 1', '2023-01-01T00:00:00.000Z', '2023-01-01T00:00:00.000000Z', 200.00, '123e4567-e89b-12d3-a456-426614174000', 123456789012345, '0x7fffffffffffffffffffffffffffffff'),
(2, false, NULL, 30, 68, 'B', 1500.25, 'EUR', NULL, NULL, '2023-01-02T00:00:00.000000Z', 300.00, '123e4567-e89b-12d3-a456-426614174001', 987654321098765, NULL),
(3, NULL, '10.0.0.1', 35, -40, 'C', NULL, 'JPY', 'Test record 3', '2023-01-03T00:00:00.000Z', '2023-01-03T00:00:00.000000Z', NULL, '123e4567-e89b-12d3-a456-426614174002', NULL, '0x1fffffffffffffffffffffffffffffff');
''')

def load_trips_table(qdb):
Expand Down Expand Up @@ -452,6 +453,71 @@ def test_almost_all_types(self):
for key in exp_schema:
self.assertEqual((key, exp_schema[key]), (key, schema[key]))

exp_df = pd.DataFrame({
'id': pd.Series([1, 2, 3], dtype='Int32'),
'active': pd.Series([True, False, None], dtype='bool'),
'ip_address': pd.Series(['192.168.1.1', None, '10.0.0.1'], dtype='string'),
'age': pd.Series([25, 30, 35], dtype='int8'),
'temperature': pd.Series([72, 68, -40], dtype='int16'),
'grade': pd.Series(['A', 'B', 'C'], dtype='string'),
'account_balance': pd.Series([1000.5, 1500.25, None], dtype='float32'),
'currency_symbol': pd.Series(['USD', 'EUR', 'JPY'], dtype='string'),
'description': pd.Series(['Test record 1', None, 'Test record 3'], dtype='string'),
'record_date': pd.Series(['2023-01-01T00:00:00.000', None, '2023-01-03T00:00:00.000'], dtype='datetime64[ns]'),
'event_timestamp': pd.Series(['2023-01-01T00:00:00.000000', '2023-01-02T00:00:00.000000', '2023-01-03T00:00:00.000000'], dtype='datetime64[ns]'),
'revenue': pd.Series([200.00, 300.00, None], dtype='float64'),
'user_uuid': pd.Series(['123e4567-e89b-12d3-a456-426614174000', '123e4567-e89b-12d3-a456-426614174001', '123e4567-e89b-12d3-a456-426614174002'], dtype='string'),
'long_number': pd.Series([123456789012345, 987654321098765, None], dtype='Int64'),
'crypto_hash': pd.Series(['0x7fffffffffffffffffffffffffffffff', None, '0x1fffffffffffffffffffffffffffffff'], dtype='string'),
})
assert_frame_equal(act, exp_df, check_column_type=True)

def test_almost_all_types_0_rows(self):
act = self.s_pandas_query('SELECT * FROM almost_all_types WHERE id = 0')
schema = {
name: str(val)
for name, val
in act.dtypes.to_dict().items()}
exp_schema = {
'id': 'Int32',
'active': 'bool',
'ip_address': 'string',
'age': 'int8',
'temperature': 'int16',
'grade': 'string',
'account_balance': 'float32',
'currency_symbol': 'string',
'description': 'string',
'record_date': 'datetime64[ns]',
'event_timestamp': 'datetime64[ns]',
'revenue': 'float64',
'user_uuid': 'string',
'long_number': 'Int64',
'crypto_hash': 'string',
}
self.assertEqual(exp_schema.keys(), schema.keys())
for key in exp_schema:
self.assertEqual((key, exp_schema[key]), (key, schema[key]))

exp_df = pd.DataFrame({
'id': pd.Series([], dtype='Int32'),
'active': pd.Series([], dtype='bool'),
'ip_address': pd.Series([], dtype='string'),
'age': pd.Series([], dtype='int8'),
'temperature': pd.Series([], dtype='int16'),
'grade': pd.Series([], dtype='string'),
'account_balance': pd.Series([], dtype='float32'),
'currency_symbol': pd.Series([], dtype='string'),
'description': pd.Series([], dtype='string'),
'record_date': pd.Series([], dtype='datetime64[ns]'),
'event_timestamp': pd.Series([], dtype='datetime64[ns]'),
'revenue': pd.Series([], dtype='float64'),
'user_uuid': pd.Series([], dtype='string'),
'long_number': pd.Series([], dtype='Int64'),
'crypto_hash': pd.Series([], dtype='string'),
})
assert_frame_equal(act, exp_df, check_column_type=True)

async def test_async_pandas(self):
act = await self.a_pandas_query('SELECT count() FROM trips')
exp = pd.DataFrame({'count': pd.Series([10000], dtype='Int64')})
Expand Down

0 comments on commit 6b2a741

Please sign in to comment.