From 6af52d71be561b7d6ea9fa17cd255df6b2c38a92 Mon Sep 17 00:00:00 2001 From: Ken Han Date: Fri, 19 Jan 2024 09:54:26 +0000 Subject: [PATCH] [feat] curvefs: change s3 info --- curvefs/proto/mds.proto | 10 +++++ curvefs/src/client/curve_fuse_op.cpp | 44 ++++++++++++++++++++ curvefs/src/client/curve_fuse_op.h | 1 + curvefs/src/client/filesystem/xattr.h | 5 +++ curvefs/src/client/rpcclient/base_client.cpp | 11 +++++ curvefs/src/client/rpcclient/base_client.h | 4 ++ curvefs/src/client/rpcclient/mds_client.cpp | 31 ++++++++++++++ curvefs/src/client/rpcclient/mds_client.h | 6 +++ curvefs/src/mds/fs_info_wrapper.h | 5 +++ curvefs/src/mds/fs_manager.cpp | 33 +++++++++++++++ curvefs/src/mds/fs_manager.h | 6 +++ curvefs/src/mds/mds_service.cpp | 29 +++++++++++++ 12 files changed, 185 insertions(+) diff --git a/curvefs/proto/mds.proto b/curvefs/proto/mds.proto index f89c13e733..9e4022dfde 100644 --- a/curvefs/proto/mds.proto +++ b/curvefs/proto/mds.proto @@ -114,6 +114,15 @@ message GetFsInfoResponse { optional FsInfo fsInfo = 2; } +message UpdateS3InfoRequest { + required common.S3Info s3Info = 1; +} + +message UpdateS3InfoResponse { + required FSStatusCode statusCode = 1; + optional FsInfo fsInfo = 2; +} + message CreateFsRequest { required string fsName = 1; required uint64 blockSize = 2; @@ -255,6 +264,7 @@ message TsoResponse { service MdsService { // fs interface rpc CreateFs(CreateFsRequest) returns (CreateFsResponse); + rpc UpdateS3Info(UpdateS3InfoRequest) returns (UpdateS3InfoResponse); rpc MountFs(MountFsRequest) returns (MountFsResponse); rpc UmountFs(UmountFsRequest) returns (UmountFsResponse); // TODO(chengyi01): move to GetFssInfo diff --git a/curvefs/src/client/curve_fuse_op.cpp b/curvefs/src/client/curve_fuse_op.cpp index cd2128d805..ad3c543a8f 100644 --- a/curvefs/src/client/curve_fuse_op.cpp +++ b/curvefs/src/client/curve_fuse_op.cpp @@ -68,6 +68,7 @@ using ::curvefs::client::filesystem::EntryOut; using ::curvefs::client::filesystem::FileOut; using ::curvefs::client::filesystem::IsListWarmupXAttr; using ::curvefs::client::filesystem::IsWarmupXAttr; +using ::curvefs::client::filesystem::IsS3ConfigXAttr; using ::curvefs::client::filesystem::StrAttr; using ::curvefs::client::filesystem::StrEntry; using ::curvefs::client::filesystem::StrMode; @@ -423,6 +424,47 @@ FuseClient* Client() { return g_ClientInstance; } + +void UpdateS3Config(fuse_req_t req, + fuse_ino_t ino, + const char* name, + const char* value, + size_t size) { + + if (g_ClientInstance->GetFsInfo()->fstype() != FSType::TYPE_S3) { + LOG(ERROR) << "updating s3 config only works for s3"; + return EOPNOTSUPP; + } + + const std::string fsName = g_ClientInstance->GetFsInfo()->fsname(); + const curvefs::common::S3Info oldS3Info = g_ClientInstance->GetFsInfo()->detail()->s3Info(); + const curvefs::common::S3Info newS3Info(oldS3Info); + + Json::CharReaderBuilder builder; + Json::CharReaderBuilder::strictMode(&builder.settings_); + std::unique_ptr reader(builder.newCharReader()); + Json::Value rootNode; + JSONCPP_STRING errormsg; + if (reader->parse(value, value + strlen(value), &rootNode, &errormsg)) { + newS3Info.ak = rootNode["ak"].asString(); + newS3Info.sk = rootNode["sk"].asString(); + newS3Info.bucket = rootNode["bucket"].asString(); + newS3Info.endpoint = rootNode["endpoint"].asString(); + + FsInfo fsInfo; + FSStatusCode statusCode = g_ClientInstance->mdsClient_->UpdateS3Info(fsName, newS3Info, &fsInfo); + if (statusCode == FSStatusCode::OK) { + g_ClientInstance->SetFsInfo(fsInfo); + } + } else { + LOG(ERROR) << "Error parsing the input value ' " + << value + << " ': " << errormsg; + } + + fuse_reply_err(req, statusCode); +} + void TriggerWarmup(fuse_req_t req, fuse_ino_t ino, const char* name, @@ -921,6 +963,8 @@ void FuseOpSetXattr(fuse_req_t req, if (IsWarmupXAttr(name)) { return TriggerWarmup(req, ino, name, value, size); + } else if (IsS3ConfigXAttr(name)) { + return UpdateS3Config(req, ino, name, value); } rc = client->FuseOpSetXattr(req, ino, name, value, size, flags); return fs->ReplyError(req, rc); diff --git a/curvefs/src/client/curve_fuse_op.h b/curvefs/src/client/curve_fuse_op.h index 231cf26526..c5dbc28977 100644 --- a/curvefs/src/client/curve_fuse_op.h +++ b/curvefs/src/client/curve_fuse_op.h @@ -30,6 +30,7 @@ #include #include #include +#include #include "curvefs/src/client/fuse_common.h" diff --git a/curvefs/src/client/filesystem/xattr.h b/curvefs/src/client/filesystem/xattr.h index 26bc3a6c2e..4059f93d34 100644 --- a/curvefs/src/client/filesystem/xattr.h +++ b/curvefs/src/client/filesystem/xattr.h @@ -45,6 +45,7 @@ const char XATTR_DIR_RFBYTES[] = "curve.dir.rfbytes"; const char XATTR_DIR_PREFIX[] = "curve.dir"; const char XATTR_WARMUP_OP[] = "curvefs.warmup.op"; const char XATTR_WARMUP_OP_LIST[] = "curvefs.warmup.op.list"; +const char XATTR_S3_CONFIG[] = "curvefs.s3.config"; inline bool IsSpecialXAttr(const std::string& key) { static std::map xattrs { @@ -69,6 +70,10 @@ inline bool IsListWarmupXAttr(const std::string& key) { return key == XATTR_WARMUP_OP_LIST; } +inline bool IsS3ConfigXAttr(const std::string& key) { + return key == XATTR_S3_CONFIG; +} + } // namespace filesystem } // namespace client } // namespace curvefs diff --git a/curvefs/src/client/rpcclient/base_client.cpp b/curvefs/src/client/rpcclient/base_client.cpp index a2f75ae177..b479cf1fe8 100644 --- a/curvefs/src/client/rpcclient/base_client.cpp +++ b/curvefs/src/client/rpcclient/base_client.cpp @@ -30,6 +30,17 @@ namespace rpcclient { using ::curvefs::mds::space::SpaceService_Stub; +void MDSBaseClient::UpdateS3Info(const std::string& fsName, + const curvefs::common::S3Info& s3Info, + UpdateS3InfoResponse* response, brpc::Controller* cntl, + brpc::Channel* channel) { + UpdateS3InfoRequest request; + request.set_fsname(fsName); + request.set_s3Info(s3Info); + curvefs::mds::MdsService_Stub stub(channel); + stub.UpdateS3Info(cntl, &request, response, nullptr); +} + void MDSBaseClient::MountFs(const std::string& fsName, const Mountpoint& mountPt, MountFsResponse* response, brpc::Controller* cntl, diff --git a/curvefs/src/client/rpcclient/base_client.h b/curvefs/src/client/rpcclient/base_client.h index 7f3bd8161f..27d2e15620 100644 --- a/curvefs/src/client/rpcclient/base_client.h +++ b/curvefs/src/client/rpcclient/base_client.h @@ -143,6 +143,10 @@ class MDSBaseClient { public: virtual ~MDSBaseClient() = default; + virtual void UpdateS3Info(const std::string& fsName, const curvefs::common::S3Info& s3Info, + UpdateS3InfoResponse* response, brpc::Controller* cntl, + brpc::Channel* channel); + virtual void MountFs(const std::string& fsName, const Mountpoint& mountPt, MountFsResponse* response, brpc::Controller* cntl, brpc::Channel* channel); diff --git a/curvefs/src/client/rpcclient/mds_client.cpp b/curvefs/src/client/rpcclient/mds_client.cpp index 2f53aa3d7d..d28b09c79f 100644 --- a/curvefs/src/client/rpcclient/mds_client.cpp +++ b/curvefs/src/client/rpcclient/mds_client.cpp @@ -60,6 +60,37 @@ MdsClientImpl::Init(const ::curve::client::MetaServerOption &mdsOpt, [&](int addrindex, uint64_t rpctimeoutMS, brpc::Channel *channel, \ brpc::Controller *cntl) -> int +FSStatusCode MdsClientImpl::UpdateS3Info(const std::string& fsName, + const curvefs::common::S3Info& s3Info, FsInfo* fsInfo) { + auto task = RPCTask { + (void)addrindex; + (void)rpctimeoutMS; + mdsClientMetric_.mountFs.qps.count << 1; + LatencyUpdater updater(&mdsClientMetric_.mountFs.latency); + UpdateS3InfoResponse response; + mdsbasecli_->UpdateS3Info(fsName, s3Info, &response, cntl, channel); + if (cntl->Failed()) { + mdsClientMetric_.mountFs.eps.count << 1; + LOG(WARNING) << "MountFs Failed, errorcode = " << cntl->ErrorCode() + << ", error content:" << cntl->ErrorText() + << ", log id = " << cntl->log_id(); + return -cntl->ErrorCode(); + } + + FSStatusCode ret = response.statuscode(); + if (ret != FSStatusCode::OK) { + LOG(WARNING) << "UpdateS3Info: fsname = " << fsName + << ", mountPt = " << mountPt.ShortDebugString() + << ", errcode = " << ret + << ", errmsg = " << FSStatusCode_Name(ret); + } else if (response.has_fsinfo()) { + fsInfo->CopyFrom(response.fsinfo()); + } + return ret; + }; + return ReturnError(rpcexcutor_.DoRPCTask(task, mdsOpt_.mdsMaxRetryMS)); +} + FSStatusCode MdsClientImpl::MountFs(const std::string& fsName, const Mountpoint& mountPt, FsInfo* fsInfo) { auto task = RPCTask { diff --git a/curvefs/src/client/rpcclient/mds_client.h b/curvefs/src/client/rpcclient/mds_client.h index e74ec84c03..1d19644f46 100644 --- a/curvefs/src/client/rpcclient/mds_client.h +++ b/curvefs/src/client/rpcclient/mds_client.h @@ -75,6 +75,9 @@ class MdsClient { virtual FSStatusCode Init(const ::curve::client::MetaServerOption &mdsOpt, MDSBaseClient *baseclient) = 0; + virtual FSStatusCode UpdateS3Info(const std::string& fsName, + const curvefs::common::S3Info& s3Info, FsInfo* fsInfo) = 0; + virtual FSStatusCode MountFs(const std::string& fsName, const Mountpoint& mountPt, FsInfo* fsInfo) = 0; @@ -171,6 +174,9 @@ class MdsClientImpl : public MdsClient { FSStatusCode Init(const ::curve::client::MetaServerOption &mdsOpt, MDSBaseClient *baseclient) override; + FSStatusCode UpdateS3Info(const std::string& fsName, const curvefs::common::S3Info& s3Info, + FsInfo* fsInfo) override; + FSStatusCode MountFs(const std::string& fsName, const Mountpoint& mountPt, FsInfo* fsInfo) override; diff --git a/curvefs/src/mds/fs_info_wrapper.h b/curvefs/src/mds/fs_info_wrapper.h index ea33ad5af7..7de3263460 100644 --- a/curvefs/src/mds/fs_info_wrapper.h +++ b/curvefs/src/mds/fs_info_wrapper.h @@ -66,6 +66,11 @@ class FsInfoWrapper { fsInfo_.set_status(status); } + void SetS3Info(S3Info s3info) { + const FsDetail fsdetail_ = GetFsDetail(); + fsdetail_._set_s3info(s3info) + } + void SetFsName(const std::string& name) { fsInfo_.set_fsname(name); } diff --git a/curvefs/src/mds/fs_manager.cpp b/curvefs/src/mds/fs_manager.cpp index 7c6a1aa51a..913ac29644 100644 --- a/curvefs/src/mds/fs_manager.cpp +++ b/curvefs/src/mds/fs_manager.cpp @@ -598,6 +598,39 @@ FSStatusCode FsManager::DeleteFs(const std::string& fsName) { return FSStatusCode::OK; } +FSStatusCode FsManager::UpdateS3Info(const std::string& fsName, + const curvefs::common::S3Info& s3Info, FsInfo* fsInfo) { + NameLockGuard lock(nameLock_, fsName); + + // query fs + FsInfoWrapper wrapper; + FSStatusCode ret = fsStorage_->Get(fsName, &wrapper); + if (ret != FSStatusCode::OK) { + LOG(WARNING) << "UpdateS3Info fail, get fs fail, fsName = " << fsName + << ", errCode = " << FSStatusCode_Name(ret); + return ret; + } + + // insert mountpoint + wrapper.SetS3Info(s3Info); + // for persistence consider + ret = fsStorage_->Update(wrapper); + if (ret != FSStatusCode::OK) { + LOG(WARNING) << "UpdateS3Info fail, update fs fail, fsName = " << fsName + << ", mountpoint = " << mountpoint.ShortDebugString() + << ", errCode = " << FSStatusCode_Name(ret); + return ret; + } + // update client alive time + UpdateClientAliveTime(mountpoint, fsName, false); + + // convert fs info + FsMetric::GetInstance().OnMount(wrapper.GetFsName(), mountpoint); + *fsInfo = std::move(wrapper).ProtoFsInfo(); + + return FSStatusCode::OK; +} + FSStatusCode FsManager::MountFs(const std::string& fsName, const Mountpoint& mountpoint, FsInfo* fsInfo) { NameLockGuard lock(nameLock_, fsName); diff --git a/curvefs/src/mds/fs_manager.h b/curvefs/src/mds/fs_manager.h index c1f3ed1e2b..dc6b21ef31 100644 --- a/curvefs/src/mds/fs_manager.h +++ b/curvefs/src/mds/fs_manager.h @@ -126,6 +126,12 @@ class FsManager { */ FSStatusCode DeleteFs(const std::string& fsName); + /** + * + */ + FSStatusCode UpdateS3Info(const std::string& fsName, + const Mountpoint& mountpoint, FsInfo* fsInfo); + /** * @brief Mount fs, mount point can not repeat. It will increate * mountNum. diff --git a/curvefs/src/mds/mds_service.cpp b/curvefs/src/mds/mds_service.cpp index 17c33156c2..82faa44bc7 100644 --- a/curvefs/src/mds/mds_service.cpp +++ b/curvefs/src/mds/mds_service.cpp @@ -141,6 +141,35 @@ void MdsServiceImpl::CreateFs(::google::protobuf::RpcController* controller, << ", capacity = " << request->capacity(); } +void MdsServiceImpl::UpdateS3Info(::google::protobuf::RpcController* controller, + const UpdateS3InfoRequest* request, UpdateS3InfoResponse* response, + ::google::protobuf::Closure* done) { + (void)controller; + brpc::ClosureGuard doneGuard(done); + const std::string &fsName = request->fsname(); + const curvefs::common::S3Info &s3Info = request->s3info(); + + LOG(INFO) << "MountFs request, fsName = " << fsName + << ", mountPoint = " << mount.ShortDebugString(); + + FSStatusCode status = + fsManager_->UpdateS3Info(fsName, s3Info, response->mutable_fsinfo()); + + if (status != FSStatusCode::OK) { + response->clear_fsinfo(); + response->set_statuscode(status); + LOG(ERROR) << "UpdateS3Info fail, fsName = " << fsName + << ", mountPoint = " << mount.ShortDebugString() + << ", errCode = " << FSStatusCode_Name(status); + return; + } + + response->set_statuscode(FSStatusCode::OK); + LOG(INFO) << "UpdateS3Info success, fsName = " << fsName + << ", mountPoint = " << mount.ShortDebugString() + << ", mps: " << response->mutable_fsinfo()->mountpoints_size(); +} + void MdsServiceImpl::MountFs(::google::protobuf::RpcController* controller, const MountFsRequest* request, MountFsResponse* response, ::google::protobuf::Closure* done) {