From 2d857cd3c895395d69cfd773e24b5585ec2906fd Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 15:41:08 +0000 Subject: [PATCH 01/42] remove shutdown from LLMEngine --- vllm/v1/engine/async_llm.py | 7 ++++--- vllm/v1/engine/core_client.py | 4 +--- vllm/v1/engine/llm_engine.py | 10 ---------- vllm/v1/utils.py | 3 --- 4 files changed, 5 insertions(+), 19 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 3f097ca7f439c..40f2a12839f2d 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -1,6 +1,7 @@ import asyncio import os import signal +import weakref from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union from vllm.config import ModelConfig, VllmConfig @@ -41,6 +42,9 @@ def __init__( log_requests: bool = True, start_engine_loop: bool = True, ) -> None: + # Call self.shutdown at exit to clean up + # and ensure workers will be terminated. + self._finalizer = weakref.finalize(self, self.shutdown) # The child processes will send SIGQUIT when unrecoverable # errors happen. We kill the process tree here so that the @@ -103,9 +107,6 @@ def sigquit_handler(signum, frame): self.output_handler: Optional[asyncio.Task] = None - def __del__(self): - self.shutdown() - @classmethod def from_engine_args( cls, diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 3293205e110af..302bbc09c5ea4 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1,5 +1,6 @@ from typing import List, Optional, Type +import weakref import msgspec import zmq import zmq.asyncio @@ -107,9 +108,6 @@ def abort_requests(self, request_ids: List[str]) -> None: def shutdown(self): self.engine_core.shutdown() - def __del__(self): - self.shutdown() - def profile(self, is_start: bool = True) -> None: self.engine_core.profile(is_start) diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index a19109559eabf..cc86e6491d868 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -28,7 +28,6 @@ class LLMEngine: - """Legacy LLMEngine for backwards compatibility.""" def __init__( self, @@ -42,8 +41,6 @@ def __init__( use_cached_outputs: bool = False, multiprocess_mode: bool = False, ) -> None: - - # TODO: Can we avoid this? self.model_config = vllm_config.model_config # Tokenizer (+ ensure liveness if running in another process). @@ -205,10 +202,3 @@ def get_tokenizer_group( f"found type: {type(tokenizer_group)}") return tokenizer_group - - def __del__(self): - self.shutdown() - - def shutdown(self): - if engine_core := getattr(self, "engine_core", None): - engine_core.shutdown() diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index 19e0dd17237c9..d7d9cf82b04c4 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -114,9 +114,6 @@ def __init__( raise RuntimeError(f"{process_name} initialization failed. " "See root cause above.") - def __del__(self): - self.shutdown() - def shutdown(self): # Shutdown the process if needed. if hasattr(self, "proc") and self.proc.is_alive(): From 9e70c5f9cf606ab24c6e28ae7df952c47b68c26b Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 15:43:52 +0000 Subject: [PATCH 02/42] format --- vllm/v1/engine/core_client.py | 1 - vllm/v1/utils.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 302bbc09c5ea4..e4a61a521f3fb 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1,6 +1,5 @@ from typing import List, Optional, Type -import weakref import msgspec import zmq import zmq.asyncio diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index d7d9cf82b04c4..76de4d171bf5f 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -91,6 +91,7 @@ def __init__( target_fn: Callable, process_kwargs: Dict[Any, Any], ): + # Ensure cleanup of background process during GC. self._finalizer = weakref.finalize(self, self.shutdown) context = get_mp_context() From f34875c115f4b7f7bb0ac6c608eae826b9c96ccd Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 15:52:54 +0000 Subject: [PATCH 03/42] no need for shutdown in asyncllm --- vllm/v1/engine/async_llm.py | 11 +---------- vllm/v1/engine/llm_engine.py | 3 +++ 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 40f2a12839f2d..6f55bcc650096 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -44,7 +44,7 @@ def __init__( ) -> None: # Call self.shutdown at exit to clean up # and ensure workers will be terminated. - self._finalizer = weakref.finalize(self, self.shutdown) + # self._finalizer = weakref.finalize(self, self.shutdown) # The child processes will send SIGQUIT when unrecoverable # errors happen. We kill the process tree here so that the @@ -137,15 +137,6 @@ def from_engine_args( stat_loggers=stat_loggers, ) - def shutdown(self): - """Shutdown, cleaning up the background proc and IPC.""" - - if engine_core := getattr(self, "engine_core", None): - engine_core.shutdown() - - if handler := getattr(self, "output_handler", None): - handler.cancel() - @classmethod def _get_executor_cls(cls, vllm_config: VllmConfig) -> Type[Executor]: executor_class: Type[Executor] diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index cc86e6491d868..1f49de67d7493 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -28,6 +28,7 @@ class LLMEngine: + """Legacy LLMEngine for backwards compatibility.""" def __init__( self, @@ -41,6 +42,8 @@ def __init__( use_cached_outputs: bool = False, multiprocess_mode: bool = False, ) -> None: + + # TODO: Can we avoid this? self.model_config = vllm_config.model_config # Tokenizer (+ ensure liveness if running in another process). From 7a777d9ea0f83f706ac6ce3d6eff23f61d61d933 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 15:53:40 +0000 Subject: [PATCH 04/42] remove from asyncllm --- vllm/v1/engine/async_llm.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 6f55bcc650096..8916f94d3491c 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -1,7 +1,6 @@ import asyncio import os import signal -import weakref from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union from vllm.config import ModelConfig, VllmConfig @@ -42,9 +41,6 @@ def __init__( log_requests: bool = True, start_engine_loop: bool = True, ) -> None: - # Call self.shutdown at exit to clean up - # and ensure workers will be terminated. - # self._finalizer = weakref.finalize(self, self.shutdown) # The child processes will send SIGQUIT when unrecoverable # errors happen. We kill the process tree here so that the From dfc9deed21aeac4f1fe4dfb5714b3bf3bc6edf0a Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 16:01:19 +0000 Subject: [PATCH 05/42] stash --- vllm/v1/engine/core.py | 171 +++++++++++++++++++++++++++------- vllm/v1/engine/core_client.py | 2 +- 2 files changed, 140 insertions(+), 33 deletions(-) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 5840541d774ba..ff1b587686dce 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -1,29 +1,29 @@ -import pickle import queue import signal import threading import time +from abc import ABC, abstractmethod from multiprocessing.connection import Connection -from typing import List, Tuple, Type +from typing import List, Optional, Tuple, Type import psutil import zmq -import zmq.asyncio from msgspec import msgpack from vllm.config import CacheConfig, VllmConfig from vllm.logger import init_logger from vllm.transformers_utils.config import ( maybe_register_config_serialize_by_value) -from vllm.utils import get_exception_traceback, zmq_socket_ctx +from vllm.utils import get_exception_traceback, make_zmq_socket, zmq_socket_ctx from vllm.v1.core.scheduler import Scheduler -from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs, - EngineCoreProfile, EngineCoreRequest, - EngineCoreRequestType, EngineCoreRequestUnion) +from vllm.v1.engine import (EngineCoreAbort, EngineCoreOutput, + EngineCoreOutputs, EngineCoreProfile, + EngineCoreRequest, EngineCoreRequestType, + EngineCoreRequestUnion) from vllm.v1.engine.mm_input_mapper import MMInputMapperServer from vllm.v1.executor.abstract import Executor from vllm.v1.request import Request, RequestStatus -from vllm.v1.serial_utils import PickleEncoder +from vllm.v1.utils import BackgroundProcHandle from vllm.version import __version__ as VLLM_VERSION logger = init_logger(__name__) @@ -127,7 +127,8 @@ def step(self) -> List[EngineCoreOutput]: return engine_core_outputs def shutdown(self): - self.model_executor.shutdown() + pass + # self.model_executor.shutdown() def profile(self, is_start: bool = True): self.model_executor.profile(is_start) @@ -164,6 +165,24 @@ def __init__( # Send Readiness signal to EngineClient. ready_pipe.send({"status": "READY"}) + @staticmethod + def make_process( + vllm_config: VllmConfig, + executor_class: Type[Executor], + input_path: str, + output_path: str, + log_stats: bool, + ) -> BackgroundProcHandle: + return BackgroundProcHandle(input_path=input_path, + output_path=output_path, + process_name="EngineCore", + target_fn=EngineCoreProc.run_engine_core, + process_kwargs={ + "vllm_config": vllm_config, + "executor_class": executor_class, + "log_stats": log_stats, + }) + @staticmethod def run_engine_core(*args, **kwargs): """Launch EngineCore busy loop in background process.""" @@ -260,36 +279,18 @@ def _handle_client_request(self, request: EngineCoreRequestUnion) -> None: self.add_request(request) elif isinstance(request, EngineCoreProfile): self.model_executor.profile(request.is_start) + elif isinstance(request, EngineCoreAbort): + self.abort_requests(request.request_ids) else: - # TODO: make an EngineCoreAbort wrapper - assert isinstance(request, list) - self.abort_requests(request) + raise ValueError("Unknown request type: {request}") def process_input_socket(self, input_path: str): """Input socket IO thread.""" - # Msgpack serialization decoding. - decoder_add_req = PickleEncoder() - decoder_abort_req = PickleEncoder() - with zmq_socket_ctx(input_path, zmq.constants.PULL) as socket: while True: - # (RequestType, RequestData) - type_frame, data_frame = socket.recv_multipart(copy=False) - request_type = type_frame.buffer - request_data = data_frame.buffer - - # Deserialize the request data. - if request_type == EngineCoreRequestType.ADD.value: - request = decoder_add_req.decode(request_data) - elif request_type == EngineCoreRequestType.ABORT.value: - request = decoder_abort_req.decode(request_data) - elif request_type == EngineCoreRequestType.PROFILE.value: - request = pickle.loads(request_data) - else: - raise ValueError(f"Unknown RequestType: {request_type}") - # Push to input queue for core busy loop. + request = socket.recv_pyobj() self.input_queue.put_nowait(request) def process_output_socket(self, output_path: str): @@ -305,4 +306,110 @@ def process_output_socket(self, output_path: str): engine_core_outputs = self.output_queue.get() outputs = EngineCoreOutputs(outputs=engine_core_outputs) encoder.encode_into(outputs, buffer) - socket.send_multipart((buffer, ), copy=False) + msg = (EngineCoreRequestType.FROM_ENGINE_CORE.value, buffer) + socket.send_multipart(msg, copy=False) + + +class EngineCoreClient(ABC): + """Client used To interact with EngineCore.""" + + @abstractmethod + def get_output(self) -> List[EngineCoreOutput]: + ... + + @abstractmethod + def add_request(self, request: EngineCoreRequest) -> None: + ... + + @abstractmethod + def abort_requests(self, request_ids: List[str]) -> None: + ... + + @abstractmethod + def profile(self, is_start: bool = True) -> None: + ... + + @abstractmethod + def shutdown(self): + ... + + +class InprocEngineCoreClient(EngineCoreClient): + """ + InprocClient: client for in-process EngineCore. Intended + for use in LLMEngine for V0-style add_request() and step() + EngineCore setup in this process (no busy loop). + * pushes EngineCoreRequest directly into the EngineCore + * pulls EngineCoreOutputs by stepping the EngineCore + """ + + def __init__(self, engine_core: EngineCore): + self.engine_core = engine_core + + def get_output(self) -> List[EngineCoreOutput]: + return self.engine_core.step() + + def add_request(self, request: EngineCoreRequest) -> None: + self.engine_core.add_request(request) + + def abort_requests(self, request_ids: List[str]) -> None: + self.engine_core.abort_requests(request_ids) + + def profile(self, is_start: bool = True) -> None: + self.engine_core.profile(is_start) + + def shutdown(self): + self.engine_core.shutdown() + + +class MpEngineCoreClient(EngineCoreClient): + """ + MPClient: client for multi-proc EngineCore. + EngineCore runs in a background process busy loop, getting + new EngineCoreRequests and returning EngineCoreOutputs + + * pushes EngineCoreRequests via input_socket + * pulls EngineCoreOutputs via output_socket + """ + + def __init__( + self, + input_path: str, + output_path: str, + proc_handle: Optional[BackgroundProcHandle] = None, + ) -> None: + + # Use msgpack for hotpath serialization. + self.decoder = msgpack.Decoder(EngineCoreOutputs) + + # Setup ZMQ IO. + self.ctx = zmq.Context(io_threads=2) # type: ignore[attr-defined] + self.input_socket = make_zmq_socket(self.ctx, input_path, + zmq.constants.PUSH) + self.output_socket = make_zmq_socket(self.ctx, output_path, + zmq.constants.PULL) + + # Optionally hold the proc handle for cleanup at shutdown(). + self.proc_handle = proc_handle + + def get_output(self) -> List[EngineCoreOutput]: + # TODO(rob): use copy=False + (msg_type, msg_bytes) = self.output_socket.recv_multipart() + assert msg_type == EngineCoreRequestType.FROM_ENGINE_CORE.value + return self.decoder.decode(msg_bytes).outputs + + def add_request(self, request: EngineCoreRequest) -> None: + self.input_socket.send_pyobj(request) + + def abort_requests(self, request_ids: List[str]) -> None: + self.input_socket.send_pyobj(EngineCoreAbort(request_ids)) + + def profile(self, is_start: bool = True) -> None: + self.input_socket.send_pyobj(EngineCoreProfile(is_start)) + + def shutdown(self) -> None: + if hasattr(self, "ctx"): + self.ctx.destroy(linger=0) + + if hasattr(self, "proc_handle") and self.proc_handle: + self.proc_handle.shutdown() diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index e4a61a521f3fb..40b69750c4053 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -105,7 +105,7 @@ def abort_requests(self, request_ids: List[str]) -> None: self.engine_core.abort_requests(request_ids) def shutdown(self): - self.engine_core.shutdown() + pass def profile(self, is_start: bool = True) -> None: self.engine_core.profile(is_start) From c72b45a51416f8d918757e16416bc2de58823dc2 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 16:07:53 +0000 Subject: [PATCH 06/42] update --- vllm/v1/engine/core.py | 178 ++++++---------------------------- vllm/v1/engine/core_client.py | 18 +--- 2 files changed, 36 insertions(+), 160 deletions(-) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index ff1b587686dce..dcd758a938fe4 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -1,29 +1,29 @@ +import pickle import queue import signal import threading import time -from abc import ABC, abstractmethod from multiprocessing.connection import Connection -from typing import List, Optional, Tuple, Type +from typing import List, Tuple, Type import psutil import zmq +import zmq.asyncio from msgspec import msgpack from vllm.config import CacheConfig, VllmConfig from vllm.logger import init_logger from vllm.transformers_utils.config import ( maybe_register_config_serialize_by_value) -from vllm.utils import get_exception_traceback, make_zmq_socket, zmq_socket_ctx +from vllm.utils import get_exception_traceback, zmq_socket_ctx from vllm.v1.core.scheduler import Scheduler -from vllm.v1.engine import (EngineCoreAbort, EngineCoreOutput, - EngineCoreOutputs, EngineCoreProfile, - EngineCoreRequest, EngineCoreRequestType, - EngineCoreRequestUnion) +from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs, + EngineCoreProfile, EngineCoreRequest, + EngineCoreRequestType, EngineCoreRequestUnion) from vllm.v1.engine.mm_input_mapper import MMInputMapperServer from vllm.v1.executor.abstract import Executor from vllm.v1.request import Request, RequestStatus -from vllm.v1.utils import BackgroundProcHandle +from vllm.v1.serial_utils import PickleEncoder from vllm.version import __version__ as VLLM_VERSION logger = init_logger(__name__) @@ -126,10 +126,6 @@ def step(self) -> List[EngineCoreOutput]: scheduler_output, output) return engine_core_outputs - def shutdown(self): - pass - # self.model_executor.shutdown() - def profile(self, is_start: bool = True): self.model_executor.profile(is_start) @@ -165,24 +161,6 @@ def __init__( # Send Readiness signal to EngineClient. ready_pipe.send({"status": "READY"}) - @staticmethod - def make_process( - vllm_config: VllmConfig, - executor_class: Type[Executor], - input_path: str, - output_path: str, - log_stats: bool, - ) -> BackgroundProcHandle: - return BackgroundProcHandle(input_path=input_path, - output_path=output_path, - process_name="EngineCore", - target_fn=EngineCoreProc.run_engine_core, - process_kwargs={ - "vllm_config": vllm_config, - "executor_class": executor_class, - "log_stats": log_stats, - }) - @staticmethod def run_engine_core(*args, **kwargs): """Launch EngineCore busy loop in background process.""" @@ -206,7 +184,6 @@ def signal_handler(signum, frame): signal.signal(signal.SIGINT, signal_handler) parent_process = psutil.Process().parent() - engine_core = None try: engine_core = EngineCoreProc(*args, **kwargs) engine_core.run_busy_loop() @@ -219,11 +196,6 @@ def signal_handler(signum, frame): logger.error("EngineCore hit an exception: %s", traceback) parent_process.send_signal(signal.SIGQUIT) - finally: - if engine_core is not None: - engine_core.shutdown() - engine_core = None - def run_busy_loop(self): """Core busy loop of the EngineCore.""" @@ -279,18 +251,36 @@ def _handle_client_request(self, request: EngineCoreRequestUnion) -> None: self.add_request(request) elif isinstance(request, EngineCoreProfile): self.model_executor.profile(request.is_start) - elif isinstance(request, EngineCoreAbort): - self.abort_requests(request.request_ids) else: - raise ValueError("Unknown request type: {request}") + # TODO: make an EngineCoreAbort wrapper + assert isinstance(request, list) + self.abort_requests(request) def process_input_socket(self, input_path: str): """Input socket IO thread.""" + # Msgpack serialization decoding. + decoder_add_req = PickleEncoder() + decoder_abort_req = PickleEncoder() + with zmq_socket_ctx(input_path, zmq.constants.PULL) as socket: while True: + # (RequestType, RequestData) + type_frame, data_frame = socket.recv_multipart(copy=False) + request_type = type_frame.buffer + request_data = data_frame.buffer + + # Deserialize the request data. + if request_type == EngineCoreRequestType.ADD.value: + request = decoder_add_req.decode(request_data) + elif request_type == EngineCoreRequestType.ABORT.value: + request = decoder_abort_req.decode(request_data) + elif request_type == EngineCoreRequestType.PROFILE.value: + request = pickle.loads(request_data) + else: + raise ValueError(f"Unknown RequestType: {request_type}") + # Push to input queue for core busy loop. - request = socket.recv_pyobj() self.input_queue.put_nowait(request) def process_output_socket(self, output_path: str): @@ -306,110 +296,4 @@ def process_output_socket(self, output_path: str): engine_core_outputs = self.output_queue.get() outputs = EngineCoreOutputs(outputs=engine_core_outputs) encoder.encode_into(outputs, buffer) - msg = (EngineCoreRequestType.FROM_ENGINE_CORE.value, buffer) - socket.send_multipart(msg, copy=False) - - -class EngineCoreClient(ABC): - """Client used To interact with EngineCore.""" - - @abstractmethod - def get_output(self) -> List[EngineCoreOutput]: - ... - - @abstractmethod - def add_request(self, request: EngineCoreRequest) -> None: - ... - - @abstractmethod - def abort_requests(self, request_ids: List[str]) -> None: - ... - - @abstractmethod - def profile(self, is_start: bool = True) -> None: - ... - - @abstractmethod - def shutdown(self): - ... - - -class InprocEngineCoreClient(EngineCoreClient): - """ - InprocClient: client for in-process EngineCore. Intended - for use in LLMEngine for V0-style add_request() and step() - EngineCore setup in this process (no busy loop). - * pushes EngineCoreRequest directly into the EngineCore - * pulls EngineCoreOutputs by stepping the EngineCore - """ - - def __init__(self, engine_core: EngineCore): - self.engine_core = engine_core - - def get_output(self) -> List[EngineCoreOutput]: - return self.engine_core.step() - - def add_request(self, request: EngineCoreRequest) -> None: - self.engine_core.add_request(request) - - def abort_requests(self, request_ids: List[str]) -> None: - self.engine_core.abort_requests(request_ids) - - def profile(self, is_start: bool = True) -> None: - self.engine_core.profile(is_start) - - def shutdown(self): - self.engine_core.shutdown() - - -class MpEngineCoreClient(EngineCoreClient): - """ - MPClient: client for multi-proc EngineCore. - EngineCore runs in a background process busy loop, getting - new EngineCoreRequests and returning EngineCoreOutputs - - * pushes EngineCoreRequests via input_socket - * pulls EngineCoreOutputs via output_socket - """ - - def __init__( - self, - input_path: str, - output_path: str, - proc_handle: Optional[BackgroundProcHandle] = None, - ) -> None: - - # Use msgpack for hotpath serialization. - self.decoder = msgpack.Decoder(EngineCoreOutputs) - - # Setup ZMQ IO. - self.ctx = zmq.Context(io_threads=2) # type: ignore[attr-defined] - self.input_socket = make_zmq_socket(self.ctx, input_path, - zmq.constants.PUSH) - self.output_socket = make_zmq_socket(self.ctx, output_path, - zmq.constants.PULL) - - # Optionally hold the proc handle for cleanup at shutdown(). - self.proc_handle = proc_handle - - def get_output(self) -> List[EngineCoreOutput]: - # TODO(rob): use copy=False - (msg_type, msg_bytes) = self.output_socket.recv_multipart() - assert msg_type == EngineCoreRequestType.FROM_ENGINE_CORE.value - return self.decoder.decode(msg_bytes).outputs - - def add_request(self, request: EngineCoreRequest) -> None: - self.input_socket.send_pyobj(request) - - def abort_requests(self, request_ids: List[str]) -> None: - self.input_socket.send_pyobj(EngineCoreAbort(request_ids)) - - def profile(self, is_start: bool = True) -> None: - self.input_socket.send_pyobj(EngineCoreProfile(is_start)) - - def shutdown(self) -> None: - if hasattr(self, "ctx"): - self.ctx.destroy(linger=0) - - if hasattr(self, "proc_handle") and self.proc_handle: - self.proc_handle.shutdown() + socket.send_multipart((buffer, ), copy=False) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 40b69750c4053..3f2b2457d3fe7 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1,4 +1,5 @@ -from typing import List, Optional, Type +import weakref +from typing import List, Type import msgspec import zmq @@ -52,9 +53,6 @@ def make_client( return InprocClient(vllm_config, executor_class, log_stats) - def shutdown(self): - pass - def get_output(self) -> List[EngineCoreOutput]: raise NotImplementedError @@ -104,9 +102,6 @@ def add_request(self, request: EngineCoreRequest) -> None: def abort_requests(self, request_ids: List[str]) -> None: self.engine_core.abort_requests(request_ids) - def shutdown(self): - pass - def profile(self, is_start: bool = True) -> None: self.engine_core.profile(is_start) @@ -131,6 +126,9 @@ def __init__( executor_class: Type[Executor], log_stats: bool = False, ): + # Ensure cleanup of ZMQ during GC. + self._finalizer = weakref.finalize(self, self.shutdown) + # Serialization setup. self.encoder = PickleEncoder() self.decoder = msgspec.msgpack.Decoder(EngineCoreOutputs) @@ -150,7 +148,6 @@ def __init__( zmq.constants.PUSH) # Start EngineCore in background process. - self.proc_handle: Optional[BackgroundProcHandle] self.proc_handle = BackgroundProcHandle( input_path=input_path, output_path=output_path, @@ -163,13 +160,8 @@ def __init__( }) def shutdown(self): - # Shut down the zmq context. self.ctx.destroy(linger=0) - if hasattr(self, "proc_handle") and self.proc_handle: - self.proc_handle.shutdown() - self.proc_handle = None - class SyncMPClient(MPClient): """Synchronous client for multi-proc EngineCore.""" From 4e2dc000707e0c7f488020efa6fd9e3edb126714 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 21:37:51 +0000 Subject: [PATCH 07/42] fix --- vllm/entrypoints/llm.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/vllm/entrypoints/llm.py b/vllm/entrypoints/llm.py index fadf297e9f6aa..7c0de3b3e5481 100644 --- a/vllm/entrypoints/llm.py +++ b/vllm/entrypoints/llm.py @@ -232,11 +232,6 @@ def __init__( self.request_counter = Counter() - def __del__(self): - if hasattr(self, 'llm_engine') and self.llm_engine and hasattr( - self.llm_engine, "shutdown"): - self.llm_engine.shutdown() - @staticmethod def get_engine_class() -> Type[LLMEngine]: if envs.VLLM_USE_V1: From 0b4b6af9613bb98a1e63d831b694bcde649b535c Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 23:04:56 +0000 Subject: [PATCH 08/42] added back explicit del --- vllm/v1/executor/multiproc_executor.py | 3 +++ vllm/v1/utils.py | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index ed64e7741390d..cc2304728f5eb 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -300,6 +300,9 @@ def shutdown(self): self.worker_response_mq = None destroy_model_parallel() destroy_distributed_environment() + + def __del__(self): + self.shutdown() @staticmethod def worker_main(*args, **kwargs): diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index 76de4d171bf5f..a189c2eb01b44 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -130,3 +130,7 @@ def shutdown(self): socket_file = ipc_socket.replace("ipc://", "") if os and os.path.exists(socket_file): os.remove(socket_file) + + def __del__(self): + print("CALLED DEL") + self.shutdown() From 4c445af76310ec7e6e9c40404a5b245e887dd404 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 23:33:36 +0000 Subject: [PATCH 09/42] stash --- vllm/v1/engine/core_client.py | 8 ++-- vllm/v1/executor/multiproc_executor.py | 3 -- vllm/v1/utils.py | 52 +++++++++++++------------- 3 files changed, 29 insertions(+), 34 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 3f2b2457d3fe7..a0607215bf198 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -126,8 +126,8 @@ def __init__( executor_class: Type[Executor], log_stats: bool = False, ): - # Ensure cleanup of ZMQ during GC. - self._finalizer = weakref.finalize(self, self.shutdown) + # # Ensure cleanup of ZMQ during GC. + # self._finalizer = weakref.finalize(self, self.shutdown) # Serialization setup. self.encoder = PickleEncoder() @@ -159,8 +159,8 @@ def __init__( "log_stats": log_stats, }) - def shutdown(self): - self.ctx.destroy(linger=0) + # def shutdown(self): + # self.ctx.destroy(linger=0) class SyncMPClient(MPClient): diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index cc2304728f5eb..ed64e7741390d 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -300,9 +300,6 @@ def shutdown(self): self.worker_response_mq = None destroy_model_parallel() destroy_distributed_environment() - - def __del__(self): - self.shutdown() @staticmethod def worker_main(*args, **kwargs): diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index a189c2eb01b44..efe5e3880fb5e 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -91,9 +91,6 @@ def __init__( target_fn: Callable, process_kwargs: Dict[Any, Any], ): - # Ensure cleanup of background process during GC. - self._finalizer = weakref.finalize(self, self.shutdown) - context = get_mp_context() reader, writer = context.Pipe(duplex=False) @@ -103,34 +100,35 @@ def __init__( process_kwargs["ready_pipe"] = writer process_kwargs["input_path"] = input_path process_kwargs["output_path"] = output_path - self.input_path = input_path - self.output_path = output_path + # self.input_path = input_path + # self.output_path = output_path - # Run Detokenizer busy loop in background process. - self.proc = context.Process(target=target_fn, kwargs=process_kwargs) - self.proc.start() + # Run busy loop in background process. + proc = context.Process(target=target_fn, kwargs=process_kwargs) + self._finalizer = weakref.finalize( + self, shutdown, proc, input_path, output_path) + proc.start() # Wait for startup. if reader.recv()["status"] != "READY": raise RuntimeError(f"{process_name} initialization failed. " "See root cause above.") - def shutdown(self): - # Shutdown the process if needed. - if hasattr(self, "proc") and self.proc.is_alive(): - self.proc.terminate() - self.proc.join(5) - - if self.proc.is_alive(): - kill_process_tree(self.proc.pid) - - # Remove zmq ipc socket files - ipc_sockets = [self.output_path, self.input_path] - for ipc_socket in ipc_sockets: - socket_file = ipc_socket.replace("ipc://", "") - if os and os.path.exists(socket_file): - os.remove(socket_file) - - def __del__(self): - print("CALLED DEL") - self.shutdown() + +# Note(rob): shutdown function cannot be a bound method, +# else the gc cannot collect the object. +def shutdown(proc, input_path, output_path): + # Shutdown the process. + if proc.is_alive(): + proc.terminate() + proc.join(5) + + if proc.is_alive(): + kill_process_tree(proc.pid) + + # Remove zmq ipc socket files + ipc_sockets = [output_path, input_path] + for ipc_socket in ipc_sockets: + socket_file = ipc_socket.replace("ipc://", "") + if os and os.path.exists(socket_file): + os.remove(socket_file) From 567b424b3ed9f2bacfc91e05cdb3b6afd9b9d59d Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Wed, 1 Jan 2025 14:11:46 +0000 Subject: [PATCH 10/42] working --- vllm/v1/engine/core_client.py | 22 ++++++++++------------ vllm/v1/utils.py | 9 ++++----- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index a0607215bf198..a6f71770eeb41 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -126,25 +126,26 @@ def __init__( executor_class: Type[Executor], log_stats: bool = False, ): - # # Ensure cleanup of ZMQ during GC. - # self._finalizer = weakref.finalize(self, self.shutdown) - # Serialization setup. self.encoder = PickleEncoder() self.decoder = msgspec.msgpack.Decoder(EngineCoreOutputs) # ZMQ setup. - if asyncio_mode: - self.ctx = zmq.asyncio.Context() - else: - self.ctx = zmq.Context() # type: ignore[attr-defined] + ctx = ( + zmq.asyncio.Context() # type: ignore[attr-defined] + if asyncio_mode else zmq.Context()) # type: ignore[attr-defined] + + # Note(rob): shutdown function cannot be a bound method, + # else the gc cannot collect the object. + self._finalizer = weakref.finalize(self, lambda x: x.destroy(linger=0), + ctx) # Paths and sockets for IPC. output_path = get_open_zmq_ipc_path() input_path = get_open_zmq_ipc_path() - self.output_socket = make_zmq_socket(self.ctx, output_path, + self.output_socket = make_zmq_socket(ctx, output_path, zmq.constants.PULL) - self.input_socket = make_zmq_socket(self.ctx, input_path, + self.input_socket = make_zmq_socket(ctx, input_path, zmq.constants.PUSH) # Start EngineCore in background process. @@ -159,9 +160,6 @@ def __init__( "log_stats": log_stats, }) - # def shutdown(self): - # self.ctx.destroy(linger=0) - class SyncMPClient(MPClient): """Synchronous client for multi-proc EngineCore.""" diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index efe5e3880fb5e..4cf1c91985081 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -1,3 +1,4 @@ +import multiprocessing import os import weakref from collections.abc import Sequence @@ -100,13 +101,11 @@ def __init__( process_kwargs["ready_pipe"] = writer process_kwargs["input_path"] = input_path process_kwargs["output_path"] = output_path - # self.input_path = input_path - # self.output_path = output_path # Run busy loop in background process. proc = context.Process(target=target_fn, kwargs=process_kwargs) - self._finalizer = weakref.finalize( - self, shutdown, proc, input_path, output_path) + self._finalizer = weakref.finalize(self, shutdown, proc, input_path, + output_path) proc.start() # Wait for startup. @@ -117,7 +116,7 @@ def __init__( # Note(rob): shutdown function cannot be a bound method, # else the gc cannot collect the object. -def shutdown(proc, input_path, output_path): +def shutdown(proc: multiprocessing.Process, input_path: str, output_path: str): # Shutdown the process. if proc.is_alive(): proc.terminate() From 7d04b981d6ddcf7d4c6b0101acc213ba1ab561e2 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 12:52:18 +0000 Subject: [PATCH 11/42] fix failing test --- tests/v1/engine/test_async_llm.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/v1/engine/test_async_llm.py b/tests/v1/engine/test_async_llm.py index fffb5b8100ec7..70c6a32dc48ed 100644 --- a/tests/v1/engine/test_async_llm.py +++ b/tests/v1/engine/test_async_llm.py @@ -65,5 +65,3 @@ async def test_load(monkeypatch): assert failed_request_id is None, ( f"{failed_request_id} generated {tokens} but " f"expected {NUM_EXPECTED_TOKENS}") - - engine.shutdown() From 62e1022dd90eb89caf21787592bc340abbfc5816 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 13:01:06 +0000 Subject: [PATCH 12/42] remove explicit shutdown calls --- tests/v1/engine/test_engine_core_client.py | 6 ------ vllm/entrypoints/openai/api_server.py | 12 +++--------- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/tests/v1/engine/test_engine_core_client.py b/tests/v1/engine/test_engine_core_client.py index 729975e4ea8c4..20d4e6f63b339 100644 --- a/tests/v1/engine/test_engine_core_client.py +++ b/tests/v1/engine/test_engine_core_client.py @@ -142,9 +142,6 @@ def test_engine_core_client(monkeypatch, multiprocessing_mode: bool): client.abort_requests([request.request_id]) - # Shutdown the client. - client.shutdown() - @pytest.mark.asyncio async def test_engine_core_client_asyncio(monkeypatch): @@ -200,6 +197,3 @@ async def test_engine_core_client_asyncio(monkeypatch): else: assert len(outputs[req_id]) == MAX_TOKENS, ( f"{len(outputs[req_id])=}, {MAX_TOKENS=}") - - # Shutdown the client. - client.shutdown() diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index bac72d87376da..eb846265e08f0 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -137,15 +137,9 @@ async def build_async_engine_client_from_engine_args( if (MQLLMEngineClient.is_unsupported_config(engine_args) or envs.VLLM_USE_V1 or disable_frontend_multiprocessing): - engine_client: Optional[EngineClient] = None - try: - engine_client = AsyncLLMEngine.from_engine_args( - engine_args=engine_args, - usage_context=UsageContext.OPENAI_API_SERVER) - yield engine_client - finally: - if engine_client and hasattr(engine_client, "shutdown"): - engine_client.shutdown() + yield AsyncLLMEngine.from_engine_args( + engine_args=engine_args, + usage_context=UsageContext.OPENAI_API_SERVER) # MQLLMEngine. else: From 0b0ca0809f3ae96e6d511e9343bbf881b93a0f69 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 14:42:44 +0000 Subject: [PATCH 13/42] updated --- tests/v1/engine/test_async_llm.py | 2 ++ vllm/v1/engine/async_llm.py | 12 ++++++++++++ vllm/v1/engine/core.py | 3 +++ vllm/v1/engine/core_client.py | 14 ++++++++++++++ vllm/v1/utils.py | 3 +++ 5 files changed, 34 insertions(+) diff --git a/tests/v1/engine/test_async_llm.py b/tests/v1/engine/test_async_llm.py index 70c6a32dc48ed..fffb5b8100ec7 100644 --- a/tests/v1/engine/test_async_llm.py +++ b/tests/v1/engine/test_async_llm.py @@ -65,3 +65,5 @@ async def test_load(monkeypatch): assert failed_request_id is None, ( f"{failed_request_id} generated {tokens} but " f"expected {NUM_EXPECTED_TOKENS}") + + engine.shutdown() diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 8916f94d3491c..fa29e62b9e91e 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -133,6 +133,15 @@ def from_engine_args( stat_loggers=stat_loggers, ) + def shutdown(self): + """Shutdown background resources.""" + + if engine_core := getattr(self, "engine_core", None): + engine_core.shutdown() + + if handler := getattr(self, "output_handler", None): + handler.cancel() + @classmethod def _get_executor_cls(cls, vllm_config: VllmConfig) -> Type[Executor]: executor_class: Type[Executor] @@ -287,6 +296,9 @@ async def _run_output_handler(self): logger.exception("EngineCore output handler hit an error: %s", e) kill_process_tree(os.getpid()) + finally: + logger.debug("AsyncLLM output handler shutting down.") + async def abort(self, request_id: str) -> None: """Abort RequestId in self, detokenizer, and engine core.""" diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index dcd758a938fe4..6439989ead903 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -129,6 +129,9 @@ def step(self) -> List[EngineCoreOutput]: def profile(self, is_start: bool = True): self.model_executor.profile(is_start) + def shutdown(self): + self.model_executor.shutdown() + class EngineCoreProc(EngineCore): """ZMQ-wrapper for running EngineCore in background process.""" diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index a6f71770eeb41..a5de3f1e64f8d 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -53,6 +53,9 @@ def make_client( return InprocClient(vllm_config, executor_class, log_stats) + def shutdown(self): + raise NotImplementedError + def get_output(self) -> List[EngineCoreOutput]: raise NotImplementedError @@ -93,6 +96,9 @@ class InprocClient(EngineCoreClient): def __init__(self, *args, **kwargs): self.engine_core = EngineCore(*args, **kwargs) + def shutdown(self): + self.engine_core.shutdown() + def get_output(self) -> List[EngineCoreOutput]: return self.engine_core.step() @@ -160,6 +166,14 @@ def __init__( "log_stats": log_stats, }) + def shutdown(self): + """Clean up background resources.""" + + self._finalizer() + + if hasattr(self, "proc_handle"): + self.proc_handle.shutdown() + class SyncMPClient(MPClient): """Synchronous client for multi-proc EngineCore.""" diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index 4cf1c91985081..ca60bc990c1f9 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -113,6 +113,9 @@ def __init__( raise RuntimeError(f"{process_name} initialization failed. " "See root cause above.") + def shutdown(self): + self._finalizer() + # Note(rob): shutdown function cannot be a bound method, # else the gc cannot collect the object. From 729938a2dd3036d5f3b0daaea4271b74b1cfb64c Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 14:44:06 +0000 Subject: [PATCH 14/42] pdated --- vllm/v1/engine/async_llm.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index fa29e62b9e91e..255eaee5aa77f 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -296,9 +296,6 @@ async def _run_output_handler(self): logger.exception("EngineCore output handler hit an error: %s", e) kill_process_tree(os.getpid()) - finally: - logger.debug("AsyncLLM output handler shutting down.") - async def abort(self, request_id: str) -> None: """Abort RequestId in self, detokenizer, and engine core.""" From 0259241efe8fe75a1ba1fe283bed0524bbb1ab83 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 14:45:25 +0000 Subject: [PATCH 15/42] update --- vllm/entrypoints/openai/api_server.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index eb846265e08f0..bac72d87376da 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -137,9 +137,15 @@ async def build_async_engine_client_from_engine_args( if (MQLLMEngineClient.is_unsupported_config(engine_args) or envs.VLLM_USE_V1 or disable_frontend_multiprocessing): - yield AsyncLLMEngine.from_engine_args( - engine_args=engine_args, - usage_context=UsageContext.OPENAI_API_SERVER) + engine_client: Optional[EngineClient] = None + try: + engine_client = AsyncLLMEngine.from_engine_args( + engine_args=engine_args, + usage_context=UsageContext.OPENAI_API_SERVER) + yield engine_client + finally: + if engine_client and hasattr(engine_client, "shutdown"): + engine_client.shutdown() # MQLLMEngine. else: From 58e4b366e8aa2d1ab80876e32f65ef28770d1107 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 14:51:14 +0000 Subject: [PATCH 16/42] working --- vllm/v1/engine/core_client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index a5de3f1e64f8d..95bec9dd25cbc 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1,4 +1,5 @@ import weakref +from abc import ABC, abstractmethod from typing import List, Type import msgspec @@ -19,7 +20,7 @@ logger = init_logger(__name__) -class EngineCoreClient: +class EngineCoreClient(ABC): """ EngineCoreClient: subclasses handle different methods for pushing and pulling from the EngineCore for asyncio / multiprocessing. @@ -53,8 +54,9 @@ def make_client( return InprocClient(vllm_config, executor_class, log_stats) + @abstractmethod def shutdown(self): - raise NotImplementedError + ... def get_output(self) -> List[EngineCoreOutput]: raise NotImplementedError From cacf6b081bb12dc4f6dfc1e47e21eb9d4896c2d6 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 14:54:47 +0000 Subject: [PATCH 17/42] updated --- vllm/v1/engine/core_client.py | 8 ++++---- vllm/v1/utils.py | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 95bec9dd25cbc..c7127d0b83bfc 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -139,21 +139,21 @@ def __init__( self.decoder = msgspec.msgpack.Decoder(EngineCoreOutputs) # ZMQ setup. - ctx = ( + self.ctx = ( zmq.asyncio.Context() # type: ignore[attr-defined] if asyncio_mode else zmq.Context()) # type: ignore[attr-defined] # Note(rob): shutdown function cannot be a bound method, # else the gc cannot collect the object. self._finalizer = weakref.finalize(self, lambda x: x.destroy(linger=0), - ctx) + self.ctx) # Paths and sockets for IPC. output_path = get_open_zmq_ipc_path() input_path = get_open_zmq_ipc_path() - self.output_socket = make_zmq_socket(ctx, output_path, + self.output_socket = make_zmq_socket(self.ctx, output_path, zmq.constants.PULL) - self.input_socket = make_zmq_socket(ctx, input_path, + self.input_socket = make_zmq_socket(self.ctx, input_path, zmq.constants.PUSH) # Start EngineCore in background process. diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index ca60bc990c1f9..b0a7affbebb7e 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -103,10 +103,10 @@ def __init__( process_kwargs["output_path"] = output_path # Run busy loop in background process. - proc = context.Process(target=target_fn, kwargs=process_kwargs) - self._finalizer = weakref.finalize(self, shutdown, proc, input_path, - output_path) - proc.start() + self.proc = context.Process(target=target_fn, kwargs=process_kwargs) + self._finalizer = weakref.finalize(self, shutdown, self.proc, + input_path, output_path) + self.proc.start() # Wait for startup. if reader.recv()["status"] != "READY": @@ -128,7 +128,7 @@ def shutdown(proc: multiprocessing.Process, input_path: str, output_path: str): if proc.is_alive(): kill_process_tree(proc.pid) - # Remove zmq ipc socket files + # Remove zmq ipc socket files. ipc_sockets = [output_path, input_path] for ipc_socket in ipc_sockets: socket_file = ipc_socket.replace("ipc://", "") From ccc747dad9ed37fed4b255d3cfa2ef2aab81a417 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 15:16:29 +0000 Subject: [PATCH 18/42] fixup --- vllm/v1/engine/core.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 6439989ead903..075e24898539e 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -187,6 +187,7 @@ def signal_handler(signum, frame): signal.signal(signal.SIGINT, signal_handler) parent_process = psutil.Process().parent() + engine_core = None try: engine_core = EngineCoreProc(*args, **kwargs) engine_core.run_busy_loop() @@ -199,6 +200,10 @@ def signal_handler(signum, frame): logger.error("EngineCore hit an exception: %s", traceback) parent_process.send_signal(signal.SIGQUIT) + finally: + if engine_core is not None: + engine_core.shutdown() + def run_busy_loop(self): """Core busy loop of the EngineCore.""" From ddc2a97632a5ce262b347fcd020c33a9033581bf Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 15:17:11 +0000 Subject: [PATCH 19/42] fixup --- vllm/v1/engine/core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 075e24898539e..13a50a4f855e2 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -126,12 +126,12 @@ def step(self) -> List[EngineCoreOutput]: scheduler_output, output) return engine_core_outputs - def profile(self, is_start: bool = True): - self.model_executor.profile(is_start) - def shutdown(self): self.model_executor.shutdown() + def profile(self, is_start: bool = True): + self.model_executor.profile(is_start) + class EngineCoreProc(EngineCore): """ZMQ-wrapper for running EngineCore in background process.""" From af0d529a78acc5cd89d051172fd233e1dffb9987 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 15:17:41 +0000 Subject: [PATCH 20/42] reduce cruft --- vllm/v1/engine/async_llm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 255eaee5aa77f..ff7a0c28dd91a 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -134,7 +134,7 @@ def from_engine_args( ) def shutdown(self): - """Shutdown background resources.""" + """Shutdown, cleaning up the background proc and IPC.""" if engine_core := getattr(self, "engine_core", None): engine_core.shutdown() From 17e152b1188d9c51d584bb5bc2f500e5e9c67ec5 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 15:18:36 +0000 Subject: [PATCH 21/42] updated --- vllm/v1/engine/core_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index c7127d0b83bfc..8b0ff913e0c5b 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -98,9 +98,6 @@ class InprocClient(EngineCoreClient): def __init__(self, *args, **kwargs): self.engine_core = EngineCore(*args, **kwargs) - def shutdown(self): - self.engine_core.shutdown() - def get_output(self) -> List[EngineCoreOutput]: return self.engine_core.step() @@ -110,6 +107,9 @@ def add_request(self, request: EngineCoreRequest) -> None: def abort_requests(self, request_ids: List[str]) -> None: self.engine_core.abort_requests(request_ids) + def shutdown(self): + self.engine_core.shutdown() + def profile(self, is_start: bool = True) -> None: self.engine_core.profile(is_start) From 37859d7b462b0aaa878220fa466fcc24fafc3898 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 15:24:25 +0000 Subject: [PATCH 22/42] finish --- vllm/v1/engine/core_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 8b0ff913e0c5b..f6b167b3e7592 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -167,7 +167,7 @@ def __init__( "executor_class": executor_class, "log_stats": log_stats, }) - + def shutdown(self): """Clean up background resources.""" From c29f3290f7fff5eeda74bdcbf496b2fe5ba9fafa Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 15:27:54 +0000 Subject: [PATCH 23/42] updated --- vllm/v1/engine/core_client.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index f6b167b3e7592..e009f3448bf69 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -167,15 +167,14 @@ def __init__( "executor_class": executor_class, "log_stats": log_stats, }) - + def shutdown(self): """Clean up background resources.""" - - self._finalizer() - if hasattr(self, "proc_handle"): self.proc_handle.shutdown() + self._finalizer() + class SyncMPClient(MPClient): """Synchronous client for multi-proc EngineCore.""" From 1c4b92af5877c0de0c0afa72b34948bdc8ea808d Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 16:14:29 +0000 Subject: [PATCH 24/42] updated --- vllm/v1/executor/multiproc_executor.py | 38 +++++++++++++++++++------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index ed64e7741390d..abaf807335ef2 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -3,12 +3,12 @@ import signal import sys import time -import weakref from dataclasses import dataclass from enum import Enum, auto from multiprocessing.process import BaseProcess from typing import Any, Dict, List, Optional, Tuple +import psutil import zmq from vllm.config import VllmConfig @@ -19,8 +19,9 @@ from vllm.executor.multiproc_worker_utils import ( _add_prefix, set_multiprocessing_worker_envs) from vllm.logger import init_logger -from vllm.utils import (get_distributed_init_method, get_mp_context, - get_open_port, get_open_zmq_ipc_path, zmq_socket_ctx) +from vllm.utils import (get_distributed_init_method, get_exception_traceback, + get_mp_context, get_open_port, get_open_zmq_ipc_path, + kill_process_tree, zmq_socket_ctx) from vllm.v1.executor.abstract import Executor from vllm.v1.outputs import ModelRunnerOutput from vllm.worker.worker_base import WorkerWrapperBase @@ -34,10 +35,25 @@ class MultiprocExecutor(Executor): def __init__(self, vllm_config: VllmConfig) -> None: - # Call self.shutdown at exit to clean up - # and ensure workers will be terminated. - self._finalizer = weakref.finalize(self, self.shutdown) + # The child processes will send SIGQUIT when unrecoverable + # errors happen. We kill the process tree here so that the + # stack trace is very evident. + # TODO: rather than killing the main process, we should + # figure out how to raise an AsyncEngineDeadError and + # handle at the API server level so we can return a better + # error code to the clients calling VLLM. + + def sigquit_handler(signum, frame): + logger.fatal( + "MulitprocExecutor got SIGQUIT from worker processes, shutting " + "down. See stack trace above for root cause issue.") + # Propagate error up to parent process. + parent_process = psutil.Process().parent() + parent_process.send_signal(signal.SIGQUIT) + kill_process_tree(os.getpid()) + + signal.signal(signal.SIGQUIT, sigquit_handler) self.vllm_config = vllm_config self.parallel_config = vllm_config.parallel_config @@ -321,7 +337,8 @@ def signal_handler(signum, frame): signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) - worker = None + parent_process = psutil.Process().parent() + worker: Optional[WorkerProc] = None try: worker = WorkerProc(*args, **kwargs) @@ -335,9 +352,10 @@ def signal_handler(signum, frame): except SystemExit: logger.debug("Worker interrupted.") - except BaseException as e: - logger.exception(e) - raise + except Exception: + traceback = get_exception_traceback() + logger.error("Worker hit an exception: %s", traceback) + parent_process.send_signal(signal.SIGQUIT) finally: # Clean up once worker exits busy loop From eb9b00bbf3a7e274794300bc8723d217d442c31f Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 16:26:20 +0000 Subject: [PATCH 25/42] stash --- vllm/v1/executor/multiproc_executor.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index abaf807335ef2..9d9d03b0228ee 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -37,13 +37,7 @@ class MultiprocExecutor(Executor): def __init__(self, vllm_config: VllmConfig) -> None: # The child processes will send SIGQUIT when unrecoverable - # errors happen. We kill the process tree here so that the - # stack trace is very evident. - # TODO: rather than killing the main process, we should - # figure out how to raise an AsyncEngineDeadError and - # handle at the API server level so we can return a better - # error code to the clients calling VLLM. - + # errors happen. def sigquit_handler(signum, frame): logger.fatal( "MulitprocExecutor got SIGQUIT from worker processes, shutting " @@ -51,9 +45,10 @@ def sigquit_handler(signum, frame): # Propagate error up to parent process. parent_process = psutil.Process().parent() parent_process.send_signal(signal.SIGQUIT) - kill_process_tree(os.getpid()) + self.shutdown() signal.signal(signal.SIGQUIT, sigquit_handler) + self.vllm_config = vllm_config self.parallel_config = vllm_config.parallel_config @@ -356,6 +351,7 @@ def signal_handler(signum, frame): traceback = get_exception_traceback() logger.error("Worker hit an exception: %s", traceback) parent_process.send_signal(signal.SIGQUIT) + raise finally: # Clean up once worker exits busy loop @@ -390,12 +386,17 @@ class ResponseStatus(Enum): def worker_busy_loop(self): """Main busy loop for Multiprocessing Workers""" + + i = 0 while True: method, args, kwargs = self.rpc_broadcast_mq.dequeue() try: + if i == 10: + raise ValueError("SIMULATE CUDA EXCEPTION") + i += 1 output = getattr(self.worker, method)(*args, **kwargs) - except BaseException as e: + except Exception as e: self.worker_response_mq.enqueue( (WorkerProc.ResponseStatus.FAILURE, e)) continue From 1da99a81f56ebdf98bcf0f9bf4ad27a7a607ffef Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 17:45:56 +0000 Subject: [PATCH 26/42] updated --- vllm/v1/executor/multiproc_executor.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 9d9d03b0228ee..583a9c36cdfa0 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -332,7 +332,6 @@ def signal_handler(signum, frame): signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) - parent_process = psutil.Process().parent() worker: Optional[WorkerProc] = None try: worker = WorkerProc(*args, **kwargs) @@ -345,12 +344,17 @@ def signal_handler(signum, frame): worker.worker_busy_loop() except SystemExit: + # Avoid re-raising SystemExit more than once, such + # that we have a cleaner stack trace. + shutdown_requested = True logger.debug("Worker interrupted.") except Exception: - traceback = get_exception_traceback() - logger.error("Worker hit an exception: %s", traceback) - parent_process.send_signal(signal.SIGQUIT) + # While busy loop handles exceptions and alerts EngineCore, + # if there is an error in startup process (e.g. OOM) + # or there an with the IPC itself, alert parent + # so we can shut down the whole system. + psutil.Process().parent().send_signal(signal.SIGQUIT) raise finally: @@ -387,19 +391,18 @@ class ResponseStatus(Enum): def worker_busy_loop(self): """Main busy loop for Multiprocessing Workers""" - i = 0 + method, args, kwargs = self.rpc_broadcast_mq.dequeue() while True: - method, args, kwargs = self.rpc_broadcast_mq.dequeue() - try: - if i == 10: - raise ValueError("SIMULATE CUDA EXCEPTION") - i += 1 + if self.rank == 0: + raise ValueError("SIMULATE CUDA ERROR") output = getattr(self.worker, method)(*args, **kwargs) except Exception as e: self.worker_response_mq.enqueue( (WorkerProc.ResponseStatus.FAILURE, e)) - continue + traceback = get_exception_traceback() + logger.error("WorkerProc hit an exception: %s", traceback) + raise SystemExit() self.worker_response_mq.enqueue( (WorkerProc.ResponseStatus.SUCCESS, output)) From 27431661694f1580d4f8b39f3c37ea6964749efe Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 17:59:31 +0000 Subject: [PATCH 27/42] updated --- vllm/v1/executor/multiproc_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 583a9c36cdfa0..d810bc6a3ff13 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -332,7 +332,7 @@ def signal_handler(signum, frame): signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) - worker: Optional[WorkerProc] = None + worker = None try: worker = WorkerProc(*args, **kwargs) From 8e257c1f007b1adf4ba38e799e6bf90d8e044bad Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 18:03:35 +0000 Subject: [PATCH 28/42] stash --- vllm/v1/executor/multiproc_executor.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index d810bc6a3ff13..989da708576e8 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -344,16 +344,16 @@ def signal_handler(signum, frame): worker.worker_busy_loop() except SystemExit: - # Avoid re-raising SystemExit more than once, such - # that we have a cleaner stack trace. + # worker_busy_loop sends exceptions to Executor and raises + # SystemExit. shutdown_requested = True logger.debug("Worker interrupted.") except Exception: - # While busy loop handles exceptions and alerts EngineCore, - # if there is an error in startup process (e.g. OOM) - # or there an with the IPC itself, alert parent - # so we can shut down the whole system. + # worker_busy_loop sends exceptions exceptons to Executor + # for shutdown, but if there is an error in startup or an + # error with IPC + # itself, we need to alert the parent so we can shut down. psutil.Process().parent().send_signal(signal.SIGQUIT) raise From b7c50dc9a413ffb09309547ff4d446723f22901a Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 18:14:53 +0000 Subject: [PATCH 29/42] revert spurious change --- vllm/v1/executor/multiproc_executor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 989da708576e8..db7592753c3f5 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -391,8 +391,9 @@ class ResponseStatus(Enum): def worker_busy_loop(self): """Main busy loop for Multiprocessing Workers""" - method, args, kwargs = self.rpc_broadcast_mq.dequeue() while True: + method, args, kwargs = self.rpc_broadcast_mq.dequeue() + try: if self.rank == 0: raise ValueError("SIMULATE CUDA ERROR") From dcfd3b8a8fcaba3cd19343ef2414480a9b5a2cc5 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 18:26:19 +0000 Subject: [PATCH 30/42] updated --- vllm/v1/executor/multiproc_executor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index db7592753c3f5..82f5acbb953d5 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -390,7 +390,6 @@ class ResponseStatus(Enum): def worker_busy_loop(self): """Main busy loop for Multiprocessing Workers""" - while True: method, args, kwargs = self.rpc_broadcast_mq.dequeue() @@ -403,7 +402,7 @@ def worker_busy_loop(self): (WorkerProc.ResponseStatus.FAILURE, e)) traceback = get_exception_traceback() logger.error("WorkerProc hit an exception: %s", traceback) - raise SystemExit() + continue self.worker_response_mq.enqueue( (WorkerProc.ResponseStatus.SUCCESS, output)) From 6e0e0d43838ef6bab5d7b3daaa3c6745ee9420ad Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 18:38:05 +0000 Subject: [PATCH 31/42] stash --- vllm/v1/executor/multiproc_executor.py | 20 +++++++++----------- vllm/v1/worker/gpu_worker.py | 6 ++++++ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 82f5acbb953d5..caff86b012211 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -3,6 +3,7 @@ import signal import sys import time +import weakref from dataclasses import dataclass from enum import Enum, auto from multiprocessing.process import BaseProcess @@ -19,9 +20,8 @@ from vllm.executor.multiproc_worker_utils import ( _add_prefix, set_multiprocessing_worker_envs) from vllm.logger import init_logger -from vllm.utils import (get_distributed_init_method, get_exception_traceback, - get_mp_context, get_open_port, get_open_zmq_ipc_path, - kill_process_tree, zmq_socket_ctx) +from vllm.utils import (get_distributed_init_method, get_mp_context, + get_open_port, get_open_zmq_ipc_path, zmq_socket_ctx) from vllm.v1.executor.abstract import Executor from vllm.v1.outputs import ModelRunnerOutput from vllm.worker.worker_base import WorkerWrapperBase @@ -35,6 +35,9 @@ class MultiprocExecutor(Executor): def __init__(self, vllm_config: VllmConfig) -> None: + # Call self.shutdown at exit to clean up + # and ensure workers will be terminated. + self._finalizer = weakref.finalize(self, self.shutdown) # The child processes will send SIGQUIT when unrecoverable # errors happen. @@ -344,15 +347,12 @@ def signal_handler(signum, frame): worker.worker_busy_loop() except SystemExit: - # worker_busy_loop sends exceptions to Executor and raises - # SystemExit. - shutdown_requested = True logger.debug("Worker interrupted.") except Exception: # worker_busy_loop sends exceptions exceptons to Executor # for shutdown, but if there is an error in startup or an - # error with IPC + # error with IPC itself, we need to alert the parent. # itself, we need to alert the parent so we can shut down. psutil.Process().parent().send_signal(signal.SIGQUIT) raise @@ -390,18 +390,16 @@ class ResponseStatus(Enum): def worker_busy_loop(self): """Main busy loop for Multiprocessing Workers""" + while True: method, args, kwargs = self.rpc_broadcast_mq.dequeue() try: - if self.rank == 0: - raise ValueError("SIMULATE CUDA ERROR") output = getattr(self.worker, method)(*args, **kwargs) except Exception as e: self.worker_response_mq.enqueue( (WorkerProc.ResponseStatus.FAILURE, e)) - traceback = get_exception_traceback() - logger.error("WorkerProc hit an exception: %s", traceback) + logger.exception("WorkerProc hit an exception: %s", exc_info=e) continue self.worker_response_mq.enqueue( diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py index af438f7d5820c..a7723e4e85264 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -35,6 +35,8 @@ def __init__( distributed_init_method: str, ): + self.i = 0 + # TODO: use WorkerBase.__init__(self, vllm_config=vllm_config) self.vllm_config = vllm_config self.model_config = vllm_config.model_config @@ -201,6 +203,10 @@ def execute_model( self, scheduler_output: "SchedulerOutput", ) -> ModelRunnerOutput: + if self.rank == 0 and self.i == 10: + raise ValueError("ERROR FROM HERE :)") + self.i += 1 + output = self.model_runner.execute_model(scheduler_output) return output if self.rank == 0 else None From 55a6195881e8d67a8d6e94a65e2340e9f30e9941 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 18:44:46 +0000 Subject: [PATCH 32/42] updated --- vllm/v1/executor/multiproc_executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index caff86b012211..228da1d9e23ed 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -353,7 +353,6 @@ def signal_handler(signum, frame): # worker_busy_loop sends exceptions exceptons to Executor # for shutdown, but if there is an error in startup or an # error with IPC itself, we need to alert the parent. - # itself, we need to alert the parent so we can shut down. psutil.Process().parent().send_signal(signal.SIGQUIT) raise From aa6954fb686ecbbc6bfba25b51f836f7c9401e2f Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 19:06:14 +0000 Subject: [PATCH 33/42] updated --- vllm/v1/engine/async_llm.py | 16 ---------------- vllm/v1/engine/core_client.py | 19 ++++++++++++++++++- vllm/v1/engine/llm_engine.py | 1 - 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index ff7a0c28dd91a..564d8a8343bef 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -1,6 +1,5 @@ import asyncio import os -import signal from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union from vllm.config import ModelConfig, VllmConfig @@ -42,21 +41,6 @@ def __init__( start_engine_loop: bool = True, ) -> None: - # The child processes will send SIGQUIT when unrecoverable - # errors happen. We kill the process tree here so that the - # stack trace is very evident. - # TODO: rather than killing the main process, we should - # figure out how to raise an AsyncEngineDeadError and - # handle at the API server level so we can return a better - # error code to the clients calling VLLM. - def sigquit_handler(signum, frame): - logger.fatal( - "AsyncLLM got SIGQUIT from worker processes, shutting " - "down. See stack trace above for root cause issue.") - kill_process_tree(os.getpid()) - - signal.signal(signal.SIGQUIT, sigquit_handler) - assert start_engine_loop self.log_requests = log_requests diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index e009f3448bf69..f52ecb29fde40 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1,3 +1,5 @@ +import os +import signal import weakref from abc import ABC, abstractmethod from typing import List, Type @@ -8,7 +10,8 @@ from vllm.config import VllmConfig from vllm.logger import init_logger -from vllm.utils import get_open_zmq_ipc_path, make_zmq_socket +from vllm.utils import (get_open_zmq_ipc_path, make_zmq_socket, + kill_process_tree) from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs, EngineCoreProfile, EngineCoreRequest, EngineCoreRequestType, EngineCoreRequestUnion) @@ -134,6 +137,20 @@ def __init__( executor_class: Type[Executor], log_stats: bool = False, ): + # The child processes will send SIGQUIT when unrecoverable + # errors happen. We kill the process tree here so that the + # stack trace is very evident. + # TODO(rob): rather than killing the main process, we should + # figure out how to raise an AsyncEngineDeadError and + # handle at the API server level so we can return a better + # error code to the clients calling VLLM. + def sigquit_handler(signum, frame): + logger.fatal( + "Got SIGQUIT from worker processes, shutting " + "down. See stack trace above for root cause issue.") + kill_process_tree(os.getpid()) + signal.signal(signal.SIGQUIT, sigquit_handler) + # Serialization setup. self.encoder = PickleEncoder() self.decoder = msgspec.msgpack.Decoder(EngineCoreOutputs) diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index 1f49de67d7493..016ed7438c5a2 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -42,7 +42,6 @@ def __init__( use_cached_outputs: bool = False, multiprocess_mode: bool = False, ) -> None: - # TODO: Can we avoid this? self.model_config = vllm_config.model_config From 1d15ae0d8b05841d792414014a438eabeb50e15f Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 19:08:50 +0000 Subject: [PATCH 34/42] remove cruft --- vllm/v1/engine/llm_engine.py | 1 + vllm/v1/executor/multiproc_executor.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index 016ed7438c5a2..1f49de67d7493 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -42,6 +42,7 @@ def __init__( use_cached_outputs: bool = False, multiprocess_mode: bool = False, ) -> None: + # TODO: Can we avoid this? self.model_config = vllm_config.model_config diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 228da1d9e23ed..26308c642cdb0 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -389,7 +389,6 @@ class ResponseStatus(Enum): def worker_busy_loop(self): """Main busy loop for Multiprocessing Workers""" - while True: method, args, kwargs = self.rpc_broadcast_mq.dequeue() From 0347baaf2bb7656d461817e1f00f3b1cbaecf2a4 Mon Sep 17 00:00:00 2001 From: Robert Shaw <114415538+robertgshaw2-neuralmagic@users.noreply.github.com> Date: Fri, 3 Jan 2025 14:09:50 -0500 Subject: [PATCH 35/42] Update vllm/v1/executor/multiproc_executor.py Co-authored-by: Tyler Michael Smith --- vllm/v1/executor/multiproc_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 26308c642cdb0..33a9097abab1a 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -350,7 +350,7 @@ def signal_handler(signum, frame): logger.debug("Worker interrupted.") except Exception: - # worker_busy_loop sends exceptions exceptons to Executor + # worker_busy_loop sends exceptons to Executor # for shutdown, but if there is an error in startup or an # error with IPC itself, we need to alert the parent. psutil.Process().parent().send_signal(signal.SIGQUIT) From 20b8fa26e097c32945d131f6530e06dbc435a495 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 19:12:00 +0000 Subject: [PATCH 36/42] stash --- vllm/v1/engine/core_client.py | 8 ++++---- vllm/v1/executor/multiproc_executor.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index f52ecb29fde40..a6096edc5182b 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -137,19 +137,19 @@ def __init__( executor_class: Type[Executor], log_stats: bool = False, ): - # The child processes will send SIGQUIT when unrecoverable + # The child processes will send SIGUSR1 when unrecoverable # errors happen. We kill the process tree here so that the # stack trace is very evident. # TODO(rob): rather than killing the main process, we should # figure out how to raise an AsyncEngineDeadError and # handle at the API server level so we can return a better # error code to the clients calling VLLM. - def sigquit_handler(signum, frame): + def sigusr1_handler(signum, frame): logger.fatal( - "Got SIGQUIT from worker processes, shutting " + "Got SIGUSR1 from worker processes, shutting " "down. See stack trace above for root cause issue.") kill_process_tree(os.getpid()) - signal.signal(signal.SIGQUIT, sigquit_handler) + signal.signal(signal.SIGUSR1, sigusr1_handler) # Serialization setup. self.encoder = PickleEncoder() diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 26308c642cdb0..e4c9a0ab3272f 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -41,16 +41,16 @@ def __init__(self, vllm_config: VllmConfig) -> None: # The child processes will send SIGQUIT when unrecoverable # errors happen. - def sigquit_handler(signum, frame): + def sigusr1_handler(signum, frame): logger.fatal( "MulitprocExecutor got SIGQUIT from worker processes, shutting " "down. See stack trace above for root cause issue.") # Propagate error up to parent process. parent_process = psutil.Process().parent() - parent_process.send_signal(signal.SIGQUIT) + parent_process.send_signal(signal.SIGUSR1) self.shutdown() - signal.signal(signal.SIGQUIT, sigquit_handler) + signal.signal(signal.SIGUSR1, sigusr1_handler) self.vllm_config = vllm_config self.parallel_config = vllm_config.parallel_config From 884879a253754087ce91753808539fdaadf3454f Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 19:12:41 +0000 Subject: [PATCH 37/42] switch to SIGUSR1 --- vllm/v1/engine/core.py | 2 +- vllm/v1/executor/multiproc_executor.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 13a50a4f855e2..975ce11fe8aff 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -198,7 +198,7 @@ def signal_handler(signum, frame): except Exception: traceback = get_exception_traceback() logger.error("EngineCore hit an exception: %s", traceback) - parent_process.send_signal(signal.SIGQUIT) + parent_process.send_signal(signal.SIGUSR1) finally: if engine_core is not None: diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index b19c15826c6a0..3cd62b7fe5619 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -39,11 +39,11 @@ def __init__(self, vllm_config: VllmConfig) -> None: # and ensure workers will be terminated. self._finalizer = weakref.finalize(self, self.shutdown) - # The child processes will send SIGQUIT when unrecoverable + # The child processes will send SIGUSR1 when unrecoverable # errors happen. def sigusr1_handler(signum, frame): logger.fatal( - "MulitprocExecutor got SIGQUIT from worker processes, shutting " + "MulitprocExecutor got SIGUSR1 from worker processes, shutting " "down. See stack trace above for root cause issue.") # Propagate error up to parent process. parent_process = psutil.Process().parent() @@ -350,10 +350,10 @@ def signal_handler(signum, frame): logger.debug("Worker interrupted.") except Exception: - # worker_busy_loop sends exceptons to Executor + # worker_busy_loop sends exceptions exceptons to Executor # for shutdown, but if there is an error in startup or an # error with IPC itself, we need to alert the parent. - psutil.Process().parent().send_signal(signal.SIGQUIT) + psutil.Process().parent().send_signal(signal.SIGUSR1) raise finally: From bb86a034fdeac9d8a20b75dccd2107a445bb158c Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 19:28:54 +0000 Subject: [PATCH 38/42] updated --- vllm/v1/engine/llm_engine.py | 1 + vllm/v1/worker/gpu_worker.py | 6 ------ 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index 1f49de67d7493..c0d29c7b92bb0 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -97,6 +97,7 @@ def from_engine_args( logger.debug("Enabling multiprocessing for LLMEngine.") enable_multiprocessing = True + print(f"{enable_multiprocessing=}") # Create the LLMEngine. return cls(vllm_config=vllm_config, executor_class=executor_class, diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py index a7723e4e85264..fcef37371a6b9 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -34,8 +34,6 @@ def __init__( rank: int, distributed_init_method: str, ): - - self.i = 0 # TODO: use WorkerBase.__init__(self, vllm_config=vllm_config) self.vllm_config = vllm_config @@ -203,10 +201,6 @@ def execute_model( self, scheduler_output: "SchedulerOutput", ) -> ModelRunnerOutput: - if self.rank == 0 and self.i == 10: - raise ValueError("ERROR FROM HERE :)") - self.i += 1 - output = self.model_runner.execute_model(scheduler_output) return output if self.rank == 0 else None From 405bcc1ddd87adaa7d3349be039e7796f0fd1fa1 Mon Sep 17 00:00:00 2001 From: Robert Shaw <114415538+robertgshaw2-neuralmagic@users.noreply.github.com> Date: Fri, 3 Jan 2025 14:29:21 -0500 Subject: [PATCH 39/42] Update vllm/v1/engine/core_client.py Co-authored-by: Tyler Michael Smith --- vllm/v1/engine/core_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index a6096edc5182b..22ed69e8eb2c2 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -146,7 +146,7 @@ def __init__( # error code to the clients calling VLLM. def sigusr1_handler(signum, frame): logger.fatal( - "Got SIGUSR1 from worker processes, shutting " + "Got fatal signal from worker processes, shutting " "down. See stack trace above for root cause issue.") kill_process_tree(os.getpid()) signal.signal(signal.SIGUSR1, sigusr1_handler) From 25e0feac05e7efb392d1b7eed292b4c2c2381161 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 19:29:50 +0000 Subject: [PATCH 40/42] update message --- vllm/v1/executor/multiproc_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 3cd62b7fe5619..114deae980d01 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -43,8 +43,8 @@ def __init__(self, vllm_config: VllmConfig) -> None: # errors happen. def sigusr1_handler(signum, frame): logger.fatal( - "MulitprocExecutor got SIGUSR1 from worker processes, shutting " - "down. See stack trace above for root cause issue.") + "MulitprocExecutor got fatal signal from worker processes, " + "shutting down. See stack trace above for root cause issue.") # Propagate error up to parent process. parent_process = psutil.Process().parent() parent_process.send_signal(signal.SIGUSR1) From efd62703022f3ff87e4531af7abb2d1419b3324e Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 19:30:22 +0000 Subject: [PATCH 41/42] updated --- vllm/v1/engine/llm_engine.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index c0d29c7b92bb0..1f49de67d7493 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -97,7 +97,6 @@ def from_engine_args( logger.debug("Enabling multiprocessing for LLMEngine.") enable_multiprocessing = True - print(f"{enable_multiprocessing=}") # Create the LLMEngine. return cls(vllm_config=vllm_config, executor_class=executor_class, From a5a306ed50cb026665c2aa52a1f40574ed26ba8a Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 19:31:12 +0000 Subject: [PATCH 42/42] fixed! --- vllm/v1/engine/core_client.py | 10 +++++----- vllm/v1/worker/gpu_worker.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 22ed69e8eb2c2..6a40c961fc1d7 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -10,8 +10,8 @@ from vllm.config import VllmConfig from vllm.logger import init_logger -from vllm.utils import (get_open_zmq_ipc_path, make_zmq_socket, - kill_process_tree) +from vllm.utils import (get_open_zmq_ipc_path, kill_process_tree, + make_zmq_socket) from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs, EngineCoreProfile, EngineCoreRequest, EngineCoreRequestType, EngineCoreRequestUnion) @@ -145,10 +145,10 @@ def __init__( # handle at the API server level so we can return a better # error code to the clients calling VLLM. def sigusr1_handler(signum, frame): - logger.fatal( - "Got fatal signal from worker processes, shutting " - "down. See stack trace above for root cause issue.") + logger.fatal("Got fatal signal from worker processes, shutting " + "down. See stack trace above for root cause issue.") kill_process_tree(os.getpid()) + signal.signal(signal.SIGUSR1, sigusr1_handler) # Serialization setup. diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py index fcef37371a6b9..af438f7d5820c 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -34,7 +34,7 @@ def __init__( rank: int, distributed_init_method: str, ): - + # TODO: use WorkerBase.__init__(self, vllm_config=vllm_config) self.vllm_config = vllm_config self.model_config = vllm_config.model_config