From bc95eae36dd3a67ce983165701f3ca4a9c8e2220 Mon Sep 17 00:00:00 2001 From: Cyber-SiKu Date: Wed, 29 Nov 2023 10:13:43 +0800 Subject: [PATCH] [feat] warmup report disk is full or not Signed-off-by: Cyber-SiKu --- curvefs/src/client/curve_fuse_op.cpp | 7 +++- .../src/client/s3/disk_cache_manager_impl.cpp | 2 +- .../src/client/s3/disk_cache_manager_impl.h | 11 +++++ curvefs/src/client/s3/disk_cache_read.cpp | 2 +- curvefs/src/client/s3/disk_cache_read.h | 12 +++++- curvefs/src/client/warmup/warmup_manager.cpp | 21 +++++++++- curvefs/src/client/warmup/warmup_manager.h | 41 +++++++++++++++++-- .../cli/command/curvefs/warmup/query/query.go | 4 +- 8 files changed, 88 insertions(+), 12 deletions(-) diff --git a/curvefs/src/client/curve_fuse_op.cpp b/curvefs/src/client/curve_fuse_op.cpp index cd2128d805..d52b03b434 100644 --- a/curvefs/src/client/curve_fuse_op.cpp +++ b/curvefs/src/client/curve_fuse_op.cpp @@ -23,6 +23,8 @@ #include "curvefs/src/client/curve_fuse_op.h" +#include + #include #include #include @@ -285,8 +287,9 @@ void QueryWarmupTask(fuse_ino_t key, std::string *data) { if (!ret) { *data = "finished"; } else { - *data = std::to_string(progress.GetFinished()) + "/" + - std::to_string(progress.GetTotal()); + *data = + fmt::format("{}/{}/{}", progress.GetFinished(), progress.GetTotal(), + progress.GetWarmupStorageErr()); } VLOG(9) << "Warmup [" << key << "]" << *data; } diff --git a/curvefs/src/client/s3/disk_cache_manager_impl.cpp b/curvefs/src/client/s3/disk_cache_manager_impl.cpp index df080f3c5c..7be4b824ae 100644 --- a/curvefs/src/client/s3/disk_cache_manager_impl.cpp +++ b/curvefs/src/client/s3/disk_cache_manager_impl.cpp @@ -153,7 +153,7 @@ int DiskCacheManagerImpl::WriteReadDirect(const std::string fileName, if (!diskCacheManager_->IsDiskUsedInited() || diskCacheManager_->IsDiskCacheFull()) { VLOG(6) << "write disk file fail, disk full."; - return -1; + return 0; } int ret = diskCacheManager_->WriteReadDirect(fileName, buf, length); if (ret < 0) { diff --git a/curvefs/src/client/s3/disk_cache_manager_impl.h b/curvefs/src/client/s3/disk_cache_manager_impl.h index f6e02deeb1..08fcac97d0 100644 --- a/curvefs/src/client/s3/disk_cache_manager_impl.h +++ b/curvefs/src/client/s3/disk_cache_manager_impl.h @@ -111,6 +111,17 @@ class DiskCacheManagerImpl { virtual int UmountDiskCache(); bool IsDiskCacheFull(); + /** + * @brief + * + * @param fileName + * @param buf + * @param length + * @return int + * -2: disk full + * -1: write fail + * >=0: write success + */ virtual int WriteReadDirect(const std::string fileName, const char* buf, uint64_t length); void InitMetrics(std::string fsName, std::shared_ptr s3Metric); diff --git a/curvefs/src/client/s3/disk_cache_read.cpp b/curvefs/src/client/s3/disk_cache_read.cpp index 27bee46776..30b92c2fd5 100644 --- a/curvefs/src/client/s3/disk_cache_read.cpp +++ b/curvefs/src/client/s3/disk_cache_read.cpp @@ -145,7 +145,7 @@ int DiskCacheRead::WriteDiskFile(const std::string fileName, const char *buf, if (fd < 0) { LOG(ERROR) << "open disk file error. errno = " << errno << ", file = " << fileName; - return fd; + return -1; } ssize_t writeLen = posixWrapper_->write(fd, buf, length); if (writeLen < static_cast(length)) { diff --git a/curvefs/src/client/s3/disk_cache_read.h b/curvefs/src/client/s3/disk_cache_read.h index 0956a93bdb..5754a21b3a 100644 --- a/curvefs/src/client/s3/disk_cache_read.h +++ b/curvefs/src/client/s3/disk_cache_read.h @@ -46,8 +46,18 @@ class DiskCacheRead : public DiskCacheBase { virtual ~DiskCacheRead() {} virtual void Init(std::shared_ptr posixWrapper, const std::string cacheDir, uint32_t objectPrefix); - virtual int ReadDiskFile(const std::string name, char *buf, uint64_t offset, + virtual int ReadDiskFile(const std::string name, char* buf, uint64_t offset, uint64_t length); + /** + * @brief + * + * @param fileName + * @param buf + * @param length + * @return int + -1: write fail + >=0: write success + */ virtual int WriteDiskFile(const std::string fileName, const char *buf, uint64_t length); virtual int LinkWriteToRead(const std::string fileName, diff --git a/curvefs/src/client/warmup/warmup_manager.cpp b/curvefs/src/client/warmup/warmup_manager.cpp index bcc9fb9d73..f6b3e35e34 100644 --- a/curvefs/src/client/warmup/warmup_manager.cpp +++ b/curvefs/src/client/warmup/warmup_manager.cpp @@ -777,9 +777,14 @@ void WarmupManagerS3Impl::PutObjectToCache( case curvefs::client::common::WarmupStorageType::kWarmupStorageTypeDisk: ret = s3Adaptor_->GetDiskCacheManager()->WriteReadDirect( context->key, context->buf, context->len); - if (ret < 0) { + if (ret == -1) { + iter->second.SetWarmupStorageErrorType( + WarmupStorageErrorType::WriteFail); LOG_EVERY_SECOND(INFO) << "write read directly failed, key: " << context->key; + } else if (ret == -2) { + iter->second.SetWarmupStorageErrorType( + WarmupStorageErrorType::Full); } delete[] context->buf; break; @@ -788,7 +793,19 @@ void WarmupManagerS3Impl::PutObjectToCache( if (kvClientManager_ != nullptr) { kvClientManager_->Set(std::make_shared( context->key, context->buf, context->len, - [context](const std::shared_ptr&) { + [context, this, + key](const std::shared_ptr& task) { + { + ReadLockGuard lock(inode2ProgressMutex_); + auto iter = FindWarmupProgressByKeyLocked(key); + if (iter->second.GetStorageType() == + curvefs::client::common::WarmupStorageType:: + kWarmupStorageTypeKvClient && + !task->res) { + iter->second.SetWarmupStorageErrorType( + WarmupStorageErrorType::WriteFail); + } + } delete[] context->buf; })); } diff --git a/curvefs/src/client/warmup/warmup_manager.h b/curvefs/src/client/warmup/warmup_manager.h index f55752c801..2e8cd49d02 100644 --- a/curvefs/src/client/warmup/warmup_manager.h +++ b/curvefs/src/client/warmup/warmup_manager.h @@ -23,6 +23,8 @@ #ifndef CURVEFS_SRC_CLIENT_WARMUP_WARMUP_MANAGER_H_ #define CURVEFS_SRC_CLIENT_WARMUP_WARMUP_MANAGER_H_ +#include + #include #include #include @@ -64,6 +66,12 @@ using curve::common::BthreadRWLock; using curvefs::client::common::WarmupStorageType; +enum WarmupStorageErrorType { + Ok = 0, + WriteFail = 1, + Full = 2, +}; + class WarmupFile { public: explicit WarmupFile(fuse_ino_t key = 0, uint64_t fileLen = 0) @@ -121,13 +129,15 @@ class WarmupProgress { : total_(0), finished_(0), storageType_(type), - filePathInClient_(filePath) {} + filePathInClient_(filePath), + storageErr_(WarmupStorageErrorType::Ok) {} WarmupProgress(const WarmupProgress& wp) : total_(wp.total_), finished_(wp.finished_), storageType_(wp.storageType_), - filePathInClient_(wp.filePathInClient_) {} + filePathInClient_(wp.filePathInClient_), + storageErr_(wp.storageErr_) {} void AddTotal(uint64_t add) { std::lock_guard lock(totalMutex_); @@ -137,6 +147,7 @@ class WarmupProgress { WarmupProgress& operator=(const WarmupProgress& wp) { total_ = wp.total_; finished_ = wp.finished_; + storageErr_ = wp.storageErr_; return *this; } @@ -158,14 +169,34 @@ class WarmupProgress { std::string ToString() { std::lock_guard lockT(totalMutex_); std::lock_guard lockF(finishedMutex_); - return "total:" + std::to_string(total_) + - ",finished:" + std::to_string(finished_); + std::lock_guard lockS(storageErrMutex_); + return fmt::format("total:{},finished:{},err:{}", total_, finished_, + storageErr_); } std::string GetFilePathInClient() { return filePathInClient_; } WarmupStorageType GetStorageType() { return storageType_; } + void SetWarmupStorageErrorType(WarmupStorageErrorType err) { + std::lock_guard lockS(storageErrMutex_); + storageErr_ = std::max(err, storageErr_); + } + + std::string GetWarmupStorageErr() { + std::lock_guard lockS(storageErrMutex_); + switch (storageErr_) { + case WarmupStorageErrorType::Ok: + return "Ok"; + case WarmupStorageErrorType::WriteFail: + return "write fail"; + case WarmupStorageErrorType::Full: + return "full"; + default: + return "unkown"; + } + } + private: uint64_t total_; std::mutex totalMutex_; @@ -173,6 +204,8 @@ class WarmupProgress { std::mutex finishedMutex_; WarmupStorageType storageType_; std::string filePathInClient_; + std::mutex storageErrMutex_; + WarmupStorageErrorType storageErr_; }; using FuseOpReadFunctionType = diff --git a/tools-v2/pkg/cli/command/curvefs/warmup/query/query.go b/tools-v2/pkg/cli/command/curvefs/warmup/query/query.go index a1645e3e0e..f0adbc9974 100644 --- a/tools-v2/pkg/cli/command/curvefs/warmup/query/query.go +++ b/tools-v2/pkg/cli/command/curvefs/warmup/query/query.go @@ -118,7 +118,7 @@ func (qCmd *QueryCommand) RunCommand(cmd *cobra.Command, args []string) error { break } strs := strings.Split(resultStr, "/") - if len(strs) != 2 { + if len(strs) < 3 { break } finished, err := strconv.ParseUint(strs[0], 10, 64) @@ -129,6 +129,8 @@ func (qCmd *QueryCommand) RunCommand(cmd *cobra.Command, args []string) error { if err != nil { break } + status := strs[2] + bar.Describe(status) bar.ChangeMax64(int64(total)) bar.Set64(int64(finished)) }