From 4d5bee44fdcb51bea5a911cb18c272ed98953389 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Mon, 16 Dec 2024 12:46:59 +0100 Subject: [PATCH] chore(backup_test): adjust tests to Scylla backup API Some tests used interceptor for given paths in order to wait/block/check some API calls. Those interceptors were updated to also look for Scylla backup API paths. --- .../backup/service_backup_integration_test.go | 80 +++++++++++-------- 1 file changed, 47 insertions(+), 33 deletions(-) diff --git a/pkg/service/backup/service_backup_integration_test.go b/pkg/service/backup/service_backup_integration_test.go index 8cd5d4485..328949074 100644 --- a/pkg/service/backup/service_backup_integration_test.go +++ b/pkg/service/backup/service_backup_integration_test.go @@ -15,7 +15,6 @@ import ( "os" "path" "strings" - "sync" "testing" "time" @@ -142,27 +141,37 @@ func defaultConfig() backup.Config { return c } -func (h *backupTestHelper) setInterceptorBlockEndpointOnFirstHost(method string, path string) { - var ( - brokenHost string - mu sync.Mutex - ) +func (h *backupTestHelper) setInterceptorBlockPathOnFirstHost(paths ...string) { + brokenHost := atomic.NewString("") h.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { - if req.Method == method && req.URL.Path == path { - mu.Lock() - defer mu.Unlock() - - if brokenHost == "" { - h.T.Log("Setting broken host", req.Host) - brokenHost = req.Host + for _, p := range paths { + if strings.HasPrefix(req.URL.Path, p) { + if brokenHost.CompareAndSwap("", req.Host) { + h.T.Log("Setting broken host", req.Host) + } + if brokenHost.Load() == req.Host { + return nil, errors.New("dial error") + } } + } + return nil, nil + })) +} - if brokenHost == req.Host { - return nil, errors.New("dial error") +func (h *backupTestHelper) setInterceptorWaitPath(paths ...string) chan struct{} { + guard := atomic.NewBool(false) + wait := make(chan struct{}) + h.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + for _, p := range paths { + if strings.HasPrefix(req.URL.Path, p) { + if guard.CompareAndSwap(false, true) { + close(wait) + } } } return nil, nil })) + return wait } func (h *backupTestHelper) listS3Files() (manifests, schemas, files []string) { @@ -1002,6 +1011,8 @@ func TestBackupResumeIntegration(t *testing.T) { done = make(chan struct{}) ) + upload := h.setInterceptorWaitPath("/storage_service/backup", "/agent/rclone/sync/movedir") + if err := h.service.InitTarget(ctx, h.ClusterID, &target); err != nil { t.Fatal(err) } @@ -1010,20 +1021,19 @@ func TestBackupResumeIntegration(t *testing.T) { defer close(done) Print("When: backup is running") err := h.service.Backup(ctx, h.ClusterID, h.TaskID, h.RunID, target) - if err == nil { - t.Error("Expected error on run but got nil") - } else { - if !strings.Contains(err.Error(), "context") { - t.Errorf("Expected context error but got: %+v", err) - } + if !errors.Is(err, context.Canceled) { + t.Errorf("Expected %q error but got: %q", context.Canceled, err) } }() - h.waitTransfersStarted() - - Print("And: context is canceled") - cancel() - <-ctx.Done() + select { + case <-time.After(backupTimeout): + t.Fatalf("Backup failed to complete in under %s", backupTimeout) + case <-upload: + Print("And: context is canceled") + cancel() + <-ctx.Done() + } select { case <-time.After(backupTimeout): @@ -1059,6 +1069,8 @@ func TestBackupResumeIntegration(t *testing.T) { ) defer cancel() + upload := h.setInterceptorWaitPath("/storage_service/backup", "/agent/rclone/sync/movedir") + if err := h.service.InitTarget(ctx, h.ClusterID, &target); err != nil { t.Fatal(err) } @@ -1072,16 +1084,18 @@ func TestBackupResumeIntegration(t *testing.T) { close(done) }() - h.waitTransfersStarted() - - Print("And: we restart the agents") - restartAgents(h.CommonTestHelper) + select { + case <-time.After(backupTimeout * 3): + t.Fatalf("Backup failed to complete in under %s", backupTimeout*3) + case <-upload: + Print("And: we restart the agents") + restartAgents(h.CommonTestHelper) + } select { case <-time.After(backupTimeout * 3): t.Fatalf("Backup failed to complete in under %s", backupTimeout*3) case <-done: - Print("Then: backup completed execution") } Print("And: nothing is transferring") @@ -1094,7 +1108,7 @@ func TestBackupResumeIntegration(t *testing.T) { t.Run("resume after snapshot failed", func(t *testing.T) { h := newBackupTestHelper(t, session, config, location, nil) Print("Given: snapshot fails on a host") - h.setInterceptorBlockEndpointOnFirstHost(http.MethodPost, "/storage_service/snapshots") + h.setInterceptorBlockPathOnFirstHost("/storage_service/snapshots") ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1132,7 +1146,7 @@ func TestBackupResumeIntegration(t *testing.T) { t.Run("resume after upload failed", func(t *testing.T) { h := newBackupTestHelper(t, session, config, location, nil) Print("Given: upload fails on a host") - h.setInterceptorBlockEndpointOnFirstHost(http.MethodPost, "/agent/rclone/job/progress") + h.setInterceptorBlockPathOnFirstHost("/agent/rclone/job/progress", "/task_manager/wait_task") ctx, cancel := context.WithCancel(context.Background()) defer cancel()