diff --git a/weave/trace_server/clickhouse_trace_server_batched.py b/weave/trace_server/clickhouse_trace_server_batched.py index 615c77ce636..38896d9b104 100644 --- a/weave/trace_server/clickhouse_trace_server_batched.py +++ b/weave/trace_server/clickhouse_trace_server_batched.py @@ -713,55 +713,6 @@ def _make_soft_delete_insertables( ) return delete_insertables - def _filter_deletable_objects( - self, objs: list[SelectableCHObjSchema] - ) -> list[SelectableCHObjSchema]: - """ - Filter out objects that we can't delete yet. - - Criteria: - - If any child refs are tables, we can't delete this object - """ - filtered_objs = [] - for obj in objs: - refs = [ri.parse_internal_uri(r) for r in obj.refs] - if any(isinstance(r, ri.InternalTableRef) for r in refs): - # TODO: Raise? - continue - filtered_objs.append(obj) - return filtered_objs - - def _make_obj_delete_insertables( - self, project_id: str, object_id: str - ) -> list[ObjDeleteCHInsertable]: - latest_obj_version = self._obj_read(project_id, object_id, "latest") - other_obj_versions = self._select_objs_query( - project_id, - conditions=["is_latest = 0"], - object_id_conditions=["object_id = {object_id: String}"], - parameters={"object_id": object_id}, - # only get metadata, we are going to hard delete - metadata_only=True, - ) - all_versions = [latest_obj_version] + other_obj_versions - filtered_obj_versions = self._filter_deletable_objects(all_versions) - insertables = self._make_soft_delete_insertables(filtered_obj_versions) - return insertables - - def _make_obj_versions_delete_insertables( - self, project_id: str, object_id: str, digests: list[str] - ) -> list[ObjDeleteCHInsertable]: - object_versions = self._select_objs_query( - project_id, - object_id_conditions=["object_id = {object_id: String}"], - conditions=["digest IN {digests: Array(String)}"], - parameters={"object_id": object_id, "digests": digests}, - metadata_only=False, - ) - filtered_object_versions = self._filter_deletable_objects(object_versions) - insertables = self._make_soft_delete_insertables(filtered_object_versions) - return insertables - def obj_delete(self, req: tsi.ObjDeleteReq) -> tsi.ObjDeleteRes: """ Delete object versions by digest, belonging to given object_id. @@ -779,12 +730,36 @@ def obj_delete(self, req: tsi.ObjDeleteReq) -> tsi.ObjDeleteRes: Otherwise, all specified versions are soft deleted. """ if req.digests: - insertables = self._make_obj_versions_delete_insertables( - req.project_id, req.object_id, req.digests + object_versions = self._select_objs_query( + req.project_id, + object_id_conditions=["object_id = {object_id: String}"], + conditions=["digest IN {digests: Array(String)}"], + parameters={"object_id": req.object_id, "digests": req.digests}, + metadata_only=False, ) + insertables = self._make_soft_delete_insertables(object_versions) + # TODO: push down to CH to lift this limit + MAX_OBJECTS_TO_DELETE = 100 + if len(insertables) > MAX_OBJECTS_TO_DELETE: + raise ValueError( + f"Object delete request contains {len(insertables)} objects. Please delete fewer than {MAX_OBJECTS_TO_DELETE} objects at a time." + ) else: - insertables = self._make_obj_delete_insertables( - req.project_id, req.object_id + latest_obj_version = self._obj_read(req.project_id, req.object_id, "latest") + other_obj_versions = self._select_objs_query( + req.project_id, + conditions=["is_latest = 0"], + object_id_conditions=["object_id = {object_id: String}"], + parameters={"object_id": req.object_id}, + # only get metadata, we are going to hard delete + metadata_only=True, + ) + all_versions = [latest_obj_version] + other_obj_versions + insertables = self._make_soft_delete_insertables(all_versions) + + if len(insertables) == 0: + raise NotFoundError( + f"Object {req.object_id} ({req.digests}) not found when deleting." ) data = [list(obj.model_dump().values()) for obj in insertables] @@ -1894,10 +1869,6 @@ def _select_objs_query( for row in query_result: (object_id, digest, val_dump) = row object_values[(object_id, digest)] = val_dump - - # update the val_dump for each object - for obj in result: - obj.val_dump = object_values.get((obj.object_id, obj.digest), "{}") return result def _run_migrations(self) -> None: