Skip to content

Commit

Permalink
Merge pull request #87 from databendlabs/feat/support-batch-insert-dict
Browse files Browse the repository at this point in the history
feat: support batch insert with dict list
  • Loading branch information
hantmac authored Dec 24, 2024
2 parents d29c4a3 + 566e1a9 commit f875384
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
13 changes: 8 additions & 5 deletions databend_py/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def _iter_receive_result(self, query, query_id=None, with_column_types=False):
yield r

def execute(
self, query, params=None, with_column_types=False, query_id=None, settings=None
self, query, params=None, with_column_types=False, query_id=None, settings=None
):
"""
Executes query.
Expand Down Expand Up @@ -138,17 +138,20 @@ def _process_insert_query(self, query, params):
if params is not None and len(params) > 0:
if isinstance(params[0], tuple):
tuple_ls = params
elif isinstance(params[0], dict):
# if params type is list[dictionary], then it's a insert query
tuple_ls = [tuple(p.values()) for p in params]
else:
tuple_ls = [
tuple(params[i : i + batch_size])
tuple(params[i: i + batch_size])
for i in range(0, len(params), batch_size)
]
insert_rows = len(tuple_ls)
self._uploader.upload_to_table_by_copy(table_name, tuple_ls)
return insert_rows

def _process_ordinary_query(
self, query, params=None, with_column_types=False, query_id=None
self, query, params=None, with_column_types=False, query_id=None
):
if params is not None:
query = self._substitute_params(query, params, self.connection.context)
Expand All @@ -159,7 +162,7 @@ def _process_ordinary_query(
)

def execute_iter(
self, query, params=None, with_column_types=False, query_id=None, settings=None
self, query, params=None, with_column_types=False, query_id=None, settings=None
):
if params is not None:
query = self._substitute_params(query, params, self.connection.context)
Expand All @@ -168,7 +171,7 @@ def execute_iter(
)

def _iter_process_ordinary_query(
self, query, with_column_types=False, query_id=None
self, query, with_column_types=False, query_id=None
):
return self._iter_receive_result(
query, query_id=query_id, with_column_types=with_column_types
Expand Down
10 changes: 10 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@ def test_batch_insert_with_tuple(self):
_, ss = c.execute("select * from test")
self.assertEqual(ss, [(3, "aa"), (4, "bb")])

def test_batch_insert_with_dict_list(self):
c = Client.from_url(self.databend_url)
c.execute("DROP TABLE IF EXISTS test")
c.execute("CREATE TABLE if not exists test (x Int32,y VARCHAR)")
c.execute("DESC test")
_, r1 = c.execute("INSERT INTO test (x,y) VALUES", [{"x": 5, "y": "cc"}, {"x": 6, "y": "dd"}])
self.assertEqual(r1, 2)
_, ss = c.execute("select * from test")
self.assertEqual(ss, [(5, "cc"), (6, "dd")])

def test_iter_query(self):
client = Client.from_url(self.databend_url)
result = client.execute_iter("select 1", with_column_types=False)
Expand Down

0 comments on commit f875384

Please sign in to comment.