From 8dcb69ee20e140e32c9315ebacc3856f707bac67 Mon Sep 17 00:00:00 2001 From: Juliusz Stasiewicz Date: Wed, 1 Jul 2020 16:52:39 +0200 Subject: [PATCH] connection_pool: picking the right connection and reconnecting The reconnection is being scheduled until we reach the desired number of connections per shard. The other change is that instead of picking the 'least busy' connection from the host's pool, we pick the least busy one from the "shards pool". --- src/connection_pool.cpp | 99 +++++++++++++++++++++++++++-------------- src/connection_pool.hpp | 3 +- 2 files changed, 67 insertions(+), 35 deletions(-) diff --git a/src/connection_pool.cpp b/src/connection_pool.cpp index 105b3a0e0..b65092177 100644 --- a/src/connection_pool.cpp +++ b/src/connection_pool.cpp @@ -22,6 +22,7 @@ #include "utils.hpp" #include +#include using namespace datastax; using namespace datastax::internal::core; @@ -77,10 +78,21 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo set_pointer_keys(reconnection_schedules_); set_pointer_keys(to_flush_); + if (host->sharding_info()) { + const auto hosts_shard_cnt = host->sharding_info()->get_shards_count(); + connections_by_shard_.resize(hosts_shard_cnt); + num_connections_per_shard_ = settings_.num_connections_per_host / hosts_shard_cnt + + (settings_.num_connections_per_host % hosts_shard_cnt ? 1u : 0u); + } else { + connections_by_shard_.resize(1); + num_connections_per_shard_ = settings_.num_connections_per_host; + } + for (Connection::Vec::const_iterator it = connections.begin(), end = connections.end(); it != end; ++it) { const Connection::Ptr& connection(*it); - if (!connection->is_closing()) { + if (!connection->is_closing() + && connections_by_shard_[connection->shard_id().value_or(0)].size() < num_connections_per_shard_) { add_connection(PooledConnection::Ptr(new PooledConnection(this, connection))); } } @@ -88,34 +100,43 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo notify_up_or_down(); // We had non-critical errors or some connections closed - assert(connections.size() <= settings_.num_connections_per_host); - size_t needed = settings_.num_connections_per_host - connections_.size(); + size_t needed = num_connections_per_shard_ * connections_by_shard_.size() + - std::accumulate(connections_by_shard_.begin(), connections_by_shard_.end(), 0u, + [] (size_t acc, const PooledConnection::Vec& v) { + return acc + v.size(); + }); for (size_t i = 0; i < needed; ++i) { schedule_reconnect(); } } PooledConnection::Ptr ConnectionPool::find_least_busy(int64_t token) const { - if (token == CASS_INT64_MIN) { - PooledConnection::Vec::const_iterator it = - std::min_element(connections_.begin(), connections_.end(), least_busy_comp); - if (it == connections_.end() || (*it)->is_closing()) { - return PooledConnection::Ptr(); + if (token == CASS_INT64_MIN || !host_->sharding_info()) { + // We got a placeholder token, or a sensible token that is useless without the sharding info. + // In both cases we return the least busy connection of the *entire pool* (or NULL). + PooledConnection::Ptr least_busy; // NULL by default + for (const auto& shard_pool : connections_by_shard_) { + for (const auto& conn : shard_pool) { + least_busy = least_busy ? std::min(least_busy, conn, least_busy_comp) : conn; + } } - return *it; + return least_busy; } - const auto desired_shard_num = host_->sharding_info()->shard_id(token); - for (const auto& conn : connections_) { - if (conn->connection()->shard_id() == desired_shard_num) { - return conn; - } + // Otherwise, find the least busy connection pointing to the right shard + const auto& shard_pool = connections_by_shard_[host_->sharding_info()->shard_id(token)]; + PooledConnection::Vec::const_iterator it = + std::min_element(shard_pool.begin(), shard_pool.end(), least_busy_comp); + if (it == shard_pool.end() || (*it)->is_closing()) { + return PooledConnection::Ptr(); } - - return find_least_busy(CASS_INT64_MIN); + return *it; } -bool ConnectionPool::has_connections() const { return !connections_.empty(); } +bool ConnectionPool::has_connections() const { + return !std::all_of(connections_by_shard_.begin(), connections_by_shard_.end(), + [] (const PooledConnection::Vec& v) { return v.empty(); }); +} void ConnectionPool::flush() { for (DenseHashSet::const_iterator it = to_flush_.begin(), @@ -153,8 +174,9 @@ void ConnectionPool::close_connection(PooledConnection* connection, Protected) { if (metrics_) { metrics_->total_connections.dec(); } - connections_.erase(std::remove(connections_.begin(), connections_.end(), connection), - connections_.end()); + auto& shard_pool = connections_by_shard_[connection->connection()->shard_id().value_or(0)]; + shard_pool.erase(std::remove(shard_pool.begin(), shard_pool.end(), connection), + shard_pool.end()); to_flush_.erase(connection); if (close_state_ != CLOSE_STATE_OPEN) { @@ -172,16 +194,16 @@ void ConnectionPool::add_connection(const PooledConnection::Ptr& connection) { if (metrics_) { metrics_->total_connections.inc(); } - connections_.push_back(connection); + const size_t new_connections_shard = connection->connection()->shard_id().value_or(0u); + LOG_INFO("add_connection: to host %s to shard %ld", host_->address_string().c_str(), new_connections_shard); + connections_by_shard_[new_connections_shard].push_back(connection); } void ConnectionPool::notify_up_or_down() { - if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) && - connections_.empty()) { + if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) && !has_connections()) { notify_state_ = NOTIFY_STATE_DOWN; listener_->on_pool_down(host_->address()); - } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) && - !connections_.empty()) { + } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) && has_connections()) { notify_state_ = NOTIFY_STATE_UP; listener_->on_pool_up(host_->address()); } @@ -222,11 +244,13 @@ void ConnectionPool::internal_close() { // Make copies of connection/connector data structures to prevent iterator // invalidation. - PooledConnection::Vec connections(connections_); - for (PooledConnection::Vec::iterator it = connections.begin(), end = connections.end(); - it != end; ++it) { - (*it)->close(); - } + auto connections_per_shards = connections_by_shard_; + std::for_each(connections_per_shards.begin(), connections_per_shards.end(), [] (PooledConnection::Vec& v) { + for (PooledConnection::Vec::iterator it = v.begin(), end = v.end(); + it != end; ++it) { + (*it)->close(); + } + }); DelayedConnector::Vec pending_connections(pending_connections_); for (DelayedConnector::Vec::iterator it = pending_connections.begin(), @@ -243,8 +267,7 @@ void ConnectionPool::internal_close() { void ConnectionPool::maybe_closed() { // Remove the pool once all current connections and pending connections // are terminated. - if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && connections_.empty() && - pending_connections_.empty()) { + if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && !has_connections() && pending_connections_.empty()) { close_state_ = CLOSE_STATE_CLOSED; // Only mark DOWN if it's UP otherwise we might get multiple DOWN events // when connecting the pool. @@ -274,9 +297,17 @@ void ConnectionPool::on_reconnect(DelayedConnector* connector) { } if (connector->is_ok()) { - add_connection( - PooledConnection::Ptr(new PooledConnection(this, connector->release_connection()))); - notify_up_or_down(); + PooledConnection::Ptr pooled_conn {new PooledConnection(this, connector->release_connection())}; + const size_t new_connections_shard = pooled_conn->connection()->shard_id().value_or(0u); + if (connections_by_shard_.size() > new_connections_shard + && connections_by_shard_[new_connections_shard].size() < num_connections_per_shard_) { + add_connection(pooled_conn); + notify_up_or_down(); + } else { + LOG_INFO("Reconnection to host %s connected us to shard %ld, reconnecting again", + address().to_string().c_str(), new_connections_shard); + schedule_reconnect(schedule.release()); + } } else if (!connector->is_canceled()) { if (connector->is_critical_error()) { LOG_ERROR("Closing established connection pool to host %s because of the following error: %s", diff --git a/src/connection_pool.hpp b/src/connection_pool.hpp index db3e2c184..952fc53c4 100644 --- a/src/connection_pool.hpp +++ b/src/connection_pool.hpp @@ -232,7 +232,8 @@ class ConnectionPool : public RefCounted { CloseState close_state_; NotifyState notify_state_; - PooledConnection::Vec connections_; + std::vector connections_by_shard_; /// Index is the shard ID + size_t num_connections_per_shard_; DelayedConnector::Vec pending_connections_; DenseHashSet to_flush_; };