Skip to content

Commit

Permalink
add when condition to IBucket::Query
Browse files Browse the repository at this point in the history
  • Loading branch information
atimin committed Dec 4, 2024
1 parent d03d142 commit ceb80ed
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 30 deletions.
32 changes: 24 additions & 8 deletions src/reduct/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,33 @@ class Bucket : public IBucket {

Error Query(std::string_view entry_name, std::optional<Time> start, std::optional<Time> stop, QueryOptions options,
ReadRecordCallback callback) const noexcept override {
std::string url = BuildQueryUrl(start, stop, entry_name, options);
auto [body, err] = client_->Get(url);
if (err) {
return err;
std::string body;
if (options.when) {
auto [json_payload, json_err] = QueryOptionsToJsonString("QUERY", start, stop, options);
if (json_err) {
return json_err;
}

auto [resp, resp_err] = client_->PostWithResponse(fmt::format("{}/{}/q", path_, entry_name), json_payload.dump());
if (resp_err) {
return resp_err;
}

body = std::move(resp);
} else {
std::string url = BuildQueryUrl(start, stop, entry_name, options);
auto [resp, err] = client_->Get(url);
if (err) {
return err;
}

body = std::move(resp);
}

uint64_t id;
try {
auto data = nlohmann::json::parse(body);
id = data.at("id");
id = data["id"];
} catch (const std::exception& ex) {
return Error{.code = -1, .message = ex.what()};
}
Expand Down Expand Up @@ -235,13 +252,12 @@ class Bucket : public IBucket {
QueryOptions options) const noexcept override {
std::string body;
if (options.when) {
auto [json_payload, json_err] = QueryOptionsToJsonString("REMOVE", options);
auto [json_payload, json_err] = QueryOptionsToJsonString("REMOVE", start, stop, options);
if (json_err) {
return {0, std::move(json_err)};
}

auto [resp, resp_err] =
client_->PostWithResponse(fmt::format("{}/{}/q", path_, entry_name), json_payload.dump());
auto [resp, resp_err] = client_->PostWithResponse(fmt::format("{}/{}/q", path_, entry_name), json_payload.dump());
if (resp_err) {
return {0, std::move(resp_err)};
}
Expand Down
14 changes: 12 additions & 2 deletions src/reduct/internal/serialisation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,20 @@ Result<IClient::FullReplicationInfo> ParseFullReplicationInfo(const nlohmann::js
return {info, Error::kOk};
}

Result<nlohmann::json> QueryOptionsToJsonString(std::string_view type, const IBucket::QueryOptions& options) {
Result<nlohmann::json> QueryOptionsToJsonString(std::string_view type, std::optional<IBucket::Time> start,
std::optional<IBucket::Time> stop,
const IBucket::QueryOptions& options) {
nlohmann::json json_data;
json_data["query_type"] = type;

if (start) {
json_data["start"] = std::chrono::duration_cast<std::chrono::microseconds>(start->time_since_epoch()).count();
}

if (stop) {
json_data["stop"] = std::chrono::duration_cast<std::chrono::microseconds>(stop->time_since_epoch()).count();
}

for (const auto& [key, value] : options.include) {
json_data["include"][key] = value;
}
Expand Down Expand Up @@ -223,7 +233,7 @@ Result<nlohmann::json> QueryOptionsToJsonString(std::string_view type, const IBu
json_data["strict"] = *options.strict;
}

return {json_data, Error::kOk};
return {json_data, Error::kOk};
}

} // namespace reduct::internal
9 changes: 4 additions & 5 deletions src/reduct/internal/serialisation.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

#include <nlohmann/json.hpp>

#include "reduct/client.h"
#include "reduct/bucket.h"
#include "reduct/client.h"

namespace reduct::internal {

Expand All @@ -30,15 +30,13 @@ Result<IBucket::Settings> ParseBucketSettings(const nlohmann::json& json);
*/
Result<IClient::FullTokenInfo> ParseTokenInfo(const nlohmann::json& json);


/**
* @brief Parse list of replication info from JSON string
* @param json
* @return
*/
Result<std::vector<IClient::ReplicationInfo>> ParseReplicationList(const nlohmann::json& data);


/**
* @brief Serialize replication settings
* @param settings to serialize
Expand All @@ -53,8 +51,9 @@ nlohmann::json ReplicationSettingsToJsonString(IClient::ReplicationSettings sett
*/
Result<IClient::FullReplicationInfo> ParseFullReplicationInfo(const nlohmann::json& data);


Result<nlohmann::json> QueryOptionsToJsonString(std::string_view type, const IBucket::QueryOptions& options);
Result<nlohmann::json> QueryOptionsToJsonString(std::string_view type, std::optional<IBucket::Time> start,
std::optional<IBucket::Time> stop,
const IBucket::QueryOptions& options);

}; // namespace reduct::internal

Expand Down
52 changes: 37 additions & 15 deletions tests/reduct/entry_api_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ TEST_CASE("reduct::IBucket should query records", "[entry_api]") {
REQUIRE(bucket->Write("entry",
IBucket::WriteOptions{
.timestamp = ts,
.labels = {{"label1", "value1"}},
.labels = {{"score", "10"}},
},
[](auto rec) { rec->WriteAll("some_data1"); }) == Error::kOk);
REQUIRE(bucket->Write("entry", ts + us(1), [](auto rec) { rec->WriteAll("some_data2"); }) == Error::kOk);
Expand Down Expand Up @@ -189,24 +189,27 @@ TEST_CASE("reduct::IBucket should query records", "[entry_api]") {
REQUIRE(err == Error::kOk);
}

SECTION("include labels") {
auto err = bucket->Query("entry", ts, ts + us(3), IBucket::QueryOptions{.include = {{"label1", "value1"}}},
[&all_data](auto record) {
auto read_err = record.Read([&all_data](auto data) {
all_data.append(data);
return true;
});
SECTION("with condition") {
auto err = bucket->Query("entry", ts, ts + us(3), {.when = R"({"&score": {"$gt": 0}})"}, [&all_data](auto record) {
auto read_err = record.Read([&all_data](auto data) {
all_data.append(data);
return true;
});

REQUIRE(read_err == Error::kOk);
return true;
});
REQUIRE(read_err == Error::kOk);
return true;
});

REQUIRE(err == Error::kOk);
REQUIRE(all_data == "some_data1");
}

SECTION("exclude labels") {
auto err = bucket->Query("entry", ts, ts + us(3), IBucket::QueryOptions{.exclude = {{"label1", "value1"}}},
SECTION("with strict condition") {
auto err = bucket->Query("entry", ts, ts + us(3),
{
.when = R"({"&NOT_EXIST": {"$gt": 0}})",
.strict = true,
},
[&all_data](auto record) {
auto read_err = record.Read([&all_data](auto data) {
all_data.append(data);
Expand All @@ -217,8 +220,27 @@ TEST_CASE("reduct::IBucket should query records", "[entry_api]") {
return true;
});

REQUIRE(err == Error::kOk);
REQUIRE(all_data == "some_data2some_data3");
REQUIRE(err == Error{.code = 404, .message = "Reference 'NOT_EXIST' not found"});
}

SECTION("with non strict condition") {
auto err = bucket->Query("entry", ts, ts + us(3),
{
.when = R"({"&NOT_EXIST": {"$gt": 0}})",
.strict = false,
},
[&all_data](auto record) {
auto read_err = record.Read([&all_data](auto data) {
all_data.append(data);
return true;
});

REQUIRE(read_err == Error::kOk);
return true;
});

REQUIRE(err == Error::kOk);
REQUIRE(all_data.empty());
}
}

Expand Down

0 comments on commit ceb80ed

Please sign in to comment.