Skip to content

Commit

Permalink
feat(restore_test): test restore pause/resume
Browse files Browse the repository at this point in the history
This commit adds runPausedRestore function which allows to run
restore which is going to be interrupted every specified interval.
Pausing after some arbitrary amount of time might seem flaky,
but we can't always rely on scyllaclient hooks for pausing restore,
as we are biased to where we put them.

This commit also alters runPausedRestore to include views in schema
and the backup, so that it can test for #4037.
  • Loading branch information
Michal-Leszczynski committed Sep 26, 2024
1 parent 93ec7db commit 6d27036
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 2 deletions.
61 changes: 61 additions & 0 deletions pkg/service/restore/helper_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,3 +476,64 @@ func checkAnyConstraint(t *testing.T, client *scyllaclient.Client, constraints .
}
return false
}

func createTable(t *testing.T, session gocqlx.Session, keyspace string, tables ...string) {
for _, tab := range tables {
ExecStmt(t, session, fmt.Sprintf("CREATE TABLE %q.%q (id int PRIMARY KEY, data int)", keyspace, tab))
}
}

func fillTable(t *testing.T, session gocqlx.Session, rowCnt int, keyspace string, tables ...string) {
for _, tab := range tables {
stmt := fmt.Sprintf("INSERT INTO %q.%q (id, data) VALUES (?, ?)", keyspace, tab)
q := session.Query(stmt, []string{"id", "data"})

for i := 0; i < rowCnt; i++ {
if err := q.Bind(i, i).Exec(); err != nil {
t.Fatal(err)
}
}

q.Release()
}
}

func runPausedRestore(t *testing.T, restore func(ctx context.Context) error, intervals ...time.Duration) (err error) {
t.Helper()

getInterval := func() time.Duration {
if len(intervals) == 0 {
return 24 * time.Hour // Return a huge value when no more ticks are expected
}
i := intervals[0]
intervals = intervals[1:]
return i
}

ctx, cancel := context.WithCancel(context.Background())
res := make(chan error)
ticker := time.NewTicker(getInterval())
go func() {
res <- restore(ctx)
}()
for {
select {
case err := <-res:
cancel()
return err
case <-ticker.C:
t.Log("Pause restore")
cancel()
err := <-res
if err == nil || !errors.Is(err, context.Canceled) {
return err
}

ctx, cancel = context.WithCancel(context.Background())
ticker.Reset(getInterval())
go func() {
res <- restore(ctx)
}()
}
}
}
120 changes: 120 additions & 0 deletions pkg/service/restore/restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ package restore_test

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

"github.com/pkg/errors"
"github.com/scylladb/scylla-manager/v3/pkg/service/backup"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/db"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils/testconfig"
"github.com/scylladb/scylla-manager/v3/pkg/util/maputil"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)

func TestRestoreTablesUserIntegration(t *testing.T) {
Expand Down Expand Up @@ -334,3 +338,119 @@ func TestRestoreTablesVnodeToTabletsIntegration(t *testing.T) {

validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab, c1, c2)
}

func TestRestoreTablesPausedIntegration(t *testing.T) {
h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts())

// Setup:
// ks1: tab, mv, si
// ks2: tab1, tab2, mv1

Print("Keyspace setup")
ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d}"
ks1 := randomizedName("paused_1_")
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks1, 1))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmt, ks1, 1))
ks2 := randomizedName("paused_2_")
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks2, 1))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmt, ks2, 1))

Print("Table setup")
tab := randomizedName("tab_")
createTable(t, h.srcCluster.rootSession, ks1, tab)
createTable(t, h.dstCluster.rootSession, ks1, tab)
tab1 := randomizedName("tab_1_")
createTable(t, h.srcCluster.rootSession, ks2, tab1)
createTable(t, h.dstCluster.rootSession, ks2, tab1)
tab2 := randomizedName("tab_2_")
createTable(t, h.srcCluster.rootSession, ks2, tab2)
createTable(t, h.dstCluster.rootSession, ks2, tab2)

Print("View setup")
mv := randomizedName("mv_")
CreateMaterializedView(t, h.srcCluster.rootSession, ks1, tab, mv)
CreateMaterializedView(t, h.dstCluster.rootSession, ks1, tab, mv)
si := randomizedName("si_")
CreateSecondaryIndex(t, h.srcCluster.rootSession, ks1, tab, si)
CreateSecondaryIndex(t, h.dstCluster.rootSession, ks1, tab, si)
mv1 := randomizedName("mv_1_")
CreateMaterializedView(t, h.srcCluster.rootSession, ks2, tab1, mv1)
CreateMaterializedView(t, h.dstCluster.rootSession, ks2, tab1, mv1)

Print("Fill setup")
fillTable(t, h.srcCluster.rootSession, 100, ks1, tab)
fillTable(t, h.srcCluster.rootSession, 100, ks2, tab1, tab2)

