Skip to content

Commit

Permalink
double write buffer (#367)
Browse files Browse the repository at this point in the history
### What problem were solved in this pull request?

Issue Number: close #334 

Problem:

### What is changed and how it works?
实现double write buffer以解决页面的原子写入问题。
### Other information

---------

Co-authored-by: wangyunlai <[email protected]>
  • Loading branch information
Wenbin1002 and hnwyllmm authored Apr 15, 2024
1 parent 6e9355b commit a85cca5
Show file tree
Hide file tree
Showing 5 changed files with 419 additions and 25 deletions.
4 changes: 2 additions & 2 deletions src/observer/common/rc.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ See the Mulan PSL v2 for more details. */
DEFINE_RC(FILE_WRITE) \
DEFINE_RC(VARIABLE_NOT_EXISTS) \
DEFINE_RC(VARIABLE_NOT_VALID) \
DEFINE_RC(LOGBUF_FULL)

DEFINE_RC(LOGBUF_FULL) \
DEFINE_RC(DBLWR_RECOVER_ERRO)
enum class RC
{
#define DEFINE_RC(name) name,
Expand Down
311 changes: 293 additions & 18 deletions src/observer/storage/buffer/disk_buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ RC BufferPoolIterator::reset()
}

////////////////////////////////////////////////////////////////////////////////
DiskBufferPool::DiskBufferPool(BufferPoolManager &bp_manager, BPFrameManager &frame_manager)
: bp_manager_(bp_manager), frame_manager_(frame_manager)
DiskBufferPool::DiskBufferPool(
BufferPoolManager &bp_manager, BPFrameManager &frame_manager, DoubleWriteBuffer &dblwr_manager)
: bp_manager_(bp_manager), frame_manager_(frame_manager), dblwr_manager_(dblwr_manager)
{}

DiskBufferPool::~DiskBufferPool()
Expand Down Expand Up @@ -311,12 +312,17 @@ RC DiskBufferPool::get_this_page(PageNum page_num, Frame **frame)
// allocated_frame->pin(); // pined in manager::get
allocated_frame->access();

if ((rc = load_page(page_num, allocated_frame)) != RC::SUCCESS) {
LOG_ERROR("Failed to load page %s:%d", file_name_.c_str(), page_num);
purge_frame(page_num, allocated_frame);
return rc;
// check if the page is in double write buffer
optional<Page> ret = dblwr_manager_.get_page(file_name_, page_num);
if (ret != std::nullopt) {
allocated_frame->page() = ret.value();
} else {
if ((rc = load_page(page_num, allocated_frame)) != RC::SUCCESS) {
LOG_ERROR("Failed to load page %s:%d", file_name_.c_str(), page_num);
purge_frame(page_num, allocated_frame);
return rc;
}
}

*frame = allocated_frame;
return RC::SUCCESS;
}
Expand Down Expand Up @@ -490,17 +496,13 @@ RC DiskBufferPool::flush_page_internal(Frame &frame)
// so it is easier to flush data to file.

frame.set_check_sum(crc32(frame.page().data, BP_PAGE_DATA_SIZE));
Page &page = frame.page();
int64_t offset = ((int64_t)page.page_num) * sizeof(Page);
if (lseek(file_desc_, offset, SEEK_SET) == offset - 1) {
LOG_ERROR("Failed to flush page %lld of %d due to failed to seek %s.", offset, file_desc_, strerror(errno));
return RC::IOERR_SEEK;
}
Page &page = frame.page();

if (writen(file_desc_, &page, sizeof(Page)) != 0) {
LOG_ERROR("Failed to flush page %lld of %d due to %s.", offset, file_desc_, strerror(errno));
return RC::IOERR_WRITE;
RC rc = dblwr_manager_.add_page(file_name_, page);
if (rc != RC::SUCCESS) {
return rc;
}

frame.clear_dirty();
LOG_DEBUG("Flush block. file desc=%d, pageNum=%d, pin count=%d", file_desc_, page.page_num, frame.pin_count());

Expand Down Expand Up @@ -536,6 +538,44 @@ RC DiskBufferPool::recover_page(PageNum page_num)
return RC::SUCCESS;
}

RC DiskBufferPool::write_page(Page &page)
{
scoped_lock lock_guard(wr_lock_);
int64_t offset = ((int64_t)page.page_num) * sizeof(Page);
if (lseek(file_desc_, offset, SEEK_SET) == -1) {
LOG_ERROR("Failed to write page %lld of %d due to failed to seek %s.", offset, file_desc_, strerror(errno));
return RC::IOERR_SEEK;
}

if (writen(file_desc_, &page, sizeof(Page)) != 0) {
LOG_ERROR("Failed to write page %lld of %d due to %s.", offset, file_desc_, strerror(errno));
return RC::IOERR_WRITE;
}

return RC::SUCCESS;
}

RC DiskBufferPool::open_file_for_dwb(const char *file_name)
{
int fd = open(file_name, O_RDWR);
if (fd < 0) {
LOG_ERROR("Failed to open file %s, because %s.", file_name, strerror(errno));
return RC::IOERR_ACCESS;
}
LOG_INFO("Successfully open buffer pool file %s.", file_name);

file_name_ = file_name;
file_desc_ = fd;

return RC::SUCCESS;
}

RC DiskBufferPool::close_file_for_dwb()
{
file_desc_ = -1;
return RC::SUCCESS;
}

RC DiskBufferPool::allocate_frame(PageNum page_num, Frame **buffer)
{
auto purger = [this](Frame *frame) {
Expand Down Expand Up @@ -584,7 +624,8 @@ RC DiskBufferPool::check_page_num(PageNum page_num)

RC DiskBufferPool::load_page(PageNum page_num, Frame *frame)
{
int64_t offset = ((int64_t)page_num) * BP_PAGE_SIZE;
std::scoped_lock lock_guard(wr_lock_);
int64_t offset = ((int64_t)page_num) * BP_PAGE_SIZE;
if (lseek(file_desc_, offset, SEEK_SET) == -1) {
LOG_ERROR("Failed to load page %s:%d, due to failed to lseek:%s.", file_name_.c_str(), page_num, strerror(errno));

Expand All @@ -611,6 +652,7 @@ BufferPoolManager::BufferPoolManager(int memory_size /* = 0 */)
}
const int pool_num = std::max(memory_size / BP_PAGE_SIZE / DEFAULT_ITEM_NUM_PER_POOL, 1);
frame_manager_.init(pool_num);
dblwr_buffer_ = new DoubleWriteBuffer(*this);
LOG_INFO("buffer pool manager init with memory size %d, page num: %d, pool num: %d",
memory_size, pool_num * DEFAULT_ITEM_NUM_PER_POOL, pool_num);
}
Expand All @@ -623,6 +665,8 @@ BufferPoolManager::~BufferPoolManager()
for (auto &iter : tmp_bps) {
delete iter.second;
}

delete dblwr_buffer_;
}

RC BufferPoolManager::create_file(const char *file_name)
Expand Down Expand Up @@ -680,7 +724,7 @@ RC BufferPoolManager::open_file(const char *_file_name, DiskBufferPool *&_bp)
return RC::BUFFERPOOL_OPEN;
}

DiskBufferPool *bp = new DiskBufferPool(*this, frame_manager_);
DiskBufferPool *bp = new DiskBufferPool(*this, frame_manager_, *dblwr_buffer_);
RC rc = bp->open_file(_file_name);
if (rc != RC::SUCCESS) {
LOG_WARN("failed to open file name");
Expand Down Expand Up @@ -744,6 +788,16 @@ RC BufferPoolManager::flush_page(Frame &frame)
return bp->flush_page(frame);
}

RC BufferPoolManager::get_disk_buffer(const char *file_name, DiskBufferPool **buf)
{

if (buffer_pools_.count(file_name) != 0) {
*buf = buffer_pools_[file_name];
}

return RC::SUCCESS;
}

static BufferPoolManager *default_bpm = nullptr;
void BufferPoolManager::set_instance(BufferPoolManager *bpm)
{
Expand All @@ -754,3 +808,224 @@ void BufferPoolManager::set_instance(BufferPoolManager *bpm
default_bpm = bpm;
}
BufferPoolManager &BufferPoolManager::instance() { return *default_bpm; }

DoubleWriteBuffer::DoubleWriteBuffer(BufferPoolManager &bp_manager) : bp_manager_(bp_manager) { open_file(); }

DoubleWriteBuffer::~DoubleWriteBuffer()
{
for (auto page : dblwr_pages_) {
delete page;
}
close(file_desc_);
}

RC DoubleWriteBuffer::open_file()
{
int fd = open(DBLWR_FILE_NAME, O_CREAT | O_RDWR, 0644);
if (fd < 0) {
LOG_ERROR("Failed to open or creat %s, due to %s.", DBLWR_FILE_NAME, strerror(errno));
return RC::SCHEMA_DB_EXIST;
}

file_desc_ = fd;
return RC::SUCCESS;
}

RC DoubleWriteBuffer::flush_page()
{
sync();

buffers_.clear();
for (const auto &page : dblwr_pages_) {
const char *file_name = page->get_file_name();

RC rc = get_disk_buffer(file_name);
if (rc != RC::SUCCESS) {
LOG_ERROR("failed to get disk buffer");
return rc;
}
}

for (const auto &page : dblwr_pages_) {
RC rc = write_page(page);
if (rc != RC::SUCCESS) {
return rc;
}
delete page;
}

clear_buffer();

dblwr_pages_.clear();
pages_.clear();

return RC::SUCCESS;
}

RC DoubleWriteBuffer::add_page(const std::string &file_name, Page &page)
{
std::scoped_lock lock_guard(lock_);
string key = file_name + to_string(page.page_num);

if (pages_.count(key) != 0) {
pages_.at(key)->get_page() = page;
return RC::SUCCESS;
}

if (dblwr_pages_.size() >= DBLWR_BUFFER_MAX_SIZE) {
RC rc = flush_page();
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to flush pages in double write buffer");
return rc;
}
}

int64_t page_cnt = dblwr_pages_.size();
DoubleWritePage *dblwr_page = new DoubleWritePage((int)dblwr_pages_.size(), file_name, page);
dblwr_pages_.push_back(dblwr_page);

int64_t offset = page_cnt * DW_PAGE_SIZE + sizeof(int);
if (lseek(file_desc_, offset, SEEK_SET) == -1) {
LOG_ERROR("Failed to add page %lld of %d due to failed to seek %s.", offset, file_desc_, strerror(errno));
return RC::IOERR_SEEK;
}

if (writen(file_desc_, dblwr_page, DW_PAGE_SIZE) != 0) {
LOG_ERROR("Failed to add page %lld of %d due to %s.", offset, file_desc_, strerror(errno));
return RC::IOERR_WRITE;
}

if (page_cnt + 1 > header_.page_cnt) {
header_.page_cnt = page_cnt + 1;
if (lseek(file_desc_, 0, SEEK_SET) == -1) {
LOG_ERROR("Failed to add page header due to failed to seek %s.", strerror(errno));
return RC::IOERR_SEEK;
}

if (writen(file_desc_, &header_, sizeof(header_)) != 0) {
LOG_ERROR("Failed to add page header due to %s.", strerror(errno));
return RC::IOERR_WRITE;
}
}

pages_[key] = dblwr_page;

return RC::SUCCESS;
}

RC DoubleWriteBuffer::write_page(DoubleWritePage *dblwr_page)
{
if (buffers_.count(dblwr_page->get_file_name()) == 0) {
LOG_ERROR("can't find disk buffer when write page");
return RC::IOERR_WRITE;
}

DiskBufferPool *disk_buffer = buffers_[dblwr_page->get_file_name()];

return disk_buffer->write_page(dblwr_page->get_page());
}

RC DoubleWriteBuffer::get_disk_buffer(const char *file_name)
{
if (buffers_.count(file_name) != 0) {
return RC::SUCCESS;
}

DiskBufferPool *disk_buffer = nullptr;
bp_manager_.get_disk_buffer(file_name, &disk_buffer);

/**
* 如果bpm中没有对应的DiskBufferPool,就创建一个新的DiskBufferPool。
* 调用bpm中open_file时,需要申请一个新的frame,而如果此时frame manager已满,需要purge page,会导致无限循环
*/
if (disk_buffer == nullptr) {
disk_buffer = new DiskBufferPool(bp_manager_, bp_manager_.get_frame_manager(), *this);
RC rc = disk_buffer->open_file_for_dwb(file_name);
if (rc != RC::SUCCESS) {
LOG_ERROR("failed to open file for dwb");
return rc;
}
buffer_to_delete.push_back(disk_buffer);
}
buffers_[file_name] = disk_buffer;

return RC::SUCCESS;
}

RC DoubleWriteBuffer::recover()
{
scoped_lock lock_guard(lock_);

if (lseek(file_desc_, 0, SEEK_SET) == -1) {
LOG_ERROR("Failed to load page header, due to failed to lseek:%s.", strerror(errno));
return RC::IOERR_SEEK;
}

int ret = readn(file_desc_, &header_, sizeof(header_));
if (ret != 0 && ret != -1) {
LOG_ERROR("Failed to load page header, file_desc:%d, due to failed to read data:%s, ret=%d",
file_desc_, strerror(errno), ret);
return RC::IOERR_READ;
}

auto dblwr_page = make_unique<DoubleWritePage>();
for (int page_num = 0; page_num < header_.page_cnt; page_num++) {
int64_t offset = ((int64_t)page_num) * DW_PAGE_SIZE + sizeof(int);

if (lseek(file_desc_, offset, SEEK_SET) == -1) {
LOG_ERROR("Failed to load page %d, due to failed to lseek:%s.", page_num, strerror(errno));
return RC::IOERR_SEEK;
}

Page &page = dblwr_page->get_page();
page.check_sum = (CheckSum)-1;

ret = readn(file_desc_, dblwr_page.get(), DW_PAGE_SIZE);
if (ret != 0) {
LOG_ERROR("Failed to load page, file_desc:%d, page num:%d, due to failed to read data:%s, ret=%d, page count=%d",
file_desc_, page_num, strerror(errno), ret, page_num);
return RC::IOERR_READ;
}

if (crc32(page.data, BP_PAGE_DATA_SIZE) == page.check_sum) {
RC rc = get_disk_buffer(dblwr_page->get_file_name());
if (rc != RC::SUCCESS) {
clear_buffer();
return rc;
}

rc = write_page(dblwr_page.get());
if (rc != RC::SUCCESS) {
clear_buffer();
return rc;
}
}
}

clear_buffer();

return RC::SUCCESS;
}

void DoubleWriteBuffer::clear_buffer()
{
for (const auto &buffer : buffer_to_delete) {
buffer->close_file_for_dwb();
delete buffer;
}

buffers_.clear();
buffer_to_delete.clear();
}

std::optional<Page> DoubleWriteBuffer::get_page(const std::string &file_name, PageNum &page_num)
{
std::scoped_lock lock_guard(lock_);

string key = file_name + to_string(page_num);
if (pages_.count(key) != 0) {
return make_optional<Page>(pages_.at(key)->get_page());
}

return std::nullopt;
}
Loading

0 comments on commit a85cca5

Please sign in to comment.