Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#285: Implement shared object storage for MPI backend #286

Merged
merged 46 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b398b0c
FEAT-#308: Add monitor to each host in the cluster
Retribution98 Jun 16, 2023
baa8830
Fix for MPI before than 3.0
Jun 20, 2023
251e6c8
Add docs strings
Jun 21, 2023
603f75a
Fix spawn processes
Jun 21, 2023
312bb9b
Apply suggestions from code review
Retribution98 Jul 4, 2023
d1c0a91
FEAT-#285: Implement shared storage
Retribution98 Apr 25, 2023
9ae5cd3
Add shared store
Jun 19, 2023
6c5b934
Fix review
Retribution98 Jun 26, 2023
0c4d649
Fix rebase
Retribution98 Jul 5, 2023
18ad2c6
Add cleanup shared memory
Retribution98 Jul 6, 2023
1029400
Fix cluster problems
Jul 14, 2023
7966a14
Fix HM benchmark
Jul 18, 2023
06510c2
Fix python 3.8
Jul 18, 2023
327cef0
Fix rebase
Retribution98 Jul 20, 2023
5aa9743
Refactoring
Retribution98 Jul 21, 2023
999db6b
Apply suggestions from code review
YarShev Aug 22, 2023
6f19e48
Update unidist/core/backends/mpi/core/communication.py
YarShev Aug 22, 2023
c456533
Fix imports
YarShev Aug 22, 2023
96f9a2d
Correct names
YarShev Aug 22, 2023
c30998a
Code review fixes
Retribution98 Aug 25, 2023
eb4f120
Apply suggestions from code review
YarShev Aug 25, 2023
663b480
Apply suggestions from code review
YarShev Aug 25, 2023
c929d45
Changes after code review
Retribution98 Aug 30, 2023
ab603d5
Apply suggestions from code review
YarShev Sep 1, 2023
8c9425c
Apply suggestions from code review
YarShev Sep 4, 2023
f2d55e0
Several fixes
Retribution98 Sep 4, 2023
fbc9620
Update the configs
YarShev Sep 5, 2023
4892ee1
Add docs for shared object store
YarShev Sep 5, 2023
155f942
Update docs/flow/unidist/core/backends/mpi/core/shared_object_store.rst
YarShev Sep 5, 2023
5dac7d6
Add parallel_memcopy
Retribution98 Sep 6, 2023
c31d4f0
Introduce some optimizations
YarShev Sep 14, 2023
7f9bee5
One more changes
Retribution98 Sep 20, 2023
4d76657
Fix UNIDIST_MPI_SHARED_OBJECT_STORE=False
Retribution98 Sep 21, 2023
4cd63fe
Apply suggestions from code review
Retribution98 Sep 22, 2023
bab0935
some fixes
Retribution98 Sep 22, 2023
a0f6020
some fixed
Retribution98 Sep 22, 2023
0885c21
Fix tests
YarShev Sep 22, 2023
899d072
fix tests
Retribution98 Sep 22, 2023
1f6f567
Fix some tests, run both modes
YarShev Sep 23, 2023
201d77e
Run CI on openmpi
YarShev Sep 23, 2023
f6ff6b0
Revert a change
YarShev Sep 23, 2023
6735f8a
Update unidist/test/utils.py
YarShev Sep 23, 2023
cdfb8a0
Fix tests
YarShev Sep 23, 2023
60bf78f
Fix for small number of CPUs
YarShev Sep 24, 2023
66edabd
Separate run
YarShev Sep 24, 2023
77f40f1
Small refactor
YarShev Sep 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/developer/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ details just pick module you are interested in.
| │ └─── :doc:`remote_function </flow/unidist/core/backends/dask/remote_function>`
| ├───mpi
| | ├───core
| │ │ ├─── :doc:`async_operations </flow/unidist/core/backends/mpi/core/async_operations>`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs for new entities (shared store, shared manager, etc.) should be added too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a separate pr

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

