Skip to content

Commit

Permalink
Added routing to shards on top of TokenAware policy
Browse files Browse the repository at this point in the history
At this point metrics indicate that cross-shard ops are reduced,
but the implementation is still raw. The reasons:
When connection-to-the-right-shard is being searched among per-host
connections, it is done by linear search. Connecting logic does not
attempt to reconnect until all shards are hit. Topology change
events are not accounted for.
  • Loading branch information
jul-stas committed Jun 26, 2020
1 parent 0438856 commit 540021e
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 13 deletions.
23 changes: 17 additions & 6 deletions src/connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,24 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
}
}

PooledConnection::Ptr ConnectionPool::find_least_busy() const {
PooledConnection::Vec::const_iterator it =
std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
if (it == connections_.end() || (*it)->is_closing()) {
return PooledConnection::Ptr();
PooledConnection::Ptr ConnectionPool::find_least_busy(int64_t token) const {
if (token == CASS_INT64_MIN) {
PooledConnection::Vec::const_iterator it =
std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
if (it == connections_.end() || (*it)->is_closing()) {
return PooledConnection::Ptr();
}
return *it;
}
return *it;

const auto desired_shard_num = host_->sharding_info()->shard_id(token);
for (const auto& conn : connections_) {
if (conn->connection()->shard_id() == desired_shard_num) {
return conn;
}
}

return find_least_busy(CASS_INT64_MIN);
}

bool ConnectionPool::has_connections() const { return !connections_.empty(); }
Expand Down
2 changes: 1 addition & 1 deletion src/connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class ConnectionPool : public RefCounted<ConnectionPool> {
*
* @return The least busy connection or null if no connection is available.
*/
PooledConnection::Ptr find_least_busy() const;
PooledConnection::Ptr find_least_busy(int64_t token) const;

/**
* Determine if the pool has any valid connections.
Expand Down
4 changes: 2 additions & 2 deletions src/connection_pool_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ ConnectionPoolManager::ConnectionPoolManager(const ConnectionPool::Map& pools, u
}
}

PooledConnection::Ptr ConnectionPoolManager::find_least_busy(const Address& address) const {
PooledConnection::Ptr ConnectionPoolManager::find_least_busy(const Address& address, int64_t token) const {
ConnectionPool::Map::const_iterator it = pools_.find(address);
if (it == pools_.end()) {
return PooledConnection::Ptr();
}
return it->second->find_least_busy();
return it->second->find_least_busy(token);
}

bool ConnectionPoolManager::has_connections(const Address& address) const {
Expand Down
2 changes: 1 addition & 1 deletion src/connection_pool_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class ConnectionPoolManager
* @return The least busy connection for a host or null if no connections are
* available.
*/
PooledConnection::Ptr find_least_busy(const Address& address) const;
PooledConnection::Ptr find_least_busy(const Address& address, int64_t token) const;

/**
* Determine if a pool has any valid connections.
Expand Down
2 changes: 2 additions & 0 deletions src/pooled_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class PooledConnection
*/
bool is_closing() const;

const Connection::Ptr connection() const { return connection_; }

public:
const String& keyspace() const { return connection_->keyspace(); } // Test only

Expand Down
10 changes: 9 additions & 1 deletion src/request_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "result_response.hpp"
#include "row.hpp"
#include "session.hpp"
#include "token_map_impl.hpp"

#include <uv.h>

Expand Down Expand Up @@ -357,8 +358,15 @@ void RequestHandler::internal_retry(RequestExecution* request_execution) {

bool is_done = false;
while (!is_done && request_execution->current_host()) {
int64_t token = CASS_INT64_MIN;
if (const RoutableRequest* routable_req = dynamic_cast<const RoutableRequest*>(request()); routable_req) {
String routing_key;
routable_req->get_routing_key(&routing_key);
token = Murmur3Partitioner::hash(routing_key);
}

PooledConnection::Ptr connection =
manager_->find_least_busy(request_execution->current_host()->address());
manager_->find_least_busy(request_execution->current_host()->address(), token);
if (connection) {
int32_t result = connection->write(request_execution);

Expand Down
4 changes: 2 additions & 2 deletions src/request_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ bool RequestProcessor::on_prepare_all(const RequestHandler::Ptr& request_handler
PrepareAllCallback::Ptr prepare_all_callback(
new PrepareAllCallback(address, prepare_all_handler));

PooledConnection::Ptr connection(connection_pool_manager_->find_least_busy(address));
PooledConnection::Ptr connection(connection_pool_manager_->find_least_busy(address, CASS_INT64_MIN));
if (connection) {
connection->write(prepare_all_callback.get());
}
Expand Down Expand Up @@ -580,7 +580,7 @@ bool RequestProcessor::write_wait_callback(const RequestHandler::Ptr& request_ha
const Host::Ptr& current_host,
const RequestCallback::Ptr& callback) {
PooledConnection::Ptr connection(
connection_pool_manager_->find_least_busy(current_host->address()));
connection_pool_manager_->find_least_busy(current_host->address(), CASS_INT64_MIN));
if (connection && connection->write(callback.get()) > 0) {
// Stop the original request timer now that we have a response and
// are waiting for the maximum wait time of the handler.
Expand Down

0 comments on commit 540021e

Please sign in to comment.