diff --git a/pkg/service/restore/service_restore_integration_test.go b/pkg/service/restore/service_restore_integration_test.go index 00384c491..cfe956197 100644 --- a/pkg/service/restore/service_restore_integration_test.go +++ b/pkg/service/restore/service_restore_integration_test.go @@ -11,11 +11,13 @@ import ( "encoding/json" "fmt" "io" + "net" "net/http" "os" "path" "regexp" "slices" + "strconv" "strings" "testing" "time" @@ -24,15 +26,18 @@ import ( "github.com/gocql/gocql" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/pkg/errors" "github.com/scylladb/go-log" "github.com/scylladb/gocqlx/v2" "github.com/scylladb/scylla-manager/v3/pkg/metrics" "github.com/scylladb/scylla-manager/v3/pkg/service/backup" "github.com/scylladb/scylla-manager/v3/pkg/service/repair" . "github.com/scylladb/scylla-manager/v3/pkg/service/restore" + "github.com/scylladb/scylla-manager/v3/pkg/sstable" "github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig" . "github.com/scylladb/scylla-manager/v3/pkg/testutils/testhelper" "github.com/scylladb/scylla-manager/v3/pkg/util/jsonutil" + "github.com/scylladb/scylla-manager/v3/pkg/util/query" "go.uber.org/atomic" "go.uber.org/zap/zapcore" @@ -184,6 +189,84 @@ func testLocation(bucket, dc string) Location { } } +// newIntegerSSTablesSnapshotResponseNotifier makes sure that half of the snapshot-ed SSTables +// will be renamed to integer based IDs right after the snapshot has been taken. +func newIntegerSSTablesSnapshotResponseNotifier(client *scyllaclient.Client, s gocqlx.Session) func(*http.Response, error) (*http.Response, error) { + cnt := 0 + // getNewID returns the ID to which SSTable should be renamed. + // In order to simulate mixed ID scenario, it renames half of + // the SSTables to integer ID, and leaves the others with UUID. + mapping := make(map[string]string) + getNewID := func(id string) string { + if newID, ok := mapping[id]; ok { + return newID + } + cnt++ + var newID string + if cnt%2 == 1 { + newID = fmt.Sprint(cnt) + } else { + newID = id + } + mapping[id] = newID + return newID + } + + return func(r *http.Response, err error) (*http.Response, error) { + // Look for successful response to snapshot call + if err != nil || !strings.HasPrefix(r.Request.URL.Path, "/storage_service/snapshots") || r.Request.Method != http.MethodPost { + return nil, nil + } + host, _, err := net.SplitHostPort(r.Request.Host) + if err != nil { + return nil, errors.New("snapshot response notifier error: get response host: " + err.Error()) + } + q := r.Request.URL.Query() + ks := q.Get("kn") + rawTabs := q.Get("cf") + tag := q.Get("tag") + tabs := strings.Split(rawTabs, ",") + if len(tabs) == 0 || slices.Equal(tabs, []string{""}) { + tabs, err = client.Tables(context.Background(), ks) + if err != nil { + return nil, errors.New("snapshot response notifier error: get keyspace tables: " + err.Error()) + } + } + + for _, tab := range tabs { + version, err := query.GetTableVersion(s, ks, tab) + if err != nil { + return nil, errors.New("snapshot response interceptor error: get table version: " + err.Error()) + } + snapshotDir := path.Join(KeyspaceDir(ks), tab+"-"+version, "snapshots", tag) + // Get snapshot files + files := make([]string, 0) + err = client.RcloneListDirIter(context.Background(), host, snapshotDir, nil, func(item *scyllaclient.RcloneListDirItem) { + // Watch out for the non-sstable files (e.g. manifest.json) + if _, err := sstable.ExtractID(item.Path); err != nil { + return + } + files = append(files, item.Path) + }) + if err != nil { + return nil, errors.New("snapshot response interceptor error: list snapshot files: " + err.Error()) + } + // Rename snapshot files + mapping := sstable.RenameSStables(files, getNewID) + for initial, renamed := range mapping { + if initial != renamed { + src := path.Join(snapshotDir, initial) + dst := path.Join(snapshotDir, renamed) + if err := client.RcloneMoveFile(context.Background(), host, dst, src); err != nil { + return nil, errors.New("snapshot response interceptor error: rename SSTable ID: " + err.Error()) + } + } + } + } + return nil, nil + } +} + func TestRestoreGetTargetUnitsViewsIntegration(t *testing.T) { testCases := []struct { name string @@ -893,10 +976,10 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo func TestRestoreTablesVersionedIntegration(t *testing.T) { testBucket, testKeyspace, testUser := getBucketKeyspaceUser(t) const ( - testLoadCnt = 2 + testLoadCnt = 5 testLoadSize = 1 testBatchSize = 1 - testParallel = 3 + testParallel = 0 corruptCnt = 3 ) @@ -920,10 +1003,10 @@ func TestRestoreTablesVersionedIntegration(t *testing.T) { func TestRestoreSchemaVersionedIntegration(t *testing.T) { testBucket, testKeyspace, testUser := getBucketKeyspaceUser(t) const ( - testLoadCnt = 2 + testLoadCnt = 5 testLoadSize = 1 testBatchSize = 1 - testParallel = 3 + testParallel = 0 corruptCnt = 3 ) @@ -973,6 +1056,11 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt, corruptedTable = "keyspaces" } + // Force creation of integer SSTables in the snapshot dir, + // as only integer SSTables can be versioned. + // This also allows us to test scenario with mixed ID type SSTables. + srcH.Hrt.SetRespInterceptor(newIntegerSSTablesSnapshotResponseNotifier(srcH.Client, srcSession)) + // Restore should be performed on user with limited permissions dropNonSuperUsers(t, dstSession) createUser(t, dstSession, user, "pass") @@ -1012,6 +1100,16 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt, if _, err = VersionedFileCreationTime(item.Name); err == nil { t.Fatalf("Versioned file %s present after first backup", path.Join(remoteDir, item.Path)) } + + // Corrupt only integer SSTables + id, err := sstable.ExtractID(item.Name) + if err != nil { + t.Fatal(err) + } + if _, err := strconv.Atoi(id); err != nil { + return + } + if strings.HasSuffix(item.Name, ".db") { switch { case len(firstCorrupt) < corruptCnt: @@ -1026,6 +1124,10 @@ func restoreWithVersions(t *testing.T, target Target, keyspace string, loadCnt, if err != nil { t.Fatal(err) } + if len(firstCorrupt) == 0 || len(bothCorrupt) == 0 || len(secondCorrupt) == 0 { + t.Fatalf("No files to be corrupted, firstCorrupt: %d, bothCorrupt: %d, secondCorrupt: %d", + len(firstCorrupt), len(bothCorrupt), len(secondCorrupt)) + } crc32FileNameFromGivenSSTableFile := func(sstable string) string { // Split the filename by dashes