| │ │ ├─── :doc:`common </flow/unidist/core/backends/mpi/core/common>`
| │ │ ├─── :doc:`communication </flow/unidist/core/backends/mpi/core/communication>`
| │ │ ├─── :doc:`controller </flow/unidist/core/backends/mpi/core/controller>`
| │ │ ├─── :doc:`monitor </flow/unidist/core/backends/mpi/core/monitor>`
| │ │ ├─── :doc:`local_object_store </flow/unidist/core/backends/mpi/core/local_object_store>`
| │ │ ├─── :doc:`shared_object_store </flow/unidist/core/backends/mpi/core/shared_object_store>`
| │ │ ├─── :doc:`serialization </flow/unidist/core/backends/mpi/core/serialization>`
| │ │ └─── :doc:`worker </flow/unidist/core/backends/mpi/core/worker>`
| │ ├─── :doc:`actor </flow/unidist/core/backends/mpi/actor>`
Expand Down
72 changes: 39 additions & 33 deletions docs/flow/unidist/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,45 @@ the config API implementation.
Unidist Configuration Settings List
'''''''''''''''''''''''''''''''''''

+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| **Config Name** | **Env. Variable Name** | **Description** |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| Backend | UNIDIST_BACKEND | Distribution backend to run queries by |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| CpuCount | UNIDIST_CPUS | How many CPU cores to use during initialization of the unidist backend |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| RayGpuCount | UNIDIST_RAY_GPUS | How many GPU devices to use during initialization of the Ray backend |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| IsRayCluster | UNIDIST_RAY_CLUSTER | Whether Ray is running on pre-initialized Ray cluster |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| RayRedisAddress | UNIDIST_RAY_REDIS_ADDRESS | Redis address to connect to when running in Ray cluster |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| RayRedisPassword | UNIDIST_RAY_REDIS_PASSWORD | What password to use for connecting to Redis |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| RayObjectStoreMemory | UNIDIST_RAY_OBJECT_STORE_MEMORY | How many bytes of memory to start the Ray object store with |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| DaskMemoryLimit | UNIDIST_DASK_MEMORY_LIMIT | How many bytes of memory that Dask worker should use |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| IsDaskCluster | UNIDIST_DASK_CLUSTER | Whether Dask is running on pre-initialized Dask cluster |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| DaskSchedulerAddress | UNIDIST_DASK_SCHEDULER_ADDRESS | Dask Scheduler address to connect to when running in Dask cluster |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| IsMpiSpawnWorkers | UNIDIST_IS_MPI_SPAWN_WORKERS | Whether to enable MPI spawn or not |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| MpiHosts | UNIDIST_MPI_HOSTS | MPI hosts to run unidist on |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| MpiPickleThreshold | UNIDIST_MPI_PICKLE_THRESHOLD | Minimum buffer size for serialization with pickle 5 protocol |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| MpiBackoff | UNIDIST_MPI_BACKOFF | Backoff time for preventing the "busy wait" in loops exchanging messages |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
| MpiLog | UNIDIST_MPI_LOG | Whether to enable logging for MPI backend or not |
+-----------------------+-----------------------------------+--------------------------------------------------------------------------+
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| **Config Name** | **Env. Variable Name** | **Description** |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| Backend | UNIDIST_BACKEND | Distribution backend to run queries by |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| CpuCount | UNIDIST_CPUS | How many CPU cores to use during initialization of the unidist backend |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| RayGpuCount | UNIDIST_RAY_GPUS | How many GPU devices to use during initialization of the Ray backend |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| IsRayCluster | UNIDIST_RAY_CLUSTER | Whether Ray is running on pre-initialized Ray cluster |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| RayRedisAddress | UNIDIST_RAY_REDIS_ADDRESS | Redis address to connect to when running in Ray cluster |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| RayRedisPassword | UNIDIST_RAY_REDIS_PASSWORD | What password to use for connecting to Redis |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| RayObjectStoreMemory | UNIDIST_RAY_OBJECT_STORE_MEMORY | How many bytes of memory to start the Ray object store with |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| DaskMemoryLimit | UNIDIST_DASK_MEMORY_LIMIT | How many bytes of memory that Dask worker should use |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| IsDaskCluster | UNIDIST_DASK_CLUSTER | Whether Dask is running on pre-initialized Dask cluster |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| DaskSchedulerAddress | UNIDIST_DASK_SCHEDULER_ADDRESS | Dask Scheduler address to connect to when running in Dask cluster |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| IsMpiSpawnWorkers | UNIDIST_IS_MPI_SPAWN_WORKERS | Whether to enable MPI spawn or not |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| MpiHosts | UNIDIST_MPI_HOSTS | MPI hosts to run unidist on |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| MpiPickleThreshold | UNIDIST_MPI_PICKLE_THRESHOLD | Minimum buffer size for serialization with pickle 5 protocol |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| MpiBackoff | UNIDIST_MPI_BACKOFF | Backoff time for preventing the "busy wait" in loops exchanging messages |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| MpiLog | UNIDIST_MPI_LOG | Whether to enable logging for MPI backend or not |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| MpiSharedObjectStore | UNIDIST_MPI_SHARED_OBJECT_STORE | Whether to enable shared object store or not |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| MpiSharedObjectStoreMemory | UNIDIST_MPI_SHARED_OBJECT_STORE_MEMORY | How many bytes of memory to start the shared object store with |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+
| MpiSharedObjectStoreThreshold | UNIDIST_MPI_SHARED_OBJECT_STORE_THRESHOLD | Minimum size of data to put into the shared object store |
+-------------------------------+-------------------------------------------+--------------------------------------------------------------------------+

Usage Guide
'''''''''''
Expand Down
15 changes: 15 additions & 0 deletions docs/flow/unidist/core/backends/mpi/core/async_operations.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
..
Copyright (C) 2021-2023 Modin authors

