diff --git a/morpheus/_lib/doca/include/morpheus/doca/common.hpp b/morpheus/_lib/doca/include/morpheus/doca/common.hpp index 236cdc9eb5..5605fe2dd2 100644 --- a/morpheus/_lib/doca/include/morpheus/doca/common.hpp +++ b/morpheus/_lib/doca/include/morpheus/doca/common.hpp @@ -67,4 +67,4 @@ struct packets_info uint32_t* timestamp_out; }; -} // namespace morpheus::doca \ No newline at end of file +} // namespace morpheus::doca diff --git a/morpheus/_lib/doca/include/morpheus/doca/packets.hpp b/morpheus/_lib/doca/include/morpheus/doca/packets.hpp index c0698d9c0f..094c9d9cc9 100644 --- a/morpheus/_lib/doca/include/morpheus/doca/packets.hpp +++ b/morpheus/_lib/doca/include/morpheus/doca/packets.hpp @@ -172,4 +172,3 @@ __device__ __forceinline__ uint32_t ip_to_int32(uint32_t address) return (address & 0x000000ff) << 24 | (address & 0x0000ff00) << 8 | (address & 0x00ff0000) >> 8 | (address & 0xff000000) >> 24; } - diff --git a/morpheus/_lib/doca/src/doca_convert_kernel.cu b/morpheus/_lib/doca/src/doca_convert_kernel.cu index 65f60fe94c..324f3e0026 100644 --- a/morpheus/_lib/doca/src/doca_convert_kernel.cu +++ b/morpheus/_lib/doca/src/doca_convert_kernel.cu @@ -62,16 +62,14 @@ __global__ void _packet_gather_payload_kernel(int32_t packet_count, } } -__global__ void _packet_gather_src_ip_kernel(int32_t packet_count, - uintptr_t* packets_buffer, - uint32_t* dst_buff) +__global__ void _packet_gather_src_ip_kernel(int32_t packet_count, uintptr_t* packets_buffer, uint32_t* dst_buff) { int pkt_idx = blockIdx.x * blockDim.x + threadIdx.x; if (pkt_idx < packet_count) { uint8_t* pkt_hdr_addr = (uint8_t*)(packets_buffer[pkt_idx]); - dst_buff[pkt_idx] = ip_to_int32(((struct eth_ip*)pkt_hdr_addr)->l3_hdr.src_addr); + dst_buff[pkt_idx] = ip_to_int32(((struct eth_ip*)pkt_hdr_addr)->l3_hdr.src_addr); } } @@ -120,8 +118,7 @@ void gather_src_ip(int32_t packet_count, rmm::mr::device_memory_resource* mr) { int numBlocks = (packet_count + THREADS_PER_BLOCK - 1) / THREADS_PER_BLOCK; - _packet_gather_src_ip_kernel<<>>( - packet_count, packets_buffer, dst_buff); + _packet_gather_src_ip_kernel<<>>(packet_count, packets_buffer, dst_buff); } void gather_payload(int32_t packet_count, @@ -134,7 +131,8 @@ void gather_payload(int32_t packet_count, { auto dst_offsets = sizes_to_offsets(packet_count, payload_sizes, stream); dim3 threadsPerBlock(32, 32); - dim3 numBlocks((packet_count + threadsPerBlock.x - 1) / threadsPerBlock.x, (MAX_PKT_SIZE+threadsPerBlock.y-1) / threadsPerBlock.y); + dim3 numBlocks((packet_count + threadsPerBlock.x - 1) / threadsPerBlock.x, + (MAX_PKT_SIZE + threadsPerBlock.y - 1) / threadsPerBlock.y); _packet_gather_payload_kernel<<>>( packet_count, packets_buffer, header_sizes, payload_sizes, dst_buff, static_cast(dst_offsets.data())); } diff --git a/morpheus/_lib/doca/src/doca_convert_stage.cpp b/morpheus/_lib/doca/src/doca_convert_stage.cpp index 00c2aead49..5286aeb089 100644 --- a/morpheus/_lib/doca/src/doca_convert_stage.cpp +++ b/morpheus/_lib/doca/src/doca_convert_stage.cpp @@ -148,7 +148,7 @@ std::unique_ptr make_ip_col(morpheus::doca::PacketDataBuffer& pack return cudf::strings::integers_to_ipv4(src_ip_int_col->view()); } -} //namespace +} // namespace namespace morpheus { @@ -213,10 +213,8 @@ void DocaConvertStage::on_raw_packet_message(sink_type_t raw_msg) m_stream_cpp); // gather header data - doca::gather_src_ip(packet_count, - pkt_addr_list, - static_cast(packet_buffer.m_header_buffer->data()), - m_stream_cpp); + doca::gather_src_ip( + packet_count, pkt_addr_list, static_cast(packet_buffer.m_header_buffer->data()), m_stream_cpp); MRC_CHECK_CUDA(cudaMemcpyAsync(static_cast(packet_buffer.m_payload_sizes_buffer->data()), pkt_pld_size_list, diff --git a/morpheus/_lib/doca/src/doca_source_kernel.cu b/morpheus/_lib/doca/src/doca_source_kernel.cu index 6cc28e56a5..4d31247317 100644 --- a/morpheus/_lib/doca/src/doca_source_kernel.cu +++ b/morpheus/_lib/doca/src/doca_source_kernel.cu @@ -41,178 +41,202 @@ #include #include #include + #include #define DEVICE_GET_TIME(globaltimer) asm volatile("mov.u64 %0, %globaltimer;" : "=l"(globaltimer)) using namespace morpheus::doca; -__global__ void _packet_receive_kernel( - doca_gpu_eth_rxq* rxq_0, doca_gpu_eth_rxq* rxq_1, - doca_gpu_semaphore_gpu* sem_0, doca_gpu_semaphore_gpu* sem_1, - uint16_t sem_idx_0, uint16_t sem_idx_1, - const bool is_tcp, uint32_t* exit_condition -) +__global__ void _packet_receive_kernel(doca_gpu_eth_rxq* rxq_0, + doca_gpu_eth_rxq* rxq_1, + doca_gpu_semaphore_gpu* sem_0, + doca_gpu_semaphore_gpu* sem_1, + uint16_t sem_idx_0, + uint16_t sem_idx_1, + const bool is_tcp, + uint32_t* exit_condition) { __shared__ uint32_t packet_count_received; __shared__ uint64_t packet_offset_received; - __shared__ struct packets_info *pkt_info; + __shared__ struct packets_info* pkt_info; #if RUN_PERSISTENT doca_gpu_semaphore_status sem_status; #endif - doca_gpu_buf *buf_ptr; + doca_gpu_buf* buf_ptr; uintptr_t buf_addr; doca_error_t doca_ret; - struct eth_ip_tcp_hdr *hdr_tcp; - struct eth_ip_udp_hdr *hdr_udp; - uint8_t *payload; + struct eth_ip_tcp_hdr* hdr_tcp; + struct eth_ip_udp_hdr* hdr_udp; + uint8_t* payload; doca_gpu_eth_rxq* rxq; doca_gpu_semaphore_gpu* sem; uint16_t sem_idx; uint32_t pkt_idx = threadIdx.x; // unsigned long long rx_start = 0, rx_stop = 0, pkt_proc = 0, reduce_stop =0, reduce_start = 0; - if (blockIdx.x == 0) { - rxq = rxq_0; - sem = sem_0; + if (blockIdx.x == 0) + { + rxq = rxq_0; + sem = sem_0; sem_idx = sem_idx_0; - } else { - rxq = rxq_1; - sem = sem_1; + } + else + { + rxq = rxq_1; + sem = sem_1; sem_idx = sem_idx_1; } - //Initial semaphore index 0, assume it's free! - doca_ret = doca_gpu_dev_semaphore_get_custom_info_addr(sem, sem_idx, (void **)&pkt_info); - if (doca_ret != DOCA_SUCCESS) { + // Initial semaphore index 0, assume it's free! + doca_ret = doca_gpu_dev_semaphore_get_custom_info_addr(sem, sem_idx, (void**)&pkt_info); + if (doca_ret != DOCA_SUCCESS) + { printf("Error %d doca_gpu_dev_semaphore_get_custom_info_addr\n", doca_ret); DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; return; } - if (threadIdx.x == 0) { + if (threadIdx.x == 0) + { DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = 0; - DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0; + DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0; } __syncthreads(); // do { - // if (threadIdx.x == 0) DEVICE_GET_TIME(rx_start); - doca_ret = doca_gpu_dev_eth_rxq_receive_block(rxq, PACKETS_PER_BLOCK, PACKET_RX_TIMEOUT_NS, &packet_count_received, &packet_offset_received); - if (doca_ret != DOCA_SUCCESS) [[unlikely]] { + // if (threadIdx.x == 0) DEVICE_GET_TIME(rx_start); + doca_ret = doca_gpu_dev_eth_rxq_receive_block( + rxq, PACKETS_PER_BLOCK, PACKET_RX_TIMEOUT_NS, &packet_count_received, &packet_offset_received); + if (doca_ret != DOCA_SUCCESS) [[unlikely]] + { + DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; + return; + } + __threadfence(); + if (DOCA_GPUNETIO_VOLATILE(packet_count_received) == 0) + return; + + while (pkt_idx < DOCA_GPUNETIO_VOLATILE(packet_count_received)) + { + doca_ret = + doca_gpu_dev_eth_rxq_get_buf(rxq, DOCA_GPUNETIO_VOLATILE(packet_offset_received) + pkt_idx, &buf_ptr); + if (doca_ret != DOCA_SUCCESS) [[unlikely]] + { DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; return; } - __threadfence(); - if (DOCA_GPUNETIO_VOLATILE(packet_count_received) == 0) + + doca_ret = doca_gpu_dev_buf_get_addr(buf_ptr, &buf_addr); + if (doca_ret != DOCA_SUCCESS) [[unlikely]] + { + DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; return; + } - while (pkt_idx < DOCA_GPUNETIO_VOLATILE(packet_count_received)) { - doca_ret = doca_gpu_dev_eth_rxq_get_buf(rxq, DOCA_GPUNETIO_VOLATILE(packet_offset_received) + pkt_idx, &buf_ptr); - if (doca_ret != DOCA_SUCCESS) [[unlikely]] { - DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; - return; - } - - doca_ret = doca_gpu_dev_buf_get_addr(buf_ptr, &buf_addr); - if (doca_ret != DOCA_SUCCESS) [[unlikely]] { - DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; - return; - } - - pkt_info->pkt_addr[pkt_idx] = buf_addr; - if (is_tcp) { - raw_to_tcp(buf_addr, &hdr_tcp, &payload); - pkt_info->pkt_hdr_size[pkt_idx] = TCP_HDR_SIZE; - pkt_info->pkt_pld_size[pkt_idx] = get_payload_tcp_size(hdr_tcp->l3_hdr, hdr_tcp->l4_hdr); - } else { - raw_to_udp(buf_addr, &hdr_udp, &payload); - pkt_info->pkt_hdr_size[pkt_idx] = UDP_HDR_SIZE; - pkt_info->pkt_pld_size[pkt_idx] = get_payload_udp_size(hdr_udp->l3_hdr, hdr_udp->l4_hdr); - } - - pkt_idx += blockDim.x; + pkt_info->pkt_addr[pkt_idx] = buf_addr; + if (is_tcp) + { + raw_to_tcp(buf_addr, &hdr_tcp, &payload); + pkt_info->pkt_hdr_size[pkt_idx] = TCP_HDR_SIZE; + pkt_info->pkt_pld_size[pkt_idx] = get_payload_tcp_size(hdr_tcp->l3_hdr, hdr_tcp->l4_hdr); + } + else + { + raw_to_udp(buf_addr, &hdr_udp, &payload); + pkt_info->pkt_hdr_size[pkt_idx] = UDP_HDR_SIZE; + pkt_info->pkt_pld_size[pkt_idx] = get_payload_udp_size(hdr_udp->l3_hdr, hdr_udp->l4_hdr); } - __syncthreads(); - - if (threadIdx.x == 0) { - DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = packet_count_received; - DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0; - doca_ret = doca_gpu_dev_semaphore_set_status(sem, sem_idx, DOCA_GPU_SEMAPHORE_STATUS_READY); - if (doca_ret != DOCA_SUCCESS) { - printf("Error %d doca_gpu_dev_semaphore_set_status\n", doca_ret); - DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; - } - - // printf("CUDA rx time %ld proc time %ld pkt conv %ld block reduce %ld\n", - // rx_stop - rx_start, - // pkt_proc - rx_stop, - // reduce_start - rx_stop, - // reduce_stop - reduce_start); + + pkt_idx += blockDim.x; + } + __syncthreads(); + + if (threadIdx.x == 0) + { + DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = packet_count_received; + DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0; + doca_ret = doca_gpu_dev_semaphore_set_status(sem, sem_idx, DOCA_GPU_SEMAPHORE_STATUS_READY); + if (doca_ret != DOCA_SUCCESS) + { + printf("Error %d doca_gpu_dev_semaphore_set_status\n", doca_ret); + DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; } - __syncthreads(); + + // printf("CUDA rx time %ld proc time %ld pkt conv %ld block reduce %ld\n", + // rx_stop - rx_start, + // pkt_proc - rx_stop, + // reduce_start - rx_stop, + // reduce_stop - reduce_start); + } + __syncthreads(); #if RUN_PERSISTENT - // sem_idx = (sem_idx+1)%MAX_SEM_X_QUEUE; - - // Get packets' info from next semaphore - // if (threadIdx.x == 0) { - // do { - // doca_ret = doca_gpu_dev_semaphore_get_status(sem, sem_idx, &sem_status); - // if (doca_ret != DOCA_SUCCESS) { - // printf("Error %d doca_gpu_dev_semaphore_get_status\n", doca_ret); - // DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; - // break; - // } - - // if (sem_status == DOCA_GPU_SEMAPHORE_STATUS_FREE) { - // doca_ret = doca_gpu_dev_semaphore_get_custom_info_addr(sem, sem_idx, (void **)&pkt_info); - // if (doca_ret != DOCA_SUCCESS) { - // printf("Error %d doca_gpu_dev_semaphore_get_custom_info_addr\n", doca_ret); - // DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; - // } - - // DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = 0; - // DOCA_GPUNETIO_VOLATILE(pkt_info->payload_size_total_out) = 0; - // DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0; - - // break; - // } - // } while (DOCA_GPUNETIO_VOLATILE(*exit_condition) == 0); - // } - // __syncthreads(); + // sem_idx = (sem_idx+1)%MAX_SEM_X_QUEUE; + + // Get packets' info from next semaphore + // if (threadIdx.x == 0) { + // do { + // doca_ret = doca_gpu_dev_semaphore_get_status(sem, sem_idx, &sem_status); + // if (doca_ret != DOCA_SUCCESS) { + // printf("Error %d doca_gpu_dev_semaphore_get_status\n", doca_ret); + // DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; + // break; + // } + + // if (sem_status == DOCA_GPU_SEMAPHORE_STATUS_FREE) { + // doca_ret = doca_gpu_dev_semaphore_get_custom_info_addr(sem, sem_idx, (void **)&pkt_info); + // if (doca_ret != DOCA_SUCCESS) { + // printf("Error %d doca_gpu_dev_semaphore_get_custom_info_addr\n", doca_ret); + // DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1; + // } + + // DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = 0; + // DOCA_GPUNETIO_VOLATILE(pkt_info->payload_size_total_out) = 0; + // DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0; + + // break; + // } + // } while (DOCA_GPUNETIO_VOLATILE(*exit_condition) == 0); + // } + // __syncthreads(); // } while (DOCA_GPUNETIO_VOLATILE(*exit_condition) == 0) - if (threadIdx.x == 0) - doca_gpu_dev_sem_set_status(sem_in, *sem_idx, DOCA_GPU_SEMAPHORE_STATUS_FREE); - // __threadfence(); - // __syncthreads(); + if (threadIdx.x == 0) + doca_gpu_dev_sem_set_status(sem_in, *sem_idx, DOCA_GPU_SEMAPHORE_STATUS_FREE); + // __threadfence(); + // __syncthreads(); #endif } namespace morpheus { namespace doca { -int packet_receive_kernel(doca_gpu_eth_rxq* rxq_0, doca_gpu_eth_rxq* rxq_1, - doca_gpu_semaphore_gpu* sem_0, doca_gpu_semaphore_gpu* sem_1, - uint16_t sem_idx_0, uint16_t sem_idx_1, - bool is_tcp, - uint32_t* exit_condition, - cudaStream_t stream) +int packet_receive_kernel(doca_gpu_eth_rxq* rxq_0, + doca_gpu_eth_rxq* rxq_1, + doca_gpu_semaphore_gpu* sem_0, + doca_gpu_semaphore_gpu* sem_1, + uint16_t sem_idx_0, + uint16_t sem_idx_1, + bool is_tcp, + uint32_t* exit_condition, + cudaStream_t stream) { cudaError_t result = cudaSuccess; - _packet_receive_kernel<<>>(rxq_0, rxq_1, sem_0, sem_1, sem_idx_0, sem_idx_1, is_tcp, exit_condition); + _packet_receive_kernel<<>>( + rxq_0, rxq_1, sem_0, sem_1, sem_idx_0, sem_idx_1, is_tcp, exit_condition); - /* Check no previous CUDA errors */ - result = cudaGetLastError(); - if (cudaSuccess != result) { - fprintf(stderr, "[%s:%d] cuda failed with %s\n", __FILE__, __LINE__, cudaGetErrorString(result)); - return -1; - } + /* Check no previous CUDA errors */ + result = cudaGetLastError(); + if (cudaSuccess != result) + { + fprintf(stderr, "[%s:%d] cuda failed with %s\n", __FILE__, __LINE__, cudaGetErrorString(result)); + return -1; + } return 0; } -} //doca -} //morpheus +} // namespace doca +} // namespace morpheus diff --git a/morpheus/_lib/doca/src/doca_source_stage.cpp b/morpheus/_lib/doca/src/doca_source_stage.cpp index cf9f031e07..f563470ef2 100644 --- a/morpheus/_lib/doca/src/doca_source_stage.cpp +++ b/morpheus/_lib/doca/src/doca_source_stage.cpp @@ -130,10 +130,13 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build() { for (int idxs = 0; idxs < MAX_SEM_X_QUEUE; idxs++) { - pkt_ptr = static_cast(m_semaphore[queue_idx]->get_info_cpu(idxs)); - pkt_ptr->pkt_addr = pkt_addr_unique->gpu_ptr() + (MAX_PKT_RECEIVE * idxs) + (MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE * queue_idx); - pkt_ptr->pkt_hdr_size = pkt_hdr_size_unique->gpu_ptr() + (MAX_PKT_RECEIVE * idxs) + (MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE * queue_idx); - pkt_ptr->pkt_pld_size = pkt_pld_size_unique->gpu_ptr() + (MAX_PKT_RECEIVE * idxs) + (MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE * queue_idx); + pkt_ptr = static_cast(m_semaphore[queue_idx]->get_info_cpu(idxs)); + pkt_ptr->pkt_addr = pkt_addr_unique->gpu_ptr() + (MAX_PKT_RECEIVE * idxs) + + (MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE * queue_idx); + pkt_ptr->pkt_hdr_size = pkt_hdr_size_unique->gpu_ptr() + (MAX_PKT_RECEIVE * idxs) + + (MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE * queue_idx); + pkt_ptr->pkt_pld_size = pkt_pld_size_unique->gpu_ptr() + (MAX_PKT_RECEIVE * idxs) + + (MAX_PKT_RECEIVE * MAX_SEM_X_QUEUE * queue_idx); } }