Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefer len(os.sched_getaffinity(0)) over os.cpu_count() #1866

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/source/developer_guide/guides/2_real_world_phishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ To start, we will need to instantiate and set a few attributes of the `Config` c
config = Config()
config.mode = PipelineModes.NLP

config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))
config.feature_length = model_fea_length

with open(labels_file, encoding='UTF-8') as fh:
Expand Down Expand Up @@ -563,7 +563,7 @@ def run_pipeline(use_stage_function: bool,
config.mode = PipelineModes.NLP

# Set the thread count to match our cpu count
config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))
config.feature_length = model_fea_length

with open(labels_file, encoding='UTF-8') as fh:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ from morpheus.cli.utils import load_labels_file
CppConfig.set_should_use_cpp(False)

config = Config()
config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))
config.ae = ConfigAutoEncoder()
config.ae.feature_columns = load_labels_file(get_package_relative_file("data/columns_ae_azure.txt"))
```
Expand Down
2 changes: 1 addition & 1 deletion examples/abp_pcap_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@click.command()
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use.",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/developer_guide/2_1_real_world_phishing/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def run_pipeline(use_stage_function: bool,
config.mode = PipelineModes.NLP

# Set the thread count to match our cpu count
config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))
config.feature_length = model_fea_length

with open(labels_file, encoding='UTF-8') as fh:
Expand Down
2 changes: 1 addition & 1 deletion examples/developer_guide/2_2_rabbitmq/read_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def run_pipeline(use_source_function: bool):
configure_logging(log_level=logging.DEBUG)

config = Config()
config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

# Create a linear pipeline object
pipeline = LinearPipeline(config)
Expand Down
2 changes: 1 addition & 1 deletion examples/developer_guide/2_2_rabbitmq/write_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def run_pipeline():
input_file = os.path.join(root_dir, 'examples/data/email.jsonlines')

config = Config()
config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

# Create a linear pipeline object
pipeline = LinearPipeline(config)
Expand Down
2 changes: 1 addition & 1 deletion examples/developer_guide/4_rabbitmq_cpp_stage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
# Example RabbitMQ stages
This example builds upon the `examples/developer_guide/2_2_rabbitmq` example adding a C++ implementation for the `RabbitMQSourceStage` along with adding package install scripts.

This example adds two flags to the `read_simple.py` script. A `--use_cpp` flag which defaults to `True` and a `--num_threads` flag which defaults to the number of cores on the system as returned by `os.cpu_count()`.
This example adds two flags to the `read_simple.py` script. A `--use_cpp` flag which defaults to `True` and a `--num_threads` flag which defaults to the number of cores on the system as returned by `len(os.sched_getaffinity(0))`.

## Supported Environments
| Environment | Supported | Notes |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@click.option('--use_cpp', default=True)
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def run_pipeline():
input_file = os.path.join(root_dir, 'examples/data/email.jsonlines')

config = Config()
config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

# Create a linear pipeline object
pipeline = LinearPipeline(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def run_pipeline(train_users,

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

config.ae = ConfigAutoEncoder()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def generate_ae_config(source: str,
pipeline_batch_size: int = 0,
edge_buffer_size: int = 0,
use_cpp: bool = False,
num_threads: int = os.cpu_count()):
num_threads: int = len(os.sched_getaffinity(0))):
config = Config()

CppConfig.set_should_use_cpp(use_cpp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def run_pipeline(train_users,

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

config.ae = ConfigAutoEncoder()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def run_pipeline(train_users,

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

config.ae = ConfigAutoEncoder()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@
"\n",
"CppConfig.set_should_use_cpp(False)\n",
"\n",
"config.num_threads = os.cpu_count()\n",
"config.num_threads = len(os.sched_getaffinity(0))\n",
"\n",
"config.ae = ConfigAutoEncoder()\n",
"\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@
"\n",
"CppConfig.set_should_use_cpp(False)\n",
"\n",
"config.num_threads = os.cpu_count()\n",
"config.num_threads = len(os.sched_getaffinity(0))\n",
"\n",
"config.ae = ConfigAutoEncoder()\n",
"\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@
"\n",
"CppConfig.set_should_use_cpp(False)\n",
"\n",
"config.num_threads = os.cpu_count()\n",
"config.num_threads = len(os.sched_getaffinity(0))\n",
"\n",
"config.ae = ConfigAutoEncoder()\n",
"\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@
"\n",
"CppConfig.set_should_use_cpp(False)\n",
"\n",
"config.num_threads = os.cpu_count()\n",
"config.num_threads = len(os.sched_getaffinity(0))\n",
"\n",
"config.ae = ConfigAutoEncoder()\n",
"\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@click.command()
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def run_pipeline(train_users,

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

config.ae = ConfigAutoEncoder()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def run_pipeline(train_users,

CppConfig.set_should_use_cpp(False)

config.num_threads = os.cpu_count()
config.num_threads = len(os.sched_getaffinity(0))

config.ae = ConfigAutoEncoder()

Expand Down
2 changes: 1 addition & 1 deletion examples/doca/run_udp_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
)
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
show_default=True,
help="Number of internal pipeline threads to use.",
Expand Down
2 changes: 1 addition & 1 deletion examples/gnn_fraud_detection_pipeline/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@click.command()
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use.",
)
Expand Down
4 changes: 2 additions & 2 deletions examples/llm/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def run():
@run.command(help="Runs a simple finite pipeline with a single execution of a LangChain agent from a fixed input")
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down Expand Up @@ -67,7 +67,7 @@ def simple(**kwargs):
@run.command(help="Runs a pipeline LangChain agents which pulls inputs from a Kafka message bus")
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/completion/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def run():
@run.command()
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/rag/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def run():
@run.command()
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/vdb_upload/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def run():
)
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/log_parsing/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
@click.command()
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use.",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/ransomware_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
@click.option('--use_cpp', default=False, help="Enable C++ execution for this pipeline, currently this is unsupported.")
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use.",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/sid_visualization/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def _generate_frames(self):
@click.option('--use_cpp', default=True)
@click.option(
"--num_threads",
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use.",
)
Expand Down
2 changes: 1 addition & 1 deletion python/morpheus/morpheus/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def install(**kwargs):

@cli.group(short_help="Run one of the available pipelines", no_args_is_help=True, cls=AliasedGroup)
@click.option('--num_threads',
default=os.cpu_count(),
default=len(os.sched_getaffinity(0)),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use")
@click.option('--pipeline_batch_size',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class HttpServerSourceStage(PreallocatorMixin, SingleOutputSource):
Maximum number of requests to queue before rejecting requests. If `None` then `config.edge_buffer_size` will be
used.
num_server_threads : int, default None
Number of threads to use for the HTTP server. If `None` then `os.cpu_count()` will be used.
Number of threads to use for the HTTP server. If `None` then `len(os.sched_getaffinity(0))` will be used.
max_payload_size : int, default 10
The maximum size in megabytes of the payload that the server will accept in a single request.
request_timeout_secs : int, default 30
Expand Down Expand Up @@ -117,7 +117,7 @@ def __init__(self,
self._sleep_time = sleep_time
self._queue_timeout = queue_timeout
self._max_queue_size = max_queue_size or config.edge_buffer_size
self._num_server_threads = num_server_threads or os.cpu_count()
self._num_server_threads = num_server_threads or len(os.sched_getaffinity(0))
self._max_payload_size_bytes = max_payload_size * 1024 * 1024
self._request_timeout_secs = request_timeout_secs
self._lines = lines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class HttpServerSinkStage(PassThruTypeMixin, SinglePortStage):
Maximum number of requests to queue before rejecting requests. If `None` then `config.edge_buffer_size` will be
used. Once the queue is full, the incoming edge buffer will begin to fill up.
num_server_threads : int, default None
Number of threads to use for the HTTP server. If `None` then `os.cpu_count()` will be used.
Number of threads to use for the HTTP server. If `None` then `len(os.sched_getaffinity(0))` will be used.
max_rows_per_response : int, optional
Maximum number of rows to include in a single response, by default 10000.
overflow_pct: float, optional
Expand Down Expand Up @@ -103,7 +103,7 @@ def __init__(self,
self._port = port
self._endpoint = endpoint
self._method = method
self._num_server_threads = num_server_threads or os.cpu_count()
self._num_server_threads = num_server_threads or len(os.sched_getaffinity(0))
self._max_rows_per_response = max_rows_per_response
self._overflow_pct = overflow_pct
self._request_timeout_secs = request_timeout_secs
Expand Down
2 changes: 1 addition & 1 deletion tests/common/test_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def make_parse_fn(status: HTTPStatus = HTTPStatus.OK,
@pytest.mark.parametrize("method", ["GET", "POST", "PUT"])
@pytest.mark.parametrize("use_callback", [True, False])
@pytest.mark.parametrize("use_context_mgr", [True, False])
@pytest.mark.parametrize("num_threads", [1, 2, min(8, os.cpu_count())])
@pytest.mark.parametrize("num_threads", [1, 2, min(8, len(os.sched_getaffinity(0)))])
@pytest.mark.parametrize("status,content_type,content",
[(HTTPStatus.OK, MimeTypes.TEXT.value, "OK"),
(HTTPStatus.OK, MimeTypes.JSON.value, '{"test": "OK"}'),
Expand Down
Loading