From 79385a5b256503cbc680fa7ae4b8ddba7c44ebf0 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 26 Oct 2023 06:39:54 +0000 Subject: [PATCH 1/4] for xfs v1 --- WORKSPACE | 14 +- build.sh | 69 +++---- curve.code-workspace | 79 ++++++++ src/chunkserver/chunkserver.cpp | 3 +- src/chunkserver/copyset_node.cpp | 24 ++- .../datastore/chunkserver_chunkfile.cpp | 129 +++++++++++++ .../datastore/chunkserver_chunkfile.h | 34 ++++ .../datastore/chunkserver_datastore.cpp | 47 +++++ .../datastore/chunkserver_datastore.h | 9 + .../datastore/datastore_file_helper.h | 4 +- src/chunkserver/op_request.cpp | 181 +++++++++++++++++- src/chunkserver/op_request.h | 42 ++++ src/chunkserver/raftlog/braft_segment.h | 7 + src/chunkserver/raftlog/curve_segment.cpp | 13 +- src/chunkserver/raftlog/curve_segment.h | 8 + .../raftlog/curve_segment_log_storage.h | 16 ++ src/chunkserver/raftlog/segment.h | 3 + src/fs/BUILD | 7 +- src/fs/ext4_filesystem_impl.h | 7 +- src/fs/fs_common.h | 1 + src/fs/local_filesystem.cpp | 3 + src/fs/local_filesystem.h | 6 + src/fs/wrap_posix.cpp | 16 +- src/fs/wrap_posix.h | 6 + src/fs/xfs_filesystem_impl.cpp | 74 +++++++ src/fs/xfs_filesystem_impl.h | 59 ++++++ src/tools/curve_format_main.cpp | 5 +- thirdparties/etcdclient/Makefile | 2 +- util/build.sh | 1 + zyb_build.sh | 10 + 30 files changed, 807 insertions(+), 72 deletions(-) create mode 100644 curve.code-workspace create mode 100644 src/fs/xfs_filesystem_impl.cpp create mode 100644 src/fs/xfs_filesystem_impl.h create mode 100644 zyb_build.sh diff --git a/WORKSPACE b/WORKSPACE index bfaf9637e9..255a3405e9 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -20,7 +20,7 @@ load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository") git_repository( name = "com_github_baidu_braft", - remote = "https://github.com/baidu/braft", + remote = "https://ghproxy.com/https://github.com/baidu/braft", commit = "e255c0e4b18d1a8a5d484d4b647f41ff1385ef1e", ) @@ -36,7 +36,7 @@ http_archive( name = "com_google_protobuf", sha256 = "cef7f1b5a7c5fba672bec2a319246e8feba471f04dcebfe362d55930ee7c1c30", strip_prefix = "protobuf-3.5.0", - urls = ["https://github.com/google/protobuf/archive/v3.5.0.zip"], + urls = ["https://ghproxy.com/https://github.com/google/protobuf/archive/v3.5.0.zip"], ) bind( @@ -48,7 +48,7 @@ bind( new_git_repository( name = "com_google_googletest", build_file = "bazel/gmock.BUILD", - remote = "https://github.com/google/googletest", + remote = "https://ghproxy.com/https://github.com/google/googletest", tag = "release-1.8.0", ) @@ -61,7 +61,7 @@ bind( # brpc内BUILD文件在依赖glog时, 直接指定的依赖是"@com_github_google_glog//:glog" git_repository( name = "com_github_google_glog", - remote = "https://github.com/google/glog", + remote = "https://ghproxy.com/https://github.com/google/glog", commit = "4cc89c9e2b452db579397887c37f302fb28f6ca1", patch_args = ["-p1"], patches = ["//:thirdparties/glog/glog.patch"], @@ -91,7 +91,7 @@ new_http_archive( name = "com_github_google_leveldb", build_file = "bazel/leveldb.BUILD", strip_prefix = "leveldb-a53934a3ae1244679f812d998a4f16f2c7f309a6", - url = "https://github.com/google/leveldb/archive/a53934a3ae1244679f812d998a4f16f2c7f309a6.tar.gz", + url = "https://ghproxy.com/https://github.com/google/leveldb/archive/a53934a3ae1244679f812d998a4f16f2c7f309a6.tar.gz", ) bind( @@ -101,7 +101,7 @@ bind( git_repository( name = "com_github_apache_brpc", - remote = "https://github.com/apache/incubator-brpc", + remote = "https://ghproxy.com/https://github.com/apache/incubator-brpc", commit = "1b9e00641cbec1c8803da6a1f7f555398c954cb0", patches = ["//:thirdparties/brpc/brpc.patch"], patch_args = ["-p1"], @@ -131,7 +131,7 @@ bind( new_git_repository( name = "jsoncpp", build_file = "bazel/jsoncpp.BUILD", - remote = "https://github.com/open-source-parsers/jsoncpp.git", + remote = "https://ghproxy.com/https://github.com/open-source-parsers/jsoncpp.git", tag = "1.8.4", ) diff --git a/build.sh b/build.sh index 8d1bbb73d1..8d083c85ba 100644 --- a/build.sh +++ b/build.sh @@ -18,16 +18,16 @@ dir=`pwd` #step1 清除生成的目录和文件 -bazel clean +#bazel clean rm -rf curvefs_python/BUILD rm -rf curvefs_python/tmplib/ -git submodule update --init -if [ $? -ne 0 ] -then - echo "submodule init failed" - exit -fi +#git submodule update --init +#if [ $? -ne 0 ] +#then +# echo "submodule init failed" +# exit +#fi #step2 获取tag版本和git提交版本信息 #获取tag版本 @@ -90,17 +90,17 @@ fi echo "gcc version : "`gcc -dumpversion` echo "start compile" -cd ${dir}/thirdparties/etcdclient -make clean -make all -if [ $? -ne 0 ] -then - echo "make etcd client failed" - exit -fi -cd ${dir} - -cp ${dir}/thirdparties/etcdclient/libetcdclient.h ${dir}/include/etcdclient/etcdclient.h +#cd ${dir}/thirdparties/etcdclient +#make clean +#make all +#if [ $? -ne 0 ] +#then +# echo "make etcd client failed" +# exit +#fi +#cd ${dir} +# +#cp ${dir}/thirdparties/etcdclient/libetcdclient.h ${dir}/include/etcdclient/etcdclient.h if [ `gcc -dumpversion | awk -F'.' '{print $1}'` -le 6 ] then @@ -108,6 +108,7 @@ then else bazelflags='--copt -faligned-new' fi +echo "bazelflags=$bazelflags" if [ "$1" = "debug" ] then @@ -138,31 +139,23 @@ then exit fi else -bazel build ... --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \ +#bazel build ... --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \ +#bazel build //src/chunkserver:chunkserver --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \ +#bazel build //src/client:all --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \ +#bazel build //:curvebs-sdk --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \ +#bazel build --spawn_strategy=local //src/chunkserver:chunkserver --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \ +#cmd="bazel build //src/tools:curve_format --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \ +cmd="bazel build //src/chunkserver:chunkserver --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \ --define=libunwind=true --copt -DGFLAGS_NS=google --copt \ -Wno-error=format-security --copt -DUSE_BTHREAD_MUTEX --copt -DCURVEVERSION=${curve_version} \ ---linkopt -L/usr/local/lib ${bazelflags} +--linkopt -L/usr/local/lib ${bazelflags}" + +echo $cmd +$cmd if [ $? -ne 0 ] then echo "build phase1 failed" exit fi -bash ./curvefs_python/configure.sh -if [ $? -ne 0 ] -then - echo "configure failed" - exit -fi -bazel build curvefs_python:curvefs --copt -DHAVE_ZLIB=1 --copt -O2 -s \ ---define=with_glog=true --define=libunwind=true --copt -DGFLAGS_NS=google \ ---copt \ --Wno-error=format-security --copt -DUSE_BTHREAD_MUTEX --linkopt \ --L${dir}/curvefs_python/tmplib/ --copt -DCURVEVERSION=${curve_version} \ ---linkopt -L/usr/local/lib ${bazelflags} -if [ $? -ne 0 ] -then - echo "build phase2 failed" - exit -fi fi -echo "end compile" \ No newline at end of file +echo "end compile" diff --git a/curve.code-workspace b/curve.code-workspace new file mode 100644 index 0000000000..7a7fe3cee8 --- /dev/null +++ b/curve.code-workspace @@ -0,0 +1,79 @@ +{ + "folders": [ + { + "path": "." + } + ], + "settings": { + "files.associations": { + "*.rst": "rust", + "atomic": "cpp", + "deque": "cpp", + "string": "cpp", + "vector": "cpp", + "array": "cpp", + "*.tcc": "cpp", + "memory": "cpp", + "future": "cpp", + "istream": "cpp", + "ranges": "cpp", + "functional": "cpp", + "tuple": "cpp", + "utility": "cpp", + "list": "cpp", + "unordered_map": "cpp", + "unordered_set": "cpp", + "hash_map": "cpp", + "hash_set": "cpp", + "bitset": "cpp", + "cctype": "cpp", + "chrono": "cpp", + "cinttypes": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "condition_variable": "cpp", + "csignal": "cpp", + "cstdarg": "cpp", + "cstddef": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "exception": "cpp", + "algorithm": "cpp", + "iterator": "cpp", + "map": "cpp", + "memory_resource": "cpp", + "numeric": "cpp", + "random": "cpp", + "ratio": "cpp", + "regex": "cpp", + "set": "cpp", + "system_error": "cpp", + "type_traits": "cpp", + "slist": "cpp", + "fstream": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "limits": "cpp", + "mutex": "cpp", + "new": "cpp", + "ostream": "cpp", + "sstream": "cpp", + "stdexcept": "cpp", + "streambuf": "cpp", + "thread": "cpp", + "cfenv": "cpp", + "typeinfo": "cpp", + "*.inc": "cpp", + "cerrno": "cpp", + "string_view": "cpp", + "valarray": "cpp" + } + } +} \ No newline at end of file diff --git a/src/chunkserver/chunkserver.cpp b/src/chunkserver/chunkserver.cpp index 8b74de1872..8df5c2cae9 100644 --- a/src/chunkserver/chunkserver.cpp +++ b/src/chunkserver/chunkserver.cpp @@ -104,6 +104,7 @@ int ChunkServer::Run(int argc, char** argv) { curve::common::ExposeCurveVersion(); // ============================初始化各模块==========================// + LOG(INFO) << "zyb xfs mod ChunkServer"; LOG(INFO) << "Initializing ChunkServer modules"; LOG_IF(FATAL, !conf.GetUInt32Value("global.min_io_alignment", @@ -128,7 +129,7 @@ int ChunkServer::Run(int argc, char** argv) { // 初始化本地文件系统 std::shared_ptr fs( - LocalFsFactory::CreateFs(FileSystemType::EXT4, "")); + LocalFsFactory::CreateFs(FileSystemType::XFS, "")); LocalFileSystemOption lfsOption; LOG_IF(FATAL, !conf.GetBoolValue( "fs.enable_renameat2", &lfsOption.enableRenameat2)); diff --git a/src/chunkserver/copyset_node.cpp b/src/chunkserver/copyset_node.cpp index c49589489f..379759ed72 100755 --- a/src/chunkserver/copyset_node.cpp +++ b/src/chunkserver/copyset_node.cpp @@ -297,12 +297,24 @@ void CopysetNode::on_apply(::braft::Iterator &iter) { butil::IOBuf data; auto opReq = ChunkOpRequest::Decode(log, &request, &data); auto chunkId = request.chunkid(); - auto task = std::bind(&ChunkOpRequest::OnApplyFromLog, - opReq, - dataStore_, - std::move(request), - data); - concurrentapply_->Push(chunkId, request.optype(), task); + + if (request.optype() == CHUNK_OP_TYPE::CHUNK_OP_WRITE) { + auto task1 = std::bind(&ChunkOpRequest::OnApplyFromLogIndex, + opReq, + dataStore_, + logStorage_, + std::move(request), + iter.index(), + data); + concurrentapply_->Push(chunkId, request.optype(), task1); + } else { + auto task2 = std::bind(&ChunkOpRequest::OnApplyFromLog, + opReq, + dataStore_, + std::move(request), + data); + concurrentapply_->Push(chunkId, request.optype(), task2); + } } } } diff --git a/src/chunkserver/datastore/chunkserver_chunkfile.cpp b/src/chunkserver/datastore/chunkserver_chunkfile.cpp index 7fc8ab300a..273173d58e 100644 --- a/src/chunkserver/datastore/chunkserver_chunkfile.cpp +++ b/src/chunkserver/datastore/chunkserver_chunkfile.cpp @@ -409,6 +409,135 @@ CSErrorCode CSChunkFile::Write(SequenceNum sn, return CSErrorCode::Success; } +CSErrorCode CSChunkFile::WriteWithClone(SequenceNum sn, + const int wal_fd, + off_t wal_offset, + size_t wal_length, + off_t data_offset, + uint32_t* cost) { + WriteLockGuard writeGuard(rwLock_); + off_t offset = data_offset; + size_t length = wal_length; + if (!CheckOffsetAndLength(offset, length, isCloneChunk_ ? pageSize_ : FLAGS_minIoAlignment)) { + LOG(ERROR) << "WriteWithClone chunk failed, invalid offset or length." + << "ChunkID: " << chunkId_ + << ", offset: " << offset + << ", length: " << length + << ", page size: " << pageSize_ + << ", chunk size: " << size_ + << ", align: " + << (isCloneChunk_ ? pageSize_ : FLAGS_minIoAlignment); + return CSErrorCode::InvalidArgError; + } + // Curve will ensure that all previous requests arrive or time out + // before issuing new requests after user initiate a snapshot request. + // Therefore, this is only a log recovery request, and it must have been + // executed, and an error code can be returned here. + if (sn < metaPage_.sn || sn < metaPage_.correctedSn) { + LOG(WARNING) << "Backward write request." + << "ChunkID: " << chunkId_ + << ",request sn: " << sn + << ",chunk sn: " << metaPage_.sn + << ",correctedSn: " << metaPage_.correctedSn; + return CSErrorCode::BackwardRequestError; + } + // Determine whether to create a snapshot file + if (needCreateSnapshot(sn)) { + // There are historical snapshots that have not been deleted + if (snapshot_ != nullptr) { + LOG(ERROR) << "Exists old snapshot." + << "ChunkID: " << chunkId_ + << ",request sn: " << sn + << ",chunk sn: " << metaPage_.sn + << ",old snapshot sn: " + << snapshot_->GetSn(); + return CSErrorCode::SnapshotConflictError; + } + + // clone chunk does not allow to create snapshot + if (isCloneChunk_) { + LOG(ERROR) << "Clone chunk can't create snapshot." + << "ChunkID: " << chunkId_ + << ",request sn: " << sn + << ",chunk sn: " << metaPage_.sn; + return CSErrorCode::StatusConflictError; + } + + // create snapshot + ChunkOptions options; + options.id = chunkId_; + options.sn = metaPage_.sn; + options.baseDir = baseDir_; + options.chunkSize = size_; + options.pageSize = pageSize_; + options.metric = metric_; + snapshot_ = new(std::nothrow) CSSnapshot(lfs_, + chunkFilePool_, + options); + CHECK(snapshot_ != nullptr) << "Failed to new CSSnapshot!"; + CSErrorCode errorCode = snapshot_->Open(true); + if (errorCode != CSErrorCode::Success) { + delete snapshot_; + snapshot_ = nullptr; + LOG(ERROR) << "Create snapshot failed." + << "ChunkID: " << chunkId_ + << ",request sn: " << sn + << ",chunk sn: " << metaPage_.sn; + return errorCode; + } + } + // If the requested sequence number is greater than the current chunk + // sequence number, the metapage needs to be updated + if (sn > metaPage_.sn) { + ChunkFileMetaPage tempMeta = metaPage_; + tempMeta.sn = sn; + CSErrorCode errorCode = updateMetaPage(&tempMeta); + if (errorCode != CSErrorCode::Success) { + LOG(ERROR) << "Update metapage failed." + << "ChunkID: " << chunkId_ + << ",request sn: " << sn + << ",chunk sn: " << metaPage_.sn; + return errorCode; + } + metaPage_.sn = tempMeta.sn; + } + // If it is cow, copy the data to the snapshot file first + if (needCow(sn)) { + CSErrorCode errorCode = copy2Snapshot(offset, length); + if (errorCode != CSErrorCode::Success) { + LOG(ERROR) << "Copy data to snapshot failed." + << "ChunkID: " << chunkId_ + << ",request sn: " << sn + << ",chunk sn: " << metaPage_.sn; + return errorCode; + } + } + + //int rc = writeData(write_buf, offset, length); + int src_fd = wal_fd; + off_t src_offset = wal_offset; + size_t src_length = wal_length; + off_t dest_offset = data_offset; + int rc = writeDataWithClone(src_fd, src_offset, src_length, dest_offset); + if (rc < 0) { + LOG(ERROR) << "writeDataWithClone data to chunk file failed." + << "ChunkID: " << chunkId_ + << ",request sn: " << sn + << ",chunk sn: " << metaPage_.sn; + return CSErrorCode::InternalError; + } + // If it is a clone chunk, the bitmap will be updated + CSErrorCode errorCode = flush(); + if (errorCode != CSErrorCode::Success) { + LOG(ERROR) << "Write data to chunk file failed." + << "ChunkID: " << chunkId_ + << ",request sn: " << sn + << ",chunk sn: " << metaPage_.sn; + return errorCode; + } + return CSErrorCode::Success; +} + CSErrorCode CSChunkFile::Sync() { WriteLockGuard writeGuard(rwLock_); int rc = SyncData(); diff --git a/src/chunkserver/datastore/chunkserver_chunkfile.h b/src/chunkserver/datastore/chunkserver_chunkfile.h index 4ffd79136c..72be6fc9d9 100644 --- a/src/chunkserver/datastore/chunkserver_chunkfile.h +++ b/src/chunkserver/datastore/chunkserver_chunkfile.h @@ -168,6 +168,12 @@ class CSChunkFile { off_t offset, size_t length, uint32_t* cost); + CSErrorCode WriteWithClone(SequenceNum sn, + const int wal_fd, + off_t wal_offset, + size_t wal_length, + off_t data_offset, + uint32_t* cost); CSErrorCode Sync(); @@ -323,6 +329,34 @@ class CSChunkFile { return rc; } + inline int writeDataWithClone(int src_fd, off_t src_offset, size_t src_length, off_t dest_offset) { + int rc = lfs_->WriteWithClone(fd_, src_fd, src_offset, src_length, dest_offset + pageSize_); + if (rc < 0) { + LOG(ERROR) << "WriteWithClone failed: " + << " fd " << fd + << " src_fd " << src_fd + << " src_offset " << src_offset + << " src_length " << src_length + << " dest_offset " << dest_offset; + << " pageSize_ " << pageSize_; + return rc; + } + // If it is a clone chunk, you need to determine whether you need to + // change the bitmap and update the metapage + if (isCloneChunk_) { + uint32_t beginIndex = dest_offset / pageSize_; + uint32_t endIndex = (dest_offset + src_length - 1) / pageSize_; + for (uint32_t i = beginIndex; i <= endIndex; ++i) { + // record dirty page + if (!metaPage_.bitmap->Test(i)) { + dirtyPages_.insert(i); + } + } + } + return rc; + } + + inline int writeData(const butil::IOBuf& buf, off_t offset, size_t length) { int rc = lfs_->Write(fd_, buf, offset + pageSize_, length); if (rc < 0) { diff --git a/src/chunkserver/datastore/chunkserver_datastore.cpp b/src/chunkserver/datastore/chunkserver_datastore.cpp index 707165b5b1..5ce783665a 100644 --- a/src/chunkserver/datastore/chunkserver_datastore.cpp +++ b/src/chunkserver/datastore/chunkserver_datastore.cpp @@ -254,6 +254,53 @@ CSErrorCode CSDataStore::WriteChunk(ChunkID id, return CSErrorCode::Success; } +CSErrorCode CSDataStore::WriteChunkWithClone(ChunkID id, + SequenceNum sn, + int wal_fd, + off_t wal_offset, + size_t wal_length, + off_t data_offset, + uint32_t* cost, + const std::string & cloneSourceLocation) { + // The requested sequence number is not allowed to be 0, when snapsn=0, + // it will be used as the basis for judging that the snapshot does not exist + if (sn == kInvalidSeq) { + LOG(ERROR) << "Sequence num should not be zero." + << "ChunkID = " << id; + return CSErrorCode::InvalidArgError; + } + auto chunkFile = metaCache_.Get(id); + // If the chunk file does not exist, create the chunk file first + if (chunkFile == nullptr) { + ChunkOptions options; + options.id = id; + options.sn = sn; + options.baseDir = baseDir_; + options.chunkSize = chunkSize_; + options.location = cloneSourceLocation; + options.pageSize = pageSize_; + options.metric = metric_; + options.enableOdsyncWhenOpenChunkFile = enableOdsyncWhenOpenChunkFile_; + CSErrorCode errorCode = CreateChunkFile(options, &chunkFile); + if (errorCode != CSErrorCode::Success) { + return errorCode; + } + } + // write chunk file + CSErrorCode errorCode = chunkFile->WriteWithClone(sn, + wal_fd, + wal_offset, + wal_length, + data_offset, + cost); + if (errorCode != CSErrorCode::Success) { + LOG(WARNING) << "Write chunk file failed." + << "ChunkID = " << id; + return errorCode; + } + return CSErrorCode::Success; +} + CSErrorCode CSDataStore::SyncChunk(ChunkID id) { auto chunkFile = metaCache_.Get(id); if (chunkFile == nullptr) { diff --git a/src/chunkserver/datastore/chunkserver_datastore.h b/src/chunkserver/datastore/chunkserver_datastore.h index 13ba2ba437..56bb5ad7e2 100644 --- a/src/chunkserver/datastore/chunkserver_datastore.h +++ b/src/chunkserver/datastore/chunkserver_datastore.h @@ -224,6 +224,15 @@ class CSDataStore { uint32_t* cost, const std::string & cloneSourceLocation = ""); + virtual CSErrorCode WriteChunkWithClone(ChunkID id, + SequenceNum sn, + int wal_fd, + off_t wal_offset, + size_t wal_length, + off_t data_offset, + uint32_t* cost, + const std::string & cloneSourceLocation = ""); + virtual CSErrorCode SyncChunk(ChunkID id); diff --git a/src/chunkserver/datastore/datastore_file_helper.h b/src/chunkserver/datastore/datastore_file_helper.h index 4add706939..4063034aeb 100644 --- a/src/chunkserver/datastore/datastore_file_helper.h +++ b/src/chunkserver/datastore/datastore_file_helper.h @@ -30,18 +30,20 @@ #include "src/fs/local_filesystem.h" #include "src/fs/ext4_filesystem_impl.h" +#include "src/fs/xfs_filesystem_impl.h" namespace curve { namespace chunkserver { using curve::fs::LocalFileSystem; using curve::fs::Ext4FileSystemImpl; +using curve::fs::XfsFileSystemImpl; class DatastoreFileHelper { public: DatastoreFileHelper() { // Use Ext4FileSystemImpl by default - fs_ = Ext4FileSystemImpl::getInstance(); + fs_ = XfsFileSystemImpl::getInstance(); } explicit DatastoreFileHelper(std::shared_ptr fs) diff --git a/src/chunkserver/op_request.cpp b/src/chunkserver/op_request.cpp index 913fc358d7..42bc44630c 100755 --- a/src/chunkserver/op_request.cpp +++ b/src/chunkserver/op_request.cpp @@ -439,6 +439,7 @@ void WriteChunkRequest::OnApply(uint64_t index, ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); uint32_t cost; + CSErrorCode ret = CSErrorCode::InternalError; std::string cloneSourceLocation; if (existCloneInfo(request_)) { @@ -447,13 +448,69 @@ void WriteChunkRequest::OnApply(uint64_t index, request_->clonefileoffset()); } - auto ret = datastore_->WriteChunk(request_->chunkid(), - request_->sn(), - cntl_->request_attachment(), - request_->offset(), - request_->size(), - &cost, - cloneSourceLocation); + if (request_->size() < 128 * 4096) { + ret = datastore_->WriteChunk(request_->chunkid(), + request_->sn(), + cntl_->request_attachment(), + request_->offset(), + request_->size(), + &cost, + cloneSourceLocation); + } else if (common::is_aligned(request_->offset(), 4096) && + common::is_aligned(request_->size(), 4096)) { + + off_t wal_offset =0; + size_t wal_length =0; + int64_t wal_term = 0; + off_t data_offset = request_->offset(); + + int wal_fd = node_->GetLogStorage()->get_segment_fd(index); + int meta_ret = node_->GetLogStorage()->get_segment_meta_info(index, &wal_offset, &wal_length, &wal_term); + + if (wal_length != request_->size() + 4096 || + wal_fd <= 0 || + meta_ret == false) { + + LOG(WARNING) << "zyb OnApply error: " + << " logic pool id: " << request_->logicpoolid() + << " copyset id: " << request_->copysetid() + << " sn: " << request_->sn() + << " chunkid: " << request_->chunkid() + << " data_size: " << request_->size() + << " data_offset: " << request_->offset() + << " index: " << index + << " wal_fd: " << wal_fd + << " wal_offset: " << wal_offset + << " wal_length: " << wal_length + << " wal_term: " << wal_term + << " meta_ret: " << meta_ret; + response_->set_status( + CHUNK_OP_STATUS::CHUNK_OP_STATUS_FAILURE_UNKNOWN); + return; + } + + ret = datastore_->WriteChunkWithClone(request_->chunkid(), + request_->sn(), + wal_fd, + wal_offset, + wal_length, + data_offset, + &cost, + cloneSourceLocation); + } else { + uint32_t aoffset = common::align_down(request_->offset(), 4096); + uint32_t alength = common::align_down(request_->size(), 4096); + LOG(ERROR) << "zyb split OnApply error: " + << " logic pool id: " << request_->logicpoolid() + << " copyset id: " << request_->copysetid() + << " sn: " << request_->sn() + << " chunkid: " << request_->chunkid() + << " data_size: " << request_->size() + << " data_offset: " << request_->offset() + << " index: " << index + << " aoffset: " << aoffset + << " alength: " << alength; + } if (CSErrorCode::Success == ret) { response_->set_status(CHUNK_OP_STATUS::CHUNK_OP_STATUS_SUCCESS); @@ -511,7 +568,6 @@ void WriteChunkRequest::OnApplyFromLog(std::shared_ptr datastore, request.clonefileoffset()); } - auto ret = datastore->WriteChunk(request.chunkid(), request.sn(), data, @@ -547,6 +603,115 @@ void WriteChunkRequest::OnApplyFromLog(std::shared_ptr datastore, } } +void WriteChunkRequest::OnApplyFromLogIndex(std::shared_ptr datastore, + CurveSegmentLogStorage* logStorage, + const ChunkRequest &request, + const uint64_t index, + const butil::IOBuf &data) { + // NOTE: 处理过程中优先使用参数传入的datastore/request + uint32_t cost; + CSErrorCode ret = CSErrorCode::InternalError; + std::string cloneSourceLocation; + if (existCloneInfo(&request)) { + auto func = ::curve::common::LocationOperator::GenerateCurveLocation; + cloneSourceLocation = func(request.clonefilesource(), + request.clonefileoffset()); + } + + + if (request.size() < 128 * 4096) { + //only using writechunk + ret = datastore->WriteChunk(request.chunkid(), + request.sn(), + data, + request.offset(), + request.size(), + &cost, + cloneSourceLocation); + + } else if (common::is_aligned(request.offset(), 4096) && + common::is_aligned(request.size(), 4096)) { + //only using clone + off_t wal_offset; + size_t wal_length; + int64_t wal_term; + off_t data_offset = request.offset(); + + + int wal_fd = logStorage->get_segment_fd(index); + int meta_ret = logStorage->get_segment_meta_info(index, &wal_offset, &wal_length, &wal_term); + + if (wal_length != request.size() + 4096 || + wal_fd <= 0 || + meta_ret == false) { + LOG(WARNING) << "zyb prep clone OnApplyFromLogIndex error: " + << " logic pool id: " << request.logicpoolid() + << " copyset id: " << request.copysetid() + << " chunkid: " << request.chunkid() + << " sn: " << request.sn() + << " data_size: " << request.size() + << " data_offset: " << request.offset() + << " index: " << index + << " wal_fd: " << wal_fd + << " wal_offset: " << wal_offset + << " wal_length: " << wal_length + << " wal_term: " << wal_term + << " meta_ret: " << meta_ret; + return; + } + ret = datastore->WriteChunkWithClone(request.chunkid(), + request.sn(), + wal_fd, + wal_offset, + wal_length, + data_offset, + &cost, + cloneSourceLocation); + } else { + //split the request, clone for 4096 align part, writechunk for the rest + uint32_t aoffset = common::align_down(request.offset(), 4096); + uint32_t alength = common::align_down(request.size(), 4096); + + LOG(WARNING) << "zyb spilt OnApplyFromLogIndex error: " + << " logic pool id: " << request.logicpoolid() + << " copyset id: " << request.copysetid() + << " chunkid: " << request.chunkid() + << " sn: " << request.sn() + << " data_size: " << request.size() + << " data_offset: " << request.offset() + << " index: " << index + << " aoffset: " << aoffset + << " alength: " << alength; + return; + } + if (CSErrorCode::Success == ret) { + return; + } else if (CSErrorCode::BackwardRequestError == ret) { + LOG(WARNING) << "write failed: " + << " logic pool id: " << request.logicpoolid() + << " copyset id: " << request.copysetid() + << " chunkid: " << request.chunkid() + << " data size: " << request.size() + << " data store return: " << ret; + } else if (CSErrorCode::InternalError == ret || + CSErrorCode::CrcCheckError == ret || + CSErrorCode::FileFormatError == ret) { + LOG(FATAL) << "write failed: " + << " logic pool id: " << request.logicpoolid() + << " copyset id: " << request.copysetid() + << " chunkid: " << request.chunkid() + << " data size: " << request.size() + << " data store return: " << ret; + } else { + LOG(ERROR) << "write failed: " + << " logic pool id: " << request.logicpoolid() + << " copyset id: " << request.copysetid() + << " chunkid: " << request.chunkid() + << " data size: " << request.size() + << " data store return: " << ret; + } +} + void ReadSnapshotRequest::OnApply(uint64_t index, ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); diff --git a/src/chunkserver/op_request.h b/src/chunkserver/op_request.h index fb9dfddb5c..666c6312b5 100755 --- a/src/chunkserver/op_request.h +++ b/src/chunkserver/op_request.h @@ -33,6 +33,7 @@ #include "include/chunkserver/chunkserver_common.h" #include "src/chunkserver/concurrent_apply/concurrent_apply.h" #include "src/chunkserver/datastore/define.h" +#include "src/chunkserver/copyset_node.h" using ::google::protobuf::RpcController; using ::curve::chunkserver::concurrent::ConcurrentApplyModule; @@ -95,6 +96,12 @@ class ChunkOpRequest : public std::enable_shared_from_this { const ChunkRequest &request, const butil::IOBuf &data) = 0; + virtual void OnApplyFromLogIndex(std::shared_ptr datastore, + CurveSegmentLogStorage* logStorage, + const ChunkRequest &request, + const uint64_t index, + const butil::IOBuf &data) = 0; + /** * 返回request的done成员 */ @@ -197,6 +204,11 @@ class DeleteChunkRequest : public ChunkOpRequest { void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, const butil::IOBuf &data) override; + void OnApplyFromLogIndex(std::shared_ptr datastore, + CurveSegmentLogStorage* logStorage, + const ChunkRequest &request, + const uint64_t index, + const butil::IOBuf &data) override {} }; class ReadChunkRequest : public ChunkOpRequest { @@ -220,6 +232,11 @@ class ReadChunkRequest : public ChunkOpRequest { void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, const butil::IOBuf &data) override; + void OnApplyFromLogIndex(std::shared_ptr datastore, + CurveSegmentLogStorage* logStorage, + const ChunkRequest &request, + const uint64_t index, + const butil::IOBuf &data) override {} const ChunkRequest* GetChunkRequest() { return request_; @@ -259,6 +276,11 @@ class WriteChunkRequest : public ChunkOpRequest { void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, const butil::IOBuf &data) override; + void OnApplyFromLogIndex(std::shared_ptr datastore, + CurveSegmentLogStorage* logStorage, + const ChunkRequest &request, + const uint64_t index, + const butil::IOBuf &data) override; }; class ReadSnapshotRequest : public ChunkOpRequest { @@ -281,6 +303,11 @@ class ReadSnapshotRequest : public ChunkOpRequest { void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, const butil::IOBuf &data) override; + void OnApplyFromLogIndex(std::shared_ptr datastore, + CurveSegmentLogStorage* logStorage, + const ChunkRequest &request, + const uint64_t index, + const butil::IOBuf &data) override {} }; class DeleteSnapshotRequest : public ChunkOpRequest { @@ -303,6 +330,11 @@ class DeleteSnapshotRequest : public ChunkOpRequest { void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, const butil::IOBuf &data) override; + void OnApplyFromLogIndex(std::shared_ptr datastore, + CurveSegmentLogStorage* logStorage, + const ChunkRequest &request, + const uint64_t index, + const butil::IOBuf &data) override {} }; class CreateCloneChunkRequest : public ChunkOpRequest { @@ -325,6 +357,11 @@ class CreateCloneChunkRequest : public ChunkOpRequest { void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, const butil::IOBuf &data) override; + void OnApplyFromLogIndex(std::shared_ptr datastore, + CurveSegmentLogStorage* logStorage, + const ChunkRequest &request, + const uint64_t index, + const butil::IOBuf &data) override {} }; class PasteChunkInternalRequest : public ChunkOpRequest { @@ -352,6 +389,11 @@ class PasteChunkInternalRequest : public ChunkOpRequest { void OnApplyFromLog(std::shared_ptr datastore, const ChunkRequest &request, const butil::IOBuf &data) override; + void OnApplyFromLogIndex(std::shared_ptr datastore, + CurveSegmentLogStorage* logStorage, + const ChunkRequest &request, + const uint64_t index, + const butil::IOBuf &data) override {} private: butil::IOBuf data_; diff --git a/src/chunkserver/raftlog/braft_segment.h b/src/chunkserver/raftlog/braft_segment.h index 02f6101389..cd5ae721fa 100644 --- a/src/chunkserver/raftlog/braft_segment.h +++ b/src/chunkserver/raftlog/braft_segment.h @@ -98,6 +98,13 @@ class BraftSegment : public Segment { return _segment->file_name(); } + int currut_fd() override { + return 0; + } + bool get_meta_info(const int64_t index, off_t* offset, size_t* length, int64_t* term) override { + return false; + }; + private: scoped_refptr _segment; }; diff --git a/src/chunkserver/raftlog/curve_segment.cpp b/src/chunkserver/raftlog/curve_segment.cpp index 30881dee5f..d35ed7dd04 100644 --- a/src/chunkserver/raftlog/curve_segment.cpp +++ b/src/chunkserver/raftlog/curve_segment.cpp @@ -50,7 +50,7 @@ namespace curve { namespace chunkserver { DEFINE_bool(raftSyncSegments, true, "call fsync when a segment is closed"); -DEFINE_bool(enableWalDirectWrite, true, "enable wal direct write or not"); +DEFINE_bool(enableWalDirectWrite, false, "enable wal direct write or not"); DEFINE_uint32(walAlignSize, 4096, "wal align size to write"); int CurveSegment::create() { @@ -585,6 +585,17 @@ int64_t CurveSegment::get_term(const int64_t index) const { return meta.term; } +bool CurveSegment::get_meta_info(const int64_t index, off_t* offset, size_t* length, int64_t* term) { + LogMeta meta; + if (_get_meta(index, &meta) != 0) { + return false; + } + *offset = meta.offset; + *length = meta.length; + *term = meta.term; + return true; +} + int CurveSegment::close(bool will_sync) { CHECK(_is_open); diff --git a/src/chunkserver/raftlog/curve_segment.h b/src/chunkserver/raftlog/curve_segment.h index 06eb39b64e..f7a3a1ddc7 100644 --- a/src/chunkserver/raftlog/curve_segment.h +++ b/src/chunkserver/raftlog/curve_segment.h @@ -146,6 +146,14 @@ class BAIDU_CACHELINE_ALIGNMENT CurveSegment: } std::string file_name() override; + + int currut_fd() override { + if (FLAGS_enableWalDirectWrite) + return _direct_fd; + else + return _fd; + } + bool get_meta_info(const int64_t index, off_t* offset, size_t* length, int64_t* term) override; private: struct LogMeta { off_t offset; diff --git a/src/chunkserver/raftlog/curve_segment_log_storage.h b/src/chunkserver/raftlog/curve_segment_log_storage.h index 50aea348e3..87f7abced6 100644 --- a/src/chunkserver/raftlog/curve_segment_log_storage.h +++ b/src/chunkserver/raftlog/curve_segment_log_storage.h @@ -163,6 +163,22 @@ class CurveSegmentLogStorage : public braft::LogStorage { LogStorageStatus GetStatus(); + int get_segment_fd(const int64_t index) { + scoped_refptr ptr; + if (get_segment(index, &ptr) != 0) { + return -1; + } + return ptr->currut_fd(); + } + + bool get_segment_meta_info(const int64_t index, off_t* offset, size_t* length, int64_t* term) { + scoped_refptr ptr; + if (get_segment(index, &ptr) != 0) { + return false; + } + return ptr->get_meta_info(index, offset, length, term); + } + private: scoped_refptr open_segment(size_t to_write); int save_meta(const int64_t log_index); diff --git a/src/chunkserver/raftlog/segment.h b/src/chunkserver/raftlog/segment.h index 807ebafa17..62cf415b46 100644 --- a/src/chunkserver/raftlog/segment.h +++ b/src/chunkserver/raftlog/segment.h @@ -71,6 +71,9 @@ class Segment : public butil::RefCountedThreadSafe { virtual int64_t last_index() const = 0; virtual std::string file_name() = 0; + + virtual int currut_fd() = 0; + virtual bool get_meta_info(const int64_t index, off_t* offset, size_t* length, int64_t* term) = 0; }; } // namespace chunkserver diff --git a/src/fs/BUILD b/src/fs/BUILD index f3e73f988d..9c7eb83244 100644 --- a/src/fs/BUILD +++ b/src/fs/BUILD @@ -17,12 +17,9 @@ cc_library( name = "lfs", srcs = glob([ - "*.cpp", - "ext4_filesystem_impl.h", - "ext4_util.h", - "wrap_posix.h" + "*.cpp" ]), - hdrs = ["local_filesystem.h","fs_common.h"], + hdrs = ["local_filesystem.h","fs_common.h","wrap_posix.h","ext4_filesystem_impl.h","xfs_filesystem_impl.h"], linkopts = ([ "-std=c++11", ]), diff --git a/src/fs/ext4_filesystem_impl.h b/src/fs/ext4_filesystem_impl.h index 54d103d17c..2eab0a6461 100644 --- a/src/fs/ext4_filesystem_impl.h +++ b/src/fs/ext4_filesystem_impl.h @@ -42,6 +42,7 @@ class Ext4FileSystemImpl : public LocalFileSystem { virtual ~Ext4FileSystemImpl(); static std::shared_ptr getInstance(); void SetPosixWrapper(std::shared_ptr wrapper); + explicit Ext4FileSystemImpl(std::shared_ptr); int Init(const LocalFileSystemOption& option) override; int Statfs(const string& path, struct FileSystemInfo* info) override; @@ -55,6 +56,11 @@ class Ext4FileSystemImpl : public LocalFileSystem { int Read(int fd, char* buf, uint64_t offset, int length) override; int Write(int fd, const char* buf, uint64_t offset, int length) override; int Write(int fd, butil::IOBuf buf, uint64_t offset, int length) override; + int WriteWithClone(int fd, + int src_fd, + uint64_t src_offset, + int src_length, + uint64_t dest_offset) { return -1; } int Sync(int fd) override; int Append(int fd, const char* buf, int length) override; int Fallocate(int fd, int op, uint64_t offset, @@ -63,7 +69,6 @@ class Ext4FileSystemImpl : public LocalFileSystem { int Fsync(int fd) override; private: - explicit Ext4FileSystemImpl(std::shared_ptr); int DoRename(const string& oldPath, const string& newPath, unsigned int flags) override; diff --git a/src/fs/fs_common.h b/src/fs/fs_common.h index 01911d90c9..12639d098a 100644 --- a/src/fs/fs_common.h +++ b/src/fs/fs_common.h @@ -29,6 +29,7 @@ namespace fs { enum class FileSystemType { // SFS, EXT4, + XFS, }; struct FileSystemInfo { diff --git a/src/fs/local_filesystem.cpp b/src/fs/local_filesystem.cpp index 52301b0c27..24367d6a44 100644 --- a/src/fs/local_filesystem.cpp +++ b/src/fs/local_filesystem.cpp @@ -24,6 +24,7 @@ #include "src/fs/local_filesystem.h" #include "src/fs/ext4_filesystem_impl.h" +#include "src/fs/xfs_filesystem_impl.h" #include "src/fs/wrap_posix.h" namespace curve { @@ -35,6 +36,8 @@ std::shared_ptr LocalFsFactory::CreateFs( std::shared_ptr localFs; if (type == FileSystemType::EXT4) { localFs = Ext4FileSystemImpl::getInstance(); + } else if (type == FileSystemType::XFS) { + localFs = XfsFileSystemImpl::getInstance(); } else { LOG(ERROR) << "Unknown filesystem type."; return nullptr; diff --git a/src/fs/local_filesystem.h b/src/fs/local_filesystem.h index 1fcfa771fd..6176bf1324 100644 --- a/src/fs/local_filesystem.h +++ b/src/fs/local_filesystem.h @@ -160,6 +160,12 @@ class LocalFileSystem { */ virtual int Write(int fd, const char* buf, uint64_t offset, int length) = 0; + virtual int WriteWithClone(int fd, + int src_fd, + uint64_t src_offset, + int src_length, + uint64_t dest_offset) = 0; + /** * 向文件指定区域写入数据 * @param fd:文件句柄id,通过Open接口获取 diff --git a/src/fs/wrap_posix.cpp b/src/fs/wrap_posix.cpp index a2c3039b7b..00b0c78669 100644 --- a/src/fs/wrap_posix.cpp +++ b/src/fs/wrap_posix.cpp @@ -96,6 +96,19 @@ ssize_t PosixWrapper::pwrite(int fd, return ::pwrite(fd, buf, count, offset); } +int PosixWrapper::ficlonerange(int fd, + int src_fd, + off_t src_offset, + size_t src_length, + off_t dest_offset) { + struct file_clone_range fcr; + fcr.src_fd = src_fd; + fcr.src_offset = src_offset; + fcr.src_length = src_length; + fcr.dest_offset = dest_offset; + return ::ioctl(fd, FICLONERANGE, &fcr); +} + int PosixWrapper::fdatasync(int fd) { return ::fdatasync(fd); } @@ -105,7 +118,8 @@ int PosixWrapper::fstat(int fd, struct stat *buf) { } int PosixWrapper::fallocate(int fd, int mode, off_t offset, off_t len) { - return ::posix_fallocate(fd, offset, len); + //return ::posix_fallocate(fd, offset, len); + return ::fallocate(fd, mode, offset, len); } int PosixWrapper::fsync(int fd) { diff --git a/src/fs/wrap_posix.h b/src/fs/wrap_posix.h index 66ddb3e719..bbb4d6da09 100644 --- a/src/fs/wrap_posix.h +++ b/src/fs/wrap_posix.h @@ -32,6 +32,7 @@ #include #include #include +#include namespace curve { namespace fs { @@ -59,6 +60,11 @@ class PosixWrapper { const void *buf, size_t count, off_t offset); + virtual int ficlonerange(int fd, + int src_fd, + off_t src_offset, + size_t src_length, + off_t dest_offset); virtual int fdatasync(int fd); virtual int fstat(int fd, struct stat *buf); virtual int fallocate(int fd, int mode, off_t offset, off_t len); diff --git a/src/fs/xfs_filesystem_impl.cpp b/src/fs/xfs_filesystem_impl.cpp new file mode 100644 index 0000000000..6cc502b092 --- /dev/null +++ b/src/fs/xfs_filesystem_impl.cpp @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2020 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: curve + * File Created: 18-10-31 + * Author: yangyaokai + */ + +#include +#include +#include +#include +#include +#include + +#include "src/common/string_util.h" +#include "src/fs/xfs_filesystem_impl.h" +#include "src/fs/wrap_posix.h" + +#define MIN_KERNEL_VERSION KERNEL_VERSION(3, 15, 0) + +namespace curve { +namespace fs { + +std::shared_ptr XfsFileSystemImpl::self_ = nullptr; +std::mutex XfsFileSystemImpl::mutex_; + +std::shared_ptr XfsFileSystemImpl::getInstance() { + std::lock_guard lock(mutex_); + if (self_ == nullptr) { + std::shared_ptr wrapper = + std::make_shared(); + self_ = std::shared_ptr( + new(std::nothrow) XfsFileSystemImpl(wrapper)); + CHECK(self_ != nullptr) << "Failed to new xfs local fs."; + } + return self_; +} + +int XfsFileSystemImpl::WriteWithClone(int fd, + int src_fd, + uint64_t src_offset, + int src_length, + uint64_t dest_offset) { + + int ret = posixWrapper_->ficlonerange(fd, + src_fd, + src_offset, + src_length, + dest_offset); + if (ret < 0) { + LOG(ERROR) << "ficlonerange failed: " << strerror(errno); + return -errno; + } + + return ret; +} + +} // namespace fs +} // namespace curve diff --git a/src/fs/xfs_filesystem_impl.h b/src/fs/xfs_filesystem_impl.h new file mode 100644 index 0000000000..49eb423f98 --- /dev/null +++ b/src/fs/xfs_filesystem_impl.h @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2020 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: curve + * File Created: 18-10-31 + * Author: yangyaokai + */ + +#ifndef SRC_FS_XFS_FILESYSTEM_IMPL_H_ +#define SRC_FS_XFS_FILESYSTEM_IMPL_H_ + + +#include "src/fs/ext4_filesystem_impl.h" + + +namespace curve { +namespace fs { +class XfsFileSystemImpl : public Ext4FileSystemImpl { + public: + XfsFileSystemImpl(std::shared_ptr posixWrapper) : Ext4FileSystemImpl(posixWrapper) { + posixWrapper_ = posixWrapper; + enableRenameat2_ = false; + enableReflink_ = false; + } + ~XfsFileSystemImpl() {} + static std::shared_ptr getInstance() ; + + int WriteWithClone(int fd, + int src_fd, + uint64_t src_offset, + int src_length, + uint64_t dest_offset); + + private: + static std::shared_ptr self_; + static std::mutex mutex_; + std::shared_ptr posixWrapper_; + bool enableRenameat2_; + bool enableReflink_; +}; + +} // namespace fs +} // namespace curve + +#endif // SRC_FS_XFS_FILESYSTEM_IMPL_H_ diff --git a/src/tools/curve_format_main.cpp b/src/tools/curve_format_main.cpp index 016dd53a0b..23b89d103b 100644 --- a/src/tools/curve_format_main.cpp +++ b/src/tools/curve_format_main.cpp @@ -134,7 +134,7 @@ int AllocateFiles(AllocateStruct* allocatestruct) { } int fd = ret; - ret = allocatestruct->fsptr->Fallocate(fd, 0, 0, + ret = allocatestruct->fsptr->Fallocate(fd, FALLOC_FL_ZERO_RANGE, 0, FLAGS_fileSize + FLAGS_metaPagSize); if (ret < 0) { allocatestruct->fsptr->Close(fd); @@ -143,7 +143,8 @@ int AllocateFiles(AllocateStruct* allocatestruct) { break; } - if (FLAGS_needWriteZero) { + //if (FLAGS_needWriteZero) { + if (false) { ret = allocatestruct->fsptr->Write(fd, data, 0, FLAGS_fileSize + FLAGS_metaPagSize); if (ret < 0) { diff --git a/thirdparties/etcdclient/Makefile b/thirdparties/etcdclient/Makefile index 1edc5d82c1..f0c15ea85d 100644 --- a/thirdparties/etcdclient/Makefile +++ b/thirdparties/etcdclient/Makefile @@ -27,7 +27,7 @@ intall-go: install-etcdclient: mkdir -p $(pwd)/tmp/gosrc/src/go.etcd.io - cd $(pwd)/tmp/gosrc/src/go.etcd.io && git clone --branch v3.4.0 https://github.com/etcd-io/etcd + cd $(pwd)/tmp/gosrc/src/go.etcd.io && git clone --branch v3.4.0 https://gitee.com/mirrors/etcd.git cd $(pwd)/tmp/gosrc/src/go.etcd.io/etcd && cp $(pwd)/expose-session-for-election.patch . && patch -p1 < expose-session-for-election.patch vendorpath := $(pwd)/tmp/gosrc/src/go.etcd.io/etcd/vendor diff --git a/util/build.sh b/util/build.sh index af9afc27b9..8e82682362 100644 --- a/util/build.sh +++ b/util/build.sh @@ -127,6 +127,7 @@ build_target() { for target in `get_target` do + echo "bazel build ${g_build_opts[@]} $target" bazel build ${g_build_opts[@]} $target local ret="$?" targets+=("$target") diff --git a/zyb_build.sh b/zyb_build.sh new file mode 100644 index 0000000000..498c517ac8 --- /dev/null +++ b/zyb_build.sh @@ -0,0 +1,10 @@ +bazel build //src/chunkserver:chunkserver --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \ +#bazel build //src/tools:curve_format --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \ +#bazel build //src/client:all --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \ +#bazel build //:curvebs-sdk --copt -DHAVE_ZLIB=1 --copt -O2 -s --define=with_glog=true \ + --define=libunwind=true --copt -DGFLAGS_NS=google --copt \ + -Wno-error=format-security --copt -DUSE_BTHREAD_MUTEX --copt -DCURVEVERSION=1.2.6-zyb \ + --linkopt -L/usr/local/lib +exit +--copt -faligned-new + From e7979cad1af57b7e0b088499e67c009602b027ed Mon Sep 17 00:00:00 2001 From: root Date: Wed, 1 Nov 2023 09:14:43 +0000 Subject: [PATCH 2/4] xfs 1.2.6 --- curve.code-workspace | 5 +- .../datastore/chunkserver_chunkfile.cpp | 4 + .../datastore/chunkserver_chunkfile.h | 4 +- src/chunkserver/datastore/file_pool.cpp | 3 +- src/chunkserver/op_request.cpp | 97 ++++++++++++++----- src/chunkserver/raftlog/curve_segment.cpp | 90 +++++++++++------ src/chunkserver/raftlog/curve_segment.h | 5 +- .../raftlog/curve_segment_log_storage.cpp | 65 ++++++++----- .../raftlog/curve_segment_log_storage.h | 11 ++- src/chunkserver/raftlog/define.h | 2 + src/fs/wrap_posix.cpp | 6 ++ 11 files changed, 202 insertions(+), 90 deletions(-) diff --git a/curve.code-workspace b/curve.code-workspace index 7a7fe3cee8..6c59bf129d 100644 --- a/curve.code-workspace +++ b/curve.code-workspace @@ -73,7 +73,10 @@ "*.inc": "cpp", "cerrno": "cpp", "string_view": "cpp", - "valarray": "cpp" + "valarray": "cpp", + "forward_list": "cpp", + "any": "cpp", + "optional": "cpp" } } } \ No newline at end of file diff --git a/src/chunkserver/datastore/chunkserver_chunkfile.cpp b/src/chunkserver/datastore/chunkserver_chunkfile.cpp index 273173d58e..99910af0a3 100644 --- a/src/chunkserver/datastore/chunkserver_chunkfile.cpp +++ b/src/chunkserver/datastore/chunkserver_chunkfile.cpp @@ -227,6 +227,10 @@ CSErrorCode CSChunkFile::Open(bool createFile) { return CSErrorCode::InternalError; } fd_ = rc; + LOG(INFO) << "Open chunk file success." + << " filepath = " << chunkFilePath + << " enableOdsyncWhenOpenChunkFile_ = " << enableOdsyncWhenOpenChunkFile_ + << " fd = " << fd_; struct stat fileInfo; rc = lfs_->Fstat(fd_, &fileInfo); if (rc < 0) { diff --git a/src/chunkserver/datastore/chunkserver_chunkfile.h b/src/chunkserver/datastore/chunkserver_chunkfile.h index 72be6fc9d9..cd10a4b08b 100644 --- a/src/chunkserver/datastore/chunkserver_chunkfile.h +++ b/src/chunkserver/datastore/chunkserver_chunkfile.h @@ -333,11 +333,11 @@ class CSChunkFile { int rc = lfs_->WriteWithClone(fd_, src_fd, src_offset, src_length, dest_offset + pageSize_); if (rc < 0) { LOG(ERROR) << "WriteWithClone failed: " - << " fd " << fd + << " fd " << fd_ << " src_fd " << src_fd << " src_offset " << src_offset << " src_length " << src_length - << " dest_offset " << dest_offset; + << " dest_offset " << dest_offset << " pageSize_ " << pageSize_; return rc; } diff --git a/src/chunkserver/datastore/file_pool.cpp b/src/chunkserver/datastore/file_pool.cpp index 7ad6167730..19286591fd 100644 --- a/src/chunkserver/datastore/file_pool.cpp +++ b/src/chunkserver/datastore/file_pool.cpp @@ -292,7 +292,8 @@ int FilePool::GetFile(const std::string& targetpath, char* metapage) { } else if (ret < 0) { LOG(ERROR) << "file rename failed, " << srcpath.c_str(); } else { - LOG(INFO) << "get file " << targetpath + LOG(INFO) << "rename file " << srcpath + << " to " << targetpath << " success! now pool size = " << tmpChunkvec_.size(); break; diff --git a/src/chunkserver/op_request.cpp b/src/chunkserver/op_request.cpp index 42bc44630c..b0ca66111e 100755 --- a/src/chunkserver/op_request.cpp +++ b/src/chunkserver/op_request.cpp @@ -38,6 +38,8 @@ namespace curve { namespace chunkserver { +const size_t kWalHeaderSize = 4096; + ChunkOpRequest::ChunkOpRequest() : datastore_(nullptr), node_(nullptr), @@ -130,6 +132,10 @@ int ChunkOpRequest::Encode(const ChunkRequest *request, if (data != nullptr) { log->append(*data); } + LOG(INFO) << "zyb attachment data_length: " << data->size() + << " encode log_length: " << log->size() + << " request data_length: " << request->size() + << " request data_offset: " << request->offset(); return 0; } @@ -449,6 +455,14 @@ void WriteChunkRequest::OnApply(uint64_t index, } if (request_->size() < 128 * 4096) { + LOG(INFO) << "zyb OnApply WriteChunk info: " + << " group id: " << ToGroupIdString(request_->logicpoolid(), request_->copysetid()) + << " sn: " << request_->sn() + << " chunkid: " << request_->chunkid() + << " data_length: " << request_->size() + << " data_offset: " << request_->offset() + << " index: " << index; + ret = datastore_->WriteChunk(request_->chunkid(), request_->sn(), cntl_->request_attachment(), @@ -459,24 +473,41 @@ void WriteChunkRequest::OnApply(uint64_t index, } else if (common::is_aligned(request_->offset(), 4096) && common::is_aligned(request_->size(), 4096)) { + int wal_fd = 0; off_t wal_offset =0; size_t wal_length =0; int64_t wal_term = 0; + size_t data_length = request_->size(); off_t data_offset = request_->offset(); - int wal_fd = node_->GetLogStorage()->get_segment_fd(index); - int meta_ret = node_->GetLogStorage()->get_segment_meta_info(index, &wal_offset, &wal_length, &wal_term); + bool meta_ret = node_->GetLogStorage()->get_segment_meta_info(index, &wal_fd, &wal_offset, &wal_length, &wal_term); - if (wal_length != request_->size() + 4096 || + LOG(INFO) << "zyb OnApply info: " + << " group id: " << ToGroupIdString(request_->logicpoolid(), request_->copysetid()) + << " sn: " << request_->sn() + << " chunkid: " << request_->chunkid() + << " request data_length: " << request_->size() + << " request data_offset: " << request_->offset() + << " data_size: " << cntl_->request_attachment().size() + << " index: " << index + << " wal_fd: " << wal_fd + << " wal_offset: " << wal_offset + << " wal_length: " << wal_length + << " wal_term: " << wal_term + << " meta_ret: " << meta_ret; + + if (common::is_aligned(wal_offset, 4096) == false || + common::is_aligned(wal_length, 4096) == false || + common::is_aligned(data_offset, 4096) == false || + common::is_aligned(data_length, 4096) == false || wal_fd <= 0 || meta_ret == false) { - LOG(WARNING) << "zyb OnApply error: " - << " logic pool id: " << request_->logicpoolid() - << " copyset id: " << request_->copysetid() + LOG(WARNING) << "zyb OnApply pre clone error: " + << " group id: " << ToGroupIdString(request_->logicpoolid(), request_->copysetid()) << " sn: " << request_->sn() << " chunkid: " << request_->chunkid() - << " data_size: " << request_->size() + << " data_length: " << request_->size() << " data_offset: " << request_->offset() << " index: " << index << " wal_fd: " << wal_fd @@ -492,8 +523,8 @@ void WriteChunkRequest::OnApply(uint64_t index, ret = datastore_->WriteChunkWithClone(request_->chunkid(), request_->sn(), wal_fd, - wal_offset, - wal_length, + wal_offset + kWalHeaderSize, + wal_length - kWalHeaderSize, data_offset, &cost, cloneSourceLocation); @@ -501,8 +532,7 @@ void WriteChunkRequest::OnApply(uint64_t index, uint32_t aoffset = common::align_down(request_->offset(), 4096); uint32_t alength = common::align_down(request_->size(), 4096); LOG(ERROR) << "zyb split OnApply error: " - << " logic pool id: " << request_->logicpoolid() - << " copyset id: " << request_->copysetid() + << " group id: " << ToGroupIdString(request_->logicpoolid(), request_->copysetid()) << " sn: " << request_->sn() << " chunkid: " << request_->chunkid() << " data_size: " << request_->size() @@ -632,25 +662,41 @@ void WriteChunkRequest::OnApplyFromLogIndex(std::shared_ptr datasto } else if (common::is_aligned(request.offset(), 4096) && common::is_aligned(request.size(), 4096)) { //only using clone - off_t wal_offset; - size_t wal_length; - int64_t wal_term; + int wal_fd = 0; + off_t wal_offset = 0; + size_t wal_length = 0; + int64_t wal_term = 0; + size_t data_length = request.size(); off_t data_offset = request.offset(); - - int wal_fd = logStorage->get_segment_fd(index); - int meta_ret = logStorage->get_segment_meta_info(index, &wal_offset, &wal_length, &wal_term); + bool meta_ret = logStorage->get_segment_meta_info(index, &wal_fd, &wal_offset, &wal_length, &wal_term); - if (wal_length != request.size() + 4096 || + LOG(INFO) << "zyb OnApplyFromLogIndex info: " + << " group id: " << ToGroupIdString(request.logicpoolid(), request.copysetid()) + << " chunkid: " << request.chunkid() + << " sn: " << request.sn() + << " request data_offset: " << request.offset() + << " request data_length: " << request.size() + << " data_size: " << data.size() + << " index: " << index + << " wal_fd: " << wal_fd + << " wal_offset: " << wal_offset + << " wal_length: " << wal_length + << " wal_term: " << wal_term + << " meta_ret: " << meta_ret; + + if (common::is_aligned(wal_offset, 4096) == false || + common::is_aligned(wal_length, 4096) == false || + common::is_aligned(data_offset, 4096) == false || + common::is_aligned(data_length, 4096) == false || wal_fd <= 0 || meta_ret == false) { LOG(WARNING) << "zyb prep clone OnApplyFromLogIndex error: " - << " logic pool id: " << request.logicpoolid() - << " copyset id: " << request.copysetid() + << " group id: " << ToGroupIdString(request.logicpoolid(), request.copysetid()) << " chunkid: " << request.chunkid() << " sn: " << request.sn() - << " data_size: " << request.size() << " data_offset: " << request.offset() + << " data_length: " << request.size() << " index: " << index << " wal_fd: " << wal_fd << " wal_offset: " << wal_offset @@ -662,8 +708,8 @@ void WriteChunkRequest::OnApplyFromLogIndex(std::shared_ptr datasto ret = datastore->WriteChunkWithClone(request.chunkid(), request.sn(), wal_fd, - wal_offset, - wal_length, + wal_offset + kWalHeaderSize, + wal_length - kWalHeaderSize, data_offset, &cost, cloneSourceLocation); @@ -672,9 +718,8 @@ void WriteChunkRequest::OnApplyFromLogIndex(std::shared_ptr datasto uint32_t aoffset = common::align_down(request.offset(), 4096); uint32_t alength = common::align_down(request.size(), 4096); - LOG(WARNING) << "zyb spilt OnApplyFromLogIndex error: " - << " logic pool id: " << request.logicpoolid() - << " copyset id: " << request.copysetid() + LOG(WARNING) << "zyb split OnApplyFromLogIndex error: " + << " group id: " << ToGroupIdString(request.logicpoolid(), request.copysetid()) << " chunkid: " << request.chunkid() << " sn: " << request.sn() << " data_size: " << request.size() diff --git a/src/chunkserver/raftlog/curve_segment.cpp b/src/chunkserver/raftlog/curve_segment.cpp index d35ed7dd04..e345b468bf 100644 --- a/src/chunkserver/raftlog/curve_segment.cpp +++ b/src/chunkserver/raftlog/curve_segment.cpp @@ -50,7 +50,7 @@ namespace curve { namespace chunkserver { DEFINE_bool(raftSyncSegments, true, "call fsync when a segment is closed"); -DEFINE_bool(enableWalDirectWrite, false, "enable wal direct write or not"); +DEFINE_bool(enableWalDirectWrite, true, "enable wal direct write or not"); DEFINE_uint32(walAlignSize, 4096, "wal align size to write"); int CurveSegment::create() { @@ -195,7 +195,7 @@ int CurveSegment::load(braft::ConfigurationManager* configuration_manager) { entry_off += skip_len; } - const int64_t last_index = _last_index.load(butil::memory_order_relaxed); + const int64_t last_index = _last_index.load(butil::memory_order_seq_cst); if (ret == 0 && !_is_open) { if (actual_last_index < last_index) { LOG(ERROR) << "data lost in a full segment, path: " << _path @@ -305,7 +305,7 @@ std::string CurveSegment::file_name() { int CurveSegment::_load_entry(off_t offset, EntryHeader* head, butil::IOBuf* data, size_t size_hint) const { butil::IOPortal buf; - size_t to_read = std::max(size_hint, kEntryHeaderSize); + size_t to_read = std::max(size_hint, kWalHeaderSize); const ssize_t n = braft::file_pread(&buf, _fd, offset, to_read); if (n != (ssize_t)to_read) { return n < 0 ? -1 : 1; @@ -343,22 +343,22 @@ int CurveSegment::_load_entry(off_t offset, EntryHeader* head, *head = tmp; } if (data != NULL) { - if (buf.length() < kEntryHeaderSize + data_real_len) { - const size_t to_read = kEntryHeaderSize + data_real_len + if (buf.length() < kWalHeaderSize - data_hdr + data_real_len) { + const size_t to_read = kWalHeaderSize + data_real_len - buf.length(); const ssize_t n = braft::file_pread(&buf, _fd, offset + buf.length(), to_read); if (n != (ssize_t)to_read) { return n < 0 ? -1 : 1; } - } else if (buf.length() > kEntryHeaderSize + data_real_len) { - buf.pop_back(buf.length() - kEntryHeaderSize - data_real_len); + } else if (buf.length() > kWalHeaderSize - data_hdr + data_real_len) { + buf.pop_back(buf.length() - kWalHeaderSize - data_real_len); } - CHECK_EQ(buf.length(), kEntryHeaderSize + data_real_len); - buf.pop_front(kEntryHeaderSize); + CHECK_EQ(buf.length(), kWalHeaderSize - data_hdr + data_real_len); + buf.pop_front(kWalHeaderSize - data_hdr); if (!verify_checksum(tmp.checksum_type, buf, tmp.data_checksum)) { LOG(ERROR) << "Found corrupted data at offset=" - << offset + kEntryHeaderSize + << offset + kWalHeaderSize << " header=" << tmp << " path: " << _path; return -1; @@ -372,7 +372,7 @@ int CurveSegment::append(const braft::LogEntry* entry) { if (BAIDU_UNLIKELY(!entry || !_is_open)) { return EINVAL; } else if (entry->id.index != - _last_index.load(butil::memory_order_consume) + 1) { + _last_index.load(butil::memory_order_seq_cst) + 1) { CHECK(false) << "entry->index=" << entry->id.index << " _last_index=" << _last_index << " _first_index=" << _first_index; @@ -400,17 +400,29 @@ int CurveSegment::append(const braft::LogEntry* entry) { << ", path: " << _path; return -1; } + butil::IOBuf raw_data; + raw_data.append(data); + + uint32_t metaSize = 0; + raw_data.cutn(&metaSize, sizeof(uint32_t)); + metaSize = butil::NetToHost32(metaSize); + raw_data.pop_front(metaSize); + size_t data_hdr = data.size() - raw_data.size(); + CHECK_EQ(data_hdr, metaSize + sizeof(uint32_t)); + uint32_t data_check_sum = get_checksum(_checksum_type, data); uint32_t real_length = data.length(); - size_t to_write = kEntryHeaderSize + data.length(); - uint32_t zero_bytes_num = 0; + uint32_t raw_length = raw_data.length(); + CHECK_EQ(raw_length, real_length - data_hdr); + //size_t to_write = data.length(); + //uint32_t zero_bytes_num = 0; // 4KB alignment - if (to_write % FLAGS_walAlignSize != 0) { - zero_bytes_num = (to_write / FLAGS_walAlignSize + 1) * - FLAGS_walAlignSize - to_write; - } - data.resize(data.length() + zero_bytes_num); - to_write = kEntryHeaderSize + data.length(); + // if (real_length % FLAGS_walAlignSize != 0) { + // zero_bytes_num = (real_length / FLAGS_walAlignSize + 1) * + // FLAGS_walAlignSize - real_length; + // } + //data.resize(data.length() + zero_bytes_num); + size_t to_write = kWalHeaderSize + raw_length; CHECK_LE(data.length(), 1ul << 56ul); char* write_buf = nullptr; if (FLAGS_enableWalDirectWrite) { @@ -419,7 +431,7 @@ int CurveSegment::append(const braft::LogEntry* entry) { LOG_IF(FATAL, ret < 0 || write_buf == nullptr) << "posix_memalign WAL write buffer failed " << strerror(ret); } else { - write_buf = new char[kEntryHeaderSize]; + write_buf = new char[kWalHeaderSize]; } const uint32_t meta_field = (entry->type << 24) | (_checksum_type << 16); @@ -431,8 +443,9 @@ int CurveSegment::append(const braft::LogEntry* entry) { .pack32(data_check_sum); packer.pack32(get_checksum( _checksum_type, write_buf, kEntryHeaderSize - 4)); + data.copy_to(write_buf + kWalHeaderSize - data_hdr, data_hdr); if (FLAGS_enableWalDirectWrite) { - data.copy_to(write_buf + kEntryHeaderSize, real_length); + data.copy_to(write_buf + kWalHeaderSize, raw_length, data_hdr); int ret = ::pwrite(_direct_fd, write_buf, to_write, _meta.bytes); free(write_buf); if (ret != to_write) { @@ -441,14 +454,16 @@ int CurveSegment::append(const braft::LogEntry* entry) { } } else { butil::IOBuf header; - header.append(write_buf, kEntryHeaderSize); + header.append(write_buf, kWalHeaderSize); delete write_buf; butil::IOBuf* pieces[2] = { &header, &data }; size_t start = 0; ssize_t written = 0; + off_t offset = 0; + while (written < (ssize_t)to_write) { - const ssize_t n = butil::IOBuf::cut_multiple_into_file_descriptor( - _fd, pieces + start, ARRAY_SIZE(pieces) - start); + const ssize_t n = butil::IOBuf::pcut_multiple_into_file_descriptor( + _fd, offset, pieces + start, ARRAY_SIZE(pieces) - start); if (n < 0) { LOG(ERROR) << "Fail to write to fd=" << _fd << ", path: " << _path << berror(); @@ -457,14 +472,28 @@ int CurveSegment::append(const braft::LogEntry* entry) { written += n; for (; start < ARRAY_SIZE(pieces) && pieces[start]->empty(); ++start) {} + if (start == ARRAY_SIZE(pieces)) { + offset += data_hdr; + } } } { BAIDU_SCOPED_LOCK(_mutex); _offset_and_term.push_back(std::make_pair(_meta.bytes, entry->id.term)); - _last_index.fetch_add(1, butil::memory_order_relaxed); + _last_index.fetch_add(1, butil::memory_order_seq_cst); _meta.bytes += to_write; } + LOG(INFO) << "wal append, path: " << _path + << ", entry->id: " << entry->id + << ", entry->type: " << entry->type + << ", old _meta.bytes: " << (_meta.bytes - to_write) + << ", real_length: " << real_length + << ", raw_length: " << raw_length + << ", data_hdr: " << data_hdr + << ", to_write: " << to_write + << ", new _meta.bytes: " << _meta.bytes + << ", _last_index: " << _last_index.load() + << ", _first_index: " << _first_index; return _update_meta_page(); } @@ -551,16 +580,16 @@ braft::LogEntry* CurveSegment::get(const int64_t index) const { int CurveSegment::_get_meta(int64_t index, LogMeta* meta) const { BAIDU_SCOPED_LOCK(_mutex); - if (index > _last_index.load(butil::memory_order_relaxed) + if (index > _last_index.load(butil::memory_order_seq_cst) || index < _first_index) { // out of range BRAFT_VLOG << "_last_index=" - << _last_index.load(butil::memory_order_relaxed) + << _last_index.load(butil::memory_order_seq_cst) << " _first_index=" << _first_index; return -1; } else if (_last_index == _first_index - 1) { BRAFT_VLOG << "_last_index=" - << _last_index.load(butil::memory_order_relaxed) + << _last_index.load(butil::memory_order_seq_cst) << " _first_index=" << _first_index; // empty return -1; @@ -568,7 +597,7 @@ int CurveSegment::_get_meta(int64_t index, LogMeta* meta) const { int64_t meta_index = index - _first_index; int64_t entry_cursor = _offset_and_term[meta_index].first; int64_t next_cursor = (index < - _last_index.load(butil::memory_order_relaxed)) + _last_index.load(butil::memory_order_seq_cst)) ? _offset_and_term[meta_index + 1].first : _meta.bytes; DCHECK_LT(entry_cursor, next_cursor); meta->offset = entry_cursor; @@ -610,6 +639,7 @@ int CurveSegment::close(bool will_sync) { << " last_index: " << _last_index << " raft_sync_segments: " << FLAGS_raftSyncSegments << " will_sync: " << will_sync + << " enableWalDirectWrite: " << FLAGS_enableWalDirectWrite << " path: " << new_path; int ret = 0; if (_last_index > _first_index) { @@ -728,7 +758,7 @@ int CurveSegment::truncate(const int64_t last_index_kept) { lck.lock(); // update memory var _offset_and_term.resize(first_truncate_in_offset); - _last_index.store(last_index_kept, butil::memory_order_relaxed); + _last_index.store(last_index_kept, butil::memory_order_seq_cst); _meta.bytes = truncate_size; return 0; } diff --git a/src/chunkserver/raftlog/curve_segment.h b/src/chunkserver/raftlog/curve_segment.h index f7a3a1ddc7..ef0f4887bc 100644 --- a/src/chunkserver/raftlog/curve_segment.h +++ b/src/chunkserver/raftlog/curve_segment.h @@ -142,15 +142,12 @@ class BAIDU_CACHELINE_ALIGNMENT CurveSegment: } int64_t last_index() const override { - return _last_index.load(butil::memory_order_consume); + return _last_index.load(butil::memory_order_seq_cst); } std::string file_name() override; int currut_fd() override { - if (FLAGS_enableWalDirectWrite) - return _direct_fd; - else return _fd; } bool get_meta_info(const int64_t index, off_t* offset, size_t* length, int64_t* term) override; diff --git a/src/chunkserver/raftlog/curve_segment_log_storage.cpp b/src/chunkserver/raftlog/curve_segment_log_storage.cpp index 453a50ea35..e9fbc8946c 100644 --- a/src/chunkserver/raftlog/curve_segment_log_storage.cpp +++ b/src/chunkserver/raftlog/curve_segment_log_storage.cpp @@ -245,11 +245,11 @@ int CurveSegmentLogStorage::list_segments(bool is_empty) { << " last_log_index: " << last_log_index; return -1; } else if (last_log_index == -1 && - _first_log_index.load(butil::memory_order_acquire) + _first_log_index.load(butil::memory_order_seq_cst) < segment->first_index()) { LOG(WARNING) << "closed segment has hole, path: " << _path << " first_log_index: " - << _first_log_index.load(butil::memory_order_relaxed) + << _first_log_index.load(butil::memory_order_seq_cst) << " first_index: " << segment->first_index() << " last_index: " << segment->last_index(); return -1; @@ -257,7 +257,7 @@ int CurveSegmentLogStorage::list_segments(bool is_empty) { _first_log_index > segment->last_index()) { LOG(WARNING) << "closed segment need discard, path: " << _path << " first_log_index: " - << _first_log_index.load(butil::memory_order_relaxed) + << _first_log_index.load(butil::memory_order_seq_cst) << " first_index: " << segment->first_index() << " last_index: " << segment->last_index(); segment->unlink(); @@ -270,17 +270,17 @@ int CurveSegmentLogStorage::list_segments(bool is_empty) { } if (_open_segment) { if (last_log_index == -1 && - _first_log_index.load(butil::memory_order_relaxed) < + _first_log_index.load(butil::memory_order_seq_cst) < _open_segment->first_index()) { LOG(WARNING) << "open segment has hole, path: " << _path << " first_log_index: " - << _first_log_index.load(butil::memory_order_relaxed) + << _first_log_index.load(butil::memory_order_seq_cst) << " first_index: " << _open_segment->first_index(); } else if (last_log_index != -1 && _open_segment->first_index() != last_log_index + 1) { LOG(WARNING) << "open segment has hole, path: " << _path << " first_log_index: " - << _first_log_index.load(butil::memory_order_relaxed) + << _first_log_index.load(butil::memory_order_seq_cst) << " first_index: " << _open_segment->first_index(); } CHECK_LE(last_log_index, _open_segment->last_index()); @@ -305,7 +305,7 @@ int CurveSegmentLogStorage::load_segments( return ret; } _last_log_index.store(segment->last_index(), - butil::memory_order_release); + butil::memory_order_seq_cst); } // open segment @@ -325,7 +325,7 @@ int CurveSegmentLogStorage::load_segments( _open_segment = NULL; } else { _last_log_index.store(_open_segment->last_index(), - butil::memory_order_release); + butil::memory_order_seq_cst); } do { if (dynamic_cast(_open_segment.get()) != nullptr) { @@ -352,7 +352,7 @@ int CurveSegmentLogStorage::load_segments( } int64_t CurveSegmentLogStorage::last_log_index() { - return _last_log_index.load(butil::memory_order_acquire); + return _last_log_index.load(butil::memory_order_seq_cst); } braft::LogEntry* CurveSegmentLogStorage::get_entry(const int64_t index) { @@ -369,6 +369,8 @@ int CurveSegmentLogStorage::get_segment(int64_t index, int64_t first_index = first_log_index(); int64_t last_index = last_log_index(); if (first_index == last_index + 1) { + LOG(WARNING) << "Log is empty, first_log_index: " << first_index + << " last_log_index: " << last_index; return -1; } if (index < first_index || index > last_index + 1) { @@ -378,6 +380,8 @@ int CurveSegmentLogStorage::get_segment(int64_t index, << " last_log_index: " << last_index; return -1; } else if (index == last_index + 1) { + LOG(WARNING) << "Attempted to access entry " << index + << " which is the next entry of last_log_index: " << last_index; return -1; } @@ -405,7 +409,7 @@ int64_t CurveSegmentLogStorage::get_term(const int64_t index) { int CurveSegmentLogStorage::append_entry(const braft::LogEntry* entry) { scoped_refptr segment = - open_segment(entry->data.size() + kEntryHeaderSize); + open_segment(entry->data.size() + kWalHeaderSize); if (NULL == segment) { return EIO; } @@ -416,7 +420,11 @@ int CurveSegmentLogStorage::append_entry(const braft::LogEntry* entry) { if (EEXIST == ret && entry->id.term != get_term(entry->id.index)) { return EINVAL; } - _last_log_index.fetch_add(1, butil::memory_order_release); + _last_log_index.fetch_add(1, butil::memory_order_seq_cst); + LOG(INFO) << "zyb: log append_entry " << _path << " index: " << entry->id.index + << " term: " << entry->id.term + << " _first_log_index: " << first_log_index() + << " _last_log_index: " << last_log_index(); return segment->sync(_enable_sync); } @@ -426,7 +434,7 @@ int CurveSegmentLogStorage::append_entries( if (entries.empty()) { return 0; } - if (_last_log_index.load(butil::memory_order_relaxed) + 1 + if (_last_log_index.load(butil::memory_order_seq_cst) + 1 != entries.front()->id.index) { LOG(FATAL) << "There's gap between appending entries and" << " _last_log_index path: " << _path; @@ -437,7 +445,7 @@ int CurveSegmentLogStorage::append_entries( braft::LogEntry* entry = entries[i]; scoped_refptr segment = - open_segment(entry->data.size() + kEntryHeaderSize); + open_segment(entry->data.size() + kWalHeaderSize); if (NULL == segment) { return i; } @@ -445,19 +453,24 @@ int CurveSegmentLogStorage::append_entries( if (0 != ret) { return i; } - _last_log_index.fetch_add(1, butil::memory_order_release); + _last_log_index.fetch_add(1, butil::memory_order_seq_cst); last_segment = segment; } last_segment->sync(_enable_sync); + LOG(INFO) << "zyb 2: log append_entry " << _path + << " index begin: " << entries.front()->id.index + << " index end: " << entries.back()->id.index + << " _first_log_index: " << first_log_index() + << " _last_log_index: " << last_log_index(); return entries.size(); } int CurveSegmentLogStorage::truncate_prefix(const int64_t first_index_kept) { // segment files - if (_first_log_index.load(butil::memory_order_acquire) >= + if (_first_log_index.load(butil::memory_order_seq_cst) >= first_index_kept) { BRAFT_VLOG << "Nothing is going to happen since _first_log_index=" - << _first_log_index.load(butil::memory_order_relaxed) + << _first_log_index.load(butil::memory_order_seq_cst) << " >= first_index_kept=" << first_index_kept; return 0; @@ -504,7 +517,7 @@ void CurveSegmentLogStorage::pop_segments( popped->clear(); popped->reserve(32); BAIDU_SCOPED_LOCK(_mutex); - _first_log_index.store(first_index_kept, butil::memory_order_release); + _first_log_index.store(first_index_kept, butil::memory_order_seq_cst); for (SegmentMap::iterator it = _segments.begin(); it != _segments.end();) { scoped_refptr& segment = it->second; if (segment->last_index() < first_index_kept) { @@ -527,6 +540,9 @@ void CurveSegmentLogStorage::pop_segments( // _log_storage is empty _last_log_index.store(first_index_kept - 1); } + LOG(INFO) << "zyb: log pop_segments first_index_kept: " << first_index_kept + << " _first_log_index: " << first_log_index() + << " _last_log_index: " << last_log_index(); } void CurveSegmentLogStorage::pop_segments_from_back( @@ -537,7 +553,7 @@ void CurveSegmentLogStorage::pop_segments_from_back( popped->reserve(32); *last_segment = NULL; BAIDU_SCOPED_LOCK(_mutex); - _last_log_index.store(last_index_kept, butil::memory_order_release); + _last_log_index.store(last_index_kept, butil::memory_order_seq_cst); if (_open_segment) { if (_open_segment->first_index() <= last_index_kept) { *last_segment = _open_segment; @@ -565,8 +581,11 @@ void CurveSegmentLogStorage::pop_segments_from_back( // all the logs have been cleared, the we move _first_log_index to the // next index _first_log_index.store(last_index_kept + 1, - butil::memory_order_release); + butil::memory_order_seq_cst); } + LOG(INFO) << "zyb: log pop_segments_from_back last_index_kept: " << last_index_kept + << " _first_log_index: " << first_log_index() + << " _last_log_index: " << last_log_index(); } int CurveSegmentLogStorage::truncate_suffix(const int64_t last_index_kept) { @@ -578,8 +597,8 @@ int CurveSegmentLogStorage::truncate_suffix(const int64_t last_index_kept) { int ret = -1; if (last_segment) { - if (_first_log_index.load(butil::memory_order_relaxed) <= - _last_log_index.load(butil::memory_order_relaxed)) { + if (_first_log_index.load(butil::memory_order_seq_cst) <= + _last_log_index.load(butil::memory_order_seq_cst)) { truncate_last_segment = true; } else { // trucate_prefix() and truncate_suffix() to discard entire logs @@ -633,8 +652,8 @@ int CurveSegmentLogStorage::reset(const int64_t next_log_index) { popped.push_back(_open_segment); _open_segment = NULL; } - _first_log_index.store(next_log_index, butil::memory_order_relaxed); - _last_log_index.store(next_log_index - 1, butil::memory_order_relaxed); + _first_log_index.store(next_log_index, butil::memory_order_seq_cst); + _last_log_index.store(next_log_index - 1, butil::memory_order_seq_cst); lck.unlock(); // NOTE: see the comments in truncate_prefix if (save_meta(next_log_index) != 0) { diff --git a/src/chunkserver/raftlog/curve_segment_log_storage.h b/src/chunkserver/raftlog/curve_segment_log_storage.h index 87f7abced6..0cd142dad2 100644 --- a/src/chunkserver/raftlog/curve_segment_log_storage.h +++ b/src/chunkserver/raftlog/curve_segment_log_storage.h @@ -124,7 +124,7 @@ class CurveSegmentLogStorage : public braft::LogStorage { // first log index in log virtual int64_t first_log_index() { - return _first_log_index.load(butil::memory_order_acquire); + return _first_log_index.load(butil::memory_order_seq_cst); } // last log index in log @@ -171,12 +171,17 @@ class CurveSegmentLogStorage : public braft::LogStorage { return ptr->currut_fd(); } - bool get_segment_meta_info(const int64_t index, off_t* offset, size_t* length, int64_t* term) { + bool get_segment_meta_info(const int64_t index, int* fd, off_t* offset, size_t* length, int64_t* term) { scoped_refptr ptr; if (get_segment(index, &ptr) != 0) { return false; } - return ptr->get_meta_info(index, offset, length, term); + if (ptr->get_meta_info(index, offset, length, term)) { + *fd = ptr->currut_fd(); + return true; + } else { + return false; + } } private: diff --git a/src/chunkserver/raftlog/define.h b/src/chunkserver/raftlog/define.h index df65fe4814..6f810f3681 100644 --- a/src/chunkserver/raftlog/define.h +++ b/src/chunkserver/raftlog/define.h @@ -58,6 +58,8 @@ namespace chunkserver { const size_t kEntryHeaderSize = 28; +const size_t kWalHeaderSize = 4096; + enum CheckSumType { CHECKSUM_MURMURHASH32 = 0, CHECKSUM_CRC32 = 1, diff --git a/src/fs/wrap_posix.cpp b/src/fs/wrap_posix.cpp index 00b0c78669..0293c44c9a 100644 --- a/src/fs/wrap_posix.cpp +++ b/src/fs/wrap_posix.cpp @@ -101,6 +101,12 @@ int PosixWrapper::ficlonerange(int fd, off_t src_offset, size_t src_length, off_t dest_offset) { + LOG(INFO) << "zyb xfs Posix FICLONERANGE: " + << " fd " << fd + << " src_fd " << src_fd + << " src_offset " << src_offset + << " src_length " << src_length + << " dest_offset " << dest_offset; struct file_clone_range fcr; fcr.src_fd = src_fd; fcr.src_offset = src_offset; From 01abd4c4d23899614b8396f7494cb072d254e1c3 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 2 Nov 2023 06:25:13 +0000 Subject: [PATCH 3/4] xfs 1.2.6 v3 --- src/chunkserver/op_request.cpp | 185 ++++++++---------- src/chunkserver/raftlog/curve_segment.cpp | 47 +++-- .../raftlog/curve_segment_log_storage.cpp | 10 +- src/fs/wrap_posix.cpp | 12 +- 4 files changed, 117 insertions(+), 137 deletions(-) diff --git a/src/chunkserver/op_request.cpp b/src/chunkserver/op_request.cpp index b0ca66111e..01ddb19dca 100755 --- a/src/chunkserver/op_request.cpp +++ b/src/chunkserver/op_request.cpp @@ -132,10 +132,10 @@ int ChunkOpRequest::Encode(const ChunkRequest *request, if (data != nullptr) { log->append(*data); } - LOG(INFO) << "zyb attachment data_length: " << data->size() - << " encode log_length: " << log->size() - << " request data_length: " << request->size() - << " request data_offset: " << request->offset(); + // LOG(INFO) << "zyb attachment data_length: " << data->size() + // << " encode log_length: " << log->size() + // << " request data_length: " << request->size() + // << " request data_offset: " << request->offset(); return 0; } @@ -445,7 +445,7 @@ void WriteChunkRequest::OnApply(uint64_t index, ::google::protobuf::Closure *done) { brpc::ClosureGuard doneGuard(done); uint32_t cost; - CSErrorCode ret = CSErrorCode::InternalError; + CSErrorCode ret = CSErrorCode::InvalidArgError; std::string cloneSourceLocation; if (existCloneInfo(request_)) { @@ -455,13 +455,13 @@ void WriteChunkRequest::OnApply(uint64_t index, } if (request_->size() < 128 * 4096) { - LOG(INFO) << "zyb OnApply WriteChunk info: " - << " group id: " << ToGroupIdString(request_->logicpoolid(), request_->copysetid()) - << " sn: " << request_->sn() - << " chunkid: " << request_->chunkid() - << " data_length: " << request_->size() - << " data_offset: " << request_->offset() - << " index: " << index; + // LOG(INFO) << "zyb OnApply WriteChunk info: " + // << " group id: " << ToGroupIdString(request_->logicpoolid(), request_->copysetid()) + // << " sn: " << request_->sn() + // << " chunkid: " << request_->chunkid() + // << " data_length: " << request_->size() + // << " data_offset: " << request_->offset() + // << " index: " << index; ret = datastore_->WriteChunk(request_->chunkid(), request_->sn(), @@ -473,6 +473,8 @@ void WriteChunkRequest::OnApply(uint64_t index, } else if (common::is_aligned(request_->offset(), 4096) && common::is_aligned(request_->size(), 4096)) { + int retry = 5; + bool meta_ret = false; int wal_fd = 0; off_t wal_offset =0; size_t wal_length =0; @@ -480,54 +482,38 @@ void WriteChunkRequest::OnApply(uint64_t index, size_t data_length = request_->size(); off_t data_offset = request_->offset(); - bool meta_ret = node_->GetLogStorage()->get_segment_meta_info(index, &wal_fd, &wal_offset, &wal_length, &wal_term); - - LOG(INFO) << "zyb OnApply info: " - << " group id: " << ToGroupIdString(request_->logicpoolid(), request_->copysetid()) - << " sn: " << request_->sn() - << " chunkid: " << request_->chunkid() - << " request data_length: " << request_->size() - << " request data_offset: " << request_->offset() - << " data_size: " << cntl_->request_attachment().size() - << " index: " << index - << " wal_fd: " << wal_fd - << " wal_offset: " << wal_offset - << " wal_length: " << wal_length - << " wal_term: " << wal_term - << " meta_ret: " << meta_ret; - - if (common::is_aligned(wal_offset, 4096) == false || - common::is_aligned(wal_length, 4096) == false || - common::is_aligned(data_offset, 4096) == false || - common::is_aligned(data_length, 4096) == false || - wal_fd <= 0 || - meta_ret == false) { - - LOG(WARNING) << "zyb OnApply pre clone error: " - << " group id: " << ToGroupIdString(request_->logicpoolid(), request_->copysetid()) - << " sn: " << request_->sn() - << " chunkid: " << request_->chunkid() - << " data_length: " << request_->size() - << " data_offset: " << request_->offset() - << " index: " << index - << " wal_fd: " << wal_fd - << " wal_offset: " << wal_offset - << " wal_length: " << wal_length - << " wal_term: " << wal_term - << " meta_ret: " << meta_ret; - response_->set_status( - CHUNK_OP_STATUS::CHUNK_OP_STATUS_FAILURE_UNKNOWN); - return; - } + do { + meta_ret = node_->GetLogStorage()->get_segment_meta_info(index, &wal_fd, &wal_offset, &wal_length, &wal_term); + if (common::is_aligned(wal_offset, 4096) == false || + common::is_aligned(wal_length, 4096) == false || + wal_fd <= 0 || + meta_ret == false) { + LOG(WARNING) << "zyb OnApply pre clone error: " + << " group id: " << ToGroupIdString(request_->logicpoolid(), request_->copysetid()) + << " sn: " << request_->sn() + << " chunkid: " << request_->chunkid() + << " data_length: " << request_->size() + << " data_offset: " << request_->offset() + << " index: " << index + << " wal_fd: " << wal_fd + << " wal_offset: " << wal_offset + << " wal_length: " << wal_length + << " wal_term: " << wal_term + << " meta_ret: " << meta_ret + << " retry: " << retry; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } else { + ret = datastore_->WriteChunkWithClone(request_->chunkid(), + request_->sn(), + wal_fd, + wal_offset + kWalHeaderSize, + wal_length - kWalHeaderSize, + data_offset, + &cost, + cloneSourceLocation); + } + } while (--retry >= 0); - ret = datastore_->WriteChunkWithClone(request_->chunkid(), - request_->sn(), - wal_fd, - wal_offset + kWalHeaderSize, - wal_length - kWalHeaderSize, - data_offset, - &cost, - cloneSourceLocation); } else { uint32_t aoffset = common::align_down(request_->offset(), 4096); uint32_t alength = common::align_down(request_->size(), 4096); @@ -640,7 +626,7 @@ void WriteChunkRequest::OnApplyFromLogIndex(std::shared_ptr datasto const butil::IOBuf &data) { // NOTE: 处理过程中优先使用参数传入的datastore/request uint32_t cost; - CSErrorCode ret = CSErrorCode::InternalError; + CSErrorCode ret = CSErrorCode::InvalidArgError; std::string cloneSourceLocation; if (existCloneInfo(&request)) { auto func = ::curve::common::LocationOperator::GenerateCurveLocation; @@ -662,63 +648,52 @@ void WriteChunkRequest::OnApplyFromLogIndex(std::shared_ptr datasto } else if (common::is_aligned(request.offset(), 4096) && common::is_aligned(request.size(), 4096)) { //only using clone + int retry = 5; int wal_fd = 0; + bool meta_ret = false; off_t wal_offset = 0; size_t wal_length = 0; int64_t wal_term = 0; size_t data_length = request.size(); off_t data_offset = request.offset(); - bool meta_ret = logStorage->get_segment_meta_info(index, &wal_fd, &wal_offset, &wal_length, &wal_term); - - LOG(INFO) << "zyb OnApplyFromLogIndex info: " - << " group id: " << ToGroupIdString(request.logicpoolid(), request.copysetid()) - << " chunkid: " << request.chunkid() - << " sn: " << request.sn() - << " request data_offset: " << request.offset() - << " request data_length: " << request.size() - << " data_size: " << data.size() - << " index: " << index - << " wal_fd: " << wal_fd - << " wal_offset: " << wal_offset - << " wal_length: " << wal_length - << " wal_term: " << wal_term - << " meta_ret: " << meta_ret; - - if (common::is_aligned(wal_offset, 4096) == false || - common::is_aligned(wal_length, 4096) == false || - common::is_aligned(data_offset, 4096) == false || - common::is_aligned(data_length, 4096) == false || - wal_fd <= 0 || - meta_ret == false) { - LOG(WARNING) << "zyb prep clone OnApplyFromLogIndex error: " - << " group id: " << ToGroupIdString(request.logicpoolid(), request.copysetid()) - << " chunkid: " << request.chunkid() - << " sn: " << request.sn() - << " data_offset: " << request.offset() - << " data_length: " << request.size() - << " index: " << index - << " wal_fd: " << wal_fd - << " wal_offset: " << wal_offset - << " wal_length: " << wal_length - << " wal_term: " << wal_term - << " meta_ret: " << meta_ret; - return; - } - ret = datastore->WriteChunkWithClone(request.chunkid(), - request.sn(), - wal_fd, - wal_offset + kWalHeaderSize, - wal_length - kWalHeaderSize, - data_offset, - &cost, - cloneSourceLocation); + do { + meta_ret = logStorage->get_segment_meta_info(index, &wal_fd, &wal_offset, &wal_length, &wal_term); + if (common::is_aligned(wal_offset, 4096) == false || + common::is_aligned(wal_length, 4096) == false || + wal_fd <= 0 || + meta_ret == false) { + LOG(WARNING) << "zyb prep clone OnApplyFromLogIndex error: " + << " group id: " << ToGroupIdString(request.logicpoolid(), request.copysetid()) + << " chunkid: " << request.chunkid() + << " sn: " << request.sn() + << " data_offset: " << request.offset() + << " data_length: " << request.size() + << " index: " << index + << " wal_fd: " << wal_fd + << " wal_offset: " << wal_offset + << " wal_length: " << wal_length + << " wal_term: " << wal_term + << " meta_ret: " << meta_ret + << " retry: " << retry; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } else { + ret = datastore->WriteChunkWithClone(request.chunkid(), + request.sn(), + wal_fd, + wal_offset + kWalHeaderSize, + wal_length - kWalHeaderSize, + data_offset, + &cost, + cloneSourceLocation); + } + } while (--retry >= 0); } else { //split the request, clone for 4096 align part, writechunk for the rest uint32_t aoffset = common::align_down(request.offset(), 4096); uint32_t alength = common::align_down(request.size(), 4096); - LOG(WARNING) << "zyb split OnApplyFromLogIndex error: " + LOG(ERROR) << "zyb split OnApplyFromLogIndex error: " << " group id: " << ToGroupIdString(request.logicpoolid(), request.copysetid()) << " chunkid: " << request.chunkid() << " sn: " << request.sn() diff --git a/src/chunkserver/raftlog/curve_segment.cpp b/src/chunkserver/raftlog/curve_segment.cpp index e345b468bf..a02950c10c 100644 --- a/src/chunkserver/raftlog/curve_segment.cpp +++ b/src/chunkserver/raftlog/curve_segment.cpp @@ -343,7 +343,7 @@ int CurveSegment::_load_entry(off_t offset, EntryHeader* head, *head = tmp; } if (data != NULL) { - if (buf.length() < kWalHeaderSize - data_hdr + data_real_len) { + if (buf.length() < kWalHeaderSize + data_real_len) { const size_t to_read = kWalHeaderSize + data_real_len - buf.length(); const ssize_t n = braft::file_pread(&buf, _fd, @@ -351,11 +351,11 @@ int CurveSegment::_load_entry(off_t offset, EntryHeader* head, if (n != (ssize_t)to_read) { return n < 0 ? -1 : 1; } - } else if (buf.length() > kWalHeaderSize - data_hdr + data_real_len) { + } else if (buf.length() > kWalHeaderSize + data_real_len) { buf.pop_back(buf.length() - kWalHeaderSize - data_real_len); } - CHECK_EQ(buf.length(), kWalHeaderSize - data_hdr + data_real_len); - buf.pop_front(kWalHeaderSize - data_hdr); + CHECK_EQ(buf.length(), kWalHeaderSize + data_real_len); + buf.pop_front(kWalHeaderSize - data_len + data_real_len); if (!verify_checksum(tmp.checksum_type, buf, tmp.data_checksum)) { LOG(ERROR) << "Found corrupted data at offset=" << offset + kWalHeaderSize @@ -408,12 +408,17 @@ int CurveSegment::append(const braft::LogEntry* entry) { metaSize = butil::NetToHost32(metaSize); raw_data.pop_front(metaSize); size_t data_hdr = data.size() - raw_data.size(); - CHECK_EQ(data_hdr, metaSize + sizeof(uint32_t)); uint32_t data_check_sum = get_checksum(_checksum_type, data); - uint32_t real_length = data.length(); - uint32_t raw_length = raw_data.length(); - CHECK_EQ(raw_length, real_length - data_hdr); + uint32_t real_length = data.length() - data_hdr; + // uint32_t raw_length = raw_data.length(); + // LOG(INFO) << "zyb wal append 1: path " << _path + // << ", data_length: " << data.length() + // << ", real_length: " << real_length + // << ", data_hdr: " << data_hdr + // << ", raw_length: " << raw_length + // << ", metaSize: " << metaSize; + //size_t to_write = data.length(); //uint32_t zero_bytes_num = 0; // 4KB alignment @@ -422,7 +427,7 @@ int CurveSegment::append(const braft::LogEntry* entry) { // FLAGS_walAlignSize - real_length; // } //data.resize(data.length() + zero_bytes_num); - size_t to_write = kWalHeaderSize + raw_length; + size_t to_write = kWalHeaderSize + real_length; CHECK_LE(data.length(), 1ul << 56ul); char* write_buf = nullptr; if (FLAGS_enableWalDirectWrite) { @@ -445,7 +450,7 @@ int CurveSegment::append(const braft::LogEntry* entry) { _checksum_type, write_buf, kEntryHeaderSize - 4)); data.copy_to(write_buf + kWalHeaderSize - data_hdr, data_hdr); if (FLAGS_enableWalDirectWrite) { - data.copy_to(write_buf + kWalHeaderSize, raw_length, data_hdr); + data.copy_to(write_buf + kWalHeaderSize, real_length, data_hdr); int ret = ::pwrite(_direct_fd, write_buf, to_write, _meta.bytes); free(write_buf); if (ret != to_write) { @@ -483,17 +488,17 @@ int CurveSegment::append(const braft::LogEntry* entry) { _last_index.fetch_add(1, butil::memory_order_seq_cst); _meta.bytes += to_write; } - LOG(INFO) << "wal append, path: " << _path - << ", entry->id: " << entry->id - << ", entry->type: " << entry->type - << ", old _meta.bytes: " << (_meta.bytes - to_write) - << ", real_length: " << real_length - << ", raw_length: " << raw_length - << ", data_hdr: " << data_hdr - << ", to_write: " << to_write - << ", new _meta.bytes: " << _meta.bytes - << ", _last_index: " << _last_index.load() - << ", _first_index: " << _first_index; + // LOG(INFO) << "zyb wal append 2, path: " << _path + // << ", entry->id: " << entry->id + // << ", entry->type: " << entry->type + // << ", old _meta.bytes: " << (_meta.bytes - to_write) + // << ", data_length: " << data.length() + // << ", real_length: " << real_length + // << ", data_hdr: " << data_hdr + // << ", to_write: " << to_write + // << ", new _meta.bytes: " << _meta.bytes + // << ", _last_index: " << _last_index.load() + // << ", _first_index: " << _first_index; return _update_meta_page(); } diff --git a/src/chunkserver/raftlog/curve_segment_log_storage.cpp b/src/chunkserver/raftlog/curve_segment_log_storage.cpp index e9fbc8946c..14d39fe7d5 100644 --- a/src/chunkserver/raftlog/curve_segment_log_storage.cpp +++ b/src/chunkserver/raftlog/curve_segment_log_storage.cpp @@ -457,11 +457,11 @@ int CurveSegmentLogStorage::append_entries( last_segment = segment; } last_segment->sync(_enable_sync); - LOG(INFO) << "zyb 2: log append_entry " << _path - << " index begin: " << entries.front()->id.index - << " index end: " << entries.back()->id.index - << " _first_log_index: " << first_log_index() - << " _last_log_index: " << last_log_index(); + // LOG(INFO) << "zyb 2: log append_entry " << _path + // << " index begin: " << entries.front()->id.index + // << " index end: " << entries.back()->id.index + // << " _first_log_index: " << first_log_index() + // << " _last_log_index: " << last_log_index(); return entries.size(); } diff --git a/src/fs/wrap_posix.cpp b/src/fs/wrap_posix.cpp index 0293c44c9a..edc3d97d24 100644 --- a/src/fs/wrap_posix.cpp +++ b/src/fs/wrap_posix.cpp @@ -101,12 +101,12 @@ int PosixWrapper::ficlonerange(int fd, off_t src_offset, size_t src_length, off_t dest_offset) { - LOG(INFO) << "zyb xfs Posix FICLONERANGE: " - << " fd " << fd - << " src_fd " << src_fd - << " src_offset " << src_offset - << " src_length " << src_length - << " dest_offset " << dest_offset; + // LOG(INFO) << "zyb xfs Posix FICLONERANGE: " + // << " fd " << fd + // << " src_fd " << src_fd + // << " src_offset " << src_offset + // << " src_length " << src_length + // << " dest_offset " << dest_offset; struct file_clone_range fcr; fcr.src_fd = src_fd; fcr.src_offset = src_offset; From 8a3e1f8c853b16a5e1bbe5362589b97b1b8ab314 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 10 Nov 2023 06:56:35 +0000 Subject: [PATCH 4/4] xfs for unshare + limit --- src/chunkserver/chunkserver.cpp | 13 ++- src/chunkserver/config_info.h | 10 +- src/chunkserver/copyset_node.cpp | 106 ++++++++++++++---- src/chunkserver/copyset_node.h | 42 ++++--- src/chunkserver/copyset_node_manager.cpp | 10 +- .../datastore/chunkserver_chunkfile.cpp | 43 ++++++- .../datastore/chunkserver_chunkfile.h | 32 ++++++ .../datastore/chunkserver_datastore.cpp | 15 +++ .../datastore/chunkserver_datastore.h | 27 ++++- src/chunkserver/op_request.cpp | 8 +- 10 files changed, 259 insertions(+), 47 deletions(-) diff --git a/src/chunkserver/chunkserver.cpp b/src/chunkserver/chunkserver.cpp index 8df5c2cae9..86058acba1 100644 --- a/src/chunkserver/chunkserver.cpp +++ b/src/chunkserver/chunkserver.cpp @@ -37,6 +37,7 @@ #include "src/chunkserver/braft_cli_service.h" #include "src/chunkserver/braft_cli_service2.h" #include "src/chunkserver/chunkserver_helper.h" +#include "src/common/concurrent/task_thread_pool.h" #include "src/chunkserver/uri_paser.h" #include "src/chunkserver/raftsnapshot/curve_snapshot_attachment.h" #include "src/chunkserver/raftsnapshot/curve_file_service.h" @@ -548,15 +549,21 @@ void ChunkServer::InitCopysetNodeOptions( ©setNodeOptions->finishLoadMargin)); LOG_IF(FATAL, !conf->GetUInt32Value("copyset.check_loadmargin_interval_ms", ©setNodeOptions->checkLoadMarginIntervalMs)); + LOG_IF(FATAL, !conf->GetUInt32Value("copyset.sync_concurrency", + ©setNodeOptions->syncConcurrency)); LOG_IF(FATAL, !conf->GetBoolValue( "copyset.enable_odsync_when_open_chunkfile", ©setNodeOptions->enableOdsyncWhenOpenChunkFile)); - if (!copysetNodeOptions->enableOdsyncWhenOpenChunkFile) { - LOG_IF(FATAL, !conf->GetUInt32Value("copyset.synctimer_interval_ms", - ©setNodeOptions->syncTimerIntervalMs)); + if (copysetNodeOptions->enableOdsyncWhenOpenChunkFile) { + LOG_IF(FATAL, !conf->GetUInt64Value("copyset.sync_chunk_limits", + ©setNodeOptions->syncChunkLimit)); + LOG_IF(FATAL, !conf->GetUInt64Value("copyset.sync_threshold", + ©setNodeOptions->syncThreshold)); LOG_IF(FATAL, !conf->GetUInt32Value("copyset.check_syncing_interval_ms", ©setNodeOptions->checkSyncingIntervalMs)); + LOG_IF(FATAL, !conf->GetUInt32Value("copyset.sync_trigger_seconds", + ©setNodeOptions->syncTriggerSeconds)); } } diff --git a/src/chunkserver/config_info.h b/src/chunkserver/config_info.h index b5938d9e73..b892fdcb4f 100644 --- a/src/chunkserver/config_info.h +++ b/src/chunkserver/config_info.h @@ -111,6 +111,10 @@ struct CopysetNodeOptions { // 限制chunkserver启动时copyset并发恢复加载的数量,为0表示不限制 uint32_t loadConcurrency = 0; + // chunkserver sync_thread_pool number of threads. + uint32_t syncConcurrency = 20; + // copyset trigger sync timeout + uint32_t syncTriggerSeconds = 25; // 检查copyset是否加载完成出现异常时的最大重试次数 // 可能的异常:1.当前大多数副本还没起来;2.网络问题等导致无法获取leader // 3.其他的原因导致无法获取到leader的committed index @@ -123,8 +127,10 @@ struct CopysetNodeOptions { // enable O_DSYNC when open chunkfile bool enableOdsyncWhenOpenChunkFile = false; - // sync timer timeout interval - uint32_t syncTimerIntervalMs = 30000u; + // syncChunkLimit default limit + uint64_t syncChunkLimit = 2 * 1024 * 1024; + // syncHighChunkLimit default limit = 64k + uint64_t syncThreshold = 64 * 1024; // check syncing interval uint32_t checkSyncingIntervalMs = 500u; diff --git a/src/chunkserver/copyset_node.cpp b/src/chunkserver/copyset_node.cpp index 379759ed72..f75aae2a63 100755 --- a/src/chunkserver/copyset_node.cpp +++ b/src/chunkserver/copyset_node.cpp @@ -35,10 +35,13 @@ #include #include #include +#include +#include #include "src/chunkserver/raftsnapshot/curve_filesystem_adaptor.h" #include "src/chunkserver/chunk_closure.h" #include "src/chunkserver/op_request.h" +#include "src/common/concurrent/task_thread_pool.h" #include "src/fs/fs_common.h" #include "src/chunkserver/copyset_node_manager.h" #include "src/chunkserver/datastore/define.h" @@ -54,6 +57,10 @@ using curve::fs::FileSystemInfo; const char *kCurveConfEpochFilename = "conf.epoch"; +uint32_t CopysetNode::syncTriggerSeconds_ = 25; +std::shared_ptr> + CopysetNode::copysetSyncPool_ = nullptr; + CopysetNode::CopysetNode(const LogicPoolID &logicPoolId, const CopysetID ©setId, const Configuration &initConf) : @@ -71,7 +78,6 @@ CopysetNode::CopysetNode(const LogicPoolID &logicPoolId, lastSnapshotIndex_(0), configChange_(std::make_shared()), enableOdsyncWhenOpenChunkFile_(false), - syncTimerIntervalMs_(30000), isSyncing_(false), checkSyncingIntervalMs_(500) { } @@ -132,6 +138,16 @@ int CopysetNode::Init(const CopysetNodeOptions &options) { << "Copyset: " << GroupIdString(); return -1; } + enableOdsyncWhenOpenChunkFile_ = options.enableOdsyncWhenOpenChunkFile; + if (enableOdsyncWhenOpenChunkFile_) { + syncThread_.Init(this); + dataStore_->SetCacheCondPtr(syncThread_.cond_); + dataStore_->SetCacheLimits(options.syncChunkLimit, + options.syncThreshold); + LOG(INFO) << "init sync thread success limit = " + << options.syncChunkLimit << + " syncthreshold = " << options.syncThreshold; + } recyclerUri_ = options.recyclerUri; @@ -209,9 +225,7 @@ int CopysetNode::Init(const CopysetNodeOptions &options) { // without using global variables. StoreOptForCurveSegmentLogStorage(lsOptions); - syncTimerIntervalMs_ = options.syncTimerIntervalMs; checkSyncingIntervalMs_ = options.checkSyncingIntervalMs; - enableOdsyncWhenOpenChunkFile_ = options.enableOdsyncWhenOpenChunkFile; return 0; } @@ -224,12 +238,8 @@ int CopysetNode::Run() { return -1; } - if (!enableOdsyncWhenOpenChunkFile_) { - CHECK_EQ(0, syncTimer_.init(this, syncTimerIntervalMs_)); - LOG(INFO) << "Init sync timer success, interval = " - << syncTimerIntervalMs_; - - syncTimer_.start(); + if (enableOdsyncWhenOpenChunkFile_) { + syncThread_.Run(); } LOG(INFO) << "Run copyset success." @@ -238,8 +248,8 @@ int CopysetNode::Run() { } void CopysetNode::Fini() { - if (!enableOdsyncWhenOpenChunkFile_) { - syncTimer_.destroy(); + if (enableOdsyncWhenOpenChunkFile_) { + syncThread_.Stop(); } WaitSnapshotDone(); @@ -346,8 +356,9 @@ void CopysetNode::save_snapshot_background(::braft::SnapshotWriter *writer, */ concurrentapply_->Flush(); - if (!enableOdsyncWhenOpenChunkFile_) { - ForceSyncAllChunks(); + if (enableOdsyncWhenOpenChunkFile_) { + //ForceSyncAllChunks(); + ForceUnshareAllChunks(); } /** @@ -975,10 +986,46 @@ void CopysetNode::HandleSyncTimerOut() { if (isSyncing_.exchange(true)) { return; } - SyncAllChunks(); + //SyncAllChunks(); + UnshareAllChunks(); + isSyncing_ = false; +} + +void CopysetNode::ForceUnshareAllChunks() { + while (isSyncing_.exchange(true)) { + std::this_thread::sleep_for( + std::chrono::milliseconds(checkSyncingIntervalMs_)); + } + UnshareAllChunks(); isSyncing_ = false; } +void CopysetNode::UnshareAllChunks() { + std::deque temp; + { + curve::common::LockGuard lg(chunkIdsLock_); + temp.swap(chunkIdsToSync_); + } + std::set chunkIds; + for (auto chunkId : temp) { + chunkIds.insert(chunkId); + } + LOG(INFO) << "UnshareAllChunks for Copyset: " << GroupIdString() + << " list total: " << temp.size() + << " chunkIds: " << chunkIds.size(); + for (ChunkID chunk : chunkIds) { + copysetSyncPool_->Enqueue([=]() { + CSErrorCode r = dataStore_->UnshareChunk(chunk); + if (r != CSErrorCode::Success) { + LOG(FATAL) << "UnshareAllChunks Chunk failed in Copyset: " + << GroupIdString() + << ", chunkid: " << chunk + << " data store return: " << r; + } + }); + } +} + void CopysetNode::ForceSyncAllChunks() { while (isSyncing_.exchange(true)) { std::this_thread::sleep_for( @@ -1010,16 +1057,33 @@ void CopysetNode::SyncAllChunks() { } } -int SyncTimer::init(CopysetNode *node, int timeoutMs) { - if (RepeatedTimerTask::init(timeoutMs) != 0) { - return -1; - } +void SyncChunkThread::Init(CopysetNode* node) { + running_ = true; node_ = node; - return 0; + cond_ = std::make_shared(); +} + +void SyncChunkThread::Run() { + syncThread_ = std::thread([this](){ + while (running_) { + std::unique_lock lock(mtx_); + cond_->wait_for(lock, + std::chrono::seconds(CopysetNode::syncTriggerSeconds_)); + node_->UnshareAllChunks(); + } + }); +} + +void SyncChunkThread::Stop() { + running_ = false; + if (syncThread_.joinable()) { + cond_->notify_one(); + syncThread_.join(); + } } -void SyncTimer::run() { - node_->HandleSyncTimerOut(); +SyncChunkThread::~SyncChunkThread() { + Stop(); } } // namespace chunkserver diff --git a/src/chunkserver/copyset_node.h b/src/chunkserver/copyset_node.h index 3b2194590f..8ec6c0f4f0 100755 --- a/src/chunkserver/copyset_node.h +++ b/src/chunkserver/copyset_node.h @@ -25,7 +25,10 @@ #include #include +#include + +#include #include #include #include @@ -41,6 +44,7 @@ #include "src/chunkserver/raftsnapshot/define.h" #include "src/chunkserver/raftsnapshot/curve_snapshot_writer.h" #include "src/common/string_util.h" +#include "src/common/concurrent/task_thread_pool.h" #include "src/chunkserver/raft_node.h" #include "proto/heartbeat.pb.h" #include "proto/chunk.pb.h" @@ -53,6 +57,7 @@ using ::google::protobuf::RpcController; using ::google::protobuf::Closure; using ::curve::mds::heartbeat::ConfigChangeType; using ::curve::common::Peer; +using ::curve::common::TaskThreadPool; class CopysetNodeManager; @@ -103,18 +108,20 @@ class ConfigurationChangeDone : public braft::Closure { class CopysetNode; -class SyncTimer : public braft::RepeatedTimerTask { +class SyncChunkThread : public curve::common::Uncopyable { public: - SyncTimer() : node_(nullptr) {} - virtual ~SyncTimer() {} - - int init(CopysetNode *node, int timeoutMs); - - void run() override; - - protected: - void on_destroy() override {} - CopysetNode *node_; + friend class CopysetNode; + SyncChunkThread() = default; + ~SyncChunkThread(); + void Run(); + void Init(CopysetNode* node); + void Stop(); + private: + bool running_; + std::mutex mtx_; + std::shared_ptr cond_; + std::thread syncThread_; + CopysetNode* node_; }; /** @@ -388,6 +395,10 @@ class CopysetNode : public braft::StateMachine, * better for test */ public: + // sync trigger seconds + static uint32_t syncTriggerSeconds_; + // shared to sync pool + static std::shared_ptr> copysetSyncPool_; /** * 从文件中解析copyset配置版本信息 * @param filePath:文件路径 @@ -417,6 +428,9 @@ class CopysetNode : public braft::StateMachine, void ForceSyncAllChunks(); + void UnshareAllChunks(); + void ForceUnshareAllChunks(); + void WaitSnapshotDone(); private: @@ -476,10 +490,8 @@ class CopysetNode : public braft::StateMachine, int64_t lastSnapshotIndex_; // enable O_DSYNC when open file bool enableOdsyncWhenOpenChunkFile_; - // sync chunk timer - SyncTimer syncTimer_; - // sync timer timeout interval - uint32_t syncTimerIntervalMs_; + // sync chunk thread + SyncChunkThread syncThread_; // chunkIds need to sync std::deque chunkIdsToSync_; // lock for chunkIdsToSync_ diff --git a/src/chunkserver/copyset_node_manager.cpp b/src/chunkserver/copyset_node_manager.cpp index ea7e7d1b2a..e03f12b7eb 100755 --- a/src/chunkserver/copyset_node_manager.cpp +++ b/src/chunkserver/copyset_node_manager.cpp @@ -30,6 +30,9 @@ #include #include +#include "src/chunkserver/config_info.h" +#include "src/chunkserver/copyset_node.h" +#include "src/common/concurrent/task_thread_pool.h" #include "src/common/string_util.h" #include "src/common/timeutility.h" #include "src/chunkserver/chunk_service.h" @@ -50,6 +53,9 @@ std::once_flag addServiceFlag; int CopysetNodeManager::Init(const CopysetNodeOptions ©setNodeOptions) { copysetNodeOptions_ = copysetNodeOptions; + CopysetNode::syncTriggerSeconds_ = copysetNodeOptions.syncTriggerSeconds; + CopysetNode::copysetSyncPool_ = + std::make_shared>(); if (copysetNodeOptions_.loadConcurrency > 0) { copysetLoader_ = std::make_shared>(); } else { @@ -62,7 +68,8 @@ int CopysetNodeManager::Run() { if (running_.exchange(true, std::memory_order_acq_rel)) { return 0; } - + CopysetNode::copysetSyncPool_->Start(copysetNodeOptions_.syncConcurrency); + assert(copysetNodeOptions_.syncConcurrency > 0); int ret = 0; // 启动线程池 if (copysetLoader_ != nullptr) { @@ -90,6 +97,7 @@ int CopysetNodeManager::Fini() { } loadFinished_.exchange(false, std::memory_order_acq_rel); + CopysetNode::copysetSyncPool_->Stop(); if (copysetLoader_ != nullptr) { copysetLoader_->Stop(); copysetLoader_ = nullptr; diff --git a/src/chunkserver/datastore/chunkserver_chunkfile.cpp b/src/chunkserver/datastore/chunkserver_chunkfile.cpp index 99910af0a3..e381952c29 100644 --- a/src/chunkserver/datastore/chunkserver_chunkfile.cpp +++ b/src/chunkserver/datastore/chunkserver_chunkfile.cpp @@ -142,10 +142,15 @@ CSErrorCode ChunkFileMetaPage::decode(const char* buf) { return CSErrorCode::Success; } +uint64_t CSChunkFile::syncChunkLimits_ = 2 * 1024 * 1024; +uint64_t CSChunkFile::syncThreshold_ = 64 * 1024; + CSChunkFile::CSChunkFile(std::shared_ptr lfs, std::shared_ptr chunkFilePool, const ChunkOptions& options) - : fd_(-1), + : cvar_(nullptr), + chunkrate_(nullptr), + fd_(-1), size_(options.chunkSize), pageSize_(options.pageSize), chunkId_(options.id), @@ -539,6 +544,20 @@ CSErrorCode CSChunkFile::WriteWithClone(SequenceNum sn, << ",chunk sn: " << metaPage_.sn; return errorCode; } + if (chunkrate_.get() && cvar_.get()) { + *chunkrate_ += length; + uint64_t res = *chunkrate_; + // if single write size > syncThreshold, for cache friend to + // delay to sync. + auto actualSyncChunkLimits = MayUpdateWriteLimits(res); + if (*chunkrate_ >= actualSyncChunkLimits && + chunkrate_->compare_exchange_weak(res, 0)) { + LOG(INFO) << "chunk unshare, chunkid = " << chunkId_ + << ", clone length = " << length + << ", chunkrate = " << res; + cvar_->notify_one(); + } + } return CSErrorCode::Success; } @@ -553,6 +572,28 @@ CSErrorCode CSChunkFile::Sync() { return CSErrorCode::Success; } +CSErrorCode CSChunkFile::UnshareClone(off_t offset, size_t length) { + WriteLockGuard writeGuard(rwLock_); + int rc = UnshareCloneData(offset, length); + if (rc < 0) { + LOG(ERROR) << "UnshareCloneData failed, " + << "ChunkID:" << chunkId_; + return CSErrorCode::InternalError; + } + return CSErrorCode::Success; +} + +CSErrorCode CSChunkFile::UnshareClone() { + WriteLockGuard writeGuard(rwLock_); + int rc = UnshareCloneData(0, size_); + if (rc < 0) { + LOG(ERROR) << "UnshareCloneData failed, " + << "ChunkID:" << chunkId_; + return CSErrorCode::InternalError; + } + return CSErrorCode::Success; +} + CSErrorCode CSChunkFile::Paste(const char * buf, off_t offset, size_t length) { WriteLockGuard writeGuard(rwLock_); // If it is not a clone chunk, return success directly diff --git a/src/chunkserver/datastore/chunkserver_chunkfile.h b/src/chunkserver/datastore/chunkserver_chunkfile.h index cd10a4b08b..c64f2cdde6 100644 --- a/src/chunkserver/datastore/chunkserver_chunkfile.h +++ b/src/chunkserver/datastore/chunkserver_chunkfile.h @@ -30,11 +30,14 @@ #include #include #include +#include +#include #include "include/curve_compiler_specific.h" #include "include/chunkserver/chunkserver_common.h" #include "src/common/concurrent/rw_lock.h" #include "src/common/crc32.h" +#include "src/common/timeutility.h" #include "src/fs/local_filesystem.h" #include "src/chunkserver/datastore/filename_operator.h" #include "src/chunkserver/datastore/chunkserver_snapshot.h" @@ -51,6 +54,7 @@ using curve::common::RWLock; using curve::common::WriteLockGuard; using curve::common::ReadLockGuard; using curve::common::BitRange; +using curve::common::TimeUtility; class FilePool; class CSSnapshot; @@ -177,6 +181,9 @@ class CSChunkFile { CSErrorCode Sync(); + CSErrorCode UnshareClone(off_t offset, size_t length); + CSErrorCode UnshareClone(); + /** * Write the copied data into Chunk * Only write areas that have not been written, and will not overwrite @@ -247,6 +254,16 @@ class CSChunkFile { size_t length, std::string *hash); + void SetSyncInfo(std::shared_ptr> rate, + std::shared_ptr cond) { + chunkrate_ = rate; + cvar_ = cond; + } + // default synclimit + static uint64_t syncChunkLimits_; + // high threshold limit + static uint64_t syncThreshold_; + private: /** * Determine whether you need to create a new snapshot @@ -381,6 +398,10 @@ class CSChunkFile { return lfs_->Sync(fd_); } + inline int UnshareCloneData(off_t offset, size_t length) { + return lfs_->Fallocate(fd_, FALLOC_FL_UNSHARE_RANGE, offset + pageSize_, length); + } + inline bool CheckOffsetAndLength(off_t offset, size_t len, size_t align) { // Check if offset+len is out of bounds if (offset + len > size_) { @@ -391,7 +412,18 @@ class CSChunkFile { common::is_aligned(len, align); } + uint64_t MayUpdateWriteLimits(uint64_t write_len) { + if (write_len > syncThreshold_) { + return syncChunkLimits_ * 2; + } + return syncChunkLimits_; + } + private: + // to notify syncThread + std::shared_ptr cvar_; + // the sum of every chunkfile length + std::shared_ptr> chunkrate_; // file descriptor of chunk file int fd_; // The logical size of the chunk, not including metapage diff --git a/src/chunkserver/datastore/chunkserver_datastore.cpp b/src/chunkserver/datastore/chunkserver_datastore.cpp index 5ce783665a..33ba54f352 100644 --- a/src/chunkserver/datastore/chunkserver_datastore.cpp +++ b/src/chunkserver/datastore/chunkserver_datastore.cpp @@ -301,6 +301,21 @@ CSErrorCode CSDataStore::WriteChunkWithClone(ChunkID id, return CSErrorCode::Success; } +CSErrorCode CSDataStore::UnshareChunk(ChunkID id) { + auto chunkFile = metaCache_.Get(id); + if (chunkFile == nullptr) { + LOG(WARNING) << "UnshareChunk not exist, ChunkID = " << id; + return CSErrorCode::Success; + } + CSErrorCode errorCode = chunkFile->UnshareClone(); + if (errorCode != CSErrorCode::Success) { + LOG(WARNING) << "UnshareChunk file failed." + << "ChunkID = " << id; + return errorCode; + } + return CSErrorCode::Success; +} + CSErrorCode CSDataStore::SyncChunk(ChunkID id) { auto chunkFile = metaCache_.Get(id); if (chunkFile == nullptr) { diff --git a/src/chunkserver/datastore/chunkserver_datastore.h b/src/chunkserver/datastore/chunkserver_datastore.h index 56bb5ad7e2..cfcd97c815 100644 --- a/src/chunkserver/datastore/chunkserver_datastore.h +++ b/src/chunkserver/datastore/chunkserver_datastore.h @@ -30,6 +30,7 @@ #include #include #include +#include #include "include/curve_compiler_specific.h" #include "include/chunkserver/chunkserver_common.h" @@ -95,7 +96,8 @@ using ChunkMap = std::unordered_map; // use read-write lock to protect the map operation class CSMetaCache { public: - CSMetaCache() {} + CSMetaCache() : cvar_(nullptr), + sumChunkRate_(std::make_shared>()) {} virtual ~CSMetaCache() {} ChunkMap GetMap() { @@ -116,6 +118,7 @@ class CSMetaCache { // When two write requests are concurrently created to create a chunk // file, return the first set chunkFile if (chunkMap_.find(id) == chunkMap_.end()) { + chunkFile->SetSyncInfo(sumChunkRate_, cvar_); chunkMap_[id] = chunkFile; } return chunkMap_[id]; @@ -133,7 +136,19 @@ class CSMetaCache { chunkMap_.clear(); } + void SetCondPtr(std::shared_ptr cond) { + cvar_ = cond; + } + + void SetSyncChunkLimits(const uint64_t limits, const uint64_t threshold) { + CSChunkFile::syncChunkLimits_ = limits; + CSChunkFile::syncThreshold_ = threshold; + } + private: + std::shared_ptr cvar_; + // sum of all chunks rate + std::shared_ptr> sumChunkRate_; RWLock rwLock_; ChunkMap chunkMap_; }; @@ -236,6 +251,8 @@ class CSDataStore { virtual CSErrorCode SyncChunk(ChunkID id); + virtual CSErrorCode UnshareChunk(ChunkID id); + // Deprecated, only use for unit & integration test virtual CSErrorCode WriteChunk( @@ -305,6 +322,14 @@ class CSDataStore { */ virtual DataStoreStatus GetStatus(); + void SetCacheCondPtr(std::shared_ptr cond) { + metaCache_.SetCondPtr(cond); + } + + void SetCacheLimits(const uint64_t limit, const uint64_t threshold) { + metaCache_.SetSyncChunkLimits(limit, threshold); + } + private: CSErrorCode loadChunkFile(ChunkID id); CSErrorCode CreateChunkFile(const ChunkOptions & ops, diff --git a/src/chunkserver/op_request.cpp b/src/chunkserver/op_request.cpp index 01ddb19dca..b53e4bdc4c 100755 --- a/src/chunkserver/op_request.cpp +++ b/src/chunkserver/op_request.cpp @@ -511,8 +511,10 @@ void WriteChunkRequest::OnApply(uint64_t index, data_offset, &cost, cloneSourceLocation); + if (CSErrorCode::Success == ret) + node_->ShipToSync(request_->chunkid()); } - } while (--retry >= 0); + } while (--retry > 0); } else { uint32_t aoffset = common::align_down(request_->offset(), 4096); @@ -569,7 +571,7 @@ void WriteChunkRequest::OnApply(uint64_t index, auto maxIndex = (index > node_->GetAppliedIndex() ? index : node_->GetAppliedIndex()); response_->set_appliedindex(maxIndex); - node_->ShipToSync(request_->chunkid()); + //node_->ShipToSync(request_->chunkid()); } void WriteChunkRequest::OnApplyFromLog(std::shared_ptr datastore, @@ -687,7 +689,7 @@ void WriteChunkRequest::OnApplyFromLogIndex(std::shared_ptr datasto &cost, cloneSourceLocation); } - } while (--retry >= 0); + } while (--retry > 0); } else { //split the request, clone for 4096 align part, writechunk for the rest uint32_t aoffset = common::align_down(request.offset(), 4096);