Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tensor cufile wip #264

Open
wants to merge 3 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions lib/nnc/ccv_nnc_tensor.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,27 @@ ccv_nnc_tensor_t* ccv_nnc_tensor_new_from_file(const ccv_nnc_tensor_param_t para
assert(CCV_TENSOR_GET_DEVICE(params.type) != CCV_COMPUTE_DEVICE_ANY);
if (size > 0)
{
fprintf(stderr, "cuDirectFileReadAsync !! size:%zu\n", size);

// This is not supported yet on CUDA.
tensor->data.u8 = (uint8_t*)cumalloc(CCV_TENSOR_GET_DEVICE_ID(params.type), size);
int fd = open(filename, O_RDONLY, 0);
void* bufptr = mmap(0, size, PROT_READ, MAP_PRIVATE, fd, offset);
close(fd);
madvise(bufptr, size, MADV_SEQUENTIAL | MADV_WILLNEED);
cumemcpy(tensor->data.u8, CCV_TENSOR_GPU_MEMORY, bufptr, CCV_TENSOR_CPU_MEMORY, size);
munmap(bufptr, size);
// tensor->data.u8 = (uint8_t*)cumalloc(CCV_TENSOR_GET_DEVICE_ID(params.type), size);
// int fd = open(filename, O_RDONLY, 0);
// void* bufptr = mmap(0, size, PROT_READ, MAP_PRIVATE, fd, offset);
// close(fd);
// madvise(bufptr, size, MADV_SEQUENTIAL | MADV_WILLNEED);
// cumemcpy(tensor->data.u8, CCV_TENSOR_GPU_MEMORY, bufptr, CCV_TENSOR_CPU_MEMORY, size);
// munmap(bufptr, size);

// async:
// cudaStream_t stream = cuSharedFileIOStream();

ccv_nnc_cuda_file_entry file_entry = ccv_nnc_get_file_entry(filename);
// Open the file using cuFile
tensor->data.u8 = (uint8_t*)cuDirectFileReadAsync(CCV_TENSOR_GET_DEVICE_ID(params.type), size, filename, offset, cuSharedFileIOStream(), file_entry.file_handle, file_entry.file_descr);

// sync:
// tensor->data.u8 = (uint8_t*)cuDirectFileRead(CCV_TENSOR_GET_DEVICE_ID(params.type), size, filename, offset);

} else
tensor->data.u8 = 0;
} else {
Expand Down
162 changes: 162 additions & 0 deletions lib/nnc/gpu/ccv_nnc_compat.cu
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,72 @@ extern "C" {
#include <nnc/_ccv_nnc_stream.h>
#include "3rdparty/khash/khash.h"
}
#include <fcntl.h>
#include <sys/mman.h>

static void cutrigmp(void);
static ccv_nnc_cuda_file_entry file_map[MAX_FILES];

ccv_nnc_cuda_file_entry ccv_nnc_get_file_entry(const char* filename) {
for (int i = 0; i < MAX_FILES; i++) {
if (file_map[i].is_used && strcmp(file_map[i].filename, filename) == 0) {
fprintf(stderr, "hit cache register file handle for: %s\n", filename);
return file_map[i];
}
}

for (int i = 0; i < MAX_FILES; i++) {
if (!file_map[i].is_used) {
memset(&file_map[i], 0, sizeof(ccv_nnc_cuda_file_entry));
strncpy(file_map[i].filename, filename, sizeof(file_map[i].filename) - 1);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strlen(filename) + 1


file_map[i].file_descr.handle.fd = open(filename, O_RDONLY | O_DIRECT, 0);
if (file_map[i].file_descr.handle.fd < 0) {
fprintf(stderr, "Failed to open file: %s\n", filename);
exit(1);
}
file_map[i].file_descr.type = CU_FILE_HANDLE_TYPE_OPAQUE_FD;

CUfileError_t status = cuFileHandleRegister(&file_map[i].file_handle, &file_map[i].file_descr);
if (status.err != CU_FILE_SUCCESS) {
fprintf(stderr, "Failed to register file handle for: %s\n", filename);
close(file_map[i].file_descr.handle.fd);
exit(1);
}

file_map[i].is_used = 1;
return file_map[i];
}
}

fprintf(stderr, "File map is full. Cannot open additional files.\n");
exit(1);
}

void ccv_nnc_cleanup_all_file_entries() {
for (int i = 0; i < MAX_FILES; i++) {
if (file_map[i].is_used) {
cuFileHandleDeregister(file_map[i].file_handle);
close(file_map[i].file_descr.handle.fd);
file_map[i].is_used = 0;
}
}
}

void ccv_nnc_remove_cuda_file_entry(const char* filename) {
for (int i = 0; i < MAX_FILES; i++) {
if (file_map[i].is_used && strcmp(file_map[i].filename, filename) == 0) {

cuFileHandleDeregister(file_map[i].file_handle);
close(file_map[i].file_descr.handle.fd);
file_map[i].is_used = 0;
memset(file_map[i].filename, 0, sizeof(file_map[i].filename));
return;
}
}

fprintf(stderr, "File entry for %s not found.\n", filename);
}

#ifdef HAVE_CUDNN
struct cudnn_free_list_s {
Expand Down Expand Up @@ -193,6 +257,104 @@ void* cumalloc(int device, size_t size)
return ptr;
}

static cudaStream_t g_stream = NULL;

cudaStream_t cuSharedFileIOStream(void) {
if (g_stream == NULL) {
if (cudaStreamCreate(&g_stream) != cudaSuccess) {
return NULL;
}
}
return g_stream;
}

cudaError_t cuSharedStreamSync(void) {
if (g_stream != NULL) {
return cudaStreamSynchronize(g_stream);
}
return cudaSuccess;
}

void* cuDirectFileReadAsync(int device, size_t size, const char* const filename, const off_t offset, cudaStream_t stream, CUfileHandle_t file_handle, CUfileDescr_t file_descr)
{
CUfileError_t status;
void* ptr = NULL;

// Allocate GPU memory
ptr = (void*)cumalloc(device, size);
if (!ptr) {
return NULL;
}

// Prepare parameters for async read
static size_t size_to_read;
static off_t file_offset;
static off_t buffer_offset;
static ssize_t bytes_read;

size_to_read = size;
file_offset = offset;
buffer_offset = 0;
bytes_read = 0;

// Initiate asynchronous read
status = cuFileReadAsync(file_handle,
ptr,
&size_to_read,
&file_offset,
&buffer_offset,
&bytes_read,
stream);

if (status.err != CU_FILE_SUCCESS) {
cufree(device, ptr);
return NULL;
}

return ptr;
}

void* cuDirectFileRead(int device, size_t size, const char* const filename, const off_t offset)
{
CUfileError_t status;

// Allocate GPU memory
void* ptr = (void*)cumalloc(device, size);

// Open the file using cuFile
CUfileHandle_t file_handle;
CUfileDescr_t file_descr;
memset(&file_descr, 0, sizeof(CUfileDescr_t));
file_descr.handle.fd = open(filename, O_RDONLY | O_DIRECT, 0);
file_descr.type = CU_FILE_HANDLE_TYPE_OPAQUE_FD;

status = cuFileHandleRegister(&file_handle, &file_descr);
if (status.err != CU_FILE_SUCCESS) {
close(file_descr.handle.fd);
exit(1);
}

// Read the file directly into GPU memory
ssize_t bytes_read = cuFileRead(file_handle, ptr, size, offset, 0);

if (bytes_read < 0 || (size_t)bytes_read != size) {
cuFileHandleDeregister(file_handle);
close(file_descr.handle.fd);
exit(1);
}

// Clean up
cuFileHandleDeregister(file_handle);
close(file_descr.handle.fd);
return ptr;
}

void cuFileWaitOnStreamIfNotReady(cudaStream_t stream)
{
fprintf(stderr, "cuSharedFileIOStream: %p\n", stream);
cudaStreamSynchronize(stream);
}

void cufree(int device, void* ptr)
{
CUDA_ENFORCE(cudaSetDevice(device));
Expand Down
20 changes: 20 additions & 0 deletions lib/nnc/gpu/ccv_nnc_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ extern "C" {
#endif
#include "nnc/ccv_nnc.h"
#include "nnc/_ccv_nnc_stream.h"
#include <cuda_runtime.h>
#include <cufile.h> // For GPUDirect Storage

// Simple counterparts of ccmalloc / ccfree.
void* cumalloc(int device, size_t size);
Expand All @@ -34,6 +36,24 @@ int curegmp(int device_id, cump_f func, void* const context); // register memory
void cuunregmp(const int id); // un-register memory pressure handler.
void cusetprofiler(int state);

void* cuDirectFileRead(int device, size_t size, const char* const filename, const off_t offset);
void* cuDirectFileReadAsync(int device, size_t size, const char* const filename, const off_t offset, cudaStream_t stream, CUfileHandle_t file_handle, CUfileDescr_t file_descr);
cudaStream_t cuSharedFileIOStream(void);
cudaError_t cuSharedStreamSync(void);

#define MAX_FILES 100

typedef struct {
char filename[256];
CUfileHandle_t file_handle;
CUfileDescr_t file_descr;
int is_used;
} ccv_nnc_cuda_file_entry;

ccv_nnc_cuda_file_entry ccv_nnc_get_file_entry(const char* filename);
void ccv_nnc_cleanup_all_file_entries();
void ccv_nnc_remove_cuda_file_entry(const char* filename);

// Stream context
CCV_WARN_UNUSED(ccv_nnc_stream_context_t*) ccv_nnc_init_stream_context(ccv_nnc_stream_context_t* const stream_context);
void ccv_nnc_synchronize_stream_context(const ccv_nnc_stream_context_t* const stream_context);
Expand Down
26 changes: 26 additions & 0 deletions test/int/nnc/tensor.tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
#include "case.h"
#include "ccv_case.h"
#include "ccv_nnc_case.h"
#include "nnc/ccv_nnc_internal.h"
#include "nnc/ccv_nnc.h"
#include "nnc/ccv_nnc_easy.h"
#include "3rdparty/sqlite3/sqlite3.h"
#include "3rdparty/dsfmt/dSFMT.h"
#ifdef HAVE_CUDA
#include "nnc/gpu/ccv_nnc_compat.h"
#endif

TEST_SETUP()
{
Expand Down Expand Up @@ -64,16 +68,38 @@ TEST_CASE("tensor mapped from file")
ccv_nnc_tensor_t* one_gpu = ccv_nnc_tensor_new(0, GPU_TENSOR_NHWC(000, 32F, 1), 0);
ccv_nnc_cmd_exec(CMD_DATA_TRANSFER_FORWARD(), ccv_nnc_no_hint, 0, TENSOR_LIST(one), TENSOR_LIST(one_gpu), 0);
ccv_nnc_tensor_t* tensor_a = ccv_nnc_tensor_new_from_file(GPU_TENSOR_NHWC(000, 32F, 5), "tensor.bin", 0, 0);
cuSharedStreamSync();
// cudaStreamSynchronize(stream);

ccv_nnc_tensor_t* a_result = ccv_nnc_tensor_new(0, GPU_TENSOR_NHWC(000, 32F, 5), 0);
ccv_nnc_cmd_exec(CMD_ADD_FORWARD(0.5, 0.2), ccv_nnc_no_hint, 0, TENSOR_LIST(tensor_a, one_gpu), TENSOR_LIST(a_result), 0);
float a[] = {1 * 0.5 + 0.2, 2 * 0.5 + 0.2, 3 * 0.5 + 0.2, 4 * 0.5 + 0.2, 5 * 0.5 + 0.2};
ccv_nnc_tensor_t* tensor_b = ccv_nnc_tensor_new_from_file(GPU_TENSOR_NHWC(000, 32F, 4), "tensor.bin", (4096 * 4 * 4), 0);
cuSharedStreamSync();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync here 4/10 pass test...


// cudaStreamSynchronize(stream);

ccv_nnc_tensor_t* b_result = ccv_nnc_tensor_new(0, GPU_TENSOR_NHWC(000, 32F, 4), 0);
ccv_nnc_cmd_exec(CMD_ADD_FORWARD(1, 1), ccv_nnc_no_hint, 0, TENSOR_LIST(tensor_b, one_gpu), TENSOR_LIST(b_result), 0);
float b[] = {4096 * 4 + 1 + 1, 4096 * 4 + 2 + 1, 4096 * 4 + 3 + 1, 4096 * 4 + 4 + 1};
ccv_nnc_tensor_t* at = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 5), 0);
ccv_nnc_tensor_t* bt = ccv_nnc_tensor_new(0, CPU_TENSOR_NHWC(32F, 4), 0);
ccv_nnc_cmd_exec(CMD_DATA_TRANSFER_FORWARD(), ccv_nnc_no_hint, 0, TENSOR_LIST(a_result, b_result), TENSOR_LIST(at, bt), 0);
fprintf(stderr, "\n");
fprintf(stderr, "\nat:\n");
for (i = 0; i < 5; i++)
fprintf(stderr, " %f", at->data.f32[i]);
fprintf(stderr, "\na:\n");
for (i = 0; i < 5; i++)
fprintf(stderr, " %f", a[i]);
fprintf(stderr, "\nbt:\n");
for (i = 0; i < 4; i++)
fprintf(stderr, " %f", bt->data.f32[i]);
fprintf(stderr, "\nb:\n");
for (i = 0; i < 4; i++)
fprintf(stderr, " %f", b[i]);
fprintf(stderr, "\n");

REQUIRE_ARRAY_EQ_WITH_TOLERANCE(float, at->data.f32, a, 5, 1e-5, "the first 5 element should be equal");
REQUIRE_ARRAY_EQ_WITH_TOLERANCE(float, bt->data.f32, b, 4, 1e-5, "the first 4 element should be equal");
ccv_nnc_tensor_free(tensor_a);
Expand Down