Skip to content

Commit

Permalink
Merge pull request #90 from databendlabs/fix/batch-insert
Browse files Browse the repository at this point in the history
fix: batch insert using attachment
  • Loading branch information
hantmac authored Dec 25, 2024
2 parents 15c18ce + 95ad606 commit 246e2c5
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 10 deletions.
3 changes: 1 addition & 2 deletions databend_py/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ def _process_insert_query(self, query, params):
query = query.split("VALUES")[0] + "VALUES"
if len(query.split(" ")) < 3:
raise Exception("Not standard insert/replace statement")
table_name = query.split(" ")[2]
batch_size = query.count(",") + 1
if params is not None and len(params) > 0:
if isinstance(params[0], tuple):
Expand All @@ -147,7 +146,7 @@ def _process_insert_query(self, query, params):
for i in range(0, len(params), batch_size)
]
insert_rows = len(tuple_ls)
self._uploader.upload_to_table_by_copy(table_name, tuple_ls)
self._uploader.upload_to_table_by_attachment(query, tuple_ls)
return insert_rows

def _process_ordinary_query(
Expand Down
24 changes: 17 additions & 7 deletions databend_py/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

class DataUploader:
def __init__(
self,
client,
connection,
settings,
default_stage_dir="@~",
debug=False,
compress=False,
self,
client,
connection,
settings,
default_stage_dir="@~",
debug=False,
compress=False,
):
# TODO: make it depends on Connection instead of Client
self.client = client
Expand All @@ -34,6 +34,14 @@ def upload_to_table_by_copy(self, table_name, data):
self._upload_to_presigned_url(presigned_url, headers, data)
self._execute_copy(table_name, stage_path, "CSV")

def upload_to_table_by_attachment(self, sql_statement, data):
if len(data) == 0:
return
stage_path = self._gen_stage_path(self.default_stage_dir)
presigned_url, headers = self._execute_presign(stage_path)
self._upload_to_presigned_url(presigned_url, headers, data)
self._execute_with_attachment(sql_statement, stage_path, "CSV")

def replace_into_table(self, table_name, conflict_keys, data):
"""
:param table_name: table name
Expand Down Expand Up @@ -175,6 +183,8 @@ def _make_attachment(self, sql_statement, stage_path, file_type):

file_format_options = {}
file_format_options["type"] = file_type
file_format_options["RECORD_DELIMITER"] = '\r\n'
file_format_options["COMPRESSION"] = "AUTO"

data = {
"sql": sql_statement,
Expand Down
12 changes: 11 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ def test_batch_insert_with_dict_list(self):
_, ss = c.execute("select * from test")
self.assertEqual(ss, [(5, "cc"), (6, "dd")])

def test_batch_insert_with_dict_multi_fields(self):
c = Client.from_url(self.databend_url)
c.execute("DROP TABLE IF EXISTS test")
c.execute("CREATE TABLE if not exists test (id int, x Int32, y VARCHAR, z Int32)")
c.execute("DESC test")
_, r1 = c.execute("INSERT INTO test (x,y) VALUES", [{"x": 7, "y": "ee"}, {"x": 8, "y": "ff"}])
self.assertEqual(r1, 2)
_, ss = c.execute("select * from test")
self.assertEqual(ss, [('NULL', 7, 'ee', 'NULL'), ('NULL', 8, 'ff', 'NULL')])

def test_iter_query(self):
client = Client.from_url(self.databend_url)
result = client.execute_iter("select 1", with_column_types=False)
Expand All @@ -167,7 +177,7 @@ def test_replace(self):
client.replace("default", "test_replace", ["x"], [(1, "a"), (2, "b")])
client.replace("default", "test_replace", ["x"], [(1, "c"), (2, "d")])
_, upload_res = client.execute("select * from test_replace")
self.assertEqual(upload_res, [(1, "c\r"), (2, "d\r")])
self.assertEqual(upload_res, [(1, "c"), (2, "d")])

def test_insert_with_compress(self):
client = Client.from_url(self.databend_url + "?compress=True&debug=True")
Expand Down

0 comments on commit 246e2c5

Please sign in to comment.