Skip to content

Commit

Permalink
connection_pool: picking the right connection and reconnecting
Browse files Browse the repository at this point in the history
The reconnection is being scheduled until we reach the desired
number of connections per shard.

The other change is that instead of picking the 'least busy'
connection from the host's pool, we pick the least busy one from
the "shards pool".
  • Loading branch information
jul-stas committed Aug 21, 2020
1 parent 690db35 commit ddec859
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 36 deletions.
105 changes: 70 additions & 35 deletions src/connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "utils.hpp"

#include <algorithm>
#include <numeric>

using namespace datastax;
using namespace datastax::internal::core;
Expand Down Expand Up @@ -77,46 +78,70 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
set_pointer_keys(reconnection_schedules_);
set_pointer_keys(to_flush_);

if (host->sharding_info()) {
const auto hosts_shard_cnt = host->sharding_info()->get_shards_count();
connections_by_shard_.resize(hosts_shard_cnt);
num_connections_per_shard_ = settings_.num_connections_per_host / hosts_shard_cnt
+ (settings_.num_connections_per_host % hosts_shard_cnt ? 1u : 0u);
} else {
connections_by_shard_.resize(1);
num_connections_per_shard_ = settings_.num_connections_per_host;
}

for (Connection::Vec::const_iterator it = connections.begin(), end = connections.end(); it != end;
++it) {
const Connection::Ptr& connection(*it);
if (!connection->is_closing()) {
add_connection(PooledConnection::Ptr(new PooledConnection(this, connection)));
if (connections_by_shard_[connection->shard_id()].size() < num_connections_per_shard_) {
add_connection(PooledConnection::Ptr(new PooledConnection(this, connection)));
} else {
connection->close();
}
}
}

notify_up_or_down();

// We had non-critical errors or some connections closed
assert(connections.size() <= settings_.num_connections_per_host);
size_t needed = settings_.num_connections_per_host - connections_.size();
size_t needed = num_connections_per_shard_ * connections_by_shard_.size()
- std::accumulate(connections_by_shard_.begin(), connections_by_shard_.end(), 0u,
[] (size_t acc, const PooledConnection::Vec& v) {
return acc + v.size();
});
for (size_t i = 0; i < needed; ++i) {
schedule_reconnect();
}
}

