diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index d233b668f..497097eb4 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -498,15 +498,69 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { Print("Fill setup") fillTable(t, h.srcCluster.rootSession, 100, ks, tab) + validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int) { + // Validate tombstone_gc mode + if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got { + t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got) + } + // Validate compaction + for _, host := range ch.Client.Config().Hosts { + enabled, err := ch.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab) + if err != nil { + t.Fatal(errors.Wrapf(err, "check compaction on host %s", host)) + } + if compaction != enabled { + t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host) + } + } + // Validate transfers + for _, host := range ch.Client.Config().Hosts { + got, err := ch.Client.RcloneGetTransfers(context.Background(), host) + if err != nil { + t.Fatal(errors.Wrapf(err, "check transfers on host %s", host)) + } + if transfers != got { + t.Errorf("expected transfers=%d, got=%d on host %s", transfers, got, host) + } + } + } + + shardCnt, err := h.dstCluster.Client.ShardCount(context.Background(), ManagedClusterHost()) + if err != nil { + t.Fatal(err) + } + transfers0 := 2 * int(shardCnt) + + // Set initial transfers + for _, host := range ManagedClusterHosts() { + err := h.dstCluster.Client.RcloneSetTransfers(context.Background(), host, 10) + if err != nil { + t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) + } + } + for _, host := range ManagedSecondClusterHosts() { + err := h.srcCluster.Client.RcloneSetTransfers(context.Background(), host, 10) + if err != nil { + t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) + } + } + + Print("Validate state before backup") + validateState(h.srcCluster, "repair", true, 10) + Print("Run backup") loc := []Location{testLocation("preparation", "")} S3InitBucket(t, loc[0].Path) ksFilter := []string{ks} tag := h.runBackup(t, map[string]any{ - "location": loc, - "keyspace": ksFilter, + "location": loc, + "keyspace": ksFilter, + "transfers": 3, }) + Print("Validate state after backup") + validateState(h.srcCluster, "repair", true, 3) + runRestore := func(ctx context.Context, finishedRestore chan error) { grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) h.dstCluster.RunID = uuid.NewTime() @@ -514,6 +568,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { "location": loc, "keyspace": ksFilter, "snapshot_tag": tag, + "transfers": 0, "restore_tables": true, }) if err != nil { @@ -522,23 +577,6 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { finishedRestore <- h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps) } - validateState := func(tombstone string, compaction bool) { - // Validate tombstone_gc mode - if got := tombstoneGCMode(t, h.dstCluster.rootSession, ks, tab); tombstone != got { - t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got) - } - // Validate compaction - for _, host := range ManagedClusterHosts() { - enabled, err := h.dstCluster.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab) - if err != nil { - t.Fatal(errors.Wrapf(err, "check compaction on host %s", host)) - } - if compaction != enabled { - t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host) - } - } - } - makeCopyPathsHang := func(reachedDataStage *atomic.Bool, reachedDataStageChan, hangCopyPaths chan struct{}) { h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") { @@ -561,6 +599,9 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { Print("Make copy paths hang") makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan) + Print("Validate state before restore") + validateState(h.dstCluster, "repair", true, 10) + Print("Run restore") finishedRestore := make(chan error) restoreCtx, restoreCancel := context.WithCancel(context.Background()) @@ -570,7 +611,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { <-reachedDataStageChan Print("Validate state during restore data") - validateState("disabled", false) + validateState(h.dstCluster, "disabled", false, transfers0) Print("Pause restore") restoreCancel() @@ -579,13 +620,13 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { close(hangCopyPathsChan) Print("Wait for restore") - err := <-finishedRestore + err = <-finishedRestore if !errors.Is(err, context.Canceled) { t.Fatalf("Expected restore to be paused, got: %s", err) } Print("Validate state during pause") - validateState("disabled", true) + validateState(h.dstCluster, "disabled", true, transfers0) reachedDataStage = &atomic.Bool{} reachedDataStageChan = make(chan struct{}) @@ -601,7 +642,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { <-reachedDataStageChan Print("Validate state during restore data after pause") - validateState("disabled", false) + validateState(h.dstCluster, "disabled", false, transfers0) Print("Release copy paths") close(hangCopyPathsChan) @@ -613,7 +654,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state after restore success") - validateState("repair", true) + validateState(h.dstCluster, "repair", true, transfers0) Print("Validate table contents") h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}})