Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support batch insert with dict list #87

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions databend_py/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def _iter_receive_result(self, query, query_id=None, with_column_types=False):
yield r

def execute(
self, query, params=None, with_column_types=False, query_id=None, settings=None
self, query, params=None, with_column_types=False, query_id=None, settings=None
):
"""
Executes query.
Expand Down Expand Up @@ -138,17 +138,20 @@ def _process_insert_query(self, query, params):
if params is not None and len(params) > 0:
if isinstance(params[0], tuple):
tuple_ls = params
elif isinstance(params[0], dict):
# if params type is list[dictionary], then it's a insert query
tuple_ls = [tuple(p.values()) for p in params]
else:
tuple_ls = [
tuple(params[i : i + batch_size])
tuple(params[i: i + batch_size])
for i in range(0, len(params), batch_size)
]
insert_rows = len(tuple_ls)
self._uploader.upload_to_table_by_copy(table_name, tuple_ls)
return insert_rows

def _process_ordinary_query(
self, query, params=None, with_column_types=False, query_id=None
self, query, params=None, with_column_types=False, query_id=None
):
if params is not None:
query = self._substitute_params(query, params, self.connection.context)
Expand All @@ -159,7 +162,7 @@ def _process_ordinary_query(
)

def execute_iter(
self, query, params=None, with_column_types=False, query_id=None, settings=None
self, query, params=None, with_column_types=False, query_id=None, settings=None
):
if params is not None:
query = self._substitute_params(query, params, self.connection.context)
Expand All @@ -168,7 +171,7 @@ def execute_iter(
)

def _iter_process_ordinary_query(
self, query, with_column_types=False, query_id=None
self, query, with_column_types=False, query_id=None
):
return self._iter_receive_result(
query, query_id=query_id, with_column_types=with_column_types
Expand Down
10 changes: 10 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@ def test_batch_insert_with_tuple(self):
_, ss = c.execute("select * from test")
self.assertEqual(ss, [(3, "aa"), (4, "bb")])

def test_batch_insert_with_dict_list(self):
c = Client.from_url(self.databend_url)
c.execute("DROP TABLE IF EXISTS test")
c.execute("CREATE TABLE if not exists test (x Int32,y VARCHAR)")
c.execute("DESC test")
_, r1 = c.execute("INSERT INTO test (x,y) VALUES", [{"x": 5, "y": "cc"}, {"x": 6, "y": "dd"}])
self.assertEqual(r1, 2)
_, ss = c.execute("select * from test")
self.assertEqual(ss, [(5, "cc"), (6, "dd")])

def test_iter_query(self):
client = Client.from_url(self.databend_url)
result = client.execute_iter("select 1", with_column_types=False)
Expand Down
Loading