Skip to content

Commit

Permalink
chore(backup_test): adjust tests to Scylla backup API
Browse files Browse the repository at this point in the history
Some tests used interceptor for given paths
in order to wait/block/check some API calls.
Those interceptors were updated to also look
for Scylla backup API paths.
  • Loading branch information
Michal-Leszczynski committed Jan 9, 2025
1 parent 19b00bf commit 4d5bee4
Showing 1 changed file with 47 additions and 33 deletions.
80 changes: 47 additions & 33 deletions pkg/service/backup/service_backup_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"os"
"path"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -142,27 +141,37 @@ func defaultConfig() backup.Config {
return c
}

func (h *backupTestHelper) setInterceptorBlockEndpointOnFirstHost(method string, path string) {
var (
brokenHost string
mu sync.Mutex
)
func (h *backupTestHelper) setInterceptorBlockPathOnFirstHost(paths ...string) {
brokenHost := atomic.NewString("")
h.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
if req.Method == method && req.URL.Path == path {
mu.Lock()
defer mu.Unlock()

if brokenHost == "" {
h.T.Log("Setting broken host", req.Host)
brokenHost = req.Host
for _, p := range paths {
if strings.HasPrefix(req.URL.Path, p) {
if brokenHost.CompareAndSwap("", req.Host) {
h.T.Log("Setting broken host", req.Host)
}
if brokenHost.Load() == req.Host {
return nil, errors.New("dial error")
}
}
}
return nil, nil
}))
}

if brokenHost == req.Host {
return nil, errors.New("dial error")
func (h *backupTestHelper) setInterceptorWaitPath(paths ...string) chan struct{} {
guard := atomic.NewBool(false)
wait := make(chan struct{})
h.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
for _, p := range paths {
if strings.HasPrefix(req.URL.Path, p) {
if guard.CompareAndSwap(false, true) {
close(wait)
}
}
}
return nil, nil
}))
return wait
}

func (h *backupTestHelper) listS3Files() (manifests, schemas, files []string) {
Expand Down Expand Up @@ -1002,6 +1011,8 @@ func TestBackupResumeIntegration(t *testing.T) {
done = make(chan struct{})
)

upload := h.setInterceptorWaitPath("/storage_service/backup", "/agent/rclone/sync/movedir")

if err := h.service.InitTarget(ctx, h.ClusterID, &target); err != nil {
t.Fatal(err)
}
Expand All @@ -1010,20 +1021,19 @@ func TestBackupResumeIntegration(t *testing.T) {
defer close(done)
Print("When: backup is running")
err := h.service.Backup(ctx, h.ClusterID, h.TaskID, h.RunID, target)
if err == nil {
t.Error("Expected error on run but got nil")
} else {
if !strings.Contains(err.Error(), "context") {
t.Errorf("Expected context error but got: %+v", err)
}
if !errors.Is(err, context.Canceled) {
t.Errorf("Expected %q error but got: %q", context.Canceled, err)
}
}()

h.waitTransfersStarted()

Print("And: context is canceled")
cancel()
<-ctx.Done()
select {
case <-time.After(backupTimeout):
t.Fatalf("Backup failed to complete in under %s", backupTimeout)
case <-upload:
Print("And: context is canceled")
cancel()
<-ctx.Done()
}

select {
case <-time.After(backupTimeout):
Expand Down Expand Up @@ -1059,6 +1069,8 @@ func TestBackupResumeIntegration(t *testing.T) {
)
defer cancel()

upload := h.setInterceptorWaitPath("/storage_service/backup", "/agent/rclone/sync/movedir")

if err := h.service.InitTarget(ctx, h.ClusterID, &target); err != nil {
t.Fatal(err)
}
Expand All @@ -1072,16 +1084,18 @@ func TestBackupResumeIntegration(t *testing.T) {
close(done)
}()

h.waitTransfersStarted()

Print("And: we restart the agents")
restartAgents(h.CommonTestHelper)
select {
case <-time.After(backupTimeout * 3):
t.Fatalf("Backup failed to complete in under %s", backupTimeout*3)
case <-upload:
Print("And: we restart the agents")
restartAgents(h.CommonTestHelper)
}

select {
case <-time.After(backupTimeout * 3):
t.Fatalf("Backup failed to complete in under %s", backupTimeout*3)
case <-done:
Print("Then: backup completed execution")
}

Print("And: nothing is transferring")
Expand All @@ -1094,7 +1108,7 @@ func TestBackupResumeIntegration(t *testing.T) {
t.Run("resume after snapshot failed", func(t *testing.T) {
h := newBackupTestHelper(t, session, config, location, nil)
Print("Given: snapshot fails on a host")
h.setInterceptorBlockEndpointOnFirstHost(http.MethodPost, "/storage_service/snapshots")
h.setInterceptorBlockPathOnFirstHost("/storage_service/snapshots")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1132,7 +1146,7 @@ func TestBackupResumeIntegration(t *testing.T) {
t.Run("resume after upload failed", func(t *testing.T) {
h := newBackupTestHelper(t, session, config, location, nil)
Print("Given: upload fails on a host")
h.setInterceptorBlockEndpointOnFirstHost(http.MethodPost, "/agent/rclone/job/progress")
h.setInterceptorBlockPathOnFirstHost("/agent/rclone/job/progress", "/task_manager/wait_task")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit 4d5bee4

Please sign in to comment.