Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update index API + a notebook that provides a general API overview #1454

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20241129063524095980.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "update API and add a demonstration notebook"
}
209 changes: 209 additions & 0 deletions docs/examples_notebooks/api-overview.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Copyright (c) 2024 Microsoft Corporation.\n",
"# Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## API Overview\n",
"\n",
"This notebook provides a demonstration of how to interact with graphrag as a library using the API as opposed to the CLI. Note that graphrag's CLI actually connects to the library through this API for all operations. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import graphrag.api as api\n",
"from graphrag.index.typing import PipelineRunResult"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prerequisite\n",
"As a prerequisite to all API operations, a `GraphRagConfig` object is required. It is the primary means to control the behavior of graphrag and can be instantiated from a `settings.yaml` configuration file.\n",
"\n",
"Please refer to the [CLI docs](https://microsoft.github.io/graphrag/cli/#init) for more detailed information on how to generate the `settings.yaml` file.\n",
"\n",
"#### Load `settings.yaml` configuration"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import yaml\n",
"\n",
"settings = yaml.safe_load(open(\"<project_directory>/settings.yaml\")) # noqa: PTH123, SIM115"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"At this point, you can modify the imported settings to align with your application's requirements. For example, if building a UI application, the application might need to change the input and/or storage destinations dynamically in order to enable users to build and query different indexes."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate a `GraphRagConfig` object"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from graphrag.config.create_graphrag_config import create_graphrag_config\n",
"\n",
"graphrag_config = create_graphrag_config(\n",
" values=settings, root_dir=\"<project_directory>\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Indexing API\n",
"\n",
"*Indexing* is the process of ingesting raw text data and constructing a knowledge graph. GraphRAG currently supports plaintext (`.txt`) and `.csv` file formats."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Build an index"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"index_result: list[PipelineRunResult] = await api.build_index(config=graphrag_config)\n",
"\n",
"# index_result is a list of workflows that make up the indexing pipeline that was run\n",
"for workflow_result in index_result:\n",
" status = f\"error\\n{workflow_result.errors}\" if workflow_result.errors else \"success\"\n",
" print(f\"Workflow Name: {workflow_result.workflow}\\tStatus: {status}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Query an index\n",
"\n",
"To query an index, several index files must first be read into memory and passed to the query API. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"final_nodes = pd.read_parquet(\"<project_directory>/output/create_final_nodes.parquet\")\n",
"final_entities = pd.read_parquet(\n",
" \"<project_directory>/output/create_final_entities.parquet\"\n",
")\n",
"final_communities = pd.read_parquet(\n",
" \"<project_directory>/output/create_final_communities.parquet\"\n",
")\n",
"final_community_reports = pd.read_parquet(\n",
" \"<project_directory>/output/create_final_community_reports.parquet\"\n",
")\n",
"\n",
"response, context = await api.global_search(\n",
" config=graphrag_config,\n",
" nodes=final_nodes,\n",
" entities=final_entities,\n",
" communities=final_communities,\n",
" community_reports=final_community_reports,\n",
" community_level=2,\n",
" dynamic_community_selection=False,\n",
" response_type=\"Multiple Paragraphs\",\n",
" query=\"Who is Scrooge and what are his main relationships?\",\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The response object is the official reponse from graphrag while the context object holds various metadata regarding the querying process used to obtain the final response."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(response)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Digging into the context a bit more provides users with extremely granular information such as what sources of data (down to the level of text chunks) were ultimately retrieved and used as part of the context sent to the LLM model)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pprint import pprint\n",
"\n",
"pprint(context) # noqa: T203"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "graphrag-venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.15"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
13 changes: 11 additions & 2 deletions graphrag/api/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

from pathlib import Path

from datashaper import WorkflowCallbacks

from graphrag.cache.noop_pipeline_cache import NoopPipelineCache
from graphrag.callbacks.factory import create_pipeline_reporter
from graphrag.config.enums import CacheType
from graphrag.config.models.graph_rag_config import GraphRagConfig
from graphrag.index.create_pipeline_config import create_pipeline_config
Expand All @@ -25,6 +28,7 @@ async def build_index(
run_id: str = "",
is_resume_run: bool = False,
memory_profile: bool = False,
callbacks: list[WorkflowCallbacks] | None = None,
progress_reporter: ProgressReporter | None = None,
) -> list[PipelineRunResult]:
"""Run the pipeline with the given configuration.
Expand All @@ -37,10 +41,10 @@ async def build_index(
The run id. Creates a output directory with this name.
is_resume_run : bool default=False
Whether to resume a previous index run.
is_update_run : bool default=False
Whether to update a previous index run.
memory_profile : bool
Whether to enable memory profiling.
callbacks : list[WorkflowCallbacks] | None default=None
A list of callbacks to register.
progress_reporter : ProgressReporter | None default=None
The progress reporter.

Expand All @@ -61,12 +65,17 @@ async def build_index(
pipeline_cache = (
NoopPipelineCache() if config.cache.type == CacheType.none is None else None
)
# TODO: remove the type ignore once the new config engine has been refactored
callbacks = (
[create_pipeline_reporter(config.reporting, None)] if config.reporting else None # type: ignore
) # type: ignore
outputs: list[PipelineRunResult] = []
async for output in run_pipeline_with_config(
pipeline_config,
run_id=run_id,
memory_profile=memory_profile,
cache=pipeline_cache,
callbacks=callbacks,
progress_reporter=progress_reporter,
is_resume_run=is_resume_run,
is_update_run=is_update_run,
Expand Down
22 changes: 8 additions & 14 deletions graphrag/index/run/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from graphrag.cache.factory import create_cache
from graphrag.cache.pipeline_cache import PipelineCache
from graphrag.callbacks.console_workflow_callbacks import ConsoleWorkflowCallbacks
from graphrag.callbacks.factory import create_pipeline_reporter
from graphrag.index.config.cache import PipelineMemoryCacheConfig
from graphrag.index.config.pipeline import (
PipelineConfig,
Expand Down Expand Up @@ -67,7 +66,7 @@ async def run_pipeline_with_config(
storage: PipelineStorage | None = None,
update_index_storage: PipelineStorage | None = None,
cache: PipelineCache | None = None,
callbacks: WorkflowCallbacks | None = None,
callbacks: list[WorkflowCallbacks] | None = None,
progress_reporter: ProgressReporter | None = None,
input_post_process_steps: list[PipelineWorkflowStep] | None = None,
additional_verbs: VerbDefinitions | None = None,
Expand Down Expand Up @@ -107,18 +106,14 @@ async def run_pipeline_with_config(
storage = storage = create_storage(config.storage) # type: ignore

if is_update_run:
# TODO: remove the default choice (PipelineFileStorageConfig) once the new config system enforces a correct update-index-storage config when used.
update_index_storage = update_index_storage or create_storage(
config.update_index_storage
or PipelineFileStorageConfig(base_dir=str(Path(root_dir) / "output"))
)

# TODO: remove the default choice (PipelineMemoryCacheConfig) when the new config system guarantees the existence of a cache config
cache = cache or create_cache(config.cache or PipelineMemoryCacheConfig(), root_dir)
callbacks = (
create_pipeline_reporter(config.reporting, root_dir)
if config.reporting
else None
)
# TODO: remove the type ignore when the new config system guarantees the existence of an input config
dataset = (
dataset
Expand Down Expand Up @@ -195,7 +190,7 @@ async def run_pipeline(
dataset: pd.DataFrame,
storage: PipelineStorage | None = None,
cache: PipelineCache | None = None,
callbacks: WorkflowCallbacks | None = None,
callbacks: list[WorkflowCallbacks] | None = None,
progress_reporter: ProgressReporter | None = None,
input_post_process_steps: list[PipelineWorkflowStep] | None = None,
additional_verbs: VerbDefinitions | None = None,
Expand Down Expand Up @@ -226,13 +221,12 @@ async def run_pipeline(
start_time = time.time()

progress_reporter = progress_reporter or NullProgressReporter()
callbacks = callbacks or ConsoleWorkflowCallbacks()
callbacks = _create_callback_chain(callbacks, progress_reporter)

callbacks = callbacks or [ConsoleWorkflowCallbacks()]
callback_chain = _create_callback_chain(callbacks, progress_reporter)
context = create_run_context(storage=storage, cache=cache, stats=None)
exporter = ParquetExporter(
context.storage,
lambda e, s, d: cast(WorkflowCallbacks, callbacks).on_error(
lambda e, s, d: cast(WorkflowCallbacks, callback_chain).on_error(
"Error exporting table", e, s, d
),
)
Expand All @@ -246,7 +240,7 @@ async def run_pipeline(
workflows_to_run = loaded_workflows.workflows
workflow_dependencies = loaded_workflows.dependencies
dataset = await _run_post_process_steps(
input_post_process_steps, dataset, context, callbacks
input_post_process_steps, dataset, context, callback_chain
)

# ensure the incoming data is valid
Expand All @@ -266,7 +260,7 @@ async def run_pipeline(
result = await _process_workflow(
workflow_to_run.workflow,
context,
callbacks,
callback_chain,
exporter,
workflow_dependencies,
dataset,
Expand Down
8 changes: 4 additions & 4 deletions graphrag/index/run/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ async def _export_workflow_output(


def _create_callback_chain(
callbacks: WorkflowCallbacks | None, progress: ProgressReporter | None
callbacks: list[WorkflowCallbacks] | None, progress: ProgressReporter | None
) -> WorkflowCallbacks:
"""Create a callbacks manager."""
"""Create a callback manager that encompasses multiple callbacks."""
manager = WorkflowCallbacksManager()
if callbacks is not None:
manager.register(callbacks)
for callback in callbacks or []:
manager.register(callback)
if progress is not None:
manager.register(ProgressWorkflowCallbacks(progress))
return manager
Expand Down
2 changes: 1 addition & 1 deletion graphrag/storage/memory_pipeline_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class MemoryPipelineStorage(FilePipelineStorage):

def __init__(self):
"""Init method definition."""
super().__init__(root_dir=".output")
super().__init__()
self._storage = {}

async def get(
Expand Down
Loading