Skip to content

Commit

Permalink
Time type for pysdk and HTTP (#1927)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Add create table, insert, select support for date, time, datetime type
in infinity_sdk, infinity_embedded, and HTTP.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Test cases
- [x] Python SDK impacted, Need to update PyPI
  • Loading branch information
vsian authored Sep 29, 2024
1 parent 34eef36 commit 6386249
Show file tree
Hide file tree
Showing 15 changed files with 512 additions and 20 deletions.
56 changes: 56 additions & 0 deletions python/infinity_embedded/local_infinity/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from infinity_embedded.common import VEC, SparseVector, InfinityException, DEFAULT_MATCH_VECTOR_TOPN
from infinity_embedded.embedded_infinity_ext import *
from infinity_embedded.errors import ErrorCode
from datetime import date, time, datetime, timedelta

def logic_type_to_dtype(ttype: WrapDataType):
match ttype.logical_type:
Expand Down Expand Up @@ -54,6 +55,16 @@ def logic_type_to_dtype(ttype: WrapDataType):
return object
case LogicalType.kSparse:
return object
case LogicalType.kDate:
return object
case LogicalType.kTime:
return object
case LogicalType.kDateTime:
return object
case LogicalType.kInterval:
return object
case LogicalType.kTimestamp:
return object
case _:
raise NotImplementedError(f"Unsupported type {ttype}")

Expand Down Expand Up @@ -222,9 +233,54 @@ def column_vector_to_list(column_type, column_data_type, column_vectors) -> \
return parse_tensor_bytes(column_data_type, column_vector)
case LogicalType.kTensorArray:
return parse_tensorarray_bytes(column_data_type, column_vector)
case LogicalType.kDate:
return parse_date_bytes(column_vector)
case LogicalType.kTime:
return parse_time_bytes(column_vector)
case LogicalType.kDateTime:
return parse_datetime_bytes(column_vector)
case LogicalType.kTimestamp:
return parse_datetime_bytes(column_vector)
case LogicalType.kInterval:
return parse_interval_bytes(column_vector)

case _:
raise NotImplementedError(f"Unsupported type {column_type}")

def parse_date_bytes(column_vector):
parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector))
date_list = []
epoch = date(1970, 1, 1)
for value in parsed_list:
date_list.append(epoch + timedelta(days = value))
return date_list

def parse_time_bytes(column_vector):
parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector))
time_list = []
for value in parsed_list:
hours = (value // 3600) % 24
minutes = (value % 3600) // 60
seconds = value % 60
time_list.append(time(hour=hours, minute=minutes, second=seconds))
return time_list

def parse_datetime_bytes(column_vector):
parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector))
datetime_list = []
epoch = datetime(1970, 1, 1)
for i in range(0, len(parsed_list), 2):
if i + 1 < len(parsed_list):
datetime_list.append(epoch + timedelta(days = parsed_list[i], seconds = parsed_list[i + 1]));
return datetime_list

def parse_interval_bytes(column_vector):
parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector))
interval_list = []
for value in parsed_list:
time_list.append(timedelta(seconds=value))
return interval_list

def parse_sparse_bytes(column_data_type, column_vector):
dimension = column_data_type.sparse_type.dimension
element_type = column_data_type.sparse_type.element_type
Expand Down
18 changes: 17 additions & 1 deletion python/infinity_embedded/local_infinity/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from infinity_embedded.utils import binary_exp_to_paser_exp
from infinity_embedded.embedded_infinity_ext import WrapInExpr, WrapParsedExpr, WrapFunctionExpr, WrapColumnExpr, WrapConstantExpr, ParsedExprType, LiteralType
from infinity_embedded.embedded_infinity_ext import WrapEmbeddingType, WrapColumnDef, WrapDataType, LogicalType, EmbeddingDataType, WrapSparseType, ConstraintType

from datetime import date, time, datetime, timedelta

def traverse_conditions(cons, fn=None):
if isinstance(cons, exp.Binary):
Expand Down Expand Up @@ -294,6 +294,16 @@ def get_local_constant_expr_from_python_value(value) -> WrapConstantExpr:
case _:
raise InfinityException(ErrorCode.INVALID_EXPRESSION,
f"Invalid sparse vector value type: {type(next(iter(value.values())))}")
case datetime():
constant_expression.literal_type = LiteralType.kDateTime
constant_expression.str_value = value.strftime("%Y-%m-%d %H:%M:%S")
case date():
constant_expression.literal_type = LiteralType.kDate
constant_expression.str_value = value.strftime("%Y-%m-%d")
case time():
constant_expression.literal_type = LiteralType.kTime
constant_expression.str_value = value.strftime("%H:%M:%S")

case _:
raise InfinityException(ErrorCode.INVALID_EXPRESSION, f"Invalid constant type: {type(value)}")
return constant_expression
Expand Down Expand Up @@ -536,6 +546,12 @@ def get_ordinary_info(column_info, column_defs, column_name, index):
proto_column_type.logical_type = LogicalType.kVarchar
case "bool":
proto_column_type.logical_type = LogicalType.kBoolean
case "date":
proto_column_type.logical_type = LogicalType.kDate
case "time":
proto_column_type.logical_type = LogicalType.kTime
case "datetime":
proto_column_type.logical_type = LogicalType.kDateTime
case _:
raise InfinityException(ErrorCode.INVALID_DATA_TYPE, f"Unknown datatype: {datatype}")
proto_column_def.column_type = proto_column_type
Expand Down
19 changes: 18 additions & 1 deletion python/infinity_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import polars as pl
import pyarrow as pa
from infinity.table import ExplainType
from datetime import date, time, datetime