PooledConnection::Ptr ConnectionPool::find_least_busy(int64_t token) const {
if (token == CASS_INT64_MIN) {
PooledConnection::Vec::const_iterator it =
std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
if (it == connections_.end() || (*it)->is_closing()) {
return PooledConnection::Ptr();
if (token == CASS_INT64_MIN || !host_->sharding_info()) {
// We got a placeholder token, or a sensible token that is useless without the sharding info.
// In both cases we return the least busy connection of the *entire pool* (or NULL).
PooledConnection::Ptr least_busy; // NULL by default
for (const auto& shard_pool : connections_by_shard_) {
for (const auto& conn : shard_pool) {
if (!conn->is_closing()) {
least_busy = least_busy ? std::min(least_busy, conn, least_busy_comp) : conn;
}
}
}
return *it;
return least_busy;
}

const auto desired_shard_num = host_->sharding_info()->shard_id(token);
const auto conn_it = std::find_if(connections_.begin(), connections_.end(), [desired_shard_num] (const PooledConnection::Ptr& c) {
return c->shard_id() == desired_shard_num;
});
if (conn_it != connections_.end()) {
return *conn_it;
// Otherwise, find the least busy connection pointing to the right shard (if possible)
const auto& shard_pool = connections_by_shard_[host_->sharding_info()->shard_id(token)];
PooledConnection::Vec::const_iterator it =
std::min_element(shard_pool.begin(), shard_pool.end(), least_busy_comp);
if (it == shard_pool.end() || (*it)->is_closing()) {
return find_least_busy(CASS_INT64_MIN);
}

return find_least_busy(CASS_INT64_MIN);
return *it;
}

bool ConnectionPool::has_connections() const { return !connections_.empty(); }
bool ConnectionPool::has_connections() const {
return std::any_of(connections_by_shard_.begin(), connections_by_shard_.end(),
[] (const PooledConnection::Vec& v) { return !v.empty(); });
}

void ConnectionPool::flush() {
for (DenseHashSet<PooledConnection*>::const_iterator it = to_flush_.begin(),
Expand Down Expand Up @@ -154,8 +179,9 @@ void ConnectionPool::close_connection(PooledConnection* connection, Protected) {
if (metrics_) {
metrics_->total_connections.dec();
}
connections_.erase(std::remove(connections_.begin(), connections_.end(), connection),
connections_.end());
auto& shard_pool = connections_by_shard_[connection->shard_id()];
shard_pool.erase(std::remove(shard_pool.begin(), shard_pool.end(), connection),
shard_pool.end());
to_flush_.erase(connection);

if (close_state_ != CLOSE_STATE_OPEN) {
Expand All @@ -173,16 +199,16 @@ void ConnectionPool::add_connection(const PooledConnection::Ptr& connection) {
if (metrics_) {
metrics_->total_connections.inc();
}
connections_.push_back(connection);
const size_t new_connections_shard = connection->shard_id();
LOG_INFO("add_connection: to host %s to shard %ld", host_->address_string().c_str(), new_connections_shard);
connections_by_shard_[new_connections_shard].push_back(connection);
}

void ConnectionPool::notify_up_or_down() {
if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) &&
connections_.empty()) {
if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) && !has_connections()) {
notify_state_ = NOTIFY_STATE_DOWN;
listener_->on_pool_down(host_->address());
} else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) &&
!connections_.empty()) {
} else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) && has_connections()) {
notify_state_ = NOTIFY_STATE_UP;
listener_->on_pool_up(host_->address());
}
Expand Down Expand Up @@ -223,11 +249,12 @@ void ConnectionPool::internal_close() {
// Make copies of connection/connector data structures to prevent iterator
// invalidation.

PooledConnection::Vec connections(connections_);
for (PooledConnection::Vec::iterator it = connections.begin(), end = connections.end();
it != end; ++it) {
(*it)->close();
}
auto connections_per_shards = connections_by_shard_;
std::for_each(connections_per_shards.begin(), connections_per_shards.end(), [] (PooledConnection::Vec& v) {
for (auto& c : v) {
c->close();
}
});

DelayedConnector::Vec pending_connections(pending_connections_);
for (DelayedConnector::Vec::iterator it = pending_connections.begin(),
Expand All @@ -244,8 +271,7 @@ void ConnectionPool::internal_close() {
void ConnectionPool::maybe_closed() {
// Remove the pool once all current connections and pending connections
// are terminated.
if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && connections_.empty() &&
pending_connections_.empty()) {
if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && !has_connections() && pending_connections_.empty()) {
close_state_ = CLOSE_STATE_CLOSED;
// Only mark DOWN if it's UP otherwise we might get multiple DOWN events
// when connecting the pool.
Expand Down Expand Up @@ -275,9 +301,18 @@ void ConnectionPool::on_reconnect(DelayedConnector* connector) {
}

if (connector->is_ok()) {
add_connection(
PooledConnection::Ptr(new PooledConnection(this, connector->release_connection())));
notify_up_or_down();
PooledConnection::Ptr pooled_conn {new PooledConnection(this, connector->release_connection())};
const size_t new_connections_shard = pooled_conn->shard_id();
if (connections_by_shard_.size() > new_connections_shard
&& connections_by_shard_[new_connections_shard].size() < num_connections_per_shard_) {
add_connection(pooled_conn);
notify_up_or_down();
} else {
LOG_INFO("Reconnection to host %s connected us to shard %ld, reconnecting again",
address().to_string().c_str(), new_connections_shard);
pooled_conn->close();
schedule_reconnect(schedule.release());
}
} else if (!connector->is_canceled()) {
if (connector->is_critical_error()) {
LOG_ERROR("Closing established connection pool to host %s because of the following error: %s",
Expand Down
6 changes: 5 additions & 1 deletion src/connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ class ConnectionPool : public RefCounted<ConnectionPool> {
private:
void notify_up_or_down();
void notify_critical_error(Connector::ConnectionError code, const String& message);

/** Adds connection to the pool. It's the caller's responsibility
* to keep track of the connections count. */
void add_connection(const PooledConnection::Ptr& connection);
void schedule_reconnect(ReconnectionSchedule* schedule = NULL);
void internal_close();
Expand All @@ -232,7 +235,8 @@ class ConnectionPool : public RefCounted<ConnectionPool> {

CloseState close_state_;
NotifyState notify_state_;
PooledConnection::Vec connections_;
std::vector<PooledConnection::Vec> connections_by_shard_; /// Index is the shard ID
size_t num_connections_per_shard_;
DelayedConnector::Vec pending_connections_;
DenseHashSet<PooledConnection*> to_flush_;
};
Expand Down

0 comments on commit ddec859

Please sign in to comment.