SPDX-License-Identifier: Apache-2.0

:orphan:

Async Operations
""""""""""""""""

API
===

.. autoclass:: unidist.core.backends.mpi.core.async_operations.AsyncOperations
:members:
9 changes: 3 additions & 6 deletions docs/flow/unidist/core/backends/mpi/core/controller.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,11 @@ just returns the next rank number in a loop.

.. autofunction:: unidist.core.backends.mpi.core.controller.common.RoundRobin.schedule_rank

Local Object Storage
====================
GarbageCollector
================

MPI :py:class:`~unidist.core.backends.mpi.core.controller.object_store.ObjectStore` stores the data for master process in a local dict.
:py:class:`~unidist.core.backends.mpi.core.controller.garbage_collector.GarbageCollector` controls memory footprint and sends cleanup requests for all workers,
if certain amount of data IDs is out-of-scope.

.. autoclass:: unidist.core.backends.mpi.core.controller.object_store.ObjectStore
:members:
.. autoclass:: unidist.core.backends.mpi.core.controller.garbage_collector.GarbageCollector
:members:
:members:
19 changes: 19 additions & 0 deletions docs/flow/unidist/core/backends/mpi/core/local_object_store.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
..
Copyright (C) 2021-2023 Modin authors

SPDX-License-Identifier: Apache-2.0

:orphan:

Local Object Store
==================

MPI :py:class:`~unidist.core.backends.mpi.core.local_object_store.LocalObjectStore` stores data in-process memory in a local dict.
In depend on :class:`~unidist.config.backends.mpi.envvars.MpiSharedObjectStoreThreshold``,
data can be stored in :py:class:`~unidist.core.backends.mpi.core.shared_object_store.SharedObjectStore`.

API
===

.. autoclass:: unidist.core.backends.mpi.core.local_object_store.LocalObjectStore
:members:
19 changes: 19 additions & 0 deletions docs/flow/unidist/core/backends/mpi/core/shared_object_store.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
..
Copyright (C) 2021-2023 Modin authors

SPDX-License-Identifier: Apache-2.0

:orphan:

Shared Object Store
===================

MPI :py:class:`~unidist.core.backends.mpi.core.shared_object_store.SharedObjectStore` stores data in the shared object store.
In depend on :class:`~unidist.config.backends.mpi.envvars.MpiSharedObjectStoreThreshold``,
data can be stored in :py:class:`~unidist.core.backends.mpi.core.local_object_store.LocalObjectStore`.

API
===

.. autoclass:: unidist.core.backends.mpi.core.shared_object_store.SharedObjectStore
:members:
15 changes: 0 additions & 15 deletions docs/flow/unidist/core/backends/mpi/core/worker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,6 @@ and leaves all internal storages in their current state.
.. autofunction:: unidist.core.backends.mpi.core.worker.task_store.TaskStore.request_worker_data
:noindex:

Local Object Storage
====================

MPI :py:class:`~unidist.core.backends.mpi.core.worker.object_store.ObjectStore` stores the data for each process in a local dict.
:py:class:`~unidist.core.backends.mpi.core.async_operations.AsyncOperations` stores ``MPI_Isend`` asynchronous handlers and holds
a reference to the sending data to prolong lifetime until the operation completed.

API
===

.. autoclass:: unidist.core.backends.mpi.core.worker.object_store.ObjectStore
:members:
.. autoclass:: unidist.core.backends.mpi.core.async_operations.AsyncOperations
:members:

Request Storage
===============

Expand Down
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pathlib
from setuptools import setup, find_packages
from setuptools import setup, find_packages, Extension
YarShev marked this conversation as resolved.
Show resolved Hide resolved
from Cython.Build import cythonize
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
import sys
import versioneer

Expand Down Expand Up @@ -35,4 +36,5 @@
"all": all_deps,
},
python_requires=">=3.7.1",
ext_modules = cythonize('unidist/ext_modules/memory/cmemory.pyx')
YarShev marked this conversation as resolved.
Show resolved Hide resolved
)
6 changes: 6 additions & 0 deletions unidist/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
MpiPickleThreshold,
MpiBackoff,
MpiLog,
MpiSharedObjectStore,
MpiSharedObjectStoreMemory,
MpiSharedObjectStoreThreshold,
)
from .parameter import ValueSource

Expand All @@ -39,4 +42,7 @@
"MpiPickleThreshold",
"MpiBackoff",
"MpiLog",
"MpiSharedObjectStore",
"MpiSharedObjectStoreMemory",
"MpiSharedObjectStoreThreshold",
]
14 changes: 13 additions & 1 deletion unidist/config/backends/mpi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,24 @@

"""Config entities specific for MPI backend which can be used for unidist behavior tuning."""

from .envvars import IsMpiSpawnWorkers, MpiHosts, MpiPickleThreshold, MpiBackoff, MpiLog
from .envvars import (
IsMpiSpawnWorkers,
MpiHosts,
MpiPickleThreshold,
MpiBackoff,
MpiLog,
MpiSharedObjectStore,
MpiSharedObjectStoreMemory,
MpiSharedObjectStoreThreshold,
)

__all__ = [
"IsMpiSpawnWorkers",
"MpiHosts",
"MpiPickleThreshold",
"MpiBackoff",
"MpiLog",
"MpiSharedObjectStore",
"MpiSharedObjectStoreMemory",
"MpiSharedObjectStoreThreshold",
]
20 changes: 20 additions & 0 deletions unidist/config/backends/mpi/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,23 @@ class MpiLog(EnvironmentVariable, type=bool):

default = False
varname = "UNIDIST_MPI_LOG"


class MpiSharedObjectStore(EnvironmentVariable, type=bool):
"""Whether to enable shared object store or not."""

default = True
YarShev marked this conversation as resolved.
Show resolved Hide resolved
Retribution98 marked this conversation as resolved.
Show resolved Hide resolved
varname = "UNIDIST_MPI_SHARED_OBJECT_STORE"


class MpiSharedObjectStoreMemory(EnvironmentVariable, type=int):
"""How many bytes of memory to start the shared object store with."""

varname = "UNIDIST_MPI_SHARED_OBJECT_STORE_MEMORY"


class MpiSharedObjectStoreThreshold(EnvironmentVariable, type=int):
"""Minimum size of data to put into the shared object store."""

default = 1024**2 # 1 MiB
varname = "UNIDIST_MPI_SHARED_OBJECT_STORE_THRESHOLD"
Loading