diff --git a/src/cluster.cpp b/src/cluster.cpp index f03eaabd0..63b3e621c 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -25,6 +25,8 @@ #include "speculative_execution.hpp" #include "utils.hpp" +#include + using namespace datastax; using namespace datastax::internal::core; @@ -235,6 +237,10 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list , local_dc_(local_dc) , supported_options_(supported_options) , is_recording_events_(settings.disable_events_on_startup) { + static const auto optimized_msg = "===== Using optimized driver!!! =====\n"; + std::cout << optimized_msg; + LOG_INFO(optimized_msg); + inc_ref(); connection_->set_listener(this); diff --git a/src/connection_pool.cpp b/src/connection_pool.cpp index ced2c4502..2fdf43425 100644 --- a/src/connection_pool.cpp +++ b/src/connection_pool.cpp @@ -22,6 +22,7 @@ #include "utils.hpp" #include +#include using namespace datastax; using namespace datastax::internal::core; @@ -77,10 +78,21 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo set_pointer_keys(reconnection_schedules_); set_pointer_keys(to_flush_); + if (host->sharding_info()) { + const auto hosts_shard_cnt = host->sharding_info()->get_shards_count(); + connections_by_shard_.resize(hosts_shard_cnt); + num_connections_per_shard_ = settings_.num_connections_per_host / hosts_shard_cnt + + (settings_.num_connections_per_host % hosts_shard_cnt ? 1u : 0u); + } else { + connections_by_shard_.resize(1); + num_connections_per_shard_ = settings_.num_connections_per_host; + } + for (Connection::Vec::const_iterator it = connections.begin(), end = connections.end(); it != end; ++it) { const Connection::Ptr& connection(*it); - if (!connection->is_closing()) { + if (!connection->is_closing() + && connections_by_shard_[connection->shard_id()].size() < num_connections_per_shard_) { add_connection(PooledConnection::Ptr(new PooledConnection(this, connection))); } } @@ -88,35 +100,43 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo notify_up_or_down(); // We had non-critical errors or some connections closed - assert(connections.size() <= settings_.num_connections_per_host); - size_t needed = settings_.num_connections_per_host - connections_.size(); + size_t needed = num_connections_per_shard_ * connections_by_shard_.size() + - std::accumulate(connections_by_shard_.begin(), connections_by_shard_.end(), 0u, + [] (size_t acc, const PooledConnection::Vec& v) { + return acc + v.size(); + }); for (size_t i = 0; i < needed; ++i) { schedule_reconnect(); } } PooledConnection::Ptr ConnectionPool::find_least_busy(int64_t token) const { - if (token == CASS_INT64_MIN) { - PooledConnection::Vec::const_iterator it = - std::min_element(connections_.begin(), connections_.end(), least_busy_comp); - if (it == connections_.end() || (*it)->is_closing()) { - return PooledConnection::Ptr(); + if (token == CASS_INT64_MIN || !host_->sharding_info()) { + // We got a placeholder token, or a sensible token that is useless without the sharding info. + // In both cases we return the least busy connection of the *entire pool* (or NULL). + PooledConnection::Ptr least_busy; // NULL by default + for (const auto& shard_pool : connections_by_shard_) { + for (const auto& conn : shard_pool) { + least_busy = least_busy ? std::min(least_busy, conn, least_busy_comp) : conn; + } } - return *it; + return least_busy; } - const auto desired_shard_num = host_->sharding_info()->shard_id(token); - const auto conn_it = std::find_if(connections_.begin(), connections_.end(), [desired_shard_num] (const PooledConnection::Ptr& c) { - return c->shard_id() == desired_shard_num; - }); - if (conn_it != connections_.end()) { - return *conn_it; + // Otherwise, find the least busy connection pointing to the right shard + const auto& shard_pool = connections_by_shard_[host_->sharding_info()->shard_id(token)]; + PooledConnection::Vec::const_iterator it = + std::min_element(shard_pool.begin(), shard_pool.end(), least_busy_comp); + if (it == shard_pool.end() || (*it)->is_closing()) { + return PooledConnection::Ptr(); } - - return find_least_busy(CASS_INT64_MIN); + return *it; } -bool ConnectionPool::has_connections() const { return !connections_.empty(); } +bool ConnectionPool::has_connections() const { + return std::any_of(connections_by_shard_.begin(), connections_by_shard_.end(), + [] (const PooledConnection::Vec& v) { return !v.empty(); }); +} void ConnectionPool::flush() { for (DenseHashSet::const_iterator it = to_flush_.begin(), @@ -154,8 +174,9 @@ void ConnectionPool::close_connection(PooledConnection* connection, Protected) { if (metrics_) { metrics_->total_connections.dec(); } - connections_.erase(std::remove(connections_.begin(), connections_.end(), connection), - connections_.end()); + auto& shard_pool = connections_by_shard_[connection->shard_id()]; + shard_pool.erase(std::remove(shard_pool.begin(), shard_pool.end(), connection), + shard_pool.end()); to_flush_.erase(connection); if (close_state_ != CLOSE_STATE_OPEN) { @@ -173,16 +194,16 @@ void ConnectionPool::add_connection(const PooledConnection::Ptr& connection) { if (metrics_) { metrics_->total_connections.inc(); } - connections_.push_back(connection); + const size_t new_connections_shard = connection->shard_id(); + LOG_INFO("add_connection: to host %s to shard %ld", host_->address_string().c_str(), new_connections_shard); + connections_by_shard_[new_connections_shard].push_back(connection); } void ConnectionPool::notify_up_or_down() { - if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) && - connections_.empty()) { + if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) && !has_connections()) { notify_state_ = NOTIFY_STATE_DOWN; listener_->on_pool_down(host_->address()); - } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) && - !connections_.empty()) { + } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) && has_connections()) { notify_state_ = NOTIFY_STATE_UP; listener_->on_pool_up(host_->address()); } @@ -223,11 +244,12 @@ void ConnectionPool::internal_close() { // Make copies of connection/connector data structures to prevent iterator // invalidation. - PooledConnection::Vec connections(connections_); - for (PooledConnection::Vec::iterator it = connections.begin(), end = connections.end(); - it != end; ++it) { - (*it)->close(); - } + auto connections_per_shards = connections_by_shard_; + std::for_each(connections_per_shards.begin(), connections_per_shards.end(), [] (PooledConnection::Vec& v) { + for (auto& c : v) { + c->close(); + } + }); DelayedConnector::Vec pending_connections(pending_connections_); for (DelayedConnector::Vec::iterator it = pending_connections.begin(), @@ -244,8 +266,7 @@ void ConnectionPool::internal_close() { void ConnectionPool::maybe_closed() { // Remove the pool once all current connections and pending connections // are terminated. - if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && connections_.empty() && - pending_connections_.empty()) { + if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && !has_connections() && pending_connections_.empty()) { close_state_ = CLOSE_STATE_CLOSED; // Only mark DOWN if it's UP otherwise we might get multiple DOWN events // when connecting the pool. @@ -275,9 +296,17 @@ void ConnectionPool::on_reconnect(DelayedConnector* connector) { } if (connector->is_ok()) { - add_connection( - PooledConnection::Ptr(new PooledConnection(this, connector->release_connection()))); - notify_up_or_down(); + PooledConnection::Ptr pooled_conn {new PooledConnection(this, connector->release_connection())}; + const size_t new_connections_shard = pooled_conn->shard_id(); + if (connections_by_shard_.size() > new_connections_shard + && connections_by_shard_[new_connections_shard].size() < num_connections_per_shard_) { + add_connection(pooled_conn); + notify_up_or_down(); + } else { + LOG_INFO("Reconnection to host %s connected us to shard %ld, reconnecting again", + address().to_string().c_str(), new_connections_shard); + schedule_reconnect(schedule.release()); + } } else if (!connector->is_canceled()) { if (connector->is_critical_error()) { LOG_ERROR("Closing established connection pool to host %s because of the following error: %s", diff --git a/src/connection_pool.hpp b/src/connection_pool.hpp index db3e2c184..4dac7466e 100644 --- a/src/connection_pool.hpp +++ b/src/connection_pool.hpp @@ -213,6 +213,9 @@ class ConnectionPool : public RefCounted { private: void notify_up_or_down(); void notify_critical_error(Connector::ConnectionError code, const String& message); + + /** Adds connection to the pool. It's the caller's responsibility + * to keep track of the connections count. */ void add_connection(const PooledConnection::Ptr& connection); void schedule_reconnect(ReconnectionSchedule* schedule = NULL); void internal_close(); @@ -232,7 +235,8 @@ class ConnectionPool : public RefCounted { CloseState close_state_; NotifyState notify_state_; - PooledConnection::Vec connections_; + std::vector connections_by_shard_; /// Index is the shard ID + size_t num_connections_per_shard_; DelayedConnector::Vec pending_connections_; DenseHashSet to_flush_; };