Skip to content

Commit

Permalink
RS-31: Implement API to change labels (#72)
Browse files Browse the repository at this point in the history
* update dependencies

* update dependencies

* implement changing labels api

* update CHANGELOG

* fix build

* fix tag
  • Loading branch information
atimin authored Aug 6, 2024
1 parent e4d7b9a commit 60d2415
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 60 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
- reductstore_version: "main"
exclude_api_version_tag: ""
- reductstore_version: "latest"
exclude_api_version_tag: "~[1_10]"
exclude_api_version_tag: "~[1_11]"
- license_file: ""
exclude_license_tag: "~[license]"

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- RS-31: `Bucket::Update` and `Bucket::UpdateBatch` methods for changing labels, [PR-72](https://github.com/reductstore/reduct-cpp/pull/72)

## [1.10.0] - 2022-06-11

### Added
Expand Down
8 changes: 4 additions & 4 deletions cmake/InstallDependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ else ()
include(FetchContent)
FetchContent_Declare(
fmt
URL https://github.com/fmtlib/fmt/archive/refs/tags/10.2.1.zip
URL_HASH MD5=1bba4e8bdd7b0fa98f207559ffa380a3
URL https://github.com/fmtlib/fmt/archive/refs/tags/11.0.2.zip
URL_HASH MD5=6e20923e12c4b78a99e528c802f459ef
)

FetchContent_Declare(
Expand All @@ -39,8 +39,8 @@ else ()

FetchContent_Declare(
httplib
URL https://github.com/yhirose/cpp-httplib/archive/refs/tags/v0.14.3.zip
URL_HASH MD5=af82eb38506ca531b6d1d53524ff7912
URL https://github.com/yhirose/cpp-httplib/archive/refs/tags/v0.16.0.zip
URL_HASH MD5=c5367889819d677bd06d6c7739896b2b
)

FetchContent_Declare(
Expand Down
6 changes: 3 additions & 3 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ class DriftFrameworkConan(ConanFile):
"date:header_only": True}
generators = "cmake"

requires = ("fmt/10.2.1",
"cpp-httplib/0.14.3",
requires = ("fmt/11.0.2",
"cpp-httplib/0.16.0",
"nlohmann_json/3.11.3",
"openssl/3.2.0",
"openssl/3.2.2",
"concurrentqueue/1.0.4",
"date/3.0.1")

Expand Down
125 changes: 81 additions & 44 deletions src/reduct/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "reduct/bucket.h"
#define FMT_HEADER_ONLY 1
#include <fmt/core.h>
#include <fmt/ranges.h>
#if CONAN
#include <moodycamel/concurrentqueue.h>
#else
Expand Down Expand Up @@ -136,60 +137,29 @@ class Bucket : public IBucket {
const auto time = options.timestamp ? ToMicroseconds(*options.timestamp) : ToMicroseconds(Time::clock::now());
const auto content_type = options.content_type.empty() ? "application/octet-stream" : options.content_type;

IHttpClient::Headers headers;
for (const auto& [key, value] : options.labels) {
headers.emplace(fmt::format("x-reduct-label-{}", key), value);
}

IHttpClient::Headers headers = MakeHeadersFromLabels(options);
return client_->Post(fmt::format("{}/{}?ts={}", path_, entry_name, time), content_type, record.content_length_,
std::move(headers), std::move(record.callback_));
}

Result<WriteBatchErrors> WriteBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept override {
Batch batch;
callback(&batch);

IHttpClient::Headers headers;
for (const auto& [time, record] : batch.records()) {
std::vector<std::string> labels;
for (const auto& [label_key, label_value] : record.labels) {
if (label_key.find(',') == std::string::npos) {
labels.push_back(fmt::format("{}={}", label_key, label_value));
} else {
labels.push_back(fmt::format("{}=\"{}\"", label_key, label_value));
}
}

const auto key = fmt::format("x-reduct-time-{}", ToMicroseconds(time));
const auto value = fmt::format("{},{},{}", record.size, record.content_type, fmt::join(labels, ","));
headers.emplace(key, value);
}
return WriteOrUpdateBatch(entry_name, std::move(callback), true);
}

const auto content_length = batch.body().size();
auto [resp_headers, err] =
client_->Post(fmt::format("{}/{}/batch", path_, entry_name), "application/octet-stream", content_length,
std::move(headers), [batch = std::move(batch)](size_t offset, size_t size) {
return std::pair{batch.body().size() <= offset + size, batch.body().substr(offset, size)};
});
if (err) {
return {{}, err};
}
Result<WriteBatchErrors> UpdateBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept override {
return WriteOrUpdateBatch(entry_name, std::move(callback), false);
}

WriteBatchErrors errors;
for (const auto& [key, value] : resp_headers) {
if (key.starts_with("x-reduct-error-")) {
auto pos = value.find(',');
if (pos == std::string::npos) {
continue;
}
auto status = std::stoi(value.substr(0, pos));
auto message = value.substr(pos + 1);
errors.emplace(FromMicroseconds(key.substr(15)), Error{.code = status, .message = message});
}
Error Update(std::string_view entry_name, const WriteOptions& options) const noexcept override {
if (!options.timestamp) {
return Error{.code = 400, .message = "Timestamp is required"};
}

return {errors, Error::kOk};
const auto time = ToMicroseconds(*options.timestamp);
IHttpClient::Headers headers = MakeHeadersFromLabels(options);
return client_->Patch(fmt::format("{}/{}?ts={}", path_, entry_name, time), "", std::move(headers));
}

Error Read(std::string_view entry_name, std::optional<Time> ts, ReadRecordCallback callback) const noexcept override {
Expand Down Expand Up @@ -525,6 +495,73 @@ class Bucket : public IBucket {

static Time FromMicroseconds(const std::string& ts) { return Time() + std::chrono::microseconds(std::stoul(ts)); }

IHttpClient::Headers MakeHeadersFromLabels(const WriteOptions& options) const {
IHttpClient::Headers headers;
for (const auto& [key, value] : options.labels) {
headers.emplace(fmt::format("x-reduct-label-{}", key), value);
}
return headers;
}

Result<WriteBatchErrors> WriteOrUpdateBatch(std::string_view entry_name, WriteBatchCallback callback,
bool write) const noexcept {
Batch batch;
callback(&batch);

IHttpClient::Headers headers;
for (const auto& [time, record] : batch.records()) {
std::vector<std::string> labels;
for (const auto& [label_key, label_value] : record.labels) {
if (label_key.find(',') == std::string::npos) {
labels.push_back(fmt::format("{}={}", label_key, label_value));
} else {
labels.push_back(fmt::format("{}=\"{}\"", label_key, label_value));
}
}

const auto key = fmt::format("x-reduct-time-{}", ToMicroseconds(time));

if (write) {
const auto value = fmt::format("{},{},{}", record.size, record.content_type, fmt::join(labels, ","));
headers.emplace(key, value);
} else {
const auto value = fmt::format("0,,{}", fmt::join(labels, ","));
headers.emplace(key, value);
}
}

Result<IHttpClient::Headers> resp;
if (write) {
const auto content_length = batch.body().size();
resp = client_->Post(fmt::format("{}/{}/batch", path_, entry_name), "application/octet-stream", content_length,
std::move(headers), [batch = std::move(batch)](size_t offset, size_t size) {
return std::pair{batch.body().size() <= offset + size, batch.body().substr(offset, size)};
});
} else {
resp = client_->Patch(fmt::format("{}/{}/batch", path_, entry_name), "", std::move(headers));
}

auto [resp_headers, err] = resp;
if (err) {
return {{}, err};
}

WriteBatchErrors errors;
for (const auto& [key, value] : resp_headers) {
if (key.starts_with("x-reduct-error-")) {
auto pos = value.find(',');
if (pos == std::string::npos) {
continue;
}
auto status = std::stoi(value.substr(0, pos));
auto message = value.substr(pos + 1);
errors.emplace(FromMicroseconds(key.substr(15)), Error{.code = status, .message = message});
}
}

return {errors, Error::kOk};
}

std::unique_ptr<internal::IHttpClient> client_;
std::string path_;
std::thread worker_;
Expand Down
29 changes: 29 additions & 0 deletions src/reduct/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class IBucket {
body_ += data;
}

void AddOnlyLabels(Time timestamp, LabelMap labels) {
records_[timestamp] = Record{timestamp, 0, "", std::move(labels)};
}

[[nodiscard]] const std::map<Time, Record>& records() const { return records_; }
[[nodiscard]] const std::string& body() const { return body_; }

Expand All @@ -191,6 +195,7 @@ class IBucket {
using WriteBatchCallback = std::function<void(Batch*)>;
using WriteBatchErrors = std::map<Time, Error>;


/**
* Read a record in chunks
* @param entry_name entry in bucket
Expand Down Expand Up @@ -248,6 +253,30 @@ class IBucket {
[[nodiscard]] virtual Result<WriteBatchErrors> WriteBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept = 0;


/**
* Write labels of an existing record
*
* Provide a label with empty value to remove it
*
* @param entry_name entry in bucket
* @param options options with timestamp, labels (content type is ignored)
* @return HTTP or communication error
*/
virtual Error Update(std::string_view entry_name,
const WriteOptions& options) const noexcept = 0;

/**
* Update labels of an existing record in a batch
* @param entry_name entry in bucket
* @param callback a callback to add records to batch
* @return HTTP error or map of errors for each record
*/
[[nodiscard]] virtual Result<WriteBatchErrors> UpdateBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept = 0;


/**
* Query options
*/
Expand Down
34 changes: 26 additions & 8 deletions src/reduct/internal/http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ using httplib::DataSink;

constexpr size_t kMaxChunkSize = 512'000;

Result<IHttpClient::Headers> NormalizeHeaders(httplib::Result res) {
IHttpClient::Headers response_headers;
for (auto& [k, v] : res->headers) {
std::string lowcase_header = k;
std::transform(k.begin(), k.end(), lowcase_header.begin(), [](auto ch) { return tolower(ch); });
response_headers[lowcase_header] = v;
}

return {std::move(response_headers), Error::kOk};
}

class HttpClient : public IHttpClient {
public:
explicit HttpClient(std::string_view url, const HttpOptions& options)
Expand Down Expand Up @@ -121,21 +132,27 @@ class HttpClient : public IHttpClient {
return {{}, std::move(err)};
}

Headers response_headers;
for (auto& [k, v] : res->headers) {
std::string lowcase_header = k;
std::transform(k.begin(), k.end(), lowcase_header.begin(), [](auto ch) { return std::tolower(ch); });
response_headers[lowcase_header] = v;
}

return {std::move(response_headers), Error::kOk};
return NormalizeHeaders(std::move(res));
}

Error Put(std::string_view path, std::string_view body, std::string_view mime) const noexcept override {
auto res = client_->Put(AddApiPrefix(path).data(), std::string(body), mime.data());
return CheckRequest(res);
}

Result<Headers> Patch(std::string_view path, std::string_view body, Headers headers) const noexcept override {
httplib::Headers httplib_headers;
for (auto& [k, v] : headers) {
httplib_headers.emplace(k, v);
}
auto res = client_->Patch(AddApiPrefix(path).data(), httplib_headers, std::string(body), "");
if (auto err = CheckRequest(res)) {
return {{}, std::move(err)};
}

return NormalizeHeaders(std::move(res));
}

Error Delete(std::string_view path) const noexcept override {
auto res = client_->Delete(AddApiPrefix(path).data());
return CheckRequest(res);
Expand Down Expand Up @@ -206,4 +223,5 @@ bool IsCompatible(std::string_view min, std::string_view version) {

return min_version[0] == current_version[0] && min_version[1] <= current_version[1];
}

} // namespace reduct::internal
2 changes: 2 additions & 0 deletions src/reduct/internal/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class IHttpClient {
virtual Error Put(std::string_view path, std::string_view body,
std::string_view mime = "application/json") const noexcept = 0;

virtual Result<Headers> Patch(std::string_view path, std::string_view body, Headers headers) const noexcept = 0;

virtual Error Delete(std::string_view path) const noexcept = 0;

[[nodiscard]] virtual std::string_view api_version() const noexcept = 0;
Expand Down
46 changes: 46 additions & 0 deletions tests/reduct/entry_api_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,49 @@ TEST_CASE("reduct::IBucket should write batch of records with errors", "[bucket_
REQUIRE(record_errors[t].code == 409);
REQUIRE(record_errors[t].message == "A record with timestamp 0 already exists");
}

TEST_CASE("reduct::IBukcet should update labels", "[bucket_api][1_11]") {
Fixture ctx;
auto [bucket, _] = ctx.client->CreateBucket(kBucketName);

auto t = IBucket::Time();
REQUIRE(bucket->Write("entry-1",
{
.timestamp = t,
.labels = {{"key1", "value1"}, {"key2", "value2"}},
},
[](auto rec) { rec->WriteAll("some_data1"); }) == Error::kOk);

REQUIRE(bucket->Update("entry-1", {
.timestamp = t,
.labels = {{"key1", "value1"}, {"key2", ""}},
}) == Error::kOk);

REQUIRE(bucket->Read("entry-1", t, [](auto record) {
REQUIRE(record.labels == std::map<std::string, std::string>{{"key1", "value1"}});
return true;
}) == Error::kOk);
}

TEST_CASE("reduct::IBukcet should update labels in barch and return errors", "[bucket_api][1_11]") {
Fixture ctx;
auto [bucket, _] = ctx.client->CreateBucket(kBucketName);

auto t = IBucket::Time();
REQUIRE(bucket->Write("entry-1",
{
.timestamp = t,
.labels = {{"key1", "value1"}, {"key2", "value2"}},
},
[](auto rec) { rec->WriteAll("some_data1"); }) == Error::kOk);

auto [record_errors, http_error] = bucket->UpdateBatch("entry-1", [t](IBucket::Batch* batch) {
batch->AddOnlyLabels(t, {{"key1", "value1"}, {"key2", ""}});
batch->AddOnlyLabels(t + us(1), {{"key1", "value1"}, {"key2", "value2"}});
});

REQUIRE(http_error == Error::kOk);
REQUIRE(record_errors.size() == 1);
REQUIRE(record_errors[t + us(1)].code == 404);
REQUIRE(record_errors[t + us(1)].message == "No record with timestamp 1");
}

0 comments on commit 60d2415

Please sign in to comment.