class infinity_http:
Expand Down Expand Up @@ -403,6 +404,12 @@ def insert(self,values=[]):
value[key][idx] = value[key][idx].tolist()
if isinstance(value[key], SparseVector):
value[key] = value[key].to_dict()
if isinstance(value[key], datetime):
value[key] = value[key].strftime("%Y-%m-%d %H:%M:%S")
if isinstance(value[key], date):
value[key] = value[key].strftime("%Y-%m-%d")
if isinstance(value[key], time):
value[key] = value[key].strftime("%H:%M:%S")

url = f"databases/{self.database_name}/tables/{self.table_name}/docs"
h = self.set_up_header(["accept", "content-type"])
Expand Down Expand Up @@ -634,16 +641,26 @@ def to_df(self):

for res in self.output_res:
for k in res:
print(res[k])
if k not in df_dict:
df_dict[k] = ()
tup = df_dict[k]
if res[k].isdigit() or is_float(res[k]):
new_tup = tup + (eval(res[k]), )
elif is_time(res[k]):
new_tup = tup + (datetime.strptime(res[k], "%H:%M:%S").time(), )
print("time!")
elif is_datetime(res[k]):
new_tup = tup + (datetime.strptime(res[k], "%Y-%m-%d %H:%M:%S"), )
print("datetime!")
elif is_date(res[k]):
new_tup = tup + (datetime.strptime(res[k], "%Y-%m-%d").date(), )
print("date!")
elif is_list(res[k]):
new_tup = tup + (ast.literal_eval(res[k]), )
elif is_sparse(res[k]):# sparse vector
sparse_vec = str2sparse(res[k])
new_tup = tup + (sparse_vec, )
new_tup = tup + (sparse_vec, )
else:
if res[k].lower() == 'true':
res[k] = True
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 54 additions & 0 deletions python/infinity_sdk/infinity/remote_thrift/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from infinity.remote_thrift.infinity_thrift_rpc.ttypes import *
from collections import defaultdict
from typing import Any, Tuple, Dict, List, Optional
from datetime import date, time, datetime, timedelta

import polars as pl
from numpy import dtype
Expand Down Expand Up @@ -58,6 +59,16 @@ def logic_type_to_dtype(ttype: ttypes.DataType):
return object
case ttypes.LogicType.Sparse:
return object
case ttypes.LogicType.Date:
return object
case ttypes.LogicType.Time:
return object
case ttypes.LogicType.DateTime:
return object
case ttypes.LogicType.Interval:
return object
case ttypes.LogicType.Timestamp:
return object
case _:
raise NotImplementedError(f"Unsupported type {ttype}")

Expand Down Expand Up @@ -149,9 +160,52 @@ def column_vector_to_list(column_type: ttypes.ColumnType, column_data_type: ttyp
return parse_tensorarray_bytes(column_data_type, column_vector)
case ttypes.ColumnType.ColumnSparse:
return parse_sparse_bytes(column_data_type, column_vector)
case ttypes.ColumnType.ColumnDate:
return parse_date_bytes(column_vector)
case ttypes.ColumnType.ColumnTime:
return parse_time_bytes(column_vector)
case ttypes.ColumnType.ColumnDateTime:
return parse_datetime_bytes(column_vector)
case ttypes.ColumnType.ColumnTimestamp:
return parse_datetime_bytes(column_vector)
case ttypes.ColumnType.ColumnInterval:
return parse_interval_bytes(column_vector)
case _:
raise NotImplementedError(f"Unsupported type {column_type}")

def parse_date_bytes(column_vector):
parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector))
date_list = []
epoch = date(1970, 1, 1)
for value in parsed_list:
date_list.append(epoch + timedelta(days = value))
return date_list

def parse_time_bytes(column_vector):
parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector))
time_list = []
for value in parsed_list:
hours = (value // 3600) % 24
minutes = (value % 3600) // 60
seconds = value % 60
time_list.append(time(hour=hours, minute=minutes, second=seconds))
return time_list

def parse_datetime_bytes(column_vector):
parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector))
datetime_list = []
epoch = datetime(1970, 1, 1)
for i in range(0, len(parsed_list), 2):
if i + 1 < len(parsed_list):
datetime_list.append(epoch + timedelta(days = parsed_list[i], seconds = parsed_list[i + 1]));
return datetime_list

def parse_interval_bytes(column_vector):
parsed_list = list(struct.unpack('<{}i'.format(len(column_vector) // 4), column_vector))
interval_list = []
for value in parsed_list:
time_list.append(timedelta(seconds=value))
return interval_list

def parse_bytes(bytes_data):
results = []
Expand Down
Loading

0 comments on commit 6386249

Please sign in to comment.