Skip to content

Commit

Permalink
chore(weave): permanently delete everything in a project
Browse files Browse the repository at this point in the history
  • Loading branch information
gtarpenning committed Dec 7, 2024
1 parent b376699 commit db8b2a3
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 0 deletions.
189 changes: 189 additions & 0 deletions tests/trace/test_permanently_delete_project.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import pytest
import weave
from weave.trace_server import trace_server_interface as tsi


@pytest.fixture
def project_id():
return "test-project-123"


@pytest.fixture
def setup_test_data(project_id):
client = weave.init(project_name=project_id)

@weave.op()
def create_test_data(pid: str):
return "hello " + pid

create_test_data(project_id)
create_test_data(project_id)
create_test_data(project_id)
create_test_data(project_id)

obj_dataset = weave.Dataset(
name=f"{project_id}/test-obj", rows=[{"id": "test-obj"}]
)
weave.publish(obj_dataset)

call1 = create_test_data.calls()[0]
call1.feedback.add_reaction("👍")

# Create test cost
cost_req = tsi.CostCreateReq(
project_id=project_id,
wb_user_id="test-user",
costs={
"gpt-4": tsi.CostSchema(
prompt_token_cost=0.01,
completion_token_cost=0.02,
prompt_token_cost_unit="USD/1K tokens",
completion_token_cost_unit="USD/1K tokens",
)
},
)
client.server.cost_create(cost_req)


def test_permanently_delete_project_deletes_all_data(project_id, setup_test_data):
client = weave.init(project_name=project_id)
# Verify data exists before deletion
assert (
len(
list(
client.server.calls_query_stream(
tsi.CallsQueryReq(project_id=project_id)
)
)
)
== 4
)

assert (
len(client.server.objs_query(tsi.ObjQueryReq(project_id=project_id)).objs) == 2
)

assert (
len(
client.server.table_query(
tsi.TableQueryReq(project_id=project_id, digest="latest")
).rows
)
== 1
)

feedback_query = tsi.FeedbackQueryReq(
project_id=project_id, fields=["id", "feedback_type"]
)
assert len(client.server.feedback_query(feedback_query).result) == 1

cost_query = tsi.CostQueryReq(project_id=project_id, fields=["id", "llm_id"])
assert len(client.server.cost_query(cost_query).results) == 1

# Execute permanent deletion
client.server.permanently_delete_project(
tsi.PermanentlyDeleteProjectReq(project_id=project_id)
)

# Verify all data is deleted
assert (
len(
list(
client.server.calls_query_stream(
tsi.CallsQueryReq(project_id=project_id)
)
)
)
== 0
)

assert (
len(client.server.objs_query(tsi.ObjQueryReq(project_id=project_id)).objs) == 0
)

assert (
len(
client.server.table_query(
tsi.TableQueryReq(project_id=project_id, digest="latest")
).rows
)
== 0
)

assert len(client.server.feedback_query(feedback_query).result) == 0


def test_permanently_delete_project_with_nonexistent_project():
client = weave.init("exists")
# Should not raise an error when deleting non-existent project
nonexistent_project_id = "nonexistent"
client.server.permanently_delete_project(
tsi.PermanentlyDeleteProjectReq(project_id=nonexistent_project_id)
)


def test_permanently_delete_project_does_not_affect_other_projects(
project_id, setup_test_data
):
client = weave.init("other-project")
# Create another project with data
other_project_id = "other-project"
other_call_start = tsi.StartedCallSchemaForInsert(
project_id=other_project_id,
op_name="test_op",
inputs={},
attributes={},
started_at=None,
wb_user_id="test-user",
)
client.server.call_start(tsi.CallStartReq(start=other_call_start))

# Delete first project
client.server.permanently_delete_project(
tsi.PermanentlyDeleteProjectReq(project_id=project_id)
)

# Verify other project's data still exists
other_project_calls = list(
client.server.calls_query_stream(tsi.CallsQueryReq(project_id=other_project_id))
)
assert len(other_project_calls) > 0


def test_permanently_delete_project_idempotency(project_id, setup_test_data):
client = weave.init(project_name=project_id)
# Delete project twice
client.server.permanently_delete_project(
tsi.PermanentlyDeleteProjectReq(project_id=project_id)
)
client.server.permanently_delete_project(
tsi.PermanentlyDeleteProjectReq(project_id=project_id)
)

