-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
docs: Add documentation about parallel execution and some performance (…
…#521) * docs: Add documentation about parallel execution and some performance * refactor: renaming and cleanup
- Loading branch information
1 parent
a15cc8d
commit 9341d93
Showing
2 changed files
with
221 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"id": "d8767b2a", | ||
"metadata": {}, | ||
"source": [ | ||
"# How to get more done in less time\n", | ||
"The following notebook contains tips for the following problems:\n", | ||
" - A single task that takes very long to complete\n", | ||
" - Running one task multiple times\n", | ||
" - Running several different tasks at the same time" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"id": "e04cb25b", | ||
"metadata": {}, | ||
"source": [ | ||
"## A single long running task\n", | ||
"With a single long running task, consider the following:\n", | ||
" - If there are other calculations to do, consider using `ThreadPool.submit`, together with `result`\n", | ||
" - See [here](#submit_example) for an example\n", | ||
" - If this is not the case consider to:\n", | ||
" - Choose a faster model. The `base` model is faster than `extended`, `extended` is faster than `supreme`\n", | ||
" - Choose tasks that perform fewer LLM operations. E.g.: `MultiChunkQa` usually takes longer than `SingleChunkQa`" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"id": "e7fbae35", | ||
"metadata": {}, | ||
"source": [ | ||
"## Running one task multiple times\n", | ||
"When a single task should process multiple inputs, one can use `task.run_concurrently` to easily process the inputs at the same time \n", | ||
"\n", | ||
"**Example:**" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "04dac517", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from intelligence_layer.core.task import Task\n", | ||
"from intelligence_layer.core.tracer import TaskSpan, NoOpTracer\n", | ||
"import time\n", | ||
"from typing import Any\n", | ||
"\n", | ||
"class DummyTask(Task):\n", | ||
" def do_run(self, input: Any, task_span: TaskSpan) -> Any:\n", | ||
" time.sleep(2)\n", | ||
" print(\"Task1 complete\")\n", | ||
" return input\n", | ||
"\n", | ||
"tracer = NoOpTracer()\n", | ||
"\n", | ||
"task_input = [\"A\", \"B\", \"C\", \"D\"]\n", | ||
"task = DummyTask()\n", | ||
"\n", | ||
"\n", | ||
"result = task.run_concurrently(\n", | ||
" task_input, tracer\n", | ||
") # this finishes in 2 seconds instead of 8\n", | ||
"result" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"id": "f58f359a", | ||
"metadata": {}, | ||
"source": [ | ||
"## Running several tasks at the same time\n", | ||
"When having to run multiple distinct tasks at the same time, one can leverage the existing `concurrent.futures` python library.\n", | ||
"The following shows some examples on how this can be done" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "8959fcec-dc54-4137-9cb8-3a9c70d6a3d0", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# Second long running task\n", | ||
"class DummyTask2(Task):\n", | ||
" def do_run(self, input: Any, task_span: TaskSpan) -> Any:\n", | ||
" time.sleep(2)\n", | ||
" print(\"Task2 complete\")\n", | ||
" return input\n", | ||
"\n", | ||
"\n", | ||
"# initialize all tasks and inputs\n", | ||
"task_1 = DummyTask()\n", | ||
"task_2 = DummyTask2()\n", | ||
"\n", | ||
"task_input_1 = list(range(10))\n", | ||
"task_input_2 = list(range(10, 20))" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"id": "4e846c9c", | ||
"metadata": {}, | ||
"source": [ | ||
"<a id='submit_example'></a>\n", | ||
"The following shows how single tasks can be submitted to a ThreadPool. \n", | ||
"This is especially useful when there are other things to do while running tasks." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "6c88c3a2", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from concurrent.futures import ThreadPoolExecutor\n", | ||
"\n", | ||
"with ThreadPoolExecutor(max_workers=2) as executor:\n", | ||
" task_1_result = executor.submit(task_1.run_concurrently, task_input_1, tracer)\n", | ||
" task_2_result = executor.submit(task_2.run_concurrently, task_input_2, tracer)\n", | ||
" # ...other important code here\n", | ||
" print(\"Task 1 result:\", task_1_result.result())\n", | ||
" print(\"Task 2 result:\", task_2_result.result())" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"id": "345244a1", | ||
"metadata": {}, | ||
"source": [ | ||
"`ThreadPool` can easily be used via the function `.map`. This processes a list of jobs in order and outputs the results once all jobs are done. \n", | ||
"This is especially useful if there are many diverse jobs that take a varying amount of time. \n", | ||
"However, since `map` only takes a single parameter, the input has to be bundled into a list of tuples beforehand." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "6b71469e", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from itertools import repeat\n", | ||
"\n", | ||
"jobs = list(zip(repeat(task_1), task_input_1)) + list(zip(repeat(task_2), task_input_2))\n", | ||
"\n", | ||
"with ThreadPoolExecutor(max_workers=20) as executor:\n", | ||
" result = list(\n", | ||
" executor.map(lambda job: job[0].run(job[1], tracer), jobs)\n", | ||
" )\n", | ||
" print(\"Task 1 result:\", result[: len(task_input_1)])\n", | ||
" print(\"Task 2 result:\", result[len(task_input_1) :])" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"id": "a786e543", | ||
"metadata": {}, | ||
"source": [ | ||
"`ThreadPool.map` can also be used with `Task.run_concurrently()` in which case the creation of the jobs becomes slightly easier." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"id": "de3fe114", | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"with ThreadPoolExecutor(max_workers=2) as executor:\n", | ||
" results = list(\n", | ||
" executor.map(\n", | ||
" lambda job: job[0].run_concurrently(job[1], tracer),\n", | ||
" [(task_1, task_input_1), (task_2, task_input_2)],\n", | ||
" )\n", | ||
" )\n", | ||
" print(\"Task 1 result:\", result[: len(task_input_1)])\n", | ||
" print(\"Task 2 result:\", result[len(task_input_1) :])" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"id": "4e775da7", | ||
"metadata": {}, | ||
"source": [ | ||
"<div class=\"alert alert-warning\">\n", | ||
"Note\n", | ||
"</div>\n", | ||
"\n", | ||
"If tasks are CPU bound, the abovementioned code will not help. In that case, replace the `ThreadPoolExecutor` with a `ProcessPoolExecutor`." | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": "Python 3 (ipykernel)", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.10.12" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 5 | ||
} |