Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RS-31: Implement API to change labels #72

Merged
merged 6 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
- reductstore_version: "main"
exclude_api_version_tag: ""
- reductstore_version: "latest"
exclude_api_version_tag: "~[1_10]"
exclude_api_version_tag: "~[1_11]"
- license_file: ""
exclude_license_tag: "~[license]"

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- RS-31: `Bucket::Update` and `Bucket::UpdateBatch` methods for changing labels, [PR-72](https://github.com/reductstore/reduct-cpp/pull/72)

## [1.10.0] - 2022-06-11

### Added
Expand Down
8 changes: 4 additions & 4 deletions cmake/InstallDependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ else ()
include(FetchContent)
FetchContent_Declare(
fmt
URL https://github.com/fmtlib/fmt/archive/refs/tags/10.2.1.zip
URL_HASH MD5=1bba4e8bdd7b0fa98f207559ffa380a3
URL https://github.com/fmtlib/fmt/archive/refs/tags/11.0.2.zip
URL_HASH MD5=6e20923e12c4b78a99e528c802f459ef
)

FetchContent_Declare(
Expand All @@ -39,8 +39,8 @@ else ()

FetchContent_Declare(
httplib
URL https://github.com/yhirose/cpp-httplib/archive/refs/tags/v0.14.3.zip
URL_HASH MD5=af82eb38506ca531b6d1d53524ff7912
URL https://github.com/yhirose/cpp-httplib/archive/refs/tags/v0.16.0.zip
URL_HASH MD5=c5367889819d677bd06d6c7739896b2b
)

FetchContent_Declare(
Expand Down
6 changes: 3 additions & 3 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ class DriftFrameworkConan(ConanFile):
"date:header_only": True}
generators = "cmake"

requires = ("fmt/10.2.1",
"cpp-httplib/0.14.3",
requires = ("fmt/11.0.2",
"cpp-httplib/0.16.0",
"nlohmann_json/3.11.3",
"openssl/3.2.0",
"openssl/3.2.2",
"concurrentqueue/1.0.4",
"date/3.0.1")

Expand Down
125 changes: 81 additions & 44 deletions src/reduct/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "reduct/bucket.h"
#define FMT_HEADER_ONLY 1
#include <fmt/core.h>
#include <fmt/ranges.h>
#if CONAN
#include <moodycamel/concurrentqueue.h>
#else
Expand Down Expand Up @@ -136,60 +137,29 @@ class Bucket : public IBucket {
const auto time = options.timestamp ? ToMicroseconds(*options.timestamp) : ToMicroseconds(Time::clock::now());
const auto content_type = options.content_type.empty() ? "application/octet-stream" : options.content_type;

IHttpClient::Headers headers;
for (const auto& [key, value] : options.labels) {
headers.emplace(fmt::format("x-reduct-label-{}", key), value);
}

IHttpClient::Headers headers = MakeHeadersFromLabels(options);
return client_->Post(fmt::format("{}/{}?ts={}", path_, entry_name, time), content_type, record.content_length_,
std::move(headers), std::move(record.callback_));
}

Result<WriteBatchErrors> WriteBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept override {
Batch batch;
callback(&batch);

IHttpClient::Headers headers;
for (const auto& [time, record] : batch.records()) {
std::vector<std::string> labels;
for (const auto& [label_key, label_value] : record.labels) {
if (label_key.find(',') == std::string::npos) {
labels.push_back(fmt::format("{}={}", label_key, label_value));
} else {
labels.push_back(fmt::format("{}=\"{}\"", label_key, label_value));
}
}

const auto key = fmt::format("x-reduct-time-{}", ToMicroseconds(time));
const auto value = fmt::format("{},{},{}", record.size, record.content_type, fmt::join(labels, ","));
headers.emplace(key, value);
}
return WriteOrUpdateBatch(entry_name, std::move(callback), true);
}

const auto content_length = batch.body().size();
auto [resp_headers, err] =
client_->Post(fmt::format("{}/{}/batch", path_, entry_name), "application/octet-stream", content_length,
std::move(headers), [batch = std::move(batch)](size_t offset, size_t size) {
return std::pair{batch.body().size() <= offset + size, batch.body().substr(offset, size)};
});
if (err) {
return {{}, err};
}
Result<WriteBatchErrors> UpdateBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept override {
return WriteOrUpdateBatch(entry_name, std::move(callback), false);
}

WriteBatchErrors errors;
for (const auto& [key, value] : resp_headers) {
if (key.starts_with("x-reduct-error-")) {
auto pos = value.find(',');
if (pos == std::string::npos) {
continue;
}
auto status = std::stoi(value.substr(0, pos));
auto message = value.substr(pos + 1);
errors.emplace(FromMicroseconds(key.substr(15)), Error{.code = status, .message = message});
}
Error Update(std::string_view entry_name, const WriteOptions& options) const noexcept override {
if (!options.timestamp) {
return Error{.code = 400, .message = "Timestamp is required"};
}

return {errors, Error::kOk};
const auto time = ToMicroseconds(*options.timestamp);
IHttpClient::Headers headers = MakeHeadersFromLabels(options);
return client_->Patch(fmt::format("{}/{}?ts={}", path_, entry_name, time), "", std::move(headers));
}

Error Read(std::string_view entry_name, std::optional<Time> ts, ReadRecordCallback callback) const noexcept override {
Expand Down Expand Up @@ -525,6 +495,73 @@ class Bucket : public IBucket {

static Time FromMicroseconds(const std::string& ts) { return Time() + std::chrono::microseconds(std::stoul(ts)); }

IHttpClient::Headers MakeHeadersFromLabels(const WriteOptions& options) const {
IHttpClient::Headers headers;
for (const auto& [key, value] : options.labels) {
headers.emplace(fmt::format("x-reduct-label-{}", key), value);
}
return headers;
}

Result<WriteBatchErrors> WriteOrUpdateBatch(std::string_view entry_name, WriteBatchCallback callback,
bool write) const noexcept {
Batch batch;
callback(&batch);

IHttpClient::Headers headers;
for (const auto& [time, record] : batch.records()) {
std::vector<std::string> labels;
for (const auto& [label_key, label_value] : record.labels) {
if (label_key.find(',') == std::string::npos) {
labels.push_back(fmt::format("{}={}", label_key, label_value));
} else {
labels.push_back(fmt::format("{}=\"{}\"", label_key, label_value));
}
}

const auto key = fmt::format("x-reduct-time-{}", ToMicroseconds(time));

if (write) {
const auto value = fmt::format("{},{},{}", record.size, record.content_type, fmt::join(labels, ","));
headers.emplace(key, value);
} else {
const auto value = fmt::format("0,,{}", fmt::join(labels, ","));
headers.emplace(key, value);
}
}

Result<IHttpClient::Headers> resp;
if (write) {
const auto content_length = batch.body().size();
resp = client_->Post(fmt::format("{}/{}/batch", path_, entry_name), "application/octet-stream", content_length,
std::move(headers), [batch = std::move(batch)](size_t offset, size_t size) {
return std::pair{batch.body().size() <= offset + size, batch.body().substr(offset, size)};
});
} else {
resp = client_->Patch(fmt::format("{}/{}/batch", path_, entry_name), "", std::move(headers));
}

auto [resp_headers, err] = resp;
if (err) {
return {{}, err};
}

WriteBatchErrors errors;
for (const auto& [key, value] : resp_headers) {
if (key.starts_with("x-reduct-error-")) {
auto pos = value.find(',');
if (pos == std::string::npos) {
continue;
}
auto status = std::stoi(value.substr(0, pos));
auto message = value.substr(pos + 1);
errors.emplace(FromMicroseconds(key.substr(15)), Error{.code = status, .message = message});
}
}

return {errors, Error::kOk};
}

std::unique_ptr<internal::IHttpClient> client_;
std::string path_;
std::thread worker_;
Expand Down
29 changes: 29 additions & 0 deletions src/reduct/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class IBucket {
body_ += data;
}

void AddOnlyLabels(Time timestamp, LabelMap labels) {
records_[timestamp] = Record{timestamp, 0, "", std::move(labels)};
}

[[nodiscard]] const std::map<Time, Record>& records() const { return records_; }
[[nodiscard]] const std::string& body() const { return body_; }

Expand All @@ -191,6 +195,7 @@ class IBucket {
using WriteBatchCallback = std::function<void(Batch*)>;
using WriteBatchErrors = std::map<Time, Error>;


/**
* Read a record in chunks
* @param entry_name entry in bucket
Expand Down Expand Up @@ -248,6 +253,30 @@ class IBucket {
[[nodiscard]] virtual Result<WriteBatchErrors> WriteBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept = 0;


/**
* Write labels of an existing record
*
* Provide a label with empty value to remove it
*
* @param entry_name entry in bucket
* @param options options with timestamp, labels (content type is ignored)
* @return HTTP or communication error
*/
virtual Error Update(std::string_view entry_name,
const WriteOptions& options) const noexcept = 0;

/**
* Update labels of an existing record in a batch

* @param entry_name entry in bucket
* @param callback a callback to add records to batch
* @return HTTP error or map of errors for each record
*/
[[nodiscard]] virtual Result<WriteBatchErrors> UpdateBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept = 0;


/**
* Query options
*/
Expand Down
34 changes: 26 additions & 8 deletions src/reduct/internal/http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ using httplib::DataSink;

constexpr size_t kMaxChunkSize = 512'000;

Result<IHttpClient::Headers> NormalizeHeaders(httplib::Result res) {
IHttpClient::Headers response_headers;
for (auto& [k, v] : res->headers) {
std::string lowcase_header = k;
std::transform(k.begin(), k.end(), lowcase_header.begin(), [](auto ch) { return tolower(ch); });
response_headers[lowcase_header] = v;
}

return {std::move(response_headers), Error::kOk};
}

class HttpClient : public IHttpClient {
public:
explicit HttpClient(std::string_view url, const HttpOptions& options)
Expand Down Expand Up @@ -121,21 +132,27 @@ class HttpClient : public IHttpClient {
return {{}, std::move(err)};
}

Headers response_headers;
for (auto& [k, v] : res->headers) {
std::string lowcase_header = k;
std::transform(k.begin(), k.end(), lowcase_header.begin(), [](auto ch) { return std::tolower(ch); });
response_headers[lowcase_header] = v;
}

return {std::move(response_headers), Error::kOk};
return NormalizeHeaders(std::move(res));
}

Error Put(std::string_view path, std::string_view body, std::string_view mime) const noexcept override {
auto res = client_->Put(AddApiPrefix(path).data(), std::string(body), mime.data());
return CheckRequest(res);
}

Result<Headers> Patch(std::string_view path, std::string_view body, Headers headers) const noexcept override {
httplib::Headers httplib_headers;
for (auto& [k, v] : headers) {
httplib_headers.emplace(k, v);
}
auto res = client_->Patch(AddApiPrefix(path).data(), httplib_headers, std::string(body), "");
if (auto err = CheckRequest(res)) {
return {{}, std::move(err)};
}

return NormalizeHeaders(std::move(res));
}

Error Delete(std::string_view path) const noexcept override {
auto res = client_->Delete(AddApiPrefix(path).data());
return CheckRequest(res);
Expand Down Expand Up @@ -206,4 +223,5 @@ bool IsCompatible(std::string_view min, std::string_view version) {

return min_version[0] == current_version[0] && min_version[1] <= current_version[1];
}

} // namespace reduct::internal
2 changes: 2 additions & 0 deletions src/reduct/internal/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class IHttpClient {
virtual Error Put(std::string_view path, std::string_view body,
std::string_view mime = "application/json") const noexcept = 0;

virtual Result<Headers> Patch(std::string_view path, std::string_view body, Headers headers) const noexcept = 0;

virtual Error Delete(std::string_view path) const noexcept = 0;

[[nodiscard]] virtual std::string_view api_version() const noexcept = 0;
Expand Down
46 changes: 46 additions & 0 deletions tests/reduct/entry_api_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,49 @@ TEST_CASE("reduct::IBucket should write batch of records with errors", "[bucket_
REQUIRE(record_errors[t].code == 409);
REQUIRE(record_errors[t].message == "A record with timestamp 0 already exists");
}

TEST_CASE("reduct::IBukcet should update labels", "[bucket_api][1_11]") {
Fixture ctx;
auto [bucket, _] = ctx.client->CreateBucket(kBucketName);

auto t = IBucket::Time();
REQUIRE(bucket->Write("entry-1",
{
.timestamp = t,
.labels = {{"key1", "value1"}, {"key2", "value2"}},
},
[](auto rec) { rec->WriteAll("some_data1"); }) == Error::kOk);

REQUIRE(bucket->Update("entry-1", {
.timestamp = t,
.labels = {{"key1", "value1"}, {"key2", ""}},
}) == Error::kOk);

REQUIRE(bucket->Read("entry-1", t, [](auto record) {
REQUIRE(record.labels == std::map<std::string, std::string>{{"key1", "value1"}});
return true;
}) == Error::kOk);
}

TEST_CASE("reduct::IBukcet should update labels in barch and return errors", "[bucket_api][1_11]") {
Fixture ctx;
auto [bucket, _] = ctx.client->CreateBucket(kBucketName);

auto t = IBucket::Time();
REQUIRE(bucket->Write("entry-1",
{
.timestamp = t,
.labels = {{"key1", "value1"}, {"key2", "value2"}},
},
[](auto rec) { rec->WriteAll("some_data1"); }) == Error::kOk);

auto [record_errors, http_error] = bucket->UpdateBatch("entry-1", [t](IBucket::Batch* batch) {
batch->AddOnlyLabels(t, {{"key1", "value1"}, {"key2", ""}});
batch->AddOnlyLabels(t + us(1), {{"key1", "value1"}, {"key2", "value2"}});
});

REQUIRE(http_error == Error::kOk);
REQUIRE(record_errors.size() == 1);
REQUIRE(record_errors[t + us(1)].code == 404);
REQUIRE(record_errors[t + us(1)].message == "No record with timestamp 1");
}
Loading