# Verify data remains deleted
assert (
len(
list(
client.server.calls_query_stream(
tsi.CallsQueryReq(project_id=project_id)
)
)
)
== 0
)


@pytest.mark.parametrize(
"invalid_project_id",
[
"", # Empty string
None, # None
" ", # Whitespace
],
)
def test_permanently_delete_project_with_invalid_project_id(invalid_project_id):
client = weave.init(project_name="test-project-123")
with pytest.raises(Exception):
client.server.permanently_delete_project(
tsi.PermanentlyDeleteProjectReq(project_id=invalid_project_id)
)
31 changes: 31 additions & 0 deletions weave/trace_server/clickhouse_trace_server_batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,37 @@ def completions_create(
response=res.response, weave_call_id=start_call.id
)

def _make_purge_query(self, project_id: str, table_name: str) -> str:
return f"DELETE FROM {table_name} WHERE project_id = '{project_id}'"

def permanently_delete_project(
self, req: tsi.PermanentlyDeleteProjectReq
) -> tsi.PermanentlyDeleteProjectRes:
"""Purge all data in a project. Irreversible.
1. Delete all call data
2. Delete all object data
3. Delete all table/table_row data
4. Delete all file data
5. Delete all feedback data
6. Delete all cost data (?)
"""
tables_to_purge = [
"call_parts",
"object_versions",
"tables",
"table_rows",
"files",
"feedback",
"cost",
]

for table in tables_to_purge:
query = self._make_purge_query(req.project_id, table)
self.ch_client.query(query)

return tsi.PermanentlyDeleteProjectRes()

# Private Methods
@property
def ch_client(self) -> CHClient:
Expand Down
22 changes: 22 additions & 0 deletions weave/trace_server/sqlite_trace_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,28 @@ def completions_create(
print("COMPLETIONS CREATE is not implemented for local sqlite", req)
return tsi.CompletionsCreateRes()

def permanently_delete_project(
self, req: tsi.PermanentlyDeleteProjectReq
) -> tsi.PermanentlyDeleteProjectRes:
conn, cursor = get_conn_cursor(self.db_path)
with self.lock:
tables_to_purge = [
"call_parts",
"object_versions",
"tables",
"table_rows",
"files",
"feedback",
"cost",
]
conn.execute("BEGIN TRANSACTION")
for table in tables_to_purge:
cursor.execute(
f"DELETE FROM {table} WHERE project_id = ?", (req.project_id,)
)
conn.commit()
return tsi.PermanentlyDeleteProjectRes()

def _table_row_read(self, project_id: str, row_digest: str) -> tsi.TableRowSchema:
conn, cursor = get_conn_cursor(self.db_path)
# Now get the rows
Expand Down
13 changes: 13 additions & 0 deletions weave/trace_server/trace_server_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,14 @@ class ActionsExecuteBatchRes(BaseModel):
pass


class PermanentlyDeleteProjectReq(BaseModel):
project_id: str


class PermanentlyDeleteProjectRes(BaseModel):
pass


class TraceServerInterface(Protocol):
def ensure_project_exists(
self, entity: str, project: str
Expand Down Expand Up @@ -917,3 +925,8 @@ def actions_execute_batch(

# Execute LLM API
def completions_create(self, req: CompletionsCreateReq) -> CompletionsCreateRes: ...

# Delete All Project Data
def permanently_delete_project(
self, req: PermanentlyDeleteProjectReq
) -> PermanentlyDeleteProjectRes: ...
11 changes: 11 additions & 0 deletions weave/trace_server_bindings/remote_http_trace_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,17 @@ def completions_create(
tsi.CompletionsCreateRes,
)

### Delete All Project Data ###
def permanently_delete_project(
self, req: tsi.PermanentlyDeleteProjectReq
) -> tsi.PermanentlyDeleteProjectRes:
return self._generic_request(
"/project/permanently_delete",
req,
tsi.PermanentlyDeleteProjectReq,
tsi.PermanentlyDeleteProjectRes,
)


__docspec__ = [
RemoteHTTPTraceServer,
Expand Down

0 comments on commit db8b2a3

Please sign in to comment.