From 60d2415d78f7a5937b571ec8f28e6f4679267976 Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Tue, 6 Aug 2024 15:10:40 +0200 Subject: [PATCH] RS-31: Implement API to change labels (#72) * update dependencies * update dependencies * implement changing labels api * update CHANGELOG * fix build * fix tag --- .github/workflows/ci.yml | 2 +- CHANGELOG.md | 4 + cmake/InstallDependencies.cmake | 8 +- conanfile.py | 6 +- src/reduct/bucket.cc | 125 +++++++++++++++++++---------- src/reduct/bucket.h | 29 +++++++ src/reduct/internal/http_client.cc | 34 ++++++-- src/reduct/internal/http_client.h | 2 + tests/reduct/entry_api_test.cc | 46 +++++++++++ 9 files changed, 196 insertions(+), 60 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0c6e910..5ea3d31 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -70,7 +70,7 @@ jobs: - reductstore_version: "main" exclude_api_version_tag: "" - reductstore_version: "latest" - exclude_api_version_tag: "~[1_10]" + exclude_api_version_tag: "~[1_11]" - license_file: "" exclude_license_tag: "~[license]" diff --git a/CHANGELOG.md b/CHANGELOG.md index be9a7cc..a6345b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- RS-31: `Bucket::Update` and `Bucket::UpdateBatch` methods for changing labels, [PR-72](https://github.com/reductstore/reduct-cpp/pull/72) + ## [1.10.0] - 2022-06-11 ### Added diff --git a/cmake/InstallDependencies.cmake b/cmake/InstallDependencies.cmake index 47e29db..820dc80 100644 --- a/cmake/InstallDependencies.cmake +++ b/cmake/InstallDependencies.cmake @@ -27,8 +27,8 @@ else () include(FetchContent) FetchContent_Declare( fmt - URL https://github.com/fmtlib/fmt/archive/refs/tags/10.2.1.zip - URL_HASH MD5=1bba4e8bdd7b0fa98f207559ffa380a3 + URL https://github.com/fmtlib/fmt/archive/refs/tags/11.0.2.zip + URL_HASH MD5=6e20923e12c4b78a99e528c802f459ef ) FetchContent_Declare( @@ -39,8 +39,8 @@ else () FetchContent_Declare( httplib - URL https://github.com/yhirose/cpp-httplib/archive/refs/tags/v0.14.3.zip - URL_HASH MD5=af82eb38506ca531b6d1d53524ff7912 + URL https://github.com/yhirose/cpp-httplib/archive/refs/tags/v0.16.0.zip + URL_HASH MD5=c5367889819d677bd06d6c7739896b2b ) FetchContent_Declare( diff --git a/conanfile.py b/conanfile.py index 1d896c4..9a0a16a 100644 --- a/conanfile.py +++ b/conanfile.py @@ -15,10 +15,10 @@ class DriftFrameworkConan(ConanFile): "date:header_only": True} generators = "cmake" - requires = ("fmt/10.2.1", - "cpp-httplib/0.14.3", + requires = ("fmt/11.0.2", + "cpp-httplib/0.16.0", "nlohmann_json/3.11.3", - "openssl/3.2.0", + "openssl/3.2.2", "concurrentqueue/1.0.4", "date/3.0.1") diff --git a/src/reduct/bucket.cc b/src/reduct/bucket.cc index e41e06d..7d10b6d 100644 --- a/src/reduct/bucket.cc +++ b/src/reduct/bucket.cc @@ -3,6 +3,7 @@ #include "reduct/bucket.h" #define FMT_HEADER_ONLY 1 #include +#include #if CONAN #include #else @@ -136,60 +137,29 @@ class Bucket : public IBucket { const auto time = options.timestamp ? ToMicroseconds(*options.timestamp) : ToMicroseconds(Time::clock::now()); const auto content_type = options.content_type.empty() ? "application/octet-stream" : options.content_type; - IHttpClient::Headers headers; - for (const auto& [key, value] : options.labels) { - headers.emplace(fmt::format("x-reduct-label-{}", key), value); - } - + IHttpClient::Headers headers = MakeHeadersFromLabels(options); return client_->Post(fmt::format("{}/{}?ts={}", path_, entry_name, time), content_type, record.content_length_, std::move(headers), std::move(record.callback_)); } Result WriteBatch(std::string_view entry_name, WriteBatchCallback callback) const noexcept override { - Batch batch; - callback(&batch); - - IHttpClient::Headers headers; - for (const auto& [time, record] : batch.records()) { - std::vector labels; - for (const auto& [label_key, label_value] : record.labels) { - if (label_key.find(',') == std::string::npos) { - labels.push_back(fmt::format("{}={}", label_key, label_value)); - } else { - labels.push_back(fmt::format("{}=\"{}\"", label_key, label_value)); - } - } - - const auto key = fmt::format("x-reduct-time-{}", ToMicroseconds(time)); - const auto value = fmt::format("{},{},{}", record.size, record.content_type, fmt::join(labels, ",")); - headers.emplace(key, value); - } + return WriteOrUpdateBatch(entry_name, std::move(callback), true); + } - const auto content_length = batch.body().size(); - auto [resp_headers, err] = - client_->Post(fmt::format("{}/{}/batch", path_, entry_name), "application/octet-stream", content_length, - std::move(headers), [batch = std::move(batch)](size_t offset, size_t size) { - return std::pair{batch.body().size() <= offset + size, batch.body().substr(offset, size)}; - }); - if (err) { - return {{}, err}; - } + Result UpdateBatch(std::string_view entry_name, + WriteBatchCallback callback) const noexcept override { + return WriteOrUpdateBatch(entry_name, std::move(callback), false); + } - WriteBatchErrors errors; - for (const auto& [key, value] : resp_headers) { - if (key.starts_with("x-reduct-error-")) { - auto pos = value.find(','); - if (pos == std::string::npos) { - continue; - } - auto status = std::stoi(value.substr(0, pos)); - auto message = value.substr(pos + 1); - errors.emplace(FromMicroseconds(key.substr(15)), Error{.code = status, .message = message}); - } + Error Update(std::string_view entry_name, const WriteOptions& options) const noexcept override { + if (!options.timestamp) { + return Error{.code = 400, .message = "Timestamp is required"}; } - return {errors, Error::kOk}; + const auto time = ToMicroseconds(*options.timestamp); + IHttpClient::Headers headers = MakeHeadersFromLabels(options); + return client_->Patch(fmt::format("{}/{}?ts={}", path_, entry_name, time), "", std::move(headers)); } Error Read(std::string_view entry_name, std::optional