Skip to content

Commit

Permalink
Empty workflow returns (#1291)
Browse files Browse the repository at this point in the history
* Skip emitting empty dataframes

* Semver

* Better empty df check
  • Loading branch information
natoverse authored Oct 17, 2024
1 parent fc502ee commit 1f70d42
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 19 deletions.
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20241017015645367096.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Allow empty workflow returns to avoid disk writing."
}
2 changes: 1 addition & 1 deletion graphrag/index/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dataclasses import field

from .cache import PipelineCache
from .storage.typing import PipelineStorage
from .storage.pipeline_storage import PipelineStorage


@dc_dataclass
Expand Down
2 changes: 1 addition & 1 deletion graphrag/index/run/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datashaper import MemoryProfile, Workflow, WorkflowRunResult

from graphrag.index.context import PipelineRunStats
from graphrag.index.storage.typing import PipelineStorage
from graphrag.index.storage.pipeline_storage import PipelineStorage

log = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion graphrag/index/run/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from graphrag.index.context import PipelineRunContext, PipelineRunStats
from graphrag.index.input import load_input
from graphrag.index.storage.memory_pipeline_storage import MemoryPipelineStorage
from graphrag.index.storage.typing import PipelineStorage
from graphrag.index.storage.pipeline_storage import PipelineStorage
from graphrag.logging import ProgressReporter

log = logging.getLogger(__name__)
Expand Down
10 changes: 7 additions & 3 deletions graphrag/index/run/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from graphrag.index.context import PipelineRunContext
from graphrag.index.emit.table_emitter import TableEmitter
from graphrag.index.run.profiling import _write_workflow_stats
from graphrag.index.storage.typing import PipelineStorage
from graphrag.index.storage.pipeline_storage import PipelineStorage
from graphrag.index.typing import PipelineRunResult
from graphrag.logging import ProgressReporter
from graphrag.utils.storage import _load_table_from_storage
Expand Down Expand Up @@ -48,8 +48,12 @@ async def _emit_workflow_output(
) -> pd.DataFrame:
"""Emit the workflow output."""
output = cast(pd.DataFrame, workflow.output())
for emitter in emitters:
await emitter.emit(workflow.name, output)
# only write the final output if it has content
# this is expressly designed to allow us to create
# workflows with side effects that don't produce a formal output to save
if not output.empty:
for emitter in emitters:
await emitter.emit(workflow.name, output)
return output


Expand Down
2 changes: 1 addition & 1 deletion graphrag/index/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .file_pipeline_storage import FilePipelineStorage
from .load_storage import load_storage
from .memory_pipeline_storage import MemoryPipelineStorage
from .typing import PipelineStorage
from .pipeline_storage import PipelineStorage

