diff --git a/examples/doca/README.md b/examples/doca/README.md index 593e34cd88..9dc88c009d 100644 --- a/examples/doca/README.md +++ b/examples/doca/README.md @@ -78,7 +78,7 @@ We can see the GPU's PCIe address is `cf:00.0`, and we can infer from the above In case of UDP traffic, the sample will launch a simple pipeline with the DOCA Source Stage followed by a Monitor Stage to report number of received packets. ``` -python3 ./examples/doca/run_udp_raw.py --nic_addr 17:00.1 --gpu_addr ca:00.0 --traffic_type udp +python ./examples/doca/run_udp_raw.py --nic_addr 17:00.1 --gpu_addr ca:00.0 ``` UDP traffic can be easily sent with nping to the interface where Morpheus is listening: ``` @@ -118,7 +118,7 @@ DOCA GPUNetIO rate: 100000 pkts [00:12, 10963.39 pkts/s] As the DOCA Source stage output packets in the new RawMessage format that not all the Morpheus stages may support, there is an additional stage named DOCA Convert Stage which transform the data RawMessage to the Messagemeta format. ``` -python3 ./examples/doca/run_udp_convert.py --nic_addr 17:00.1 --gpu_addr ca:00.0 --traffic_type udp +python ./examples/doca/run_udp_convert.py --nic_addr 17:00.1 --gpu_addr ca:00.0 ``` ## Doca Sensitive Information Detection example for TCP traffic @@ -126,7 +126,7 @@ python3 ./examples/doca/run_udp_convert.py --nic_addr 17:00.1 --gpu_addr ca:00.0 The DOCA example is similar to the Sensitive Information Detection (SID) example in that it uses the `sid-minibert` model in conjunction with the `TritonInferenceStage` to detect sensitive information. The difference is that the sensitive information we will be detecting is obtained from a live TCP packet stream provided by a `DocaSourceStage`. To run the example from the Morpheus root directory and capture all TCP network traffic from the given NIC, use the following command and replace the `nic_addr` and `gpu_addr` arguments with your NIC and GPU PCIe addresses. ``` -# python examples/doca/run_tcp.py --nic_addr cc:00.1 --gpu_addr cf:00.0 --traffic_type tcp +# python examples/doca/run_tcp.py --nic_addr cc:00.1 --gpu_addr cf:00.0 ``` ``` ====Registering Pipeline==== @@ -146,7 +146,7 @@ DOCA GPUNetIO rate: 0 pkts [00:03, ? pkts/s]====Registering Pipeline Complete!== ====Starting Pipeline====[00:02, ? pkts/s] ====Pipeline Started====0:02, ? pkts/s] ====Building Segment: linear_segment_0==== -Added source: +Added source: └─> morpheus.MessageMeta Added stage: └─ morpheus.MessageMeta -> morpheus.MessageMeta diff --git a/examples/doca/run_udp_convert.py b/examples/doca/run_udp_convert.py index 9ead309e1c..6b78a67e72 100644 --- a/examples/doca/run_udp_convert.py +++ b/examples/doca/run_udp_convert.py @@ -12,17 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +import os import click +from morpheus.cli.utils import get_log_levels +from morpheus.cli.utils import parse_log_level from morpheus.config import Config from morpheus.config import CppConfig from morpheus.config import PipelineModes +from morpheus.messages import RawPacketMessage from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.stages.doca.doca_convert_stage import DocaConvertStage from morpheus.stages.doca.doca_source_stage import DocaSourceStage from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.utils.logger import configure_logging @@ -37,9 +41,54 @@ help="GPU PCI Address", required=True, ) -def run_pipeline(nic_addr, gpu_addr): +@click.option( + "--num_threads", + default=os.cpu_count(), + type=click.IntRange(min=1), + show_default=True, + help="Number of internal pipeline threads to use.", +) +@click.option( + "--edge_buffer_size", + default=1024 * 16, + type=click.IntRange(min=1), + show_default=True, + help="Size of edge buffers.", +) +@click.option( + "--max_batch_delay_sec", + default=3.0, + type=float, + show_default=True, + help="Maximum amount of time in seconds to buffer incoming packets.", +) +@click.option( + "--buffer_channel_size", + default=None, + type=click.IntRange(min=2), + show_default=True, + help=("Size of the internal buffer channel used by the DocaConvertStage, if None, the value of `--edge_buffer_size`" + " will be used."), +) +@click.option("--log_level", + default="INFO", + type=click.Choice(get_log_levels(), case_sensitive=False), + callback=parse_log_level, + show_default=True, + help="Specify the logging level to use.") +@click.option("--output_file", + default=None, + help="File to output to, if not supplied, the to-file sink will be omitted.") +def run_pipeline(nic_addr: str, + gpu_addr: str, + num_threads: int, + edge_buffer_size: int, + max_batch_delay_sec: float, + buffer_channel_size: int, + log_level: int, + output_file: str | None): # Enable the default logger - configure_logging(log_level=logging.DEBUG) + configure_logging(log_level=log_level) CppConfig.set_should_use_cpp(True) @@ -47,15 +96,31 @@ def run_pipeline(nic_addr, gpu_addr): config.mode = PipelineModes.NLP # Below properties are specified by the command line - config.num_threads = 10 - config.edge_buffer_size = 1024 + config.num_threads = num_threads + config.edge_buffer_size = edge_buffer_size pipeline = LinearPipeline(config) # add doca source stage pipeline.set_source(DocaSourceStage(config, nic_addr, gpu_addr, 'udp')) - pipeline.add_stage(DocaConvertStage(config)) - pipeline.add_stage(MonitorStage(config, description="DOCA GPUNetIO rate", unit='pkts')) + + def count_raw_packets(message: RawPacketMessage): + return message.num + + pipeline.add_stage( + MonitorStage(config, + description="DOCA GPUNetIO Raw rate", + unit='pkts', + determine_count_fn=count_raw_packets, + delayed_start=True)) + + pipeline.add_stage( + DocaConvertStage(config, max_batch_delay_sec=max_batch_delay_sec, buffer_channel_size=buffer_channel_size)) + + pipeline.add_stage(MonitorStage(config, description="Convert rate", unit='pkts', delayed_start=True)) + + if output_file is not None: + pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) # Build the pipeline here to see types in the vizualization pipeline.build() diff --git a/examples/doca/run_udp_raw.py b/examples/doca/run_udp_raw.py index 7d99fe939d..576ecff957 100644 --- a/examples/doca/run_udp_raw.py +++ b/examples/doca/run_udp_raw.py @@ -58,7 +58,11 @@ def count_raw_packets(message: RawPacketMessage): # add doca source stage pipeline.set_source(DocaSourceStage(config, nic_addr, gpu_addr, 'udp')) pipeline.add_stage( - MonitorStage(config, description="DOCA GPUNetIO rate", unit='pkts', determine_count_fn=count_raw_packets)) + MonitorStage(config, + description="DOCA GPUNetIO rate", + unit='pkts', + determine_count_fn=count_raw_packets, + delayed_start=True)) # Build the pipeline here to see types in the vizualization pipeline.build() diff --git a/examples/doca/vdb_realtime/sender/send.py b/examples/doca/vdb_realtime/sender/send.py index 202c1fd35a..45c55f01ec 100644 --- a/examples/doca/vdb_realtime/sender/send.py +++ b/examples/doca/vdb_realtime/sender/send.py @@ -17,25 +17,86 @@ import glob import os +import click from scapy.all import IP # pylint: disable=no-name-in-module +from scapy.all import TCP from scapy.all import UDP # pylint: disable=no-name-in-module from scapy.all import RandShort from scapy.all import Raw from scapy.all import send +DEFAULT_DPORT = 5001 +MORPHEUS_ROOT = os.environ['MORPHEUS_ROOT'] -def main(): - os.chdir("dataset") - for file in glob.glob("*.txt"): - with open(file, 'r', encoding='utf-8') as fp: + +def get_data(input_glob: str) -> list[str]: + data = [] + for file in glob.glob(input_glob): + with open(file, 'r', encoding='utf-8') as fh: while True: - content = fp.read(1024) + content = fh.read(1024) if not content: break - pkt = IP(src="192.168.2.28", dst="192.168.2.27") / UDP(sport=RandShort(), - dport=5001) / Raw(load=content.encode('utf-8')) - print(pkt) - send(pkt, iface="enp202s0f0np0") + + data.append(content) + + return data + + +def send_data(data: list[str], + dst_ip: str, + dport: int = DEFAULT_DPORT, + iface: str | None = None, + src_ip: str | None = None, + sport: int | None = None, + net_type: str = 'UDP'): + if net_type == 'UDP': + net_type_cls = UDP + else: + net_type_cls = TCP + + if sport is None: + sport = RandShort() + + ip_kwargs = {"dst": dst_ip} + if src_ip is not None: + ip_kwargs["src"] = src_ip + + packets = [ + IP(**ip_kwargs) / net_type_cls(sport=sport, dport=dport) / Raw(load=content.encode('utf-8')) for content in data + ] + + send_kwargs = {} + if iface is not None: + send_kwargs["iface"] = iface + + send(packets, **send_kwargs) + + +@click.command() +@click.option("--iface", help="Ethernet device to use, useful for systems with multiple NICs", required=False) +@click.option("--src_ip", help="Source IP to send from, useful for systems with multiple IPs", required=False) +@click.option("--dst_ip", help="Destination IP to send to", required=True) +@click.option("--dport", help="Destination port", type=int, default=DEFAULT_DPORT) +@click.option("--sport", + help="Source port, if undefined a random port will be used", + type=int, + default=None, + required=False) +@click.option("--net_type", type=click.Choice(['TCP', 'UDP'], case_sensitive=False), default='UDP') +@click.option("--input_data_glob", + type=str, + default=os.path.join(MORPHEUS_ROOT, 'examples/doca/vdb_realtime/sender/dataset/*.txt'), + help="Input filepath glob pattenr matching the data to send.") +def main(iface: str | None, + src_ip: str | None, + dst_ip: str, + dport: int, + sport: int | None, + net_type: str, + input_data_glob: str): + data = get_data(input_data_glob) + send_data(data=data, dst_ip=dst_ip, dport=dport, iface=iface, src_ip=src_ip, sport=sport, net_type=net_type) if __name__ == "__main__": diff --git a/examples/doca/vdb_realtime/vdb.py b/examples/doca/vdb_realtime/vdb.py index 964551798f..371c39cf34 100644 --- a/examples/doca/vdb_realtime/vdb.py +++ b/examples/doca/vdb_realtime/vdb.py @@ -82,7 +82,40 @@ def build_milvus_service(embedding_size): help="GPU PCI Address", required=True, ) -def run_pipeline(nic_addr, gpu_addr): +@click.option( + "--triton_server_url", + type=str, + default="localhost:8001", + show_default=True, + help="Triton server URL.", +) +@click.option( + "--embedding_model_name", + required=True, + default='all-MiniLM-L6-v2', + show_default=True, + help="The name of the model that is deployed on Triton server", +) +@click.option( + "--vector_db_uri", + type=str, + default="http://localhost:19530", + show_default=True, + help="URI for connecting to Vector Database server.", +) +@click.option( + "--vector_db_resource_name", + type=str, + default="vdb_doca", + show_default=True, + help="The identifier of the resource on which operations are to be performed in the vector database.", +) +def run_pipeline(nic_addr: str, + gpu_addr: str, + triton_server_url: str, + embedding_model_name: str, + vector_db_uri: str, + vector_db_resource_name: str): # Enable the default logger configure_logging(log_level=logging.DEBUG) @@ -110,18 +143,18 @@ def run_pipeline(nic_addr, gpu_addr): pipeline.add_stage( TritonInferenceStage(config, force_convert_inputs=True, - model_name="all-MiniLM-L6-v2", - server_url="localhost:8001", + model_name=embedding_model_name, + server_url=triton_server_url, use_shared_memory=True)) pipeline.add_stage(MonitorStage(config, description="Embedding rate", unit='pkts')) pipeline.add_stage( WriteToVectorDBStage(config, - resource_name="vdb_doca", + resource_name=vector_db_resource_name, batch_size=16896, recreate=True, service="milvus", - uri="http://localhost:19530", + uri=vector_db_uri, resource_schemas={"vdb_doca": build_milvus_service(384)})) pipeline.add_stage(MonitorStage(config, description="Upload rate", unit='docs')) diff --git a/morpheus/_lib/doca/CMakeLists.txt b/morpheus/_lib/doca/CMakeLists.txt index 1dc277ad9c..e711df896b 100644 --- a/morpheus/_lib/doca/CMakeLists.txt +++ b/morpheus/_lib/doca/CMakeLists.txt @@ -20,15 +20,17 @@ set(doca_ROOT "/opt/mellanox/doca") find_package(doca REQUIRED) add_library(morpheus_doca + # Keep these sorted! src/doca_context.cpp src/doca_convert_kernel.cu - src/doca_convert.cpp + src/doca_convert_stage.cpp src/doca_rx_pipe.cpp src/doca_rx_queue.cpp src/doca_semaphore.cpp src/doca_source_kernel.cu - src/doca_source.cpp + src/doca_source_stage.cpp + src/packet_data_buffer.cpp src/rte_context.cpp ) @@ -38,14 +40,15 @@ target_include_directories(morpheus_doca PUBLIC $ $ + $ ) target_link_libraries(morpheus_doca PRIVATE doca::doca + matx::matx PUBLIC ${PROJECT_NAME}::morpheus - ) # Ideally, we dont use glob here. But there is no good way to guarantee you dont miss anything like *.cpp @@ -79,7 +82,7 @@ set_target_properties(morpheus_doca CUDA_SEPARABLE_COMPILATION ON ) -if (MORPHEUS_PYTHON_INPLACE_BUILD) +if(MORPHEUS_PYTHON_INPLACE_BUILD) morpheus_utils_inplace_build_copy(morpheus_doca ${CMAKE_CURRENT_SOURCE_DIR}) endif() diff --git a/morpheus/_lib/doca/__init__.pyi b/morpheus/_lib/doca/__init__.pyi index 6150391bff..100af5654a 100644 --- a/morpheus/_lib/doca/__init__.pyi +++ b/morpheus/_lib/doca/__init__.pyi @@ -1,6 +1,7 @@ from __future__ import annotations import morpheus._lib.doca import typing +import datetime import morpheus._lib.messages import mrc.core.segment @@ -11,7 +12,7 @@ __all__ = [ class DocaConvertStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str) -> None: ... + def __init__(self, builder: mrc.core.segment.Builder, name: str, max_batch_delay: datetime.timedelta = datetime.timedelta(microseconds=500000), max_batch_size: int = 40960, buffer_channel_size: int = 1024) -> None: ... pass class DocaSourceStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, nic_pci_address: str, gpu_pci_address: str, traffic_type: str) -> None: ... diff --git a/morpheus/_lib/doca/include/morpheus/doca/common.hpp b/morpheus/_lib/doca/include/morpheus/doca/common.hpp index 5ad821bbb3..5605fe2dd2 100644 --- a/morpheus/_lib/doca/include/morpheus/doca/common.hpp +++ b/morpheus/_lib/doca/include/morpheus/doca/common.hpp @@ -18,10 +18,10 @@ #pragma once #include -#include #include #include -#include + +namespace morpheus::doca { uint32_t const PACKETS_PER_THREAD = 16; uint32_t const THREADS_PER_BLOCK = 512; @@ -65,4 +65,6 @@ struct packets_info int32_t* ether_type_out; int32_t* next_proto_id_out; uint32_t* timestamp_out; -}; \ No newline at end of file +}; + +} // namespace morpheus::doca diff --git a/morpheus/_lib/doca/include/morpheus/doca/doca_convert_stage.hpp b/morpheus/_lib/doca/include/morpheus/doca/doca_convert_stage.hpp new file mode 100644 index 0000000000..ecd8bc70ed --- /dev/null +++ b/morpheus/_lib/doca/include/morpheus/doca/doca_convert_stage.hpp @@ -0,0 +1,113 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "morpheus/doca/common.hpp" // for MAX_PKT_CONVERT +#include "morpheus/doca/packet_data_buffer.hpp" +#include "morpheus/export.h" +#include "morpheus/messages/meta.hpp" +#include "morpheus/messages/raw_packet.hpp" + +#include +#include // for cudaStream_t +#include +#include +#include // for Object +#include +#include // for cuda_stream_view +#include // for subscriber + +#include +#include // for size_t +#include +#include +#include + +namespace morpheus { + +constexpr std::chrono::milliseconds DefaultMaxBatchDelay(500); + +/** + * @brief Transform DOCA GPUNetIO raw packets into Dataframe for other Morpheus stages. + * + * Tested only on ConnectX 6-Dx with a single GPU on the same NUMA node running firmware 24.35.2000 + */ +class MORPHEUS_EXPORT DocaConvertStage + : public mrc::pymrc::PythonNode, std::shared_ptr> +{ + public: + using base_t = mrc::pymrc::PythonNode, std::shared_ptr>; + // Input = Receive = Sink = RawPacketMessage + using typename base_t::sink_type_t; + // Output = Send = Source = MessageMeta + using typename base_t::source_type_t; + using typename base_t::subscribe_fn_t; + + /** + * @brief Construct a new DocaConvertStage object + * + * @param max_batch_delay : Maximum amount of time to wait for additional incoming packets prior to + * constructing a cuDF DataFrame. + * @param max_batch_size : Maximum number of packets to attempt to combine into a single cuDF DataFrame. + */ + DocaConvertStage(std::chrono::milliseconds max_batch_delay = DefaultMaxBatchDelay, + std::size_t max_batch_size = doca::MAX_PKT_CONVERT, + std::size_t buffer_channel_size = 1024); + ~DocaConvertStage() override; + + private: + subscribe_fn_t build(); + + /** + * Called every time a message is passed to this stage + */ + void on_raw_packet_message(sink_type_t x); + void send_buffered_data(rxcpp::subscriber& output, doca::PacketDataBuffer&& paccket_buffer); + void buffer_reader(rxcpp::subscriber& output); + + cudaStream_t m_stream; + rmm::cuda_stream_view m_stream_cpp; + + std::chrono::milliseconds m_max_batch_delay; + const std::size_t m_max_batch_size; + std::shared_ptr> m_buffer_channel; +}; + +/****** DocaConvertStageInterfaceProxy***********************/ +/** + * @brief Interface proxy, used to insulate python bindings. + */ +struct MORPHEUS_EXPORT DocaConvertStageInterfaceProxy +{ + /** + * @brief Create and initialize a DocaConvertStage, and return the result as a shared pointer. + * + * @param max_batch_delay : Maximum amount of time to wait for additional incoming packets prior to + * constructing a cuDF DataFrame. + * @param max_batch_size : Maximum number of packets to attempt to combine into a single cuDF DataFrame. + * @return std::shared_ptr> + */ + static std::shared_ptr> init( + mrc::segment::Builder& builder, + std::string const& name, + std::chrono::milliseconds max_batch_delay = DefaultMaxBatchDelay, + std::size_t max_batch_size = doca::MAX_PKT_CONVERT, + std::size_t buffer_channel_size = 1024); +}; + +} // namespace morpheus diff --git a/morpheus/_lib/doca/include/morpheus/doca/doca_kernels.hpp b/morpheus/_lib/doca/include/morpheus/doca/doca_kernels.hpp index a959989267..34a8a66d0d 100644 --- a/morpheus/_lib/doca/include/morpheus/doca/doca_kernels.hpp +++ b/morpheus/_lib/doca/include/morpheus/doca/doca_kernels.hpp @@ -30,52 +30,23 @@ namespace morpheus::doca { -std::unique_ptr gather_payload( - int32_t packet_count, - uintptr_t* packets_buffer, - uint32_t* header_sizes, - uint32_t* payload_sizes, - uint32_t* fixed_size_list, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); - -std::unique_ptr gather_header( - int32_t packet_count, - uintptr_t* packets_buffer, - uint32_t* header_sizes, - uint32_t* payload_sizes, - uint32_t* fixed_size_list, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); - -std::unique_ptr gather_header( - int32_t packet_count, - uintptr_t* packets_buffer, - uint32_t* header_sizes, - uint32_t* payload_sizes, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); - -void gather_header_scalar(int32_t packet_count, - uintptr_t* packets_buffer, - uint32_t* header_sizes, - uint32_t* payload_sizes, - uint8_t* header_col, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); - -void gather_payload_scalar(int32_t packet_count, - uintptr_t* packets_buffer, - uint32_t* header_sizes, - uint32_t* payload_sizes, - uint8_t* payload_col, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); - -std::unique_ptr integers_to_mac( - cudf::column_view const& integers, - rmm::cuda_stream_view stream = cudf::detail::default_stream_value, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +uint32_t gather_sizes(int32_t packet_count, uint32_t* size_list, rmm::cuda_stream_view stream); + +void gather_payload(int32_t packet_count, + uintptr_t* packets_buffer, + uint32_t* header_sizes, + uint32_t* payload_sizes, + uint8_t* dst_buff, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +void gather_src_ip(int32_t packet_count, + uintptr_t* packets_buffer, + uint32_t* dst_buff, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +rmm::device_buffer sizes_to_offsets(int32_t packet_count, uint32_t* sizes_buff, rmm::cuda_stream_view stream); int packet_receive_kernel(doca_gpu_eth_rxq* rxq_0, doca_gpu_eth_rxq* rxq_1, diff --git a/morpheus/_lib/doca/include/morpheus/doca/doca_stages.hpp b/morpheus/_lib/doca/include/morpheus/doca/doca_source_stage.hpp similarity index 62% rename from morpheus/_lib/doca/include/morpheus/doca/doca_stages.hpp rename to morpheus/_lib/doca/include/morpheus/doca/doca_source_stage.hpp index f335c7fcb0..38e70f10db 100644 --- a/morpheus/_lib/doca/include/morpheus/doca/doca_stages.hpp +++ b/morpheus/_lib/doca/include/morpheus/doca/doca_source_stage.hpp @@ -19,13 +19,18 @@ #include "morpheus/doca/common.hpp" #include "morpheus/export.h" -#include "morpheus/messages/meta.hpp" #include "morpheus/messages/raw_packet.hpp" +#include #include +#include // for Object #include +#include #include +#include +#include +#include namespace morpheus { @@ -35,6 +40,7 @@ struct DocaContext; struct DocaRxQueue; struct DocaRxPipe; struct DocaSemaphore; + } // namespace doca /** @@ -61,7 +67,7 @@ class MORPHEUS_EXPORT DocaSourceStage : public mrc::pymrc::PythonSource> m_rxq; std::vector> m_semaphore; std::shared_ptr m_rxpipe; - enum doca_traffic_type m_traffic_type; + enum doca::doca_traffic_type m_traffic_type; }; /****** DocaSourceStageInterfaceProxy***********************/ @@ -80,51 +86,4 @@ struct MORPHEUS_EXPORT DocaSourceStageInterfaceProxy std::string const& traffic_type); }; -/** - * @brief Transform DOCA GPUNetIO raw packets into Dataframe for other Morpheus stages. - * - * Tested only on ConnectX 6-Dx with a single GPU on the same NUMA node running firmware 24.35.2000 - */ -class MORPHEUS_EXPORT DocaConvertStage - : public mrc::pymrc::PythonNode, std::shared_ptr> -{ - public: - using base_t = mrc::pymrc::PythonNode, std::shared_ptr>; - // Input = Receive = Sink = RawPacketMessage - using typename base_t::sink_type_t; - // Output = Send = Source = MessageMeta - using typename base_t::source_type_t; - using typename base_t::subscribe_fn_t; - - DocaConvertStage(); - ~DocaConvertStage() override; - - private: - /** - * Called every time a message is passed to this stage - */ - source_type_t on_data(sink_type_t x); - source_type_t on_raw_packet_message(sink_type_t x); - - cudaStream_t m_stream; - rmm::cuda_stream_view m_stream_cpp; - uint32_t* m_fixed_pld_size_list; - uint32_t* m_fixed_pld_size_list_cpu; - uint32_t* m_fixed_hdr_size_list; - uint32_t* m_fixed_hdr_size_list_cpu; -}; - -/****** DocaConvertStageInterfaceProxy***********************/ -/** - * @brief Interface proxy, used to insulate python bindings. - */ -struct MORPHEUS_EXPORT DocaConvertStageInterfaceProxy -{ - /** - * @brief Create and initialize a DocaConvertStage, and return the result. - */ - static std::shared_ptr> init(mrc::segment::Builder& builder, - std::string const& name); -}; - } // namespace morpheus diff --git a/morpheus/_lib/doca/include/morpheus/doca/packet_data_buffer.hpp b/morpheus/_lib/doca/include/morpheus/doca/packet_data_buffer.hpp new file mode 100644 index 0000000000..3bb36d7a24 --- /dev/null +++ b/morpheus/_lib/doca/include/morpheus/doca/packet_data_buffer.hpp @@ -0,0 +1,51 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include // for cuda_stream_view +#include // for device_buffer +#include // for device_memory_resource +#include // for get_current_device_resource + +#include // for size_t +#include + +namespace morpheus::doca { + +struct PacketDataBuffer +{ + PacketDataBuffer(); + PacketDataBuffer(std::size_t num_packets, + std::size_t header_size, + std::size_t payload_size, + std::size_t payload_sizes_size, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + + PacketDataBuffer(PacketDataBuffer&& other) = default; + PacketDataBuffer& operator=(PacketDataBuffer&& other) = default; + + std::size_t m_num_packets; + rmm::cuda_stream_view m_stream; + + // The header buffer needs to be a shared pointer to construct a DevMemInfo + std::shared_ptr m_header_buffer; + std::unique_ptr m_payload_buffer; + std::unique_ptr m_payload_sizes_buffer; +}; +} // namespace morpheus::doca diff --git a/morpheus/_lib/doca/include/morpheus/doca/packets.hpp b/morpheus/_lib/doca/include/morpheus/doca/packets.hpp index 8faa5066a4..094c9d9cc9 100644 --- a/morpheus/_lib/doca/include/morpheus/doca/packets.hpp +++ b/morpheus/_lib/doca/include/morpheus/doca/packets.hpp @@ -167,174 +167,8 @@ __device__ __forceinline__ int32_t get_payload_udp_size(ipv4_hdr& packet_l3, udp return payload_size; } -__device__ __forceinline__ char to_hex_16(uint8_t value) -{ - return "0123456789ABCDEF"[value]; -} - -__device__ __forceinline__ int64_t mac_bytes_to_int64(uint8_t* mac) -{ - return static_cast(mac[0]) << 40 | static_cast(mac[1]) << 32 | - static_cast(mac[2]) << 24 | static_cast(mac[3]) << 16 | - static_cast(mac[4]) << 8 | static_cast(mac[5]); -} - -__device__ __forceinline__ int64_t mac_int64_to_chars(int64_t mac, char* out) -{ - uint8_t mac_0 = (mac >> 40) & (0xFF); - out[0] = to_hex_16(mac_0 / 16); - out[1] = to_hex_16(mac_0 % 16); - out[2] = ':'; - - uint8_t mac_1 = (mac >> 32) & (0xFF); - out[3] = to_hex_16(mac_1 / 16); - out[4] = to_hex_16(mac_1 % 16); - out[5] = ':'; - - uint8_t mac_2 = (mac >> 24) & (0xFF); - out[6] = to_hex_16(mac_2 / 16); - out[7] = to_hex_16(mac_2 % 16); - out[8] = ':'; - - uint8_t mac_3 = (mac >> 16) & (0xFF); - out[9] = to_hex_16(mac_3 / 16); - out[10] = to_hex_16(mac_3 % 16); - out[11] = ':'; - - uint8_t mac_4 = (mac >> 8) & (0xFF); - out[12] = to_hex_16(mac_4 / 16); - out[13] = to_hex_16(mac_4 % 16); - out[14] = ':'; - - uint8_t mac_5 = (mac >> 0) & (0xFF); - out[15] = to_hex_16(mac_5 / 16); - out[16] = to_hex_16(mac_5 % 16); -} - __device__ __forceinline__ uint32_t ip_to_int32(uint32_t address) { return (address & 0x000000ff) << 24 | (address & 0x0000ff00) << 8 | (address & 0x00ff0000) >> 8 | (address & 0xff000000) >> 24; } - -__device__ __forceinline__ int num_to_string(uint32_t value, char* sp) -{ - char tmp[16]; // be careful with the length of the buffer - char* tp = tmp; - int i; - int radix = 10; - - while (value || tp == tmp) - { - i = value % radix; - value /= radix; - if (i < 10) - *tp++ = i + '0'; - else - *tp++ = i + 'a' - 10; - } - - int len = tp - tmp; - while (tp > tmp) - *sp++ = *--tp; - - return len; -} - -__device__ __forceinline__ int ip_to_string(uint32_t ip_int, uint8_t* ip_str) -{ - int i; - int idxp = 0; - int idxt = 0; - int radix = 10; - uint8_t tmp[3]; - - // uint32_t ip = ip_to_int32(ip_int); - // Assuming network order - uint8_t ip0 = (uint8_t)(ip_int & 0x000000ff); - uint8_t ip1 = (uint8_t)((ip_int & 0x0000ff00) >> 8); - uint8_t ip2 = (uint8_t)((ip_int & 0x00ff0000) >> 16); - uint8_t ip3 = (uint8_t)((ip_int & 0xff000000) >> 24); - - idxt = 0; - while (ip0) - { - i = ip0 % radix; - ip0 /= radix; - if (i < 10) - tmp[idxt++] = i + '0'; - else - tmp[idxt++] = i + 'a' - 10; - } - --idxt; - while (idxt >= 0) - ip_str[idxp++] = tmp[idxt--]; - ip_str[idxp++] = '.'; - - idxt = 0; - while (ip1) - { - i = ip1 % radix; - ip1 /= radix; - if (i < 10) - tmp[idxt++] = i + '0'; - else - tmp[idxt++] = i + 'a' - 10; - } - --idxt; - while (idxt >= 0) - ip_str[idxp++] = tmp[idxt--]; - ip_str[idxp++] = '.'; - - idxt = 0; - while (ip2) - { - i = ip2 % radix; - ip2 /= radix; - if (i < 10) - tmp[idxt++] = i + '0'; - else - tmp[idxt++] = i + 'a' - 10; - } - --idxt; - while (idxt >= 0) - ip_str[idxp++] = tmp[idxt--]; - ip_str[idxp++] = '.'; - - idxt = 0; - while (ip3) - { - i = ip3 % radix; - ip3 /= radix; - if (i < 10) - tmp[idxt++] = i + '0'; - else - tmp[idxt++] = i + 'a' - 10; - } - --idxt; - while (idxt >= 0) - { - // Add print here to check boundaries - if (idxp > IP_ADDR_STRING_LEN) - { - printf("idxp %d > IP_ADDR_STRING_LEN %d\n", idxp, IP_ADDR_STRING_LEN); - return 0; - } - if (idxt > 3) - { - printf("idxt %d > 3\n", idxt); - return 0; - } - - ip_str[idxp++] = tmp[idxt--]; - } - - // printf("ip_str %c%c%c%c %c%c%c%c %c%c%c%c %c%c%c Final pos %d\n", - // ip_str[0],ip_str[1],ip_str[2],ip_str[3], - // ip_str[4],ip_str[5],ip_str[6],ip_str[7], - // ip_str[8],ip_str[9],ip_str[10],ip_str[11], - // ip_str[12],ip_str[13],ip_str[14], - // pos); - - return idxp; -} diff --git a/morpheus/_lib/doca/module.cpp b/morpheus/_lib/doca/module.cpp index d0c6a5aef1..bd2462e550 100644 --- a/morpheus/_lib/doca/module.cpp +++ b/morpheus/_lib/doca/module.cpp @@ -15,11 +15,14 @@ * limitations under the License. */ -#include "morpheus/doca/doca_stages.hpp" +#include "morpheus/doca/common.hpp" // for MAX_PKT_CONVERT +#include "morpheus/doca/doca_convert_stage.hpp" +#include "morpheus/doca/doca_source_stage.hpp" #include // IWYU pragma: keep #include #include +#include // IWYU pragma: keep #include // for str_attr_accessor #include @@ -48,7 +51,12 @@ PYBIND11_MODULE(doca, m) mrc::segment::ObjectProperties, std::shared_ptr>>( m, "DocaConvertStage", py::multiple_inheritance()) - .def(py::init<>(&DocaConvertStageInterfaceProxy::init), py::arg("builder"), py::arg("name")); + .def(py::init<>(&DocaConvertStageInterfaceProxy::init), + py::arg("builder"), + py::arg("name"), + py::arg("max_batch_delay") = DefaultMaxBatchDelay, + py::arg("max_batch_size") = doca::MAX_PKT_CONVERT, + py::arg("buffer_channel_size") = 1024); } } // namespace morpheus diff --git a/morpheus/_lib/doca/src/doca_context.cpp b/morpheus/_lib/doca/src/doca_context.cpp index 27f34d88ea..32d310197f 100644 --- a/morpheus/_lib/doca/src/doca_context.cpp +++ b/morpheus/_lib/doca/src/doca_context.cpp @@ -115,7 +115,7 @@ doca_flow_port* init_doca_flow(uint16_t port_id, uint8_t rxq_num) RTE_TRY(rte_eth_dev_info_get(port_id, &dev_info)); RTE_TRY(rte_eth_dev_configure(port_id, rxq_num, rxq_num, ð_conf)); - mp = rte_pktmbuf_pool_create("TEST", 8192, 0, 0, MAX_PKT_SIZE, rte_eth_dev_socket_id(port_id)); + mp = rte_pktmbuf_pool_create("TEST", 8192, 0, 0, morpheus::doca::MAX_PKT_SIZE, rte_eth_dev_socket_id(port_id)); if (mp == nullptr) { diff --git a/morpheus/_lib/doca/src/doca_convert.cpp b/morpheus/_lib/doca/src/doca_convert.cpp deleted file mode 100644 index 99ade6c60e..0000000000 --- a/morpheus/_lib/doca/src/doca_convert.cpp +++ /dev/null @@ -1,202 +0,0 @@ -/** - * SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "morpheus/doca/common.hpp" -#include "morpheus/doca/doca_kernels.hpp" -#include "morpheus/doca/doca_stages.hpp" -#include "morpheus/messages/meta.hpp" -#include "morpheus/messages/raw_packet.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define BE_IPV4_ADDR(a, b, c, d) (RTE_BE32((a << 24) + (b << 16) + (c << 8) + d)) /* Big endian conversion */ -#define ENABLE_TIMERS 0 - -std::optional ip_to_int(std::string const& ip_address) -{ - if (ip_address.empty()) - { - return 0; - } - - uint8_t a, b, c, d; - uint32_t ret; - - ret = sscanf(ip_address.c_str(), "%hhu.%hhu.%hhu.%hhu", &a, &b, &c, &d); - - printf("%u: %u %u %u %u\n", ret, a, b, c, d); - - if (ret == 4) - { - return BE_IPV4_ADDR(a, b, c, d); - } - - return std::nullopt; -} - -static uint64_t now_ns() -{ - struct timespec t; - if (clock_gettime(CLOCK_REALTIME, &t) != 0) - return 0; - return (uint64_t)t.tv_nsec + (uint64_t)t.tv_sec * 1000 * 1000 * 1000; -} - -#define DEBUG_GET_TIMESTAMP(ts) clock_gettime(CLOCK_REALTIME, (ts)) - -namespace morpheus { - -DocaConvertStage::DocaConvertStage() : - base_t(rxcpp::operators::map([this](sink_type_t x) { - return this->on_data(std::move(x)); - })) -{ - cudaStreamCreateWithFlags(&m_stream, cudaStreamNonBlocking); - m_stream_cpp = rmm::cuda_stream_view(reinterpret_cast(m_stream)); - m_fixed_pld_size_list_cpu = (uint32_t*)calloc(MAX_PKT_RECEIVE, sizeof(uint32_t)); - cudaMalloc((void**)&m_fixed_pld_size_list, MAX_PKT_RECEIVE * sizeof(uint32_t)); - for (int idx = 0; idx < MAX_PKT_RECEIVE; idx++) - m_fixed_pld_size_list_cpu[idx] = MAX_PKT_SIZE; - cudaMemcpy(m_fixed_pld_size_list, m_fixed_pld_size_list_cpu, MAX_PKT_RECEIVE * sizeof(uint32_t), cudaMemcpyDefault); - - m_fixed_hdr_size_list_cpu = (uint32_t*)calloc(MAX_PKT_RECEIVE, sizeof(uint32_t)); - cudaMalloc((void**)&m_fixed_hdr_size_list, MAX_PKT_RECEIVE * sizeof(uint32_t)); - for (int idx = 0; idx < MAX_PKT_RECEIVE; idx++) - m_fixed_hdr_size_list_cpu[idx] = IP_ADDR_STRING_LEN; - cudaMemcpy(m_fixed_hdr_size_list, m_fixed_hdr_size_list_cpu, MAX_PKT_RECEIVE * sizeof(uint32_t), cudaMemcpyDefault); -} - -DocaConvertStage::~DocaConvertStage() -{ - free(m_fixed_pld_size_list_cpu); - cudaFree(m_fixed_pld_size_list); - free(m_fixed_hdr_size_list_cpu); - cudaFree(m_fixed_hdr_size_list); - cudaStreamDestroy(m_stream); -} - -DocaConvertStage::source_type_t DocaConvertStage::on_data(sink_type_t x) -{ - if constexpr (std::is_same_v>) - { - return this->on_raw_packet_message(x); - } - // sink_type_t not supported - else - { - std::string error_msg{"DocaConvertStage receives unsupported input type: " + std::string(typeid(x).name())}; - LOG(ERROR) << error_msg; - throw std::runtime_error(error_msg); - } -} - -DocaConvertStage::source_type_t DocaConvertStage::on_raw_packet_message(sink_type_t raw_msg) -{ - auto packet_count = raw_msg->count(); - auto max_size = raw_msg->get_max_size(); - auto pkt_addr_list = raw_msg->get_pkt_addr_list(); - auto pkt_hdr_size_list = raw_msg->get_pkt_hdr_size_list(); - auto pkt_pld_size_list = raw_msg->get_pkt_pld_size_list(); - auto queue_idx = raw_msg->get_queue_idx(); - - // LOG(WARNING) << "New RawPacketMessage with " << packet_count << " packets from queue id " << queue_idx; - -#if ENABLE_TIMERS == 1 - const auto t0 = now_ns(); -#endif - // gather header data - auto header_src_ip_col = doca::gather_header( - packet_count, pkt_addr_list, pkt_hdr_size_list, pkt_pld_size_list, m_fixed_hdr_size_list, m_stream_cpp); - -#if ENABLE_TIMERS == 1 - const auto t1 = now_ns(); -#endif - // gather payload data - auto payload_col = doca::gather_payload( - packet_count, pkt_addr_list, pkt_hdr_size_list, pkt_pld_size_list, m_fixed_pld_size_list, m_stream_cpp); - -#if ENABLE_TIMERS == 1 - const auto t2 = now_ns(); -#endif - std::vector> gathered_columns; - gathered_columns.emplace_back(std::move(header_src_ip_col)); - gathered_columns.emplace_back(std::move(payload_col)); - - // After this point buffers can be reused -> copies actual packets' data - auto gathered_table = std::make_unique(std::move(gathered_columns)); - -#if ENABLE_TIMERS == 1 - const auto t3 = now_ns(); -#endif - auto gathered_metadata = cudf::io::table_metadata(); - gathered_metadata.schema_info.emplace_back("src_ip"); - gathered_metadata.schema_info.emplace_back("data"); - - auto gathered_table_w_metadata = - cudf::io::table_with_metadata{std::move(gathered_table), std::move(gathered_metadata)}; - -#if ENABLE_TIMERS == 1 - const auto t4 = now_ns(); -#endif - auto meta = MessageMeta::create_from_cpp(std::move(gathered_table_w_metadata), 0); - -#if ENABLE_TIMERS == 1 - const auto t5 = now_ns(); -#endif - cudaStreamSynchronize(m_stream_cpp); -#if ENABLE_TIMERS == 1 - const auto t6 = now_ns(); - - LOG(WARNING) << "Queue " << queue_idx << " packets " << packet_count << " header column " << t1 - t0 - << " payload column " << t2 - t1 << " gather columns " << t3 - t2 << " gather metadata " << t4 - t3 - << " create_from_cpp " << t5 - t4 << " stream sync " << t6 - t5 << std::endl; -#endif - - return std::move(meta); -} - -std::shared_ptr> DocaConvertStageInterfaceProxy::init( - mrc::segment::Builder& builder, std::string const& name) -{ - return builder.construct_object(name); -} - -} // namespace morpheus diff --git a/morpheus/_lib/doca/src/doca_convert_kernel.cu b/morpheus/_lib/doca/src/doca_convert_kernel.cu index e1bfce795b..324f3e0026 100644 --- a/morpheus/_lib/doca/src/doca_convert_kernel.cu +++ b/morpheus/_lib/doca/src/doca_convert_kernel.cu @@ -21,18 +21,13 @@ #include #include -#include -#include -#include -#include -#include -#include -#include #include #include #include #include #include +#include +#include #include #include #include @@ -41,257 +36,106 @@ #include #include #include + #include -__global__ void _packet_gather_payload_kernel( - int32_t packet_count, - uintptr_t* packets_buffer, - uint32_t* header_sizes, - uint32_t* payload_sizes, - uint8_t* payload_chars_out -) +__global__ void _packet_gather_payload_kernel(int32_t packet_count, + uintptr_t* packets_buffer, + uint32_t* header_sizes, + uint32_t* payload_sizes, + uint8_t* payload_chars_out, + int32_t* dst_offsets) { - int pkt_idx = threadIdx.x; - int j = 0; - - while (pkt_idx < packet_count) { - uint8_t* pkt_hdr_addr = (uint8_t*)(packets_buffer[pkt_idx] + header_sizes[pkt_idx]); - // if (!pkt_hdr_addr) - // continue; - for (j = 0; j < payload_sizes[pkt_idx]; j++) - payload_chars_out[(MAX_PKT_SIZE * pkt_idx) + j] = pkt_hdr_addr[j]; - for (; j < MAX_PKT_SIZE; j++) - payload_chars_out[(MAX_PKT_SIZE * pkt_idx) + j] = '\0'; - pkt_idx += blockDim.x; - } - -#if 0 - - // Specialize BlockScan for a 1D block of 128 threads of type int - using BlockScan = cub::BlockScan; - // Allocate shared memory for BlockScan - __shared__ typename BlockScan::TempStorage temp_storage; - int32_t payload_offsets[PACKETS_PER_THREAD]; - /* Th0 will work on first 4 packets, etc.. */ - for (auto i = 0; i < PACKETS_PER_THREAD; i++) { - auto packet_idx = threadIdx.x * PACKETS_PER_THREAD + i; - if (packet_idx >= packet_count) - payload_offsets[i] = 0; - else - payload_offsets[i] = payload_sizes[packet_idx]; - } - __syncthreads(); - - /* Calculate the right payload offset for each thread */ - int32_t data_offsets_agg; - BlockScan(temp_storage).ExclusiveSum(payload_offsets, payload_offsets, data_offsets_agg); - __syncthreads(); - - for (auto i = 0; i < PACKETS_PER_THREAD; i++) { - auto packet_idx = threadIdx.x * PACKETS_PER_THREAD + i; - if (packet_idx >= packet_count) - continue; - - auto payload_size = payload_sizes[packet_idx]; - for (auto j = 0; j < payload_size; j++) { - auto value = *(((uint8_t*)packets_buffer[packet_idx]) + header_sizes[packet_idx] + j); - payload_chars_out[payload_offsets[i] + j] = value; - // printf("payload %d size %d : 0x%1x / 0x%1x addr %lx\n", - // payload_offsets[i] + j, payload_size, - // payload_chars_out[payload_offsets[i] + j], value, - // packets_buffer[packet_idx]); + int pkt_idx = blockIdx.x * blockDim.x + threadIdx.x; + int byte_offset = blockIdx.y * blockDim.y + threadIdx.y; + + if (pkt_idx < packet_count) + { + const uint32_t payload_size = payload_sizes[pkt_idx]; + + if (byte_offset < payload_size) + { + uint8_t* pkt_hdr_addr = (uint8_t*)(packets_buffer[pkt_idx] + header_sizes[pkt_idx]); + const int32_t dst_offset = dst_offsets[pkt_idx]; + payload_chars_out[dst_offset + byte_offset] = pkt_hdr_addr[byte_offset]; + } } - } -#endif } -__global__ void _packet_gather_header_kernel( - int32_t packet_count, - uintptr_t* packets_buffer, - uint32_t* header_sizes, - uint32_t* payload_sizes, - uint8_t* header_src_ip_addr -) +__global__ void _packet_gather_src_ip_kernel(int32_t packet_count, uintptr_t* packets_buffer, uint32_t* dst_buff) { - int pkt_idx = threadIdx.x; + int pkt_idx = blockIdx.x * blockDim.x + threadIdx.x; - while (pkt_idx < packet_count) { - uint8_t* pkt_hdr_addr = (uint8_t*)(packets_buffer[pkt_idx]); - // if (!pkt_hdr_addr) - // continue; - int len = ip_to_string(((struct eth_ip *)pkt_hdr_addr)->l3_hdr.src_addr, header_src_ip_addr + (IP_ADDR_STRING_LEN * pkt_idx)); - while (len < IP_ADDR_STRING_LEN) - header_src_ip_addr[(IP_ADDR_STRING_LEN * pkt_idx) + len++] = '\0'; - pkt_idx += blockDim.x; - } + if (pkt_idx < packet_count) + { + uint8_t* pkt_hdr_addr = (uint8_t*)(packets_buffer[pkt_idx]); + dst_buff[pkt_idx] = ip_to_int32(((struct eth_ip*)pkt_hdr_addr)->l3_hdr.src_addr); + } } namespace morpheus { namespace doca { -std::unique_ptr gather_payload( - int32_t packet_count, - uintptr_t* packets_buffer, - uint32_t* header_sizes, - uint32_t* payload_sizes, - uint32_t* fixed_size_list, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +uint32_t gather_sizes(int32_t packet_count, uint32_t* size_list, rmm::cuda_stream_view stream) { - auto [offsets_column, bytes] = cudf::detail::make_offsets_child_column( - fixed_size_list, - fixed_size_list + packet_count, - stream, - mr - ); - - auto chars_column = cudf::strings::detail::create_chars_child_column(bytes, stream, mr); - auto d_chars = chars_column->mutable_view().data(); + auto sizes_tensor = matx::make_tensor(size_list, {packet_count}); + auto bytes_tensor = matx::make_tensor({1}); - _packet_gather_payload_kernel<<<1, THREADS_PER_BLOCK, 0, stream>>>( - packet_count, - packets_buffer, - header_sizes, - payload_sizes, - d_chars - ); + (bytes_tensor = matx::sum(sizes_tensor)).run(stream.value()); - return cudf::make_strings_column(packet_count, - std::move(offsets_column), - std::move(chars_column), - 0, - {}); + cudaStreamSynchronize(stream); + return bytes_tensor(0); } -std::unique_ptr gather_header( - int32_t packet_count, - uintptr_t* packets_buffer, - uint32_t* header_sizes, - uint32_t* payload_sizes, - uint32_t* fixed_size_list, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +rmm::device_buffer sizes_to_offsets(int32_t packet_count, uint32_t* sizes_buff, rmm::cuda_stream_view stream) { - auto [offsets_column, bytes] = cudf::detail::make_offsets_child_column( - fixed_size_list, - fixed_size_list + packet_count, - stream, - mr - ); + // The cudf offsets column wants int32 + const auto out_elem_count = packet_count + 1; + const auto out_byte_size = out_elem_count * sizeof(int32_t); + rmm::device_buffer out_buffer(out_byte_size, stream); - auto chars_column = cudf::strings::detail::create_chars_child_column(bytes, stream, mr); - auto d_chars = chars_column->mutable_view().data(); + auto sizes_tensor = matx::make_tensor(sizes_buff, {packet_count}); + auto cum_tensor = matx::make_tensor({packet_count}); - _packet_gather_header_kernel<<<1, THREADS_PER_BLOCK, 0, stream>>>( - packet_count, - packets_buffer, - header_sizes, - payload_sizes, - d_chars - ); + // first element needs to be a 0 + auto zero_tensor = matx::make_tensor({1}); + zero_tensor.SetVals({0}); - return cudf::make_strings_column(packet_count, - std::move(offsets_column), - std::move(chars_column), - 0, - {}); -} + auto offsets_tensor = matx::make_tensor(static_cast(out_buffer.data()), {out_elem_count}); -void gather_header_scalar( - int32_t packet_count, - uintptr_t* packets_buffer, - uint32_t* header_sizes, - uint32_t* payload_sizes, - uint8_t* header_src_ip_addr, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - _packet_gather_header_kernel<<<1, THREADS_PER_BLOCK, 0, stream>>>( - packet_count, - packets_buffer, - header_sizes, - payload_sizes, - header_src_ip_addr - ); + (cum_tensor = matx::cumsum(matx::as_type(sizes_tensor))).run(stream.value()); + (offsets_tensor = matx::concat(0, zero_tensor, cum_tensor)).run(stream.value()); + + cudaStreamSynchronize(stream); + + return out_buffer; } -void gather_payload_scalar( - int32_t packet_count, - uintptr_t* packets_buffer, - uint32_t* header_sizes, - uint32_t* payload_sizes, - uint8_t* payload_col, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +void gather_src_ip(int32_t packet_count, + uintptr_t* packets_buffer, + uint32_t* dst_buff, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { - _packet_gather_payload_kernel<<<1, THREADS_PER_BLOCK, 0, stream>>>( - packet_count, - packets_buffer, - header_sizes, - payload_sizes, - payload_col - ); + int numBlocks = (packet_count + THREADS_PER_BLOCK - 1) / THREADS_PER_BLOCK; + _packet_gather_src_ip_kernel<<>>(packet_count, packets_buffer, dst_buff); } - -struct integers_to_mac_fn { - cudf::column_device_view const d_column; - int32_t const* d_offsets; - char* d_chars; - - __device__ void operator()(cudf::size_type idx) - { - int64_t mac_address = d_column.element(idx); - char* out_ptr = d_chars + d_offsets[idx]; - - mac_int64_to_chars(mac_address, out_ptr); - } -}; - -std::unique_ptr integers_to_mac( - cudf::column_view const& integers, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr -) +void gather_payload(int32_t packet_count, + uintptr_t* packets_buffer, + uint32_t* header_sizes, + uint32_t* payload_sizes, + uint8_t* dst_buff, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { - CUDF_EXPECTS(integers.type().id() == cudf::type_id::INT64, "Input column must be type_id::INT64 type"); - CUDF_EXPECTS(integers.null_count() == 0, "integers_to_mac does not support null values."); - - cudf::size_type strings_count = integers.size(); - - if (strings_count == 0) - { - return cudf::make_empty_column(cudf::type_id::STRING); - } - - auto const_17_itr = thrust::constant_iterator(17); - auto [offsets_column, bytes] = cudf::detail::make_offsets_child_column( - const_17_itr, - const_17_itr + strings_count, - stream, - mr - ); - - auto column = cudf::column_device_view::create(integers, stream); - auto d_column = *column; - auto d_offsets = offsets_column->view().data(); - auto chars_column = cudf::strings::detail::create_chars_child_column(bytes, stream, mr); - auto d_chars = chars_column->mutable_view().data(); - - thrust::for_each_n( - rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - strings_count, - integers_to_mac_fn{d_column, d_offsets, d_chars} - ); - - return cudf::make_strings_column(strings_count, - std::move(offsets_column), - std::move(chars_column), - 0, - {}); + auto dst_offsets = sizes_to_offsets(packet_count, payload_sizes, stream); + dim3 threadsPerBlock(32, 32); + dim3 numBlocks((packet_count + threadsPerBlock.x - 1) / threadsPerBlock.x, + (MAX_PKT_SIZE + threadsPerBlock.y - 1) / threadsPerBlock.y); + _packet_gather_payload_kernel<<>>( + packet_count, packets_buffer, header_sizes, payload_sizes, dst_buff, static_cast(dst_offsets.data())); } - -} //doca -} //morpheus +} // namespace doca +} // namespace morpheus diff --git a/morpheus/_lib/doca/src/doca_convert_stage.cpp b/morpheus/_lib/doca/src/doca_convert_stage.cpp new file mode 100644 index 0000000000..71ab309161 --- /dev/null +++ b/morpheus/_lib/doca/src/doca_convert_stage.cpp @@ -0,0 +1,326 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "morpheus/doca/doca_convert_stage.hpp" + +#include "morpheus/doca/doca_kernels.hpp" +#include "morpheus/messages/meta.hpp" +#include "morpheus/messages/raw_packet.hpp" +#include "morpheus/objects/dev_mem_info.hpp" // for DevMemInfo +#include "morpheus/objects/dtype.hpp" // for DType +#include "morpheus/types.hpp" // for TensorIndex +#include "morpheus/utilities/matx_util.hpp" // for MatxUtil + +#include +#include +#include +#include +#include +#include +#include +#include +#include // for data_type, type_id +#include +#include // for Status +#include // for MRC_CHECK_CUDA +#include +#include +#include +#include // for device_buffer +#include + +#include +#include +#include // for exception_ptr +#include +#include // for invalid_argument +#include +#include +#include +#include + +namespace { +morpheus::doca::PacketDataBuffer concat_packet_buffers(std::size_t ttl_packets, + std::size_t ttl_header_bytes, + std::size_t ttl_payload_bytes, + std::size_t ttl_payload_sizes_bytes, + std::vector&& packet_buffers) +{ + DCHECK(!packet_buffers.empty()); + + if (packet_buffers.size() == 1) + { + return std::move(packet_buffers[0]); + } + + morpheus::doca::PacketDataBuffer combined_buffer( + ttl_packets, ttl_header_bytes, ttl_payload_bytes, ttl_payload_sizes_bytes, packet_buffers[0].m_stream); + + std::size_t curr_header_offset = 0; + std::size_t curr_payload_offset = 0; + std::size_t curr_payload_sizes_offset = 0; + for (auto& packet_buffer : packet_buffers) + { + auto header_addr = static_cast(combined_buffer.m_header_buffer->data()) + curr_header_offset; + auto payload_addr = static_cast(combined_buffer.m_payload_buffer->data()) + curr_payload_offset; + auto payload_sizes_addr = + static_cast(combined_buffer.m_payload_sizes_buffer->data()) + curr_payload_sizes_offset; + + MRC_CHECK_CUDA(cudaMemcpyAsync(static_cast(header_addr), + packet_buffer.m_header_buffer->data(), + packet_buffer.m_header_buffer->size(), + cudaMemcpyDeviceToDevice, + combined_buffer.m_stream)); + + MRC_CHECK_CUDA(cudaMemcpyAsync(static_cast(payload_addr), + packet_buffer.m_payload_buffer->data(), + packet_buffer.m_payload_buffer->size(), + cudaMemcpyDeviceToDevice, + combined_buffer.m_stream)); + + MRC_CHECK_CUDA(cudaMemcpyAsync(static_cast(payload_sizes_addr), + packet_buffer.m_payload_sizes_buffer->data(), + packet_buffer.m_payload_sizes_buffer->size(), + cudaMemcpyDeviceToDevice, + combined_buffer.m_stream)); + + curr_header_offset += packet_buffer.m_header_buffer->size(); + curr_payload_offset += packet_buffer.m_payload_buffer->size(); + curr_payload_sizes_offset += packet_buffer.m_payload_sizes_buffer->size(); + } + + MRC_CHECK_CUDA(cudaStreamSynchronize(combined_buffer.m_stream)); + + return combined_buffer; +} + +std::unique_ptr make_string_col(morpheus::doca::PacketDataBuffer& packet_buffer) +{ + auto offsets_buffer = + morpheus::doca::sizes_to_offsets(packet_buffer.m_num_packets, + static_cast(packet_buffer.m_payload_sizes_buffer->data()), + packet_buffer.m_stream); + + const auto offset_count = packet_buffer.m_num_packets + 1; + const auto offset_buff_size = (offset_count) * sizeof(int32_t); + + auto offsets_col = std::make_unique(cudf::data_type(cudf::type_id::INT32), + offset_count, + std::move(offsets_buffer), + std::move(rmm::device_buffer(0, packet_buffer.m_stream)), + 0); + + return cudf::make_strings_column( + packet_buffer.m_num_packets, std::move(offsets_col), std::move(*packet_buffer.m_payload_buffer), 0, {}); +} + +std::unique_ptr make_ip_col(morpheus::doca::PacketDataBuffer& packet_buffer) +{ + const auto num_packets = static_cast(packet_buffer.m_num_packets); + + // cudf doesn't support uint32, need to cast to int64 remove this once + // https://github.com/rapidsai/cudf/issues/16324 is resolved + auto src_type = morpheus::DType::create(); + auto dst_type = morpheus::DType(morpheus::TypeId::INT64); + auto dev_mem_info = morpheus::DevMemInfo(packet_buffer.m_header_buffer, src_type, {num_packets}, {1}); + + auto ip_int64_buff = morpheus::MatxUtil::cast(dev_mem_info, dst_type.type_id()); + + auto src_ip_int_col = std::make_unique(cudf::data_type(dst_type.cudf_type_id()), + num_packets, + std::move(*ip_int64_buff), + std::move(rmm::device_buffer(0, packet_buffer.m_stream)), + 0); + + return cudf::strings::integers_to_ipv4(src_ip_int_col->view()); +} +} // namespace + +namespace morpheus { + +DocaConvertStage::DocaConvertStage(std::chrono::milliseconds max_batch_delay, + std::size_t max_batch_size, + std::size_t buffer_channel_size) : + base_t(base_t::op_factory_from_sub_fn(build())), + m_max_batch_delay{max_batch_delay}, + m_max_batch_size{max_batch_size}, + m_buffer_channel{std::make_shared>(buffer_channel_size)} +{ + if (m_max_batch_size < doca::MAX_PKT_RECEIVE) + { + throw std::invalid_argument("max_batch_size is less than the maximum number of packets in a RawPacketMessage"); + } + + cudaStreamCreateWithFlags(&m_stream, cudaStreamNonBlocking); + m_stream_cpp = rmm::cuda_stream_view(m_stream); +} + +DocaConvertStage::~DocaConvertStage() +{ + cudaStreamDestroy(m_stream); +} + +DocaConvertStage::subscribe_fn_t DocaConvertStage::build() +{ + return [this](rxcpp::observable input, rxcpp::subscriber output) { + auto buffer_reader_fiber = boost::fibers::fiber([this, &output]() { + this->buffer_reader(output); + }); + + return input.subscribe(rxcpp::make_observer( + [this](sink_type_t x) { + this->on_raw_packet_message(x); + }, + [&](std::exception_ptr error_ptr) { + output.on_error(error_ptr); + }, + [&]() { + m_buffer_channel->close_channel(); + buffer_reader_fiber.join(); + })); + }; +} + +void DocaConvertStage::on_raw_packet_message(sink_type_t raw_msg) +{ + auto packet_count = raw_msg->count(); + auto max_size = raw_msg->get_max_size(); + auto pkt_addr_list = raw_msg->get_pkt_addr_list(); + auto pkt_hdr_size_list = raw_msg->get_pkt_hdr_size_list(); + auto pkt_pld_size_list = raw_msg->get_pkt_pld_size_list(); + auto queue_idx = raw_msg->get_queue_idx(); + + const auto payload_buff_size = doca::gather_sizes(packet_count, pkt_pld_size_list, m_stream_cpp); + + const auto header_buff_size = packet_count * sizeof(uint32_t); + const auto sizes_buff_size = packet_count * sizeof(uint32_t); + + auto packet_buffer = + doca::PacketDataBuffer(packet_count, header_buff_size, payload_buff_size, sizes_buff_size, m_stream_cpp); + + // gather payload data, intentionally calling this first as it needs to perform an early sync operation + doca::gather_payload(packet_count, + pkt_addr_list, + pkt_hdr_size_list, + pkt_pld_size_list, + static_cast(packet_buffer.m_payload_buffer->data()), + m_stream_cpp); + + // gather header data + doca::gather_src_ip( + packet_count, pkt_addr_list, static_cast(packet_buffer.m_header_buffer->data()), m_stream_cpp); + + MRC_CHECK_CUDA(cudaMemcpyAsync(static_cast(packet_buffer.m_payload_sizes_buffer->data()), + pkt_pld_size_list, + sizes_buff_size, + cudaMemcpyDeviceToDevice, + m_stream_cpp)); + cudaStreamSynchronize(m_stream_cpp); + + m_buffer_channel->await_write(std::move(packet_buffer)); +} + +void DocaConvertStage::buffer_reader(rxcpp::subscriber& output) +{ + std::vector packets; + std::size_t ttl_packets = 0; + std::size_t ttl_header_bytes = 0; + std::size_t ttl_payload_bytes = 0; + std::size_t ttl_payload_sizes_bytes = 0; + auto poll_end = std::chrono::high_resolution_clock::now() + m_max_batch_delay; + + auto combine_and_send = [&]() { + auto combined_data = concat_packet_buffers( + ttl_packets, ttl_header_bytes, ttl_payload_bytes, ttl_payload_sizes_bytes, std::move(packets)); + send_buffered_data(output, std::move(combined_data)); + + // reset variables + packets.clear(); + ttl_packets = 0; + ttl_header_bytes = 0; + ttl_payload_bytes = 0; + ttl_payload_sizes_bytes = 0; + }; + + while (!m_buffer_channel->is_channel_closed()) + { + while (std::chrono::high_resolution_clock::now() < poll_end && !m_buffer_channel->is_channel_closed()) + { + doca::PacketDataBuffer packet_buffer; + auto status = m_buffer_channel->await_read_until(packet_buffer, poll_end); + + if (status == mrc::channel::Status::success) + { + // check if we will go over the m_max_batch_size + if (ttl_packets + packet_buffer.m_num_packets > m_max_batch_size) + { + combine_and_send(); + poll_end = std::chrono::high_resolution_clock::now() + m_max_batch_delay; + } + + ttl_packets += packet_buffer.m_num_packets; + ttl_header_bytes += packet_buffer.m_header_buffer->size(); + ttl_payload_bytes += packet_buffer.m_payload_buffer->size(); + ttl_payload_sizes_bytes += packet_buffer.m_payload_sizes_buffer->size(); + packets.emplace_back(std::move(packet_buffer)); + } + } + + // if we got here that means our buffer poll timed out without hitting the max batch size, send what we have + if (!packets.empty()) + { + combine_and_send(); + } + + poll_end = std::chrono::high_resolution_clock::now() + m_max_batch_delay; + } +} + +void DocaConvertStage::send_buffered_data(rxcpp::subscriber& output, + doca::PacketDataBuffer&& packet_buffer) +{ + auto src_ip_col = make_ip_col(packet_buffer); + auto payload_col = make_string_col(packet_buffer); + + std::vector> gathered_columns; + gathered_columns.emplace_back(std::move(src_ip_col)); + gathered_columns.emplace_back(std::move(payload_col)); + + auto gathered_table = std::make_unique(std::move(gathered_columns)); + + auto gathered_metadata = cudf::io::table_metadata(); + gathered_metadata.schema_info.emplace_back("src_ip"); + gathered_metadata.schema_info.emplace_back("data"); + + auto gathered_table_w_metadata = + cudf::io::table_with_metadata{std::move(gathered_table), std::move(gathered_metadata)}; + + auto meta = MessageMeta::create_from_cpp(std::move(gathered_table_w_metadata), 0); + output.on_next(std::move(meta)); +} + +std::shared_ptr> DocaConvertStageInterfaceProxy::init( + mrc::segment::Builder& builder, + std::string const& name, + std::chrono::milliseconds max_batch_delay, + std::size_t max_batch_size, + std::size_t buffer_channel_size) +{ + return builder.construct_object(name, max_batch_delay, max_batch_size, buffer_channel_size); +} + +} // namespace morpheus diff --git a/morpheus/_lib/doca/src/doca_source_kernel.cu b/morpheus/_lib/doca/src/doca_source_kernel.cu index 0a7f6e1cc3..4d31247317 100644 --- a/morpheus/_lib/doca/src/doca_source_kernel.cu +++ b/morpheus/_lib/doca/src/doca_source_kernel.cu @@ -41,176 +41,202 @@ #include #include #include + #include #define DEVICE_GET_TIME(globaltimer) asm volatile("mov.u64 %0, %globaltimer;" : "=l"(globaltimer)) -__global__ void _packet_receive_kernel( - doca_gpu_eth_rxq* rxq_0, doca_gpu_eth_rxq* rxq_1, - doca_gpu_semaphore_gpu* sem_0, doca_gpu_semaphore_gpu* sem_1, - uint16_t sem_idx_0, uint16_t sem_idx_1, - const bool is_tcp, uint32_t* exit_condition -) +using namespace morpheus::doca; + +__global__ void _packet_receive_kernel(doca_gpu_eth_rxq* rxq_0, + doca_gpu_eth_rxq* rxq_1, + doca_gpu_semaphore_gpu* sem_0, + doca_gpu_semaphore_gpu* sem_1, + uint16_t sem_idx_0, + uint16_t sem_idx_1, + const bool is_tcp, + uint32_t* exit_condition) { __shared__ uint32_t packet_count_received; __shared__ uint64_t packet_offset_received; - __shared__ struct packets_info *pkt_info; + __shared__ struct packets_info* pkt_info; #if RUN_PERSISTENT doca_gpu_semaphore_status sem_status; #endif - doca_gpu_buf *buf_ptr; + doca_gpu_buf* buf_ptr; uintptr_t buf_addr; doca_error_t doca_ret; - struct eth_ip_tcp_hdr *hdr_tcp; - struct eth_ip_udp_hdr *hdr_udp; - uint8_t *payload; + struct eth_ip_tcp_hdr* hdr_tcp; + struct eth_ip_udp_hdr* hdr_udp; + uint8_t* payload; doca_gpu_eth_rxq* rxq; doca_gpu_semaphore_gpu* sem; uint16_t sem_idx; uint32_t pkt_idx = threadIdx.x; // unsigned long long rx_start = 0, rx_stop = 0, pkt_proc = 0, reduce_stop =0, reduce_start = 0; - if (blockIdx.x == 0) { - rxq = rxq_0; - sem = sem_0; + if (blockIdx.x == 0) + { + rxq = rxq_0; + sem = sem_0; sem_idx = sem_idx_0; - } else { - rxq = rxq_1; - sem = sem_1; + } + else + { + rxq = rxq_1; + sem = sem_1; sem_idx = sem_idx_1; } - //Initial semaphore index 0, assume it's free! - doca_ret = doca_gpu_dev_semaphore_get_custom_info_addr(sem, sem_idx, (void **)&pkt_info); - if (doca_ret != DOCA_SUCCESS) { + // Initial semaphore index 0, assume it's free! + doca_ret = doca_gpu_dev_semaphore_get_custom_info_addr(sem, sem_idx, (void**)&pkt_info); + if (doca_ret != DOCA_SUCCESS) + { printf("Error %d doca_gpu_dev_semaphore_get_custom_info_addr\n", doca_ret); DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; return; } - if (threadIdx.x == 0) { + if (threadIdx.x == 0) + { DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = 0; - DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0; + DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0; } __syncthreads(); // do { - // if (threadIdx.x == 0) DEVICE_GET_TIME(rx_start); - doca_ret = doca_gpu_dev_eth_rxq_receive_block(rxq, PACKETS_PER_BLOCK, PACKET_RX_TIMEOUT_NS, &packet_count_received, &packet_offset_received); - if (doca_ret != DOCA_SUCCESS) [[unlikely]] { + // if (threadIdx.x == 0) DEVICE_GET_TIME(rx_start); + doca_ret = doca_gpu_dev_eth_rxq_receive_block( + rxq, PACKETS_PER_BLOCK, PACKET_RX_TIMEOUT_NS, &packet_count_received, &packet_offset_received); + if (doca_ret != DOCA_SUCCESS) [[unlikely]] + { + DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; + return; + } + __threadfence(); + if (DOCA_GPUNETIO_VOLATILE(packet_count_received) == 0) + return; + + while (pkt_idx < DOCA_GPUNETIO_VOLATILE(packet_count_received)) + { + doca_ret = + doca_gpu_dev_eth_rxq_get_buf(rxq, DOCA_GPUNETIO_VOLATILE(packet_offset_received) + pkt_idx, &buf_ptr); + if (doca_ret != DOCA_SUCCESS) [[unlikely]] + { DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; return; } - __threadfence(); - if (DOCA_GPUNETIO_VOLATILE(packet_count_received) == 0) + + doca_ret = doca_gpu_dev_buf_get_addr(buf_ptr, &buf_addr); + if (doca_ret != DOCA_SUCCESS) [[unlikely]] + { + DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; return; + } - while (pkt_idx < DOCA_GPUNETIO_VOLATILE(packet_count_received)) { - doca_ret = doca_gpu_dev_eth_rxq_get_buf(rxq, DOCA_GPUNETIO_VOLATILE(packet_offset_received) + pkt_idx, &buf_ptr); - if (doca_ret != DOCA_SUCCESS) [[unlikely]] { - DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; - return; - } - - doca_ret = doca_gpu_dev_buf_get_addr(buf_ptr, &buf_addr); - if (doca_ret != DOCA_SUCCESS) [[unlikely]] { - DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; - return; - } - - pkt_info->pkt_addr[pkt_idx] = buf_addr; - if (is_tcp) { - raw_to_tcp(buf_addr, &hdr_tcp, &payload); - pkt_info->pkt_hdr_size[pkt_idx] = TCP_HDR_SIZE; - pkt_info->pkt_pld_size[pkt_idx] = get_payload_tcp_size(hdr_tcp->l3_hdr, hdr_tcp->l4_hdr); - } else { - raw_to_udp(buf_addr, &hdr_udp, &payload); - pkt_info->pkt_hdr_size[pkt_idx] = UDP_HDR_SIZE; - pkt_info->pkt_pld_size[pkt_idx] = get_payload_udp_size(hdr_udp->l3_hdr, hdr_udp->l4_hdr); - } - - pkt_idx += blockDim.x; + pkt_info->pkt_addr[pkt_idx] = buf_addr; + if (is_tcp) + { + raw_to_tcp(buf_addr, &hdr_tcp, &payload); + pkt_info->pkt_hdr_size[pkt_idx] = TCP_HDR_SIZE; + pkt_info->pkt_pld_size[pkt_idx] = get_payload_tcp_size(hdr_tcp->l3_hdr, hdr_tcp->l4_hdr); } - __syncthreads(); - - if (threadIdx.x == 0) { - DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = packet_count_received; - DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0; - doca_ret = doca_gpu_dev_semaphore_set_status(sem, sem_idx, DOCA_GPU_SEMAPHORE_STATUS_READY); - if (doca_ret != DOCA_SUCCESS) { - printf("Error %d doca_gpu_dev_semaphore_set_status\n", doca_ret); - DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; - } - - // printf("CUDA rx time %ld proc time %ld pkt conv %ld block reduce %ld\n", - // rx_stop - rx_start, - // pkt_proc - rx_stop, - // reduce_start - rx_stop, - // reduce_stop - reduce_start); + else + { + raw_to_udp(buf_addr, &hdr_udp, &payload); + pkt_info->pkt_hdr_size[pkt_idx] = UDP_HDR_SIZE; + pkt_info->pkt_pld_size[pkt_idx] = get_payload_udp_size(hdr_udp->l3_hdr, hdr_udp->l4_hdr); } - __syncthreads(); + + pkt_idx += blockDim.x; + } + __syncthreads(); + + if (threadIdx.x == 0) + { + DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = packet_count_received; + DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0; + doca_ret = doca_gpu_dev_semaphore_set_status(sem, sem_idx, DOCA_GPU_SEMAPHORE_STATUS_READY); + if (doca_ret != DOCA_SUCCESS) + { + printf("Error %d doca_gpu_dev_semaphore_set_status\n", doca_ret); + DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; + } + + // printf("CUDA rx time %ld proc time %ld pkt conv %ld block reduce %ld\n", + // rx_stop - rx_start, + // pkt_proc - rx_stop, + // reduce_start - rx_stop, + // reduce_stop - reduce_start); + } + __syncthreads(); #if RUN_PERSISTENT - // sem_idx = (sem_idx+1)%MAX_SEM_X_QUEUE; - - // Get packets' info from next semaphore - // if (threadIdx.x == 0) { - // do { - // doca_ret = doca_gpu_dev_semaphore_get_status(sem, sem_idx, &sem_status); - // if (doca_ret != DOCA_SUCCESS) { - // printf("Error %d doca_gpu_dev_semaphore_get_status\n", doca_ret); - // DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; - // break; - // } - - // if (sem_status == DOCA_GPU_SEMAPHORE_STATUS_FREE) { - // doca_ret = doca_gpu_dev_semaphore_get_custom_info_addr(sem, sem_idx, (void **)&pkt_info); - // if (doca_ret != DOCA_SUCCESS) { - // printf("Error %d doca_gpu_dev_semaphore_get_custom_info_addr\n", doca_ret); - // DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; - // } - - // DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = 0; - // DOCA_GPUNETIO_VOLATILE(pkt_info->payload_size_total_out) = 0; - // DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0; - - // break; - // } - // } while (DOCA_GPUNETIO_VOLATILE(*exit_condition) == 0); - // } - // __syncthreads(); + // sem_idx = (sem_idx+1)%MAX_SEM_X_QUEUE; + + // Get packets' info from next semaphore + // if (threadIdx.x == 0) { + // do { + // doca_ret = doca_gpu_dev_semaphore_get_status(sem, sem_idx, &sem_status); + // if (doca_ret != DOCA_SUCCESS) { + // printf("Error %d doca_gpu_dev_semaphore_get_status\n", doca_ret); + // DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; + // break; + // } + + // if (sem_status == DOCA_GPU_SEMAPHORE_STATUS_FREE) { + // doca_ret = doca_gpu_dev_semaphore_get_custom_info_addr(sem, sem_idx, (void **)&pkt_info); + // if (doca_ret != DOCA_SUCCESS) { + // printf("Error %d doca_gpu_dev_semaphore_get_custom_info_addr\n", doca_ret); + // DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; + // } + + // DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = 0; + // DOCA_GPUNETIO_VOLATILE(pkt_info->payload_size_total_out) = 0; + // DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0; + + // break; + // } + // } while (DOCA_GPUNETIO_VOLATILE(*exit_condition) == 0); + // } + // __syncthreads(); // } while (DOCA_GPUNETIO_VOLATILE(*exit_condition) == 0) - if (threadIdx.x == 0) - doca_gpu_dev_sem_set_status(sem_in, *sem_idx, DOCA_GPU_SEMAPHORE_STATUS_FREE); - // __threadfence(); - // __syncthreads(); + if (threadIdx.x == 0) + doca_gpu_dev_sem_set_status(sem_in, *sem_idx, DOCA_GPU_SEMAPHORE_STATUS_FREE); + // __threadfence(); + // __syncthreads(); #endif } namespace morpheus { namespace doca { -int packet_receive_kernel(doca_gpu_eth_rxq* rxq_0, doca_gpu_eth_rxq* rxq_1, - doca_gpu_semaphore_gpu* sem_0, doca_gpu_semaphore_gpu* sem_1, - uint16_t sem_idx_0, uint16_t sem_idx_1, - bool is_tcp, - uint32_t* exit_condition, - cudaStream_t stream) +int packet_receive_kernel(doca_gpu_eth_rxq* rxq_0, + doca_gpu_eth_rxq* rxq_1, + doca_gpu_semaphore_gpu* sem_0, + doca_gpu_semaphore_gpu* sem_1, + uint16_t sem_idx_0, + uint16_t sem_idx_1, + bool is_tcp, + uint32_t* exit_condition, + cudaStream_t stream) { cudaError_t result = cudaSuccess; - _packet_receive_kernel<<>>(rxq_0, rxq_1, sem_0, sem_1, sem_idx_0, sem_idx_1, is_tcp, exit_condition); + _packet_receive_kernel<<>>( + rxq_0, rxq_1, sem_0, sem_1, sem_idx_0, sem_idx_1, is_tcp, exit_condition); - /* Check no previous CUDA errors */ - result = cudaGetLastError(); - if (cudaSuccess != result) { - fprintf(stderr, "[%s:%d] cuda failed with %s\n", __FILE__, __LINE__, cudaGetErrorString(result)); - return -1; - } + /* Check no previous CUDA errors */ + result = cudaGetLastError(); + if (cudaSuccess != result) + { + fprintf(stderr, "[%s:%d] cuda failed with %s\n", __FILE__, __LINE__, cudaGetErrorString(result)); + return -1; + } return 0; } -} //doca -} //morpheus +} // namespace doca +} // namespace morpheus diff --git a/morpheus/_lib/doca/src/doca_source.cpp b/morpheus/_lib/doca/src/doca_source_stage.cpp similarity index 86% rename from morpheus/_lib/doca/src/doca_source.cpp rename to morpheus/_lib/doca/src/doca_source_stage.cpp index 1a4f7d5d17..8df71bf31e 100644 --- a/morpheus/_lib/doca/src/doca_source.cpp +++ b/morpheus/_lib/doca/src/doca_source_stage.cpp @@ -15,7 +15,7 @@ * limitations under the License. */ -#include "cuda.h" +#include "morpheus/doca/doca_source_stage.hpp" #include "morpheus/doca/common.hpp" #include "morpheus/doca/doca_context.hpp" @@ -24,11 +24,11 @@ #include "morpheus/doca/doca_rx_pipe.hpp" #include "morpheus/doca/doca_rx_queue.hpp" #include "morpheus/doca/doca_semaphore.hpp" -#include "morpheus/doca/doca_stages.hpp" #include "morpheus/messages/raw_packet.hpp" #include "morpheus/utilities/error.hpp" #include +#include #include #include #include @@ -40,6 +40,7 @@ #include #include +#include #include #include #include @@ -50,11 +51,13 @@ #include #include -#define debug_get_timestamp(ts) clock_gettime(CLOCK_REALTIME, (ts)) +#define DEBUG_GET_TIMESTAMP(ts) clock_gettime(CLOCK_REALTIME, (ts)) #define ENABLE_TIMERS 0 namespace morpheus { +using namespace morpheus::doca; + DocaSourceStage::DocaSourceStage(std::string const& nic_pci_address, std::string const& gpu_pci_address, std::string const& traffic_type) : @@ -94,23 +97,25 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build() CUcontext cuContext; cudaSetDevice(0); // Need to rely on GPU 0 - cudaFree(0); + cudaFree(0); // NOLINT(modernize-use-nullptr) cuDeviceGet(&cuDevice, 0); cuCtxCreate(&cuContext, CU_CTX_SCHED_SPIN | CU_CTX_MAP_HOST, cuDevice); cuCtxPushCurrent(cuContext); struct packets_info* pkt_ptr; - int sem_idx[MAX_QUEUE] = {0}; - cudaStream_t rstream = nullptr; - int thread_idx = mrc::runnable::Context::get_runtime_context().rank(); + std::array sem_idx; + sem_idx.fill(0); + + cudaStream_t rstream = nullptr; + int thread_idx = mrc::runnable::Context::get_runtime_context().rank(); // Add per queue auto pkt_addr_unique = std::make_unique>( - m_context, MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE, DOCA_GPU_MEM_TYPE_GPU); + m_context, MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE * MAX_QUEUE, DOCA_GPU_MEM_TYPE_GPU); auto pkt_hdr_size_unique = std::make_unique>( - m_context, MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE, DOCA_GPU_MEM_TYPE_GPU); + m_context, MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE * MAX_QUEUE, DOCA_GPU_MEM_TYPE_GPU); auto pkt_pld_size_unique = std::make_unique>( - m_context, MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE, DOCA_GPU_MEM_TYPE_GPU); + m_context, MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE * MAX_QUEUE, DOCA_GPU_MEM_TYPE_GPU); if (thread_idx > 1) { @@ -128,10 +133,13 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build() { for (int idxs = 0; idxs < MAX_SEM_X_QUEUE; idxs++) { - pkt_ptr = static_cast(m_semaphore[queue_idx]->get_info_cpu(idxs)); - pkt_ptr->pkt_addr = pkt_addr_unique->gpu_ptr() + (MAX_PKT_RECEIVE * idxs); - pkt_ptr->pkt_hdr_size = pkt_hdr_size_unique->gpu_ptr() + (MAX_PKT_RECEIVE * idxs); - pkt_ptr->pkt_pld_size = pkt_pld_size_unique->gpu_ptr() + (MAX_PKT_RECEIVE * idxs); + pkt_ptr = static_cast(m_semaphore[queue_idx]->get_info_cpu(idxs)); + pkt_ptr->pkt_addr = pkt_addr_unique->gpu_ptr() + (MAX_PKT_RECEIVE * idxs) + + (MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE * queue_idx); + pkt_ptr->pkt_hdr_size = pkt_hdr_size_unique->gpu_ptr() + (MAX_PKT_RECEIVE * idxs) + + (MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE * queue_idx); + pkt_ptr->pkt_pld_size = pkt_pld_size_unique->gpu_ptr() + (MAX_PKT_RECEIVE * idxs) + + (MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE * queue_idx); } } @@ -162,7 +170,7 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build() m_semaphore[1]->gpu_ptr(), sem_idx[0], sem_idx[1], - (m_traffic_type == DOCA_TRAFFIC_TYPE_TCP) ? true : false, + (m_traffic_type == DOCA_TRAFFIC_TYPE_TCP), exit_condition->gpu_ptr(), rstream); cudaStreamSynchronize(rstream); diff --git a/morpheus/_lib/doca/src/packet_data_buffer.cpp b/morpheus/_lib/doca/src/packet_data_buffer.cpp new file mode 100644 index 0000000000..03ca961d25 --- /dev/null +++ b/morpheus/_lib/doca/src/packet_data_buffer.cpp @@ -0,0 +1,43 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "morpheus/doca/packet_data_buffer.hpp" + +namespace morpheus::doca { + +PacketDataBuffer::PacketDataBuffer() : + m_num_packets{0}, + m_stream{rmm::cuda_stream_per_thread}, + m_header_buffer{nullptr}, + m_payload_buffer{nullptr}, + m_payload_sizes_buffer{nullptr} +{} + +PacketDataBuffer::PacketDataBuffer(std::size_t num_packets, + std::size_t header_size, + std::size_t payload_size, + std::size_t payload_sizes_size, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) : + m_num_packets{num_packets}, + m_stream{stream}, + m_header_buffer{std::make_shared(header_size, stream, mr)}, + m_payload_buffer{std::make_unique(payload_size, stream, mr)}, + m_payload_sizes_buffer{std::make_unique(payload_sizes_size, stream, mr)} +{} + +} // namespace morpheus::doca diff --git a/morpheus/stages/doca/doca_convert_stage.py b/morpheus/stages/doca/doca_convert_stage.py index c4da3bd672..a5c52f0276 100644 --- a/morpheus/stages/doca/doca_convert_stage.py +++ b/morpheus/stages/doca/doca_convert_stage.py @@ -14,6 +14,7 @@ import logging import typing +from datetime import timedelta import mrc @@ -27,6 +28,8 @@ logger = logging.getLogger(__name__) +MAX_PKT_RECEIVE = 512 * 16 + @register_stage("from-doca-convert", modes=[PipelineModes.NLP]) class DocaConvertStage(PreallocatorMixin, SinglePortStage): @@ -37,12 +40,33 @@ class DocaConvertStage(PreallocatorMixin, SinglePortStage): ---------- c : `morpheus.config.Config` Pipeline configuration instance. + max_batch_delay_sec : `float` + Maximum amount of time to wait, in seconds, for additional incoming packets prior to constructing a cuDF + DataFrame. + max_batch_size : `int` + Maximum number of packets to attempt to combine into a single cuDF DataFrame. Must be greater than or equal to + `MAX_PKT_RECEIVE`. + buffer_channel_size : `int`, optional + The size of the internal buffer to store incoming packet data. If `None`, the config's `edge_buffer_size` will + be used. """ - def __init__(self, c: Config): + def __init__(self, + c: Config, + max_batch_delay_sec: float = 0.5, + max_batch_size: int = MAX_PKT_RECEIVE * 5, + buffer_channel_size: int = None): super().__init__(c) + self._max_batch_delay = timedelta(seconds=max_batch_delay_sec) + self._buffer_channel_size = buffer_channel_size or c.edge_buffer_size + + if max_batch_size < MAX_PKT_RECEIVE: + raise RuntimeError(f"max_batch_size ({max_batch_size}) must be greater than or equal to {MAX_PKT_RECEIVE}") + + self._max_batch_size = max_batch_size + # Attempt to import the C++ stage on creation try: # pylint: disable=c-extension-no-member @@ -54,8 +78,6 @@ def __init__(self, c: Config): "Ensure the DOCA components have been built and installed. Error message: ") + ex.msg) from ex - self._max_concurrent = 5 - @property def name(self) -> str: return "from-doca-convert" @@ -77,8 +99,12 @@ def accepted_types(self) -> tuple: def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: if self._build_cpp_node(): - node = self.doca_convert_class(builder, self.unique_name) - node.launch_options.pe_count = self._max_concurrent + node = self.doca_convert_class(builder, + self.unique_name, + max_batch_delay=self._max_batch_delay, + max_batch_size=self._max_batch_size, + buffer_channel_size=self._buffer_channel_size) + builder.make_edge(input_node, node) return node