units := []backup.Unit{
{
Keyspace: ks1,
Tables: []string{tab, mv, si + "_index"},
AllTables: true,
},
{
Keyspace: ks2,
Tables: []string{tab1, tab2, mv1},
AllTables: true,
},
}

Print("Run backup")
loc := []Location{testLocation("paused", "")}
S3InitBucket(t, loc[0].Path)

// Starting from SM 3.3.1, SM does not allow to back up views,
// but backed up views should still be tested as older backups might
// contain them. That's why here we manually force backup target
// to contain the views.
ctx := context.Background()
h.srcCluster.TaskID = uuid.NewTime()
h.srcCluster.RunID = uuid.NewTime()

rawProps, err := json.Marshal(map[string]any{"location": loc})
if err != nil {
t.Fatal(errors.Wrap(err, "marshal properties"))
}

target, err := h.srcBackupSvc.GetTarget(ctx, h.srcCluster.ClusterID, rawProps)
if err != nil {
t.Fatal(errors.Wrap(err, "generate target"))
}
target.Units = units

err = h.srcBackupSvc.Backup(ctx, h.srcCluster.ClusterID, h.srcCluster.TaskID, h.srcCluster.RunID, target)
if err != nil {
t.Fatal(errors.Wrap(err, "run backup"))
}

pr, err := h.srcBackupSvc.GetProgress(ctx, h.srcCluster.ClusterID, h.srcCluster.TaskID, h.srcCluster.RunID)
if err != nil {
t.Fatal(errors.Wrap(err, "get progress"))
}
tag := pr.SnapshotTag

Print("Run restore tables")
grantRestoreTablesPermissions(t, h.dstCluster.rootSession, []string{ks1, ks2}, h.dstUser)
props := map[string]any{
"location": loc,
"keyspace": []string{ks1, ks2},
"snapshot_tag": tag,
"restore_tables": true,
}
err = runPausedRestore(t, func(ctx context.Context) error {
h.dstCluster.RunID = uuid.NewTime()
rawProps, err := json.Marshal(props)
if err != nil {
return err
}
return h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps)
}, 5*time.Second, 20*time.Second, 35*time.Second, 20*time.Second, time.Minute)
if err != nil {
t.Fatal(err)
}

for _, u := range units {
for _, tb := range u.Tables {
validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, u.Keyspace, tb, "id", "data")
}
}
}
35 changes: 33 additions & 2 deletions pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo
srcSession = CreateSessionAndDropAllKeyspaces(t, srcH.Client)
ctx1, cancel1 = context.WithCancel(context.Background())
ctx2, cancel2 = context.WithCancel(context.Background())
mv = "mv_resume"
)

dstH.shouldSkipTest(target)
Expand All @@ -770,10 +771,40 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo
// Recreate schema on destination cluster
if target.RestoreTables {
WriteDataSecondClusterSchema(t, dstSession, keyspace, 0, 0)
CreateMaterializedView(t, dstSession, keyspace, BigTableName, mv)
}

srcH.prepareRestoreBackup(srcSession, keyspace, loadCnt, loadSize)
target.SnapshotTag = srcH.simpleBackup(target.Location[0])
CreateMaterializedView(t, srcSession, keyspace, BigTableName, mv)

// Starting from SM 3.3.1, SM does not allow to back up views,
// but backed up views should still be tested as older backups might
// contain them. That's why here we manually force backup target
// to contain the views.
backupProps, err := json.Marshal(map[string]any{"location": target.Location})
if err != nil {
t.Fatal(err)
}
backupTarget, err := srcH.backupSvc.GetTarget(context.Background(), srcH.ClusterID, backupProps)
if err != nil {
t.Fatal(err)
}
backupTarget.Units = []backup.Unit{
{
Keyspace: keyspace,
Tables: []string{BigTableName, mv},
AllTables: true,
},
}
err = srcH.backupSvc.Backup(context.Background(), srcH.ClusterID, srcH.TaskID, srcH.RunID, backupTarget)
if err != nil {
t.Fatal(err)
}
backupPr, err := srcH.backupSvc.GetProgress(context.Background(), srcH.ClusterID, srcH.TaskID, srcH.RunID)
if err != nil {
t.Fatal(err)
}
target.SnapshotTag = backupPr.SnapshotTag

if target.RestoreTables {
grantRestoreTablesPermissions(t, dstSession, target.Keyspace, user)
Expand Down Expand Up @@ -812,7 +843,7 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo
})

Print("When: run restore and stop it during load and stream")
err := dstH.service.Restore(ctx1, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(target))
err = dstH.service.Restore(ctx1, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(target))
if err == nil {
t.Fatal("Expected error on run but got nil")
return
Expand Down

0 comments on commit 6d27036

Please sign in to comment.