__all__ = [
"BlobPipelineStorage",
Expand Down
2 changes: 1 addition & 1 deletion graphrag/index/storage/blob_pipeline_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from graphrag.logging import ProgressReporter

from .typing import PipelineStorage
from .pipeline_storage import PipelineStorage

log = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion graphrag/index/storage/file_pipeline_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from graphrag.logging import ProgressReporter

from .typing import PipelineStorage
from .pipeline_storage import PipelineStorage

log = logging.getLogger(__name__)

Expand Down
6 changes: 2 additions & 4 deletions graphrag/index/storage/memory_pipeline_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any

from .file_pipeline_storage import FilePipelineStorage
from .typing import PipelineStorage
from .pipeline_storage import PipelineStorage


class MemoryPipelineStorage(FilePipelineStorage):
Expand Down Expand Up @@ -34,9 +34,7 @@ async def get(
"""
return self._storage.get(key) or await super().get(key, as_bytes, encoding)

async def set(
self, key: str, value: str | bytes | None, encoding: str | None = None
) -> None:
async def set(self, key: str, value: Any, encoding: str | None = None) -> None:
"""Set the value for the given key.
Args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ async def get(
"""

@abstractmethod
async def set(
self, key: str, value: str | bytes | None, encoding: str | None = None
) -> None:
async def set(self, key: str, value: Any, encoding: str | None = None) -> None:
"""Set the value for the given key.
Args:
Expand Down
2 changes: 1 addition & 1 deletion graphrag/index/update/dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import numpy as np
import pandas as pd

from graphrag.index.storage.typing import PipelineStorage
from graphrag.index.storage.pipeline_storage import PipelineStorage
from graphrag.utils.storage import _load_table_from_storage

mergeable_outputs = [
Expand Down
2 changes: 1 addition & 1 deletion graphrag/utils/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
PipelineStorageConfigTypes,
)
from graphrag.index.storage import load_storage
from graphrag.index.storage.typing import PipelineStorage
from graphrag.index.storage.pipeline_storage import PipelineStorage

log = logging.getLogger(__name__)

Expand Down
124 changes: 124 additions & 0 deletions tests/unit/indexing/workflows/test_emit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

from typing import Any, cast

import pandas as pd
from datashaper import (
Table,
VerbInput,
VerbResult,
create_verb_result,
)

from graphrag.index.config import PipelineWorkflowReference
from graphrag.index.run import run_pipeline
from graphrag.index.storage import MemoryPipelineStorage, PipelineStorage


async def mock_verb(
input: VerbInput, storage: PipelineStorage, **_kwargs
) -> VerbResult:
source = cast(pd.DataFrame, input.get_input())

output = source[["id"]]

await storage.set("mock_write", source[["id"]])

return create_verb_result(
cast(
Table,
output,
)
)


async def mock_no_return_verb(
input: VerbInput, storage: PipelineStorage, **_kwargs
) -> VerbResult:
source = cast(pd.DataFrame, input.get_input())

# write some outputs to storage independent of the return
await storage.set("empty_write", source[["name"]])

return create_verb_result(
cast(
Table,
pd.DataFrame(),
)
)


async def test_normal_result_emits_parquet():
mock_verbs: Any = {"mock_verb": mock_verb}
mock_workflows: Any = {
"mock_workflow": lambda _x: [
{
"verb": "mock_verb",
"args": {
"column": "test",
},
}
]
}
workflows = [
PipelineWorkflowReference(
name="mock_workflow",
config=None,
)
]
dataset = pd.DataFrame({"id": [1, 2, 3], "name": ["a", "b", "c"]})
storage = MemoryPipelineStorage()
pipeline_result = [
gen
async for gen in run_pipeline(
workflows,
dataset,
storage=storage,
additional_workflows=mock_workflows,
additional_verbs=mock_verbs,
)
]

assert len(pipeline_result) == 1
assert (
storage.keys() == ["stats.json", "mock_write", "mock_workflow.parquet"]
), "Mock workflow output should be written to storage by the emitter when there is a non-empty data frame"


async def test_empty_result_does_not_emit_parquet():
mock_verbs: Any = {"mock_no_return_verb": mock_no_return_verb}
mock_workflows: Any = {
"mock_workflow": lambda _x: [
{
"verb": "mock_no_return_verb",
"args": {
"column": "test",
},
}
]
}
workflows = [
PipelineWorkflowReference(
name="mock_workflow",
config=None,
)
]
dataset = pd.DataFrame({"id": [1, 2, 3], "name": ["a", "b", "c"]})
storage = MemoryPipelineStorage()
pipeline_result = [
gen
async for gen in run_pipeline(
workflows,
dataset,
storage=storage,
additional_workflows=mock_workflows,
additional_verbs=mock_verbs,
)
]

assert len(pipeline_result) == 1
assert storage.keys() == [
"stats.json",
"empty_write",
], "Mock workflow output should not be written to storage by the emitter"
2 changes: 1 addition & 1 deletion tests/verbs/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
create_pipeline_config,
)
from graphrag.index.run.utils import _create_run_context
from graphrag.index.storage.typing import PipelineStorage
from graphrag.index.storage.pipeline_storage import PipelineStorage

pd.set_option("display.max_columns", None)

Expand Down

0 comments on commit 1f70d42

Please sign in to comment.