Skip to content

Commit

Permalink
Signal usage of shard-aware driver
Browse files Browse the repository at this point in the history
  • Loading branch information
jul-stas committed Aug 18, 2020
1 parent 138e475 commit 0178646
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 36 deletions.
6 changes: 6 additions & 0 deletions src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "speculative_execution.hpp"
#include "utils.hpp"

#include <iostream>

using namespace datastax;
using namespace datastax::internal::core;

Expand Down Expand Up @@ -235,6 +237,10 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list
, local_dc_(local_dc)
, supported_options_(supported_options)
, is_recording_events_(settings.disable_events_on_startup) {
static const auto optimized_msg = "===== Using optimized driver!!! =====\n";
std::cout << optimized_msg;
LOG_INFO(optimized_msg);

inc_ref();
connection_->set_listener(this);

Expand Down
99 changes: 64 additions & 35 deletions src/connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "utils.hpp"

#include <algorithm>
#include <numeric>

using namespace datastax;
using namespace datastax::internal::core;
Expand Down Expand Up @@ -77,46 +78,65 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
set_pointer_keys(reconnection_schedules_);
set_pointer_keys(to_flush_);

if (host->sharding_info()) {
const auto hosts_shard_cnt = host->sharding_info()->get_shards_count();
connections_by_shard_.resize(hosts_shard_cnt);
num_connections_per_shard_ = settings_.num_connections_per_host / hosts_shard_cnt
+ (settings_.num_connections_per_host % hosts_shard_cnt ? 1u : 0u);
} else {
connections_by_shard_.resize(1);
num_connections_per_shard_ = settings_.num_connections_per_host;
}

for (Connection::Vec::const_iterator it = connections.begin(), end = connections.end(); it != end;
++it) {
const Connection::Ptr& connection(*it);
if (!connection->is_closing()) {
if (!connection->is_closing()
&& connections_by_shard_[connection->shard_id()].size() < num_connections_per_shard_) {
add_connection(PooledConnection::Ptr(new PooledConnection(this, connection)));
}
}

notify_up_or_down();

// We had non-critical errors or some connections closed
assert(connections.size() <= settings_.num_connections_per_host);
size_t needed = settings_.num_connections_per_host - connections_.size();
size_t needed = num_connections_per_shard_ * connections_by_shard_.size()
- std::accumulate(connections_by_shard_.begin(), connections_by_shard_.end(), 0u,
[] (size_t acc, const PooledConnection::Vec& v) {
return acc + v.size();
});
for (size_t i = 0; i < needed; ++i) {
schedule_reconnect();
}
}

PooledConnection::Ptr ConnectionPool::find_least_busy(int64_t token) const {
if (token == CASS_INT64_MIN) {
PooledConnection::Vec::const_iterator it =
std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
if (it == connections_.end() || (*it)->is_closing()) {
return PooledConnection::Ptr();
if (token == CASS_INT64_MIN || !host_->sharding_info()) {
// We got a placeholder token, or a sensible token that is useless without the sharding info.
// In both cases we return the least busy connection of the *entire pool* (or NULL).
PooledConnection::Ptr least_busy; // NULL by default
for (const auto& shard_pool : connections_by_shard_) {
for (const auto& conn : shard_pool) {
least_busy = least_busy ? std::min(least_busy, conn, least_busy_comp) : conn;
}
}
return *it;
return least_busy;
}

const auto desired_shard_num = host_->sharding_info()->shard_id(token);
const auto conn_it = std::find_if(connections_.begin(), connections_.end(), [desired_shard_num] (const PooledConnection::Ptr& c) {
return c->shard_id() == desired_shard_num;
});
if (conn_it != connections_.end()) {
return *conn_it;
// Otherwise, find the least busy connection pointing to the right shard
const auto& shard_pool = connections_by_shard_[host_->sharding_info()->shard_id(token)];
PooledConnection::Vec::const_iterator it =
std::min_element(shard_pool.begin(), shard_pool.end(), least_busy_comp);
if (it == shard_pool.end() || (*it)->is_closing()) {
return PooledConnection::Ptr();
}

return find_least_busy(CASS_INT64_MIN);
return *it;
}

bool ConnectionPool::has_connections() const { return !connections_.empty(); }
bool ConnectionPool::has_connections() const {
return std::any_of(connections_by_shard_.begin(), connections_by_shard_.end(),
[] (const PooledConnection::Vec& v) { return !v.empty(); });
}

void ConnectionPool::flush() {
for (DenseHashSet<PooledConnection*>::const_iterator it = to_flush_.begin(),
Expand Down Expand Up @@ -154,8 +174,9 @@ void ConnectionPool::close_connection(PooledConnection* connection, Protected) {
if (metrics_) {
metrics_->total_connections.dec();
}
connections_.erase(std::remove(connections_.begin(), connections_.end(), connection),
connections_.end());
auto& shard_pool = connections_by_shard_[connection->shard_id()];
shard_pool.erase(std::remove(shard_pool.begin(), shard_pool.end(), connection),
shard_pool.end());
to_flush_.erase(connection);

if (close_state_ != CLOSE_STATE_OPEN) {
Expand All @@ -173,16 +194,16 @@ void ConnectionPool::add_connection(const PooledConnection::Ptr& connection) {
if (metrics_) {
metrics_->total_connections.inc();
}
connections_.push_back(connection);
const size_t new_connections_shard = connection->shard_id();
LOG_INFO("add_connection: to host %s to shard %ld", host_->address_string().c_str(), new_connections_shard);
connections_by_shard_[new_connections_shard].push_back(connection);
}

void ConnectionPool::notify_up_or_down() {
if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) &&
connections_.empty()) {
if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) && !has_connections()) {
notify_state_ = NOTIFY_STATE_DOWN;
listener_->on_pool_down(host_->address());
} else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) &&
!connections_.empty()) {
} else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) && has_connections()) {
notify_state_ = NOTIFY_STATE_UP;
listener_->on_pool_up(host_->address());
}
Expand Down Expand Up @@ -223,11 +244,12 @@ void ConnectionPool::internal_close() {
// Make copies of connection/connector data structures to prevent iterator
// invalidation.

PooledConnection::Vec connections(connections_);
for (PooledConnection::Vec::iterator it = connections.begin(), end = connections.end();
it != end; ++it) {
(*it)->close();
}
auto connections_per_shards = connections_by_shard_;
std::for_each(connections_per_shards.begin(), connections_per_shards.end(), [] (PooledConnection::Vec& v) {
for (auto& c : v) {
c->close();
}
});

DelayedConnector::Vec pending_connections(pending_connections_);
for (DelayedConnector::Vec::iterator it = pending_connections.begin(),
Expand All @@ -244,8 +266,7 @@ void ConnectionPool::internal_close() {
void ConnectionPool::maybe_closed() {
// Remove the pool once all current connections and pending connections
// are terminated.
if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && connections_.empty() &&
pending_connections_.empty()) {
if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && !has_connections() && pending_connections_.empty()) {
close_state_ = CLOSE_STATE_CLOSED;
// Only mark DOWN if it's UP otherwise we might get multiple DOWN events
// when connecting the pool.
Expand Down Expand Up @@ -275,9 +296,17 @@ void ConnectionPool::on_reconnect(DelayedConnector* connector) {
}

if (connector->is_ok()) {
add_connection(
PooledConnection::Ptr(new PooledConnection(this, connector->release_connection())));
notify_up_or_down();
PooledConnection::Ptr pooled_conn {new PooledConnection(this, connector->release_connection())};
const size_t new_connections_shard = pooled_conn->shard_id();
if (connections_by_shard_.size() > new_connections_shard
&& connections_by_shard_[new_connections_shard].size() < num_connections_per_shard_) {
add_connection(pooled_conn);
notify_up_or_down();
} else {
LOG_INFO("Reconnection to host %s connected us to shard %ld, reconnecting again",
address().to_string().c_str(), new_connections_shard);
schedule_reconnect(schedule.release());
}
} else if (!connector->is_canceled()) {
if (connector->is_critical_error()) {
LOG_ERROR("Closing established connection pool to host %s because of the following error: %s",
Expand Down
6 changes: 5 additions & 1 deletion src/connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ class ConnectionPool : public RefCounted<ConnectionPool> {
private:
void notify_up_or_down();
void notify_critical_error(Connector::ConnectionError code, const String& message);

/** Adds connection to the pool. It's the caller's responsibility
* to keep track of the connections count. */
void add_connection(const PooledConnection::Ptr& connection);
void schedule_reconnect(ReconnectionSchedule* schedule = NULL);
void internal_close();
Expand All @@ -232,7 +235,8 @@ class ConnectionPool : public RefCounted<ConnectionPool> {

CloseState close_state_;
NotifyState notify_state_;
PooledConnection::Vec connections_;
std::vector<PooledConnection::Vec> connections_by_shard_; /// Index is the shard ID
size_t num_connections_per_shard_;
DelayedConnector::Vec pending_connections_;
DenseHashSet<PooledConnection*> to_flush_;
};
Expand Down

0 comments on commit 0178646

Please sign in to comment.