Skip to content

Commit

Permalink
AIO CPU Locked Tensor (#6592)
Browse files Browse the repository at this point in the history
Restoring the functionality of the cpu locked tensor in the AIO library.
Make async_io operator available for CPU accelerator, i.e., CPU only
environment.

---------

Co-authored-by: Olatunji Ruwase <[email protected]>
  • Loading branch information
jomayeri and tjruwase authored Oct 9, 2024
1 parent 7d751ee commit a1f98bd
Show file tree
Hide file tree
Showing 29 changed files with 362 additions and 196 deletions.
6 changes: 4 additions & 2 deletions accelerator/cpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,9 @@ def get_op_builder(self, class_name):
# is op_builder from deepspeed or a 3p version? this should only succeed if it's deepspeed
# if successful this also means we're doing a local install and not JIT compile path
from op_builder import __deepspeed__ # noqa: F401 # type: ignore
from op_builder.cpu import CCLCommBuilder, ShareMemCommBuilder, FusedAdamBuilder, CPUAdamBuilder, NotImplementedBuilder
from op_builder.cpu import AsyncIOBuilder, CCLCommBuilder, ShareMemCommBuilder, FusedAdamBuilder, CPUAdamBuilder, NotImplementedBuilder
except ImportError:
from deepspeed.ops.op_builder.cpu import CCLCommBuilder, ShareMemCommBuilder, FusedAdamBuilder, CPUAdamBuilder, NotImplementedBuilder
from deepspeed.ops.op_builder.cpu import AsyncIOBuilder, CCLCommBuilder, ShareMemCommBuilder, FusedAdamBuilder, CPUAdamBuilder, NotImplementedBuilder

if class_name == "CCLCommBuilder":
return CCLCommBuilder
Expand All @@ -313,6 +313,8 @@ def get_op_builder(self, class_name):
return FusedAdamBuilder
elif class_name == "CPUAdamBuilder":
return CPUAdamBuilder
elif class_name == "AsyncIOBuilder":
return AsyncIOBuilder
else:
# return a NotImplementedBuilder to avoid get NoneType[Name] in unit tests
return NotImplementedBuilder
Expand Down
43 changes: 21 additions & 22 deletions csrc/aio/common/deepspeed_aio_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ static void _get_aio_latencies(std::vector<std::chrono::duration<double>>& raw_l
std::accumulate(lat_usec.begin(), lat_usec.end(), 0) / lat_usec.size();
}

static void _do_io_submit_singles(const long long int n_iocbs,
const long long int iocb_index,
static void _do_io_submit_singles(const int64_t n_iocbs,
const int64_t iocb_index,
std::unique_ptr<aio_context>& aio_ctxt,
std::vector<std::chrono::duration<double>>& submit_times)
{
Expand All @@ -89,8 +89,8 @@ static void _do_io_submit_singles(const long long int n_iocbs,
}
}

static void _do_io_submit_block(const long long int n_iocbs,
const long long int iocb_index,
static void _do_io_submit_block(const int64_t n_iocbs,
const int64_t iocb_index,
std::unique_ptr<aio_context>& aio_ctxt,
std::vector<std::chrono::duration<double>>& submit_times)
{
Expand All @@ -109,18 +109,18 @@ static void _do_io_submit_block(const long long int n_iocbs,
assert(submit_ret > 0);
}

static int _do_io_complete(const long long int min_completes,
const long long int max_completes,
static int _do_io_complete(const int64_t min_completes,
const int64_t max_completes,
std::unique_ptr<aio_context>& aio_ctxt,
std::vector<std::chrono::duration<double>>& reap_times)
{
const auto start_time = std::chrono::high_resolution_clock::now();
long long int n_completes = io_pgetevents(aio_ctxt->_io_ctxt,
min_completes,
max_completes,
aio_ctxt->_io_events.data(),
nullptr,
nullptr);
int64_t n_completes = io_pgetevents(aio_ctxt->_io_ctxt,
min_completes,
max_completes,
aio_ctxt->_io_events.data(),
nullptr,
nullptr);
reap_times.push_back(std::chrono::high_resolution_clock::now() - start_time);
assert(n_completes >= min_completes);
return n_completes;
Expand All @@ -134,7 +134,7 @@ void do_aio_operation_sequential(const bool read_op,
{
struct io_prep_context prep_ctxt(read_op, xfer_ctxt, aio_ctxt->_block_size, &aio_ctxt->_iocbs);

const auto num_io_blocks = static_cast<long long int>(
const auto num_io_blocks = static_cast<int64_t>(
ceil(static_cast<double>(xfer_ctxt->_num_bytes) / aio_ctxt->_block_size));
#if DEBUG_DS_AIO_PERF
const auto io_op_name = std::string(read_op ? "read" : "write");
Expand All @@ -145,15 +145,14 @@ void do_aio_operation_sequential(const bool read_op,
std::vector<std::chrono::duration<double>> submit_times;
std::vector<std::chrono::duration<double>> reap_times;
const auto max_queue_bytes =
static_cast<long long int>(aio_ctxt->_queue_depth * aio_ctxt->_block_size);
static_cast<int64_t>(aio_ctxt->_queue_depth * aio_ctxt->_block_size);

auto start = std::chrono::high_resolution_clock::now();
for (long long iocb_index = 0; iocb_index < num_io_blocks;
iocb_index += aio_ctxt->_queue_depth) {
for (int64_t iocb_index = 0; iocb_index < num_io_blocks; iocb_index += aio_ctxt->_queue_depth) {
const auto start_offset = iocb_index * aio_ctxt->_block_size;
const auto start_buffer = (char*)xfer_ctxt->_mem_buffer + start_offset;
const auto n_iocbs =
min(static_cast<long long>(aio_ctxt->_queue_depth), (num_io_blocks - iocb_index));
min(static_cast<int64_t>(aio_ctxt->_queue_depth), (num_io_blocks - iocb_index));
const auto num_bytes = min(max_queue_bytes, (xfer_ctxt->_num_bytes - start_offset));
prep_ctxt.prep_iocbs(n_iocbs, num_bytes, start_buffer, start_offset);

Expand Down Expand Up @@ -285,13 +284,13 @@ int open_file(const char* filename, const bool read_op)

int regular_read(const char* filename, std::vector<char>& buffer)
{
long long int num_bytes;
int64_t num_bytes;
const auto f_size = get_file_size(filename, num_bytes);
assert(f_size != -1);
buffer.resize(num_bytes);
const auto fd = open(filename, O_RDONLY, 0600);
assert(fd != -1);
long long int read_bytes = 0;
int64_t read_bytes = 0;
auto r = 0;
do {
const auto buffer_ptr = buffer.data() + read_bytes;
Expand All @@ -309,23 +308,23 @@ int regular_read(const char* filename, std::vector<char>& buffer)
return 0;
}

static bool _validate_buffer(const char* filename, void* aio_buffer, const long long int num_bytes)
static bool _validate_buffer(const char* filename, void* aio_buffer, const int64_t num_bytes)
{
std::vector<char> regular_buffer;
const auto reg_ret = regular_read(filename, regular_buffer);
assert(0 == reg_ret);
std::cout << "regular read of " << filename << " returned " << regular_buffer.size() << " bytes"
<< std::endl;

if (static_cast<long long int>(regular_buffer.size()) != num_bytes) { return false; }
if (static_cast<int64_t>(regular_buffer.size()) != num_bytes) { return false; }

return (0 == memcmp(aio_buffer, regular_buffer.data(), regular_buffer.size()));
}

bool validate_aio_operation(const bool read_op,
const char* filename,
void* aio_buffer,
const long long int num_bytes)
const int64_t num_bytes)
{
const auto msg_suffix = std::string("deepspeed_aio_") +
std::string(read_op ? "read()" : "write()") +
Expand Down
2 changes: 1 addition & 1 deletion csrc/aio/common/deepspeed_aio_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ int regular_read(const char* filename, std::vector<char>& buffer);
bool validate_aio_operation(const bool read_op,
const char* filename,
void* aio_buffer,
const long long int num_bytes);
const int64_t num_bytes);
18 changes: 9 additions & 9 deletions csrc/aio/common/deepspeed_aio_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ const int c_block_size = 128 * 1024;
const int c_io_queue_depth = 8;

io_xfer_ctxt::io_xfer_ctxt(const int fd,
const long long int file_offset,
const long long int num_bytes,
const int64_t file_offset,
const int64_t num_bytes,
const void* buffer)
: _fd(fd), _base_offset(file_offset), _mem_buffer(buffer), _num_bytes(num_bytes)
{
Expand All @@ -36,7 +36,7 @@ io_prep_context::io_prep_context(const bool read_op,
void io_prep_context::prep_iocbs(const int n_iocbs,
const size_t num_bytes,
const void* start_buffer,
const long long int start_offset)
const int64_t start_offset)
{
assert(static_cast<size_t>(n_iocbs) <= _iocbs->size());
for (auto i = 0; i < n_iocbs; ++i) {
Expand Down Expand Up @@ -64,24 +64,24 @@ io_prep_generator::io_prep_generator(const bool read_op,
_next_iocb_index(0)
{
_num_io_blocks =
static_cast<long long int>(ceil(static_cast<double>(xfer_ctxt->_num_bytes) / block_size));
static_cast<int64_t>(ceil(static_cast<double>(xfer_ctxt->_num_bytes) / block_size));
_remaining_io_blocks = _num_io_blocks;
}

int io_prep_generator::prep_iocbs(const int n_iocbs, std::vector<struct iocb*>* iocbs)
{
if ((_remaining_bytes) == 0 || (_remaining_io_blocks == 0)) {
assert(static_cast<long long int>(_remaining_bytes) == _remaining_io_blocks);
assert(static_cast<int64_t>(_remaining_bytes) == _remaining_io_blocks);
return 0;
}

assert(static_cast<size_t>(n_iocbs) <= iocbs->size());

auto actual_n_iocbs = min(static_cast<long long int>(n_iocbs), _remaining_io_blocks);
auto actual_n_iocbs = min(static_cast<int64_t>(n_iocbs), _remaining_io_blocks);
for (auto i = 0; i < actual_n_iocbs; ++i, ++_next_iocb_index) {
const auto xfer_offset = _xfer_ctxt->_base_offset + (_next_iocb_index * _block_size);
const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + xfer_offset;
const auto num_bytes = min(static_cast<long long int>(_block_size), _remaining_bytes);
const auto num_bytes = min(static_cast<int64_t>(_block_size), _remaining_bytes);

if (_read_op) {
io_prep_pread(iocbs->at(i), _xfer_ctxt->_fd, xfer_buffer, num_bytes, xfer_offset);
Expand All @@ -95,15 +95,15 @@ int io_prep_generator::prep_iocbs(const int n_iocbs, std::vector<struct iocb*>*
return actual_n_iocbs;
}

int get_file_size(const char* filename, long long int& size)
int get_file_size(const char* filename, int64_t& size)
{
struct stat st;
if (stat(filename, &st) == -1) { return -1; }
size = st.st_size;
return 0;
}

void* ds_page_aligned_alloc(const size_t size, const bool lock)
void* ds_page_aligned_alloc(const int64_t size, const bool lock)
{
void* ptr;
int retval;
Expand Down
22 changes: 11 additions & 11 deletions csrc/aio/common/deepspeed_aio_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices.

struct io_xfer_ctxt {
const int _fd;
const long long int _base_offset;
const int64_t _base_offset;
const void* _mem_buffer;
const long long int _num_bytes;
const int64_t _num_bytes;

io_xfer_ctxt(const int fd,
const long long int file_offset,
const long long int num_bytes,
const int64_t file_offset,
const int64_t num_bytes,
const void* buffer);
};

Expand All @@ -54,18 +54,18 @@ struct io_prep_context {
void prep_iocbs(const int n_iocbs,
const size_t num_bytes,
const void* start_buffer,
const long long int start_offset);
const int64_t start_offset);
};

struct io_prep_generator {
const bool _read_op;
const std::unique_ptr<io_xfer_ctxt>& _xfer_ctxt;
const size_t _block_size;

long long int _remaining_bytes;
long long int _num_io_blocks;
long long int _remaining_io_blocks;
long long int _next_iocb_index;
int64_t _remaining_bytes;
int64_t _num_io_blocks;
int64_t _remaining_io_blocks;
int64_t _next_iocb_index;

io_prep_generator(const bool read_op,
const std::unique_ptr<io_xfer_ctxt>& xfer_ctxt,
Expand All @@ -74,6 +74,6 @@ struct io_prep_generator {
int prep_iocbs(const int n_iocbs, std::vector<struct iocb*>* iocbs);
};

void* ds_page_aligned_alloc(const size_t size, const bool lock = false);
void* ds_page_aligned_alloc(const int64_t size, const bool lock = false);

int get_file_size(const char* filename, long long int& size);
int get_file_size(const char* filename, int64_t& size);
8 changes: 4 additions & 4 deletions csrc/aio/py_lib/deepspeed_aio_op_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ io_op_desc_t::io_op_desc_t(const bool read_op,
const torch::Tensor& buffer,
const int fd,
const char* filename,
const long long int file_num_bytes,
const int num_threads,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate)
: _read_op(read_op),
_buffer(buffer),
_fd(fd),
_filename(filename),
_file_num_bytes(file_num_bytes),
_num_threads(num_threads),
_num_bytes_per_thread(file_num_bytes / num_threads),
_intra_op_parallelism(intra_op_parallelism),
_num_bytes_per_thread(file_num_bytes / intra_op_parallelism),
_validate(validate)
{
}
Expand Down
10 changes: 5 additions & 5 deletions csrc/aio/py_lib/deepspeed_aio_op_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ struct io_op_desc_t {
torch::Tensor _buffer;
int _fd;
const std::string _filename;
const long long int _file_num_bytes;
const int _num_threads;
const long long int _num_bytes_per_thread;
const int64_t _file_num_bytes;
const int _intra_op_parallelism;
const int64_t _num_bytes_per_thread;
torch::Tensor _contiguous_buffer;
const bool _validate;

io_op_desc_t(const bool read_op,
const torch::Tensor& buffer,
const int fd,
const char* filename,
const long long int file_num_bytes,
const int num_threads,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate);

virtual void run(const int tid,
Expand Down
Loading

0 comments on commit a1f98bd

Please sign in to comment.