Skip to content

Commit

Permalink
implement changing labels api
Browse files Browse the repository at this point in the history
  • Loading branch information
atimin committed Aug 6, 2024
1 parent be517c9 commit 91043e1
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 52 deletions.
125 changes: 81 additions & 44 deletions src/reduct/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "reduct/bucket.h"
#define FMT_HEADER_ONLY 1
#include <fmt/core.h>
#include <fmt/ranges.h>
#if CONAN
#include <moodycamel/concurrentqueue.h>
#else
Expand Down Expand Up @@ -136,60 +137,29 @@ class Bucket : public IBucket {
const auto time = options.timestamp ? ToMicroseconds(*options.timestamp) : ToMicroseconds(Time::clock::now());
const auto content_type = options.content_type.empty() ? "application/octet-stream" : options.content_type;

IHttpClient::Headers headers;
for (const auto& [key, value] : options.labels) {
headers.emplace(fmt::format("x-reduct-label-{}", key), value);
}

IHttpClient::Headers headers = MakeHeadersFromLabels(options);
return client_->Post(fmt::format("{}/{}?ts={}", path_, entry_name, time), content_type, record.content_length_,
std::move(headers), std::move(record.callback_));
}

Result<WriteBatchErrors> WriteBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept override {
Batch batch;
callback(&batch);

IHttpClient::Headers headers;
for (const auto& [time, record] : batch.records()) {
std::vector<std::string> labels;
for (const auto& [label_key, label_value] : record.labels) {
if (label_key.find(',') == std::string::npos) {
labels.push_back(fmt::format("{}={}", label_key, label_value));
} else {
labels.push_back(fmt::format("{}=\"{}\"", label_key, label_value));
}
}

const auto key = fmt::format("x-reduct-time-{}", ToMicroseconds(time));
const auto value = fmt::format("{},{},{}", record.size, record.content_type, fmt::join(labels, ","));
headers.emplace(key, value);
}
return WriteOrUpdateBatch(entry_name, std::move(callback), true);
}

const auto content_length = batch.body().size();
auto [resp_headers, err] =
client_->Post(fmt::format("{}/{}/batch", path_, entry_name), "application/octet-stream", content_length,
std::move(headers), [batch = std::move(batch)](size_t offset, size_t size) {
return std::pair{batch.body().size() <= offset + size, batch.body().substr(offset, size)};
});
if (err) {
return {{}, err};
}
Result<WriteBatchErrors> UpdateBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept override {
return WriteOrUpdateBatch(entry_name, std::move(callback), false);
}

WriteBatchErrors errors;
for (const auto& [key, value] : resp_headers) {
if (key.starts_with("x-reduct-error-")) {
auto pos = value.find(',');
if (pos == std::string::npos) {
continue;
}
auto status = std::stoi(value.substr(0, pos));
auto message = value.substr(pos + 1);
errors.emplace(FromMicroseconds(key.substr(15)), Error{.code = status, .message = message});
}
Error Update(std::string_view entry_name, const WriteOptions& options) const noexcept override {
if (!options.timestamp) {
return Error{.code = 400, .message = "Timestamp is required"};
}

return {errors, Error::kOk};
const auto time = ToMicroseconds(*options.timestamp);
IHttpClient::Headers headers = MakeHeadersFromLabels(options);
return client_->Patch(fmt::format("{}/{}?ts={}", path_, entry_name, time), "", std::move(headers));
}

Error Read(std::string_view entry_name, std::optional<Time> ts, ReadRecordCallback callback) const noexcept override {
Expand Down Expand Up @@ -525,6 +495,73 @@ class Bucket : public IBucket {

static Time FromMicroseconds(const std::string& ts) { return Time() + std::chrono::microseconds(std::stoul(ts)); }

IHttpClient::Headers MakeHeadersFromLabels(const WriteOptions& options) const {
IHttpClient::Headers headers;
for (const auto& [key, value] : options.labels) {
headers.emplace(fmt::format("x-reduct-label-{}", key), value);
}
return headers;
}

Result<WriteBatchErrors> WriteOrUpdateBatch(std::string_view entry_name, WriteBatchCallback callback,
bool write) const noexcept {
Batch batch;
callback(&batch);

IHttpClient::Headers headers;
for (const auto& [time, record] : batch.records()) {
std::vector<std::string> labels;
for (const auto& [label_key, label_value] : record.labels) {
if (label_key.find(',') == std::string::npos) {
labels.push_back(fmt::format("{}={}", label_key, label_value));
} else {
labels.push_back(fmt::format("{}=\"{}\"", label_key, label_value));
}
}

const auto key = fmt::format("x-reduct-time-{}", ToMicroseconds(time));

if (write) {
const auto value = fmt::format("{},{},{}", record.size, record.content_type, fmt::join(labels, ","));
headers.emplace(key, value);
} else {
const auto value = fmt::format("0,,{}", fmt::join(labels, ","));
headers.emplace(key, value);
}
}

Result<IHttpClient::Headers> resp;
if (write) {
const auto content_length = batch.body().size();
resp = client_->Post(fmt::format("{}/{}/batch", path_, entry_name), "application/octet-stream", content_length,
std::move(headers), [batch = std::move(batch)](size_t offset, size_t size) {
return std::pair{batch.body().size() <= offset + size, batch.body().substr(offset, size)};
});
} else {
resp = client_->Patch(fmt::format("{}/{}/batch", path_, entry_name), "", std::move(headers));
}

auto [resp_headers, err] = resp;
if (err) {
return {{}, err};
}

WriteBatchErrors errors;
for (const auto& [key, value] : resp_headers) {
if (key.starts_with("x-reduct-error-")) {
auto pos = value.find(',');
if (pos == std::string::npos) {
continue;
}
auto status = std::stoi(value.substr(0, pos));
auto message = value.substr(pos + 1);
errors.emplace(FromMicroseconds(key.substr(15)), Error{.code = status, .message = message});
}
}

return {errors, Error::kOk};
}

std::unique_ptr<internal::IHttpClient> client_;
std::string path_;
std::thread worker_;
Expand Down
29 changes: 29 additions & 0 deletions src/reduct/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class IBucket {
body_ += data;
}

void AddOnlyLabels(Time timestamp, LabelMap labels) {
records_[timestamp] = Record{timestamp, 0, "", std::move(labels)};
}

[[nodiscard]] const std::map<Time, Record>& records() const { return records_; }
[[nodiscard]] const std::string& body() const { return body_; }

Expand All @@ -191,6 +195,7 @@ class IBucket {
using WriteBatchCallback = std::function<void(Batch*)>;
using WriteBatchErrors = std::map<Time, Error>;


/**
* Read a record in chunks
* @param entry_name entry in bucket
Expand Down Expand Up @@ -248,6 +253,30 @@ class IBucket {
[[nodiscard]] virtual Result<WriteBatchErrors> WriteBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept = 0;


/**
* Write labels of an existing record
*
* Provide a label with empty value to remove it
*
* @param entry_name entry in bucket
* @param options options with timestamp, labels (content type is ignored)
* @return HTTP or communication error
*/
virtual Error Update(std::string_view entry_name,
const WriteOptions& options) const noexcept = 0;

/**
* Update labels of an existing record in a batch
* @param entry_name entry in bucket
* @param callback a callback to add records to batch
* @return HTTP error or map of errors for each record
*/
[[nodiscard]] virtual Result<WriteBatchErrors> UpdateBatch(std::string_view entry_name,
WriteBatchCallback callback) const noexcept = 0;


/**
* Query options
*/
Expand Down
34 changes: 26 additions & 8 deletions src/reduct/internal/http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,27 @@ class HttpClient : public IHttpClient {
return {{}, std::move(err)};
}

Headers response_headers;
for (auto& [k, v] : res->headers) {
std::string lowcase_header = k;
std::transform(k.begin(), k.end(), lowcase_header.begin(), [](auto ch) { return std::tolower(ch); });
response_headers[lowcase_header] = v;
}

return {std::move(response_headers), Error::kOk};
return NormilizeHeaders(std::move(res));
}

Error Put(std::string_view path, std::string_view body, std::string_view mime) const noexcept override {
auto res = client_->Put(AddApiPrefix(path).data(), std::string(body), mime.data());
return CheckRequest(res);
}

Result<Headers> Patch(std::string_view path, std::string_view body, Headers headers) const noexcept override {
httplib::Headers httplib_headers;
for (auto& [k, v] : headers) {
httplib_headers.emplace(k, v);
}
auto res = client_->Patch(AddApiPrefix(path).data(), httplib_headers, std::string(body), "");
if (auto err = CheckRequest(res)) {
return {{}, std::move(err)};
}

return NormilizeHeaders(std::move(res));
}

Error Delete(std::string_view path) const noexcept override {
auto res = client_->Delete(AddApiPrefix(path).data());
return CheckRequest(res);
Expand Down Expand Up @@ -206,4 +212,16 @@ bool IsCompatible(std::string_view min, std::string_view version) {

return min_version[0] == current_version[0] && min_version[1] <= current_version[1];
}

Result<IHttpClient::Headers> NormilizeHeaders(httplib::Result res) {
IHttpClient::Headers response_headers;
for (auto& [k, v] : res->headers) {
std::string lowcase_header = k;
std::transform(k.begin(), k.end(), lowcase_header.begin(), [](auto ch) { return tolower(ch); });
response_headers[lowcase_header] = v;
}

return {std::move(response_headers), Error::kOk};
}

} // namespace reduct::internal
2 changes: 2 additions & 0 deletions src/reduct/internal/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class IHttpClient {
virtual Error Put(std::string_view path, std::string_view body,
std::string_view mime = "application/json") const noexcept = 0;

virtual Result<Headers> Patch(std::string_view path, std::string_view body, Headers headers) const noexcept = 0;

virtual Error Delete(std::string_view path) const noexcept = 0;

[[nodiscard]] virtual std::string_view api_version() const noexcept = 0;
Expand Down
46 changes: 46 additions & 0 deletions tests/reduct/entry_api_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,49 @@ TEST_CASE("reduct::IBucket should write batch of records with errors", "[bucket_
REQUIRE(record_errors[t].code == 409);
REQUIRE(record_errors[t].message == "A record with timestamp 0 already exists");
}

TEST_CASE("reduct::IBukcet should update labels", "[bucket_api][1_11]") {
Fixture ctx;
auto [bucket, _] = ctx.client->CreateBucket(kBucketName);

auto t = IBucket::Time();
REQUIRE(bucket->Write("entry-1",
{
.timestamp = t,
.labels = {{"key1", "value1"}, {"key2", "value2"}},
},
[](auto rec) { rec->WriteAll("some_data1"); }) == Error::kOk);

REQUIRE(bucket->Update("entry-1", {
.timestamp = t,
.labels = {{"key1", "value1"}, {"key2", ""}},
}) == Error::kOk);

REQUIRE(bucket->Read("entry-1", t, [](auto record) {
REQUIRE(record.labels == std::map<std::string, std::string>{{"key1", "value1"}});
return true;
}) == Error::kOk);
}

TEST_CASE("reduct::IBukcet should update labels in barch and return errors", "[bucket_api][1_11]") {
Fixture ctx;
auto [bucket, _] = ctx.client->CreateBucket(kBucketName);

auto t = IBucket::Time();
REQUIRE(bucket->Write("entry-1",
{
.timestamp = t,
.labels = {{"key1", "value1"}, {"key2", "value2"}},
},
[](auto rec) { rec->WriteAll("some_data1"); }) == Error::kOk);

auto [record_errors, http_error] = bucket->UpdateBatch("entry-1", [t](IBucket::Batch* batch) {
batch->AddOnlyLabels(t, {{"key1", "value1"}, {"key2", ""}});
batch->AddOnlyLabels(t + us(1), {{"key1", "value1"}, {"key2", "value2"}});
});

REQUIRE(http_error == Error::kOk);
REQUIRE(record_errors.size() == 1);
REQUIRE(record_errors[t + us(1)].code == 404);
REQUIRE(record_errors[t + us(1)].message == "No record with timestamp 1");
}

0 comments on commit 91043e1

Please sign in to comment.