Skip to content

Commit

Permalink
Fix(athena): Drop partitions in batches of 25 to prevent hitting API …
Browse files Browse the repository at this point in the history
…limits
  • Loading branch information
erindru committed Dec 8, 2024
1 parent a7e75dc commit e9ca4ff
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 15 deletions.
18 changes: 9 additions & 9 deletions .circleci/continue_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -339,16 +339,16 @@ workflows:
matrix:
parameters:
engine:
- snowflake
- databricks
- redshift
- bigquery
- clickhouse-cloud
#- snowflake
#- databricks
#- redshift
#- bigquery
#- clickhouse-cloud
- athena
filters:
branches:
only:
- main
#filters:
# branches:
# only:
# - main
- trigger_private_tests:
requires:
- style_and_slow_tests
Expand Down
20 changes: 14 additions & 6 deletions sqlmesh/core/engine_adapter/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,21 @@ def _query_table_s3_location(self, table: exp.Table) -> str:
raise SQLMeshError(f"Table {table} has no location set in the metastore!")

def _drop_partitions_from_metastore(
self, table: exp.Table, partition_values: t.List[t.List[t.Any]]
self, table: exp.Table, partition_values: t.List[t.List[str]]
) -> None:
self._glue_client.batch_delete_partition(
DatabaseName=table.db,
TableName=table.name,
PartitionsToDelete=[{"Values": v} for v in partition_values],
)
# todo: switch to itertools.batched when our minimum supported Python is 3.12
# 25 = maximum number of partitions that batch_delete_partition can process at once
# ref: https://docs.aws.amazon.com/glue/latest/webapi/API_BatchDeletePartition.html#API_BatchDeletePartition_RequestParameters
def _chunks() -> t.Iterable[t.List[t.List[str]]]:
for i in range(0, len(partition_values), 25):
yield partition_values[i : i + 25]

for batch in _chunks():
self._glue_client.batch_delete_partition(
DatabaseName=table.db,
TableName=table.name,
PartitionsToDelete=[{"Values": v} for v in batch],
)

def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None:
table = exp.to_table(table_name)
Expand Down
39 changes: 39 additions & 0 deletions tests/core/engine_adapter/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,42 @@ def test_create_state_table(adapter: AthenaEngineAdapter):
assert to_sql_calls(adapter) == [
"CREATE TABLE IF NOT EXISTS `_snapshots` (`name` STRING) LOCATION 's3://base/_snapshots/' TBLPROPERTIES ('table_type'='iceberg')"
]


def test_drop_partitions_from_metastore_uses_batches(
adapter: AthenaEngineAdapter, mocker: MockerFixture
):
glue_client_mock = mocker.patch.object(AthenaEngineAdapter, "_glue_client", autospec=True)

glue_client_mock.batch_delete_partition.assert_not_called()

partition_values = []

for i in range(63):
partition_values.append([str(i)])

adapter._drop_partitions_from_metastore(
table=exp.table_("foo"), partition_values=partition_values
)

glue_client_mock.batch_delete_partition.assert_called()

# should have been called in batches of 25
calls = glue_client_mock.batch_delete_partition.call_args_list
assert len(calls) == 3

assert len(calls[0][1]["PartitionsToDelete"]) == 25
assert len(calls[1][1]["PartitionsToDelete"]) == 25
assert len(calls[2][1]["PartitionsToDelete"]) == 13

# first call 0-24
assert calls[0][1]["PartitionsToDelete"][0]["Values"][0] == "0"
assert calls[0][1]["PartitionsToDelete"][-1]["Values"][0] == "24"

# second call 25-49
assert calls[1][1]["PartitionsToDelete"][0]["Values"][0] == "25"
assert calls[1][1]["PartitionsToDelete"][-1]["Values"][0] == "49"

# third call 50-62
assert calls[2][1]["PartitionsToDelete"][0]["Values"][0] == "50"
assert calls[2][1]["PartitionsToDelete"][-1]["Values"][0] == "62"

0 comments on commit e9ca4ff

Please sign in to comment.