Skip to content

Commit

Permalink
AIO File Offsets (#6641)
Browse files Browse the repository at this point in the history
Adding the option for a file offset to the read/write functions of AIO &
GDS ops.

---------

Co-authored-by: jomayeri <deepspeed@H100-VM2.shlnn55tgwve1eacvp21ie45dg.jx.internal.cloudapp.net>
Co-authored-by: Masahiro Tanaka <[email protected]>
Co-authored-by: Olatunji Ruwase <[email protected]>
Co-authored-by: Logan Adams <[email protected]>
  • Loading branch information
5 people authored Nov 12, 2024
1 parent 7af3a4b commit b692cde
Show file tree
Hide file tree
Showing 21 changed files with 342 additions and 101 deletions.
2 changes: 2 additions & 0 deletions accelerator/cpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def device_count(self):
# In flat mode, HBM is in separate NUMA node with no cores on this node.
# Ignore these NUMA nodes with no cores.
numa_core_lists = get_numa_cores()
if not numa_core_lists:
return 1
numa_count = 0
prev_core_list = []
for core_list in numa_core_lists:
Expand Down
18 changes: 12 additions & 6 deletions csrc/aio/common/deepspeed_aio_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ const int c_io_queue_depth = 8;

io_xfer_ctxt::io_xfer_ctxt(const int fd,
const int64_t file_offset,
const int64_t buffer_offset,
const int64_t num_bytes,
const void* buffer)
: _fd(fd), _base_offset(file_offset), _mem_buffer(buffer), _num_bytes(num_bytes)
: _fd(fd),
_file_base_offset(file_offset),
_buffer_base_offset(buffer_offset),
_mem_buffer(buffer),
_num_bytes(num_bytes)
{
}

Expand All @@ -41,9 +46,10 @@ void io_prep_context::prep_iocbs(const int n_iocbs,
assert(static_cast<size_t>(n_iocbs) <= _iocbs->size());
for (auto i = 0; i < n_iocbs; ++i) {
const auto shift = i * _block_size;
const auto xfer_buffer = (char*)start_buffer + _xfer_ctxt->_base_offset + shift;
const auto xfer_offset = _xfer_ctxt->_base_offset + start_offset + shift;
const auto xfer_buffer = (char*)start_buffer + _xfer_ctxt->_buffer_base_offset + shift;
const auto xfer_offset = _xfer_ctxt->_file_base_offset + start_offset + shift;
auto byte_count = _block_size;

if ((shift + _block_size) > num_bytes) { byte_count = num_bytes - shift; }

if (_read_op) {
Expand Down Expand Up @@ -79,10 +85,10 @@ int io_prep_generator::prep_iocbs(const int n_iocbs, std::vector<struct iocb*>*

auto actual_n_iocbs = min(static_cast<int64_t>(n_iocbs), _remaining_io_blocks);
for (auto i = 0; i < actual_n_iocbs; ++i, ++_next_iocb_index) {
const auto xfer_offset = _xfer_ctxt->_base_offset + (_next_iocb_index * _block_size);
const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + xfer_offset;
const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + _xfer_ctxt->_buffer_base_offset +
(_next_iocb_index * _block_size);
const auto xfer_offset = _xfer_ctxt->_file_base_offset + (_next_iocb_index * _block_size);
const auto num_bytes = min(static_cast<int64_t>(_block_size), _remaining_bytes);

if (_read_op) {
io_prep_pread(iocbs->at(i), _xfer_ctxt->_fd, xfer_buffer, num_bytes, xfer_offset);
} else {
Expand Down
4 changes: 3 additions & 1 deletion csrc/aio/common/deepspeed_aio_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices.

struct io_xfer_ctxt {
const int _fd;
const int64_t _base_offset;
const int64_t _file_base_offset;
const int64_t _buffer_base_offset;
const void* _mem_buffer;
const int64_t _num_bytes;

io_xfer_ctxt(const int fd,
const int64_t file_offset,
const int64_t buffer_offset,
const int64_t num_bytes,
const void* buffer);
};
Expand Down
6 changes: 4 additions & 2 deletions csrc/aio/py_lib/deepspeed_aio_op_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ io_op_desc_t::io_op_desc_t(const bool read_op,
const char* filename,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate)
const bool validate,
const int64_t file_offset)
: _read_op(read_op),
_buffer(buffer),
_fd(fd),
_filename(filename),
_file_num_bytes(file_num_bytes),
_file_offset(file_offset),
_intra_op_parallelism(intra_op_parallelism),
_num_bytes_per_thread(file_num_bytes / intra_op_parallelism),
_num_bytes_per_thread(static_cast<int64_t>(buffer.nbytes()) / intra_op_parallelism),
_validate(validate)
{
}
Expand Down
4 changes: 3 additions & 1 deletion csrc/aio/py_lib/deepspeed_aio_op_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ struct io_op_desc_t {
const int64_t _num_bytes_per_thread;
torch::Tensor _contiguous_buffer;
const bool _validate;
const int64_t _file_offset;

io_op_desc_t(const bool read_op,
const torch::Tensor& buffer,
const int fd,
const char* filename,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate);
const bool validate,
const int64_t file_offset);

virtual void run(const int tid,
std::unique_ptr<aio_context>& aio_ctxt,
Expand Down
19 changes: 14 additions & 5 deletions csrc/aio/py_lib/deepspeed_cpu_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,16 @@ cpu_op_desc_t::cpu_op_desc_t(
const char* filename,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate)
: io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, intra_op_parallelism, validate),
const bool validate,
const int64_t file_offset)
: io_op_desc_t(read_op,
buffer,
fd,
filename,
file_num_bytes,
intra_op_parallelism,
validate,
file_offset),
_cpu_buffer(buffer),
_pinned_tensor_mgr(pinned_tensor_mgr),
_is_managed_bounce_buffer(false)
Expand Down Expand Up @@ -66,10 +74,11 @@ void cpu_op_desc_t::run(const int tid,
deepspeed_aio_config_t* aio_config)
{
assert(tid < _intra_op_parallelism);
const auto base_offset = _num_bytes_per_thread * tid;
const auto buffer_base_offset = _num_bytes_per_thread * tid;
const auto file_base_offset = _file_offset + (_num_bytes_per_thread * tid);

std::unique_ptr<io_xfer_ctxt> xfer_ctxt(
new io_xfer_ctxt(_fd, base_offset, _num_bytes_per_thread, data_ptr()));
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(new io_xfer_ctxt(
_fd, file_base_offset, buffer_base_offset, _num_bytes_per_thread, data_ptr()));

if (aio_config->_overlap_events) {
do_aio_operation_overlap(_read_op, aio_ctxt, xfer_ctxt, aio_config, nullptr);
Expand Down
3 changes: 2 additions & 1 deletion csrc/aio/py_lib/deepspeed_cpu_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ struct cpu_op_desc_t : io_op_desc_t {
const char* filename,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate);
const bool validate,
const int64_t file_offset);

void run(const int tid,
std::unique_ptr<aio_context>& aio_ctxt,
Expand Down
7 changes: 5 additions & 2 deletions csrc/aio/py_lib/deepspeed_py_aio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ int deepspeed_py_aio_write(const torch::Tensor& buffer,

auto write_buffer = (char*)buffer.data_ptr();
const auto num_write_bytes = static_cast<int64_t>(buffer.nbytes());
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(new io_xfer_ctxt(fd, 0, num_write_bytes, write_buffer));

std::unique_ptr<io_xfer_ctxt> xfer_ctxt(
new io_xfer_ctxt(fd, 0, 0, num_write_bytes, write_buffer));
std::unique_ptr<aio_context> aio_ctxt(new aio_context(config._block_size, config._queue_depth));

if (config._overlap_events) {
Expand Down Expand Up @@ -97,7 +99,8 @@ int deepspeed_py_aio_read(torch::Tensor& buffer,
auto read_buffer = (char*)buffer.data_ptr();
assert(static_cast<int64_t>(buffer.nbytes()) == num_file_bytes);

std::unique_ptr<io_xfer_ctxt> xfer_ctxt(new io_xfer_ctxt(fd, 0, num_file_bytes, read_buffer));
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(
new io_xfer_ctxt(fd, 0, 0, num_file_bytes, read_buffer));
std::unique_ptr<aio_context> aio_ctxt(new aio_context(config._block_size, config._queue_depth));

if (config._overlap_events) {
Expand Down
65 changes: 41 additions & 24 deletions csrc/aio/py_lib/deepspeed_py_io_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ const bool deepspeed_io_handle_t::get_overlap_events() const { return _overlap_e

const int deepspeed_io_handle_t::get_intra_op_parallelism() const { return _intra_op_parallelism; }

int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, const bool validate)
int deepspeed_io_handle_t::read(torch::Tensor& buffer,
const char* filename,
const bool validate,
const int64_t file_offset)
{
const auto start_time = std::chrono::high_resolution_clock::now();

Expand All @@ -76,7 +79,8 @@ int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, con
if (fd == -1) { return -1; }

auto read_buffer = (char*)buffer.data_ptr();
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(new io_xfer_ctxt(fd, 0, num_file_bytes, read_buffer));
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(
new io_xfer_ctxt(fd, file_offset, 0, num_file_bytes, read_buffer));

if (_aio_config._overlap_events) {
do_aio_operation_overlap(true, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr);
Expand All @@ -98,7 +102,8 @@ int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, con

int deepspeed_io_handle_t::write(const torch::Tensor& buffer,
const char* filename,
const bool validate)
const bool validate,
const int64_t file_offset)
{
assert(_aio_ctxt);

Expand All @@ -109,7 +114,8 @@ int deepspeed_io_handle_t::write(const torch::Tensor& buffer,

auto write_buffer = (char*)buffer.data_ptr();
const auto num_write_bytes = static_cast<int64_t>(buffer.nbytes());
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(new io_xfer_ctxt(fd, 0, num_write_bytes, write_buffer));
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(
new io_xfer_ctxt(fd, file_offset, 0, num_write_bytes, write_buffer));

if (_aio_config._overlap_events) {
do_aio_operation_overlap(false, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr);
Expand Down Expand Up @@ -206,7 +212,8 @@ std::shared_ptr<struct io_op_desc_t> deepspeed_io_handle_t::_create_io_op_desc(
const int fd,
const char* filename,
const int64_t file_num_bytes,
const bool validate)
const bool validate,
const int64_t file_offset)
{
return std::make_shared<cpu_op_desc_t>(read_op,
buffer,
Expand All @@ -215,34 +222,34 @@ std::shared_ptr<struct io_op_desc_t> deepspeed_io_handle_t::_create_io_op_desc(
filename,
file_num_bytes,
_intra_op_parallelism,
validate);
validate,
file_offset);
}

int deepspeed_io_handle_t::pread(const torch::Tensor& buffer,
const char* filename,
const bool validate,
const bool async)
const bool async,
const int64_t file_offset)
{
int64_t num_file_bytes;
if (-1 == get_file_size(filename, num_file_bytes)) {
const auto error_code = errno;
report_file_error(filename, " fstat for read", error_code);
return -1;
}

// buffer can exceed file size to enable 4k alignment
const auto buffer_bytes = static_cast<int64_t>(buffer.nbytes());
if (buffer_bytes != num_file_bytes) {
std::cout << filename << ": buffer nbytes != file bytes " << buffer_bytes
<< " != " << num_file_bytes << std::endl;
}
assert(buffer_bytes == num_file_bytes);
assert((num_file_bytes % _intra_op_parallelism) == 0);

if (!_is_valid_parallel_aio_op(true, num_file_bytes)) { return -1; }
if (!_is_valid_parallel_aio_op(true, buffer_bytes)) { return -1; }

const auto fd = open_file(filename, true);
if (fd == -1) { return -1; }

auto scheduled_op = _create_io_op_desc(true, buffer, fd, filename, num_file_bytes, validate);
auto scheduled_op =
_create_io_op_desc(true, buffer, fd, filename, num_file_bytes, validate, file_offset);

_schedule_aio_work(scheduled_op);

Expand All @@ -254,7 +261,8 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer,
int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer,
const char* filename,
const bool validate,
const bool async)
const bool async,
const int64_t file_offset)
{
const auto num_write_bytes = static_cast<int64_t>(buffer.nbytes());
assert((num_write_bytes % _intra_op_parallelism) == 0);
Expand All @@ -264,7 +272,8 @@ int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer,
const auto fd = open_file(filename, false);
if (fd == -1) { return -1; }

auto scheduled_op = _create_io_op_desc(false, buffer, fd, filename, num_write_bytes, validate);
auto scheduled_op =
_create_io_op_desc(false, buffer, fd, filename, num_write_bytes, validate, file_offset);

_schedule_aio_work(scheduled_op);

Expand All @@ -273,24 +282,32 @@ int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer,
return wait();
}

int deepspeed_io_handle_t::sync_pread(torch::Tensor& buffer, const char* filename)
int deepspeed_io_handle_t::sync_pread(torch::Tensor& buffer,
const char* filename,
const int64_t file_offset)
{
return pread(buffer, filename, false, false);
return pread(buffer, filename, false, false, file_offset);
}

int deepspeed_io_handle_t::sync_pwrite(const torch::Tensor& buffer, const char* filename)
int deepspeed_io_handle_t::sync_pwrite(const torch::Tensor& buffer,
const char* filename,
const int64_t file_offset)
{
return pwrite(buffer, filename, false, false);
return pwrite(buffer, filename, false, false, file_offset);
}

int deepspeed_io_handle_t::async_pread(torch::Tensor& buffer, const char* filename)
int deepspeed_io_handle_t::async_pread(torch::Tensor& buffer,
const char* filename,
const int64_t file_offset)
{
return pread(buffer, filename, false, true);
return pread(buffer, filename, false, true, file_offset);
}

int deepspeed_io_handle_t::async_pwrite(const torch::Tensor& buffer, const char* filename)
int deepspeed_io_handle_t::async_pwrite(const torch::Tensor& buffer,
const char* filename,
const int64_t file_offset)
{
return pwrite(buffer, filename, false, true);
return pwrite(buffer, filename, false, true, file_offset);
}

at::Tensor deepspeed_io_handle_t::new_cpu_locked_tensor(const int64_t num_elem,
Expand Down
27 changes: 18 additions & 9 deletions csrc/aio/py_lib/deepspeed_py_io_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,35 @@ struct deepspeed_io_handle_t {
const bool get_overlap_events() const;
const int get_intra_op_parallelism() const;

int read(torch::Tensor& buffer, const char* filename, const bool validate);
int read(torch::Tensor& buffer,
const char* filename,
const bool validate,
const int64_t file_offset);

int write(const torch::Tensor& buffer, const char* filename, const bool validate);
int write(const torch::Tensor& buffer,
const char* filename,
const bool validate,
const int64_t file_offset);

int pread(const torch::Tensor& buffer,
const char* filename,
const bool validate,
const bool async);
const bool async,
const int64_t file_offset);

int pwrite(const torch::Tensor& buffer,
const char* filename,
const bool validate,
const bool async);
const bool async,
const int64_t file_offset);

int sync_pread(torch::Tensor& buffer, const char* filename);
int sync_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset);

int sync_pwrite(const torch::Tensor& buffer, const char* filename);
int sync_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset);

int async_pread(torch::Tensor& buffer, const char* filename);
int async_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset);

int async_pwrite(const torch::Tensor& buffer, const char* filename);
int async_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset);

// TODO: Make API's args to be shape and dtype.
torch::Tensor new_cpu_locked_tensor(const int64_t num_elem,
Expand All @@ -81,5 +89,6 @@ struct deepspeed_io_handle_t {
const int fd,
const char* filename,
const int64_t file_num_bytes,
const bool validate);
const bool validate,
const int64_t file_offset);
};
Loading

0 comments on commit b692cde

Please sign in to comment.