Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Jul 31, 2024
1 parent 402b489 commit f88b653
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 134 deletions.
2 changes: 1 addition & 1 deletion morpheus/_lib/doca/include/morpheus/doca/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,4 @@ struct packets_info
uint32_t* timestamp_out;
};

} // namespace morpheus::doca
} // namespace morpheus::doca
1 change: 0 additions & 1 deletion morpheus/_lib/doca/include/morpheus/doca/packets.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,3 @@ __device__ __forceinline__ uint32_t ip_to_int32(uint32_t address)
return (address & 0x000000ff) << 24 | (address & 0x0000ff00) << 8 | (address & 0x00ff0000) >> 8 |
(address & 0xff000000) >> 24;
}

12 changes: 5 additions & 7 deletions morpheus/_lib/doca/src/doca_convert_kernel.cu
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,14 @@ __global__ void _packet_gather_payload_kernel(int32_t packet_count,
}
}

__global__ void _packet_gather_src_ip_kernel(int32_t packet_count,
uintptr_t* packets_buffer,
uint32_t* dst_buff)
__global__ void _packet_gather_src_ip_kernel(int32_t packet_count, uintptr_t* packets_buffer, uint32_t* dst_buff)
{
int pkt_idx = blockIdx.x * blockDim.x + threadIdx.x;

if (pkt_idx < packet_count)
{
uint8_t* pkt_hdr_addr = (uint8_t*)(packets_buffer[pkt_idx]);
dst_buff[pkt_idx] = ip_to_int32(((struct eth_ip*)pkt_hdr_addr)->l3_hdr.src_addr);
dst_buff[pkt_idx] = ip_to_int32(((struct eth_ip*)pkt_hdr_addr)->l3_hdr.src_addr);
}
}

Expand Down Expand Up @@ -120,8 +118,7 @@ void gather_src_ip(int32_t packet_count,
rmm::mr::device_memory_resource* mr)
{
int numBlocks = (packet_count + THREADS_PER_BLOCK - 1) / THREADS_PER_BLOCK;
_packet_gather_src_ip_kernel<<<numBlocks, THREADS_PER_BLOCK, 0, stream>>>(
packet_count, packets_buffer, dst_buff);
_packet_gather_src_ip_kernel<<<numBlocks, THREADS_PER_BLOCK, 0, stream>>>(packet_count, packets_buffer, dst_buff);
}

void gather_payload(int32_t packet_count,
Expand All @@ -134,7 +131,8 @@ void gather_payload(int32_t packet_count,
{
auto dst_offsets = sizes_to_offsets(packet_count, payload_sizes, stream);
dim3 threadsPerBlock(32, 32);
dim3 numBlocks((packet_count + threadsPerBlock.x - 1) / threadsPerBlock.x, (MAX_PKT_SIZE+threadsPerBlock.y-1) / threadsPerBlock.y);
dim3 numBlocks((packet_count + threadsPerBlock.x - 1) / threadsPerBlock.x,
(MAX_PKT_SIZE + threadsPerBlock.y - 1) / threadsPerBlock.y);
_packet_gather_payload_kernel<<<numBlocks, threadsPerBlock, 0, stream>>>(
packet_count, packets_buffer, header_sizes, payload_sizes, dst_buff, static_cast<int32_t*>(dst_offsets.data()));
}
Expand Down
8 changes: 3 additions & 5 deletions morpheus/_lib/doca/src/doca_convert_stage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ std::unique_ptr<cudf::column> make_ip_col(morpheus::doca::PacketDataBuffer& pack

return cudf::strings::integers_to_ipv4(src_ip_int_col->view());
}
} //namespace
} // namespace

namespace morpheus {

Expand Down Expand Up @@ -213,10 +213,8 @@ void DocaConvertStage::on_raw_packet_message(sink_type_t raw_msg)
m_stream_cpp);

// gather header data
doca::gather_src_ip(packet_count,
pkt_addr_list,
static_cast<uint32_t*>(packet_buffer.m_header_buffer->data()),
m_stream_cpp);
doca::gather_src_ip(
packet_count, pkt_addr_list, static_cast<uint32_t*>(packet_buffer.m_header_buffer->data()), m_stream_cpp);

MRC_CHECK_CUDA(cudaMemcpyAsync(static_cast<uint8_t*>(packet_buffer.m_payload_sizes_buffer->data()),
pkt_pld_size_list,
Expand Down
256 changes: 140 additions & 116 deletions morpheus/_lib/doca/src/doca_source_kernel.cu
Original file line number Diff line number Diff line change
Expand Up @@ -41,178 +41,202 @@
#include <thrust/gather.h>
#include <thrust/iterator/constant_iterator.h>
#include <thrust/iterator/counting_iterator.h>

#include <memory>

#define DEVICE_GET_TIME(globaltimer) asm volatile("mov.u64 %0, %globaltimer;" : "=l"(globaltimer))

using namespace morpheus::doca;

__global__ void _packet_receive_kernel(
doca_gpu_eth_rxq* rxq_0, doca_gpu_eth_rxq* rxq_1,
doca_gpu_semaphore_gpu* sem_0, doca_gpu_semaphore_gpu* sem_1,
uint16_t sem_idx_0, uint16_t sem_idx_1,
const bool is_tcp, uint32_t* exit_condition
)
__global__ void _packet_receive_kernel(doca_gpu_eth_rxq* rxq_0,
doca_gpu_eth_rxq* rxq_1,
doca_gpu_semaphore_gpu* sem_0,
doca_gpu_semaphore_gpu* sem_1,
uint16_t sem_idx_0,
uint16_t sem_idx_1,
const bool is_tcp,
uint32_t* exit_condition)
{
__shared__ uint32_t packet_count_received;
__shared__ uint64_t packet_offset_received;
__shared__ struct packets_info *pkt_info;
__shared__ struct packets_info* pkt_info;
#if RUN_PERSISTENT
doca_gpu_semaphore_status sem_status;
#endif
doca_gpu_buf *buf_ptr;
doca_gpu_buf* buf_ptr;
uintptr_t buf_addr;
doca_error_t doca_ret;
struct eth_ip_tcp_hdr *hdr_tcp;
struct eth_ip_udp_hdr *hdr_udp;
uint8_t *payload;
struct eth_ip_tcp_hdr* hdr_tcp;
struct eth_ip_udp_hdr* hdr_udp;
uint8_t* payload;
doca_gpu_eth_rxq* rxq;
doca_gpu_semaphore_gpu* sem;
uint16_t sem_idx;
uint32_t pkt_idx = threadIdx.x;
// unsigned long long rx_start = 0, rx_stop = 0, pkt_proc = 0, reduce_stop =0, reduce_start = 0;

if (blockIdx.x == 0) {
rxq = rxq_0;
sem = sem_0;
if (blockIdx.x == 0)
{
rxq = rxq_0;
sem = sem_0;
sem_idx = sem_idx_0;
} else {
rxq = rxq_1;
sem = sem_1;
}
else
{
rxq = rxq_1;
sem = sem_1;
sem_idx = sem_idx_1;
}

//Initial semaphore index 0, assume it's free!
doca_ret = doca_gpu_dev_semaphore_get_custom_info_addr(sem, sem_idx, (void **)&pkt_info);
if (doca_ret != DOCA_SUCCESS) {
// Initial semaphore index 0, assume it's free!
doca_ret = doca_gpu_dev_semaphore_get_custom_info_addr(sem, sem_idx, (void**)&pkt_info);
if (doca_ret != DOCA_SUCCESS)
{
printf("Error %d doca_gpu_dev_semaphore_get_custom_info_addr\n", doca_ret);
DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1;
return;
}

if (threadIdx.x == 0) {
if (threadIdx.x == 0)
{
DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = 0;
DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0;
DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0;
}
__syncthreads();

// do {
// if (threadIdx.x == 0) DEVICE_GET_TIME(rx_start);
doca_ret = doca_gpu_dev_eth_rxq_receive_block(rxq, PACKETS_PER_BLOCK, PACKET_RX_TIMEOUT_NS, &packet_count_received, &packet_offset_received);
if (doca_ret != DOCA_SUCCESS) [[unlikely]] {
// if (threadIdx.x == 0) DEVICE_GET_TIME(rx_start);
doca_ret = doca_gpu_dev_eth_rxq_receive_block(
rxq, PACKETS_PER_BLOCK, PACKET_RX_TIMEOUT_NS, &packet_count_received, &packet_offset_received);
if (doca_ret != DOCA_SUCCESS) [[unlikely]]
{
DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1;
return;
}
__threadfence();
if (DOCA_GPUNETIO_VOLATILE(packet_count_received) == 0)
return;

while (pkt_idx < DOCA_GPUNETIO_VOLATILE(packet_count_received))
{
doca_ret =
doca_gpu_dev_eth_rxq_get_buf(rxq, DOCA_GPUNETIO_VOLATILE(packet_offset_received) + pkt_idx, &buf_ptr);
if (doca_ret != DOCA_SUCCESS) [[unlikely]]
{
DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1;
return;
}
__threadfence();
if (DOCA_GPUNETIO_VOLATILE(packet_count_received) == 0)

doca_ret = doca_gpu_dev_buf_get_addr(buf_ptr, &buf_addr);
if (doca_ret != DOCA_SUCCESS) [[unlikely]]
{
DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1;
return;
}

while (pkt_idx < DOCA_GPUNETIO_VOLATILE(packet_count_received)) {
doca_ret = doca_gpu_dev_eth_rxq_get_buf(rxq, DOCA_GPUNETIO_VOLATILE(packet_offset_received) + pkt_idx, &buf_ptr);
if (doca_ret != DOCA_SUCCESS) [[unlikely]] {
DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1;
return;
}

doca_ret = doca_gpu_dev_buf_get_addr(buf_ptr, &buf_addr);
if (doca_ret != DOCA_SUCCESS) [[unlikely]] {
DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1;
return;
}

pkt_info->pkt_addr[pkt_idx] = buf_addr;
if (is_tcp) {
raw_to_tcp(buf_addr, &hdr_tcp, &payload);
pkt_info->pkt_hdr_size[pkt_idx] = TCP_HDR_SIZE;
pkt_info->pkt_pld_size[pkt_idx] = get_payload_tcp_size(hdr_tcp->l3_hdr, hdr_tcp->l4_hdr);
} else {
raw_to_udp(buf_addr, &hdr_udp, &payload);
pkt_info->pkt_hdr_size[pkt_idx] = UDP_HDR_SIZE;
pkt_info->pkt_pld_size[pkt_idx] = get_payload_udp_size(hdr_udp->l3_hdr, hdr_udp->l4_hdr);
}

pkt_idx += blockDim.x;
pkt_info->pkt_addr[pkt_idx] = buf_addr;
if (is_tcp)
{
raw_to_tcp(buf_addr, &hdr_tcp, &payload);
pkt_info->pkt_hdr_size[pkt_idx] = TCP_HDR_SIZE;
pkt_info->pkt_pld_size[pkt_idx] = get_payload_tcp_size(hdr_tcp->l3_hdr, hdr_tcp->l4_hdr);
}
else
{
raw_to_udp(buf_addr, &hdr_udp, &payload);
pkt_info->pkt_hdr_size[pkt_idx] = UDP_HDR_SIZE;
pkt_info->pkt_pld_size[pkt_idx] = get_payload_udp_size(hdr_udp->l3_hdr, hdr_udp->l4_hdr);
}
__syncthreads();

if (threadIdx.x == 0) {
DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = packet_count_received;
DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0;
doca_ret = doca_gpu_dev_semaphore_set_status(sem, sem_idx, DOCA_GPU_SEMAPHORE_STATUS_READY);
if (doca_ret != DOCA_SUCCESS) {
printf("Error %d doca_gpu_dev_semaphore_set_status\n", doca_ret);
DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1;
}

// printf("CUDA rx time %ld proc time %ld pkt conv %ld block reduce %ld\n",
// rx_stop - rx_start,
// pkt_proc - rx_stop,
// reduce_start - rx_stop,
// reduce_stop - reduce_start);

pkt_idx += blockDim.x;
}
__syncthreads();

if (threadIdx.x == 0)
{
DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = packet_count_received;
DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0;
doca_ret = doca_gpu_dev_semaphore_set_status(sem, sem_idx, DOCA_GPU_SEMAPHORE_STATUS_READY);
if (doca_ret != DOCA_SUCCESS)
{
printf("Error %d doca_gpu_dev_semaphore_set_status\n", doca_ret);
DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1;
}
__syncthreads();

// printf("CUDA rx time %ld proc time %ld pkt conv %ld block reduce %ld\n",
// rx_stop - rx_start,
// pkt_proc - rx_stop,
// reduce_start - rx_stop,
// reduce_stop - reduce_start);
}
__syncthreads();

#if RUN_PERSISTENT
// sem_idx = (sem_idx+1)%MAX_SEM_X_QUEUE;

// Get packets' info from next semaphore
// if (threadIdx.x == 0) {
// do {
// doca_ret = doca_gpu_dev_semaphore_get_status(sem, sem_idx, &sem_status);
// if (doca_ret != DOCA_SUCCESS) {
// printf("Error %d doca_gpu_dev_semaphore_get_status\n", doca_ret);
// DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1;
// break;
// }

// if (sem_status == DOCA_GPU_SEMAPHORE_STATUS_FREE) {
// doca_ret = doca_gpu_dev_semaphore_get_custom_info_addr(sem, sem_idx, (void **)&pkt_info);
// if (doca_ret != DOCA_SUCCESS) {
// printf("Error %d doca_gpu_dev_semaphore_get_custom_info_addr\n", doca_ret);
// DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1;
// }

// DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = 0;
// DOCA_GPUNETIO_VOLATILE(pkt_info->payload_size_total_out) = 0;
// DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0;

// break;
// }
// } while (DOCA_GPUNETIO_VOLATILE(*exit_condition) == 0);
// }
// __syncthreads();
// sem_idx = (sem_idx+1)%MAX_SEM_X_QUEUE;

// Get packets' info from next semaphore
// if (threadIdx.x == 0) {
// do {
// doca_ret = doca_gpu_dev_semaphore_get_status(sem, sem_idx, &sem_status);
// if (doca_ret != DOCA_SUCCESS) {
// printf("Error %d doca_gpu_dev_semaphore_get_status\n", doca_ret);
// DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1;
// break;
// }

// if (sem_status == DOCA_GPU_SEMAPHORE_STATUS_FREE) {
// doca_ret = doca_gpu_dev_semaphore_get_custom_info_addr(sem, sem_idx, (void **)&pkt_info);
// if (doca_ret != DOCA_SUCCESS) {
// printf("Error %d doca_gpu_dev_semaphore_get_custom_info_addr\n", doca_ret);
// DOCA_GPUNETIO_VOLATILE(*exit_condition) = 1;
// }

// DOCA_GPUNETIO_VOLATILE(pkt_info->packet_count_out) = 0;
// DOCA_GPUNETIO_VOLATILE(pkt_info->payload_size_total_out) = 0;
// DOCA_GPUNETIO_VOLATILE(packet_count_received) = 0;

// break;
// }
// } while (DOCA_GPUNETIO_VOLATILE(*exit_condition) == 0);
// }
// __syncthreads();
// } while (DOCA_GPUNETIO_VOLATILE(*exit_condition) == 0)

if (threadIdx.x == 0)
doca_gpu_dev_sem_set_status(sem_in, *sem_idx, DOCA_GPU_SEMAPHORE_STATUS_FREE);
// __threadfence();
// __syncthreads();
if (threadIdx.x == 0)
doca_gpu_dev_sem_set_status(sem_in, *sem_idx, DOCA_GPU_SEMAPHORE_STATUS_FREE);
// __threadfence();
// __syncthreads();
#endif
}

namespace morpheus {
namespace doca {

int packet_receive_kernel(doca_gpu_eth_rxq* rxq_0, doca_gpu_eth_rxq* rxq_1,
doca_gpu_semaphore_gpu* sem_0, doca_gpu_semaphore_gpu* sem_1,
uint16_t sem_idx_0, uint16_t sem_idx_1,
bool is_tcp,
uint32_t* exit_condition,
cudaStream_t stream)
int packet_receive_kernel(doca_gpu_eth_rxq* rxq_0,
doca_gpu_eth_rxq* rxq_1,
doca_gpu_semaphore_gpu* sem_0,
doca_gpu_semaphore_gpu* sem_1,
uint16_t sem_idx_0,
uint16_t sem_idx_1,
bool is_tcp,
uint32_t* exit_condition,
cudaStream_t stream)
{
cudaError_t result = cudaSuccess;

_packet_receive_kernel<<<MAX_QUEUE, THREADS_PER_BLOCK, 0, stream>>>(rxq_0, rxq_1, sem_0, sem_1, sem_idx_0, sem_idx_1, is_tcp, exit_condition);
_packet_receive_kernel<<<MAX_QUEUE, THREADS_PER_BLOCK, 0, stream>>>(
rxq_0, rxq_1, sem_0, sem_1, sem_idx_0, sem_idx_1, is_tcp, exit_condition);

/* Check no previous CUDA errors */
result = cudaGetLastError();
if (cudaSuccess != result) {
fprintf(stderr, "[%s:%d] cuda failed with %s\n", __FILE__, __LINE__, cudaGetErrorString(result));
return -1;
}
/* Check no previous CUDA errors */
result = cudaGetLastError();
if (cudaSuccess != result)
{
fprintf(stderr, "[%s:%d] cuda failed with %s\n", __FILE__, __LINE__, cudaGetErrorString(result));
return -1;
}

return 0;
}

} //doca
} //morpheus
} // namespace doca
} // namespace morpheus
Loading

0 comments on commit f88b653

Please sign in to comment.