Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't add cluster to SM DB before validation finishes #4029

Merged
merged 5 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/scyllaclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package scyllaclient

import (
"net/http"
"slices"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -111,6 +112,9 @@ func (c Config) Validate() error {
if len(c.Hosts) == 0 {
err = multierr.Append(err, errors.New("missing hosts"))
}
if slices.Contains(c.Hosts, "") {
err = multierr.Append(err, errors.New("empty host"))
}
if c.Port == "" {
err = multierr.Append(err, errors.New("missing port"))
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/scyllaclient/hostpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ func hostPool(next http.RoundTripper, pool hostpool.HostPool, port string) http.
// Get host from pool
if !ok {
hpr = pool.Get()
if hpr == nil {
return nil, errors.New("empty host pool response")
}
h = hpr.Host()
}

Expand Down
17 changes: 14 additions & 3 deletions pkg/service/cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,19 @@ func (s *Service) discoverAndSetClusterHosts(ctx context.Context, c *Cluster) er

func (s *Service) discoverClusterHosts(ctx context.Context, c *Cluster) ([]string, error) {
var contactPoints []string
contactPoints = append(contactPoints, c.Host) // Go with the designated contact point first
if c.Host != "" {
contactPoints = append(contactPoints, c.Host) // Go with the designated contact point first
} else {
s.logger.Error(ctx, "Missing --host flag. Using only previously discovered hosts instead", "cluster ID", c.ID)
}
karol-kokoszka marked this conversation as resolved.
Show resolved Hide resolved
contactPoints = append(contactPoints, c.KnownHosts...) // In case it failed, try to contact previously discovered hosts

for _, cp := range contactPoints {
if cp == "" {
s.logger.Error(ctx, "Empty contact point", "cluster ID", c.ID, "contact points", contactPoints)
continue
}

config := scyllaclient.DefaultConfigWithTimeout(s.timeoutConfig)
if c.Port != 0 {
config.Port = fmt.Sprint(c.Port)
Expand Down Expand Up @@ -483,9 +492,11 @@ func (s *Service) validateHostsConnectivity(ctx context.Context, c *Cluster) err
return errors.Wrap(err, "load known hosts")
}

if err := s.discoverAndSetClusterHosts(ctx, c); err != nil {
return errors.Wrap(err, "discover and set cluster hosts")
knownHosts, err := s.discoverClusterHosts(ctx, c)
if err != nil {
return errors.Wrap(err, "discover cluster hosts")
}
c.KnownHosts = knownHosts

config := s.clientConfig(c)
client, err := scyllaclient.NewClient(config, s.logger.Named("client"))
Expand Down
73 changes: 72 additions & 1 deletion pkg/service/cluster/service_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ package cluster_test

import (
"context"
"fmt"
"net"
"os"
"strconv"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -105,7 +107,11 @@ func TestServiceStorageIntegration(t *testing.T) {

secretsStore := store.NewTableStore(session, table.Secrets)

s, err := cluster.NewService(session, metrics.NewClusterMetrics(), secretsStore, scyllaclient.DefaultTimeoutConfig(),
cfg := scyllaclient.DefaultTimeoutConfig()
cfg.Timeout = 2 * time.Second
cfg.Backoff.WaitMax = 2 * time.Second
cfg.Backoff.MaxRetries = 1
s, err := cluster.NewService(session, metrics.NewClusterMetrics(), secretsStore, cfg,
server.DefaultConfig().ClientCacheTimeout, log.NewDevelopment())
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -532,6 +538,71 @@ func TestServiceStorageIntegration(t *testing.T) {
}
})

t.Run("failed connectivity check", func(t *testing.T) {
if IsIPV6Network() {
t.Skip("DB node do not have ip6tables and related modules to make it work properly")
}

setup(t)
hosts := ManagedClusterHosts()
if len(hosts) < 2 {
t.Skip("not enough nodes in the cluster")
}
h1 := hosts[0]
h2 := hosts[1]

c := validCluster()
c.Host = h1
if err := RunIptablesCommand(h2, CmdBlockScyllaREST); err != nil {
t.Fatal(err)
}
defer RunIptablesCommand(h2, CmdUnblockScyllaREST)

if err := s.PutCluster(ctx, c); err == nil {
t.Fatal("expected put cluster to fail because of connectivity issues")
} else {
t.Logf("put cluster ended with expected error: %s", err)
}

clusters, err := s.ListClusters(ctx, &cluster.Filter{})
if err != nil {
t.Fatalf("list clusters: %s", err)
}
if len(clusters) != 0 {
t.Fatalf("expected no clusters to be listed, got: %v", clusters)
}

var cnt int
if err := session.Query("SELECT COUNT(*) FROM cluster", nil).GetRelease(&cnt); err != nil {
t.Fatalf("check SM DB cluster table entries: %s", err)
}
if cnt != 0 {
t.Fatalf("expected no entries in SM DB cluster table, got: %d", cnt)
}
})

t.Run("no --host in SM DB", func(t *testing.T) {
setup(t)
c := validCluster()
if err := s.PutCluster(ctx, c); err != nil {
t.Fatal(err)
}

if err := session.Query(fmt.Sprintf("UPDATE cluster SET host = '' WHERE id = %s", c.ID), nil).ExecRelease(); err != nil {
t.Fatalf("remove --host from SM DB: %s", err)
}

client, err := s.CreateClientNoCache(context.Background(), c.ID)
if err != nil {
t.Fatal(err)
}
for _, h := range ManagedClusterHosts() {
if _, err := client.HostRack(ctx, h); err != nil {
t.Fatalf("test client by getting rack of host %s: %s", h, err)
}
}
})

t.Run("list nodes", func(t *testing.T) {
setup(t)

Expand Down
Loading