Skip to content

Commit

Permalink
dont do table delete on object delete
Browse files Browse the repository at this point in the history
  • Loading branch information
gtarpenning committed Dec 11, 2024
1 parent 3e634d5 commit f3e513b
Showing 1 changed file with 28 additions and 57 deletions.
85 changes: 28 additions & 57 deletions weave/trace_server/clickhouse_trace_server_batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,55 +713,6 @@ def _make_soft_delete_insertables(
)
return delete_insertables

def _filter_deletable_objects(
self, objs: list[SelectableCHObjSchema]
) -> list[SelectableCHObjSchema]:
"""
Filter out objects that we can't delete yet.
Criteria:
- If any child refs are tables, we can't delete this object
"""
filtered_objs = []
for obj in objs:
refs = [ri.parse_internal_uri(r) for r in obj.refs]
if any(isinstance(r, ri.InternalTableRef) for r in refs):
# TODO: Raise?
continue
filtered_objs.append(obj)
return filtered_objs

def _make_obj_delete_insertables(
self, project_id: str, object_id: str
) -> list[ObjDeleteCHInsertable]:
latest_obj_version = self._obj_read(project_id, object_id, "latest")
other_obj_versions = self._select_objs_query(
project_id,
conditions=["is_latest = 0"],
object_id_conditions=["object_id = {object_id: String}"],
parameters={"object_id": object_id},
# only get metadata, we are going to hard delete
metadata_only=True,
)
all_versions = [latest_obj_version] + other_obj_versions
filtered_obj_versions = self._filter_deletable_objects(all_versions)
insertables = self._make_soft_delete_insertables(filtered_obj_versions)
return insertables

def _make_obj_versions_delete_insertables(
self, project_id: str, object_id: str, digests: list[str]
) -> list[ObjDeleteCHInsertable]:
object_versions = self._select_objs_query(
project_id,
object_id_conditions=["object_id = {object_id: String}"],
conditions=["digest IN {digests: Array(String)}"],
parameters={"object_id": object_id, "digests": digests},
metadata_only=False,
)
filtered_object_versions = self._filter_deletable_objects(object_versions)
insertables = self._make_soft_delete_insertables(filtered_object_versions)
return insertables

def obj_delete(self, req: tsi.ObjDeleteReq) -> tsi.ObjDeleteRes:
"""
Delete object versions by digest, belonging to given object_id.
Expand All @@ -779,12 +730,36 @@ def obj_delete(self, req: tsi.ObjDeleteReq) -> tsi.ObjDeleteRes:
Otherwise, all specified versions are soft deleted.
"""
if req.digests:
insertables = self._make_obj_versions_delete_insertables(
req.project_id, req.object_id, req.digests
object_versions = self._select_objs_query(
req.project_id,
object_id_conditions=["object_id = {object_id: String}"],
conditions=["digest IN {digests: Array(String)}"],
parameters={"object_id": req.object_id, "digests": req.digests},
metadata_only=False,
)
insertables = self._make_soft_delete_insertables(object_versions)
# TODO: push down to CH to lift this limit
MAX_OBJECTS_TO_DELETE = 100
if len(insertables) > MAX_OBJECTS_TO_DELETE:
raise ValueError(
f"Object delete request contains {len(insertables)} objects. Please delete fewer than {MAX_OBJECTS_TO_DELETE} objects at a time."
)
else:
insertables = self._make_obj_delete_insertables(
req.project_id, req.object_id
latest_obj_version = self._obj_read(req.project_id, req.object_id, "latest")
other_obj_versions = self._select_objs_query(
req.project_id,
conditions=["is_latest = 0"],
object_id_conditions=["object_id = {object_id: String}"],
parameters={"object_id": req.object_id},
# only get metadata, we are going to hard delete
metadata_only=True,
)
all_versions = [latest_obj_version] + other_obj_versions
insertables = self._make_soft_delete_insertables(all_versions)

if len(insertables) == 0:
raise NotFoundError(
f"Object {req.object_id} ({req.digests}) not found when deleting."
)

data = [list(obj.model_dump().values()) for obj in insertables]
Expand Down Expand Up @@ -1894,10 +1869,6 @@ def _select_objs_query(
for row in query_result:
(object_id, digest, val_dump) = row
object_values[(object_id, digest)] = val_dump

# update the val_dump for each object
for obj in result:
obj.val_dump = object_values.get((obj.object_id, obj.digest), "{}")
return result

def _run_migrations(self) -> None:
Expand Down

0 comments on commit f3e513b

Please sign in to comment.