Skip to content

Commit

Permalink
Streamline run_pipeline_with_config call pattern (#10)
Browse files Browse the repository at this point in the history
streamline invocation pattern necessary for 'run_pipeline_with_config'
  • Loading branch information
darthtrevino authored Apr 2, 2024
1 parent 1501cc6 commit a658e59
Show file tree
Hide file tree
Showing 15 changed files with 17 additions and 18 deletions.
2 changes: 1 addition & 1 deletion python/graphrag/examples/custom_input/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def run():

# Grab the last result from the pipeline, should be our entity extraction
outputs = []
async for output in await run_pipeline_with_config(
async for output in run_pipeline_with_config(
config_or_path=config, dataset=dataset
):
outputs.append(output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def run_with_config():
)

outputs = []
async for output in await run_pipeline_with_config(
async for output in run_pipeline_with_config(
config_or_path=config_path, dataset=dataset
):
outputs.append(output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def run_with_config():

# Grab the last result from the pipeline, should be our entity extraction
tables = []
async for table in await run_pipeline_with_config(
async for table in run_pipeline_with_config(
config_or_path=config_path,
dataset=dataset,
additional_workflows=custom_workflows,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async def run_with_config():

# Grab the last result from the pipeline, should be our entity extraction
tables = []
async for table in await run_pipeline_with_config(
async for table in run_pipeline_with_config(
config_or_path=config_path, dataset=dataset
):
tables.append(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def run_with_config():

# Grab the last result from the pipeline, should be our entity extraction
tables = []
async for table in await run_pipeline_with_config(
async for table in run_pipeline_with_config(
config_or_path=config_path, dataset=dataset
):
tables.append(table)
Expand Down
2 changes: 1 addition & 1 deletion python/graphrag/examples/interdependent_workflows/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def run_with_config():
)

tables = []
async for table in await run_pipeline_with_config(
async for table in run_pipeline_with_config(
config_or_path=config_path, dataset=dataset
):
tables.append(table)
Expand Down
2 changes: 1 addition & 1 deletion python/graphrag/examples/multiple_workflows/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def run_with_config():
os.path.dirname(os.path.abspath(__file__)), "./pipeline.yml"
)

async for result in await run_pipeline_with_config(pipeline_path, dataset=dataset):
async for result in run_pipeline_with_config(pipeline_path, dataset=dataset):
print(f"Workflow {result.workflow} result\n: ")
print(result.result)

Expand Down
2 changes: 1 addition & 1 deletion python/graphrag/examples/single_verb/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async def run_with_config():
)

tables = []
async for table in await run_pipeline_with_config(
async for table in run_pipeline_with_config(
config_or_path=config_path, dataset=dataset
):
tables.append(table)
Expand Down
2 changes: 1 addition & 1 deletion python/graphrag/examples/use_built_in_workflows/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def run_with_config():

# Grab the last result from the pipeline, should be our entity extraction
tables = []
async for table in await run_pipeline_with_config(
async for table in run_pipeline_with_config(
config_or_path=config_path, dataset=dataset
):
tables.append(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def main():
# run the pipeline with the config, and override the dataset with the one we just created
# and grab the last result from the pipeline, should be the last workflow that was run (our nodes)
tables = []
async for table in await run_pipeline_with_config(pipeline_path):
async for table in run_pipeline_with_config(pipeline_path):
tables.append(table)
pipeline_result = tables[-1]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async def main():
# run the pipeline with the config, and override the dataset with the one we just created
# and grab the last result from the pipeline, should be the last workflow that was run (our nodes)
pipeline_result = []
async for result in await run_pipeline_with_config(
async for result in run_pipeline_with_config(
pipeline_path,
storage=custom_storage,
callbacks=custom_reporter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def main():
os.path.dirname(os.path.abspath(__file__)), "./pipelines/workflows_only.yml"
)
tables = []
async for table in await run_pipeline_with_config(pipeline_path, dataset=dataset):
async for table in run_pipeline_with_config(pipeline_path, dataset=dataset):
tables.append(table)
pipeline_result = tables[-1]

Expand Down
2 changes: 1 addition & 1 deletion python/graphrag/graphrag/index/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def handle_signal(signum, _):

async def execute():
nonlocal encountered_errors
async for output in await run_pipeline_with_config(
async for output in run_pipeline_with_config(
pipeline_config,
debug=verbose,
resume=resume, # type: ignore
Expand Down
5 changes: 3 additions & 2 deletions python/graphrag/graphrag/index/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def _create_postprocess_steps(
msg = "No dataset provided!"
raise ValueError(msg)

return run_pipeline(
async for table in run_pipeline(
workflows=workflows,
dataset=dataset,
storage=storage,
Expand All @@ -158,7 +158,8 @@ def _create_postprocess_steps(
additional_workflows=additional_workflows,
progress_reporter=progress_reporter,
emit=emit,
)
):
yield table


async def run_pipeline(
Expand Down
4 changes: 1 addition & 3 deletions python/graphrag/tests/integration/_pipeline/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ async def test_megapipeline(self):
os.path.dirname(os.path.abspath(__file__)),
"./megapipeline.yml",
)
pipeline_result = [
gen async for gen in await run_pipeline_with_config(pipeline_path)
]
pipeline_result = [gen async for gen in run_pipeline_with_config(pipeline_path)]

errors = []
for result in pipeline_result:
Expand Down

0 comments on commit a658e59

Please sign in to comment.