Skip to content

Commit

Permalink
fix(restore): rely on run.units instead of target.keyspace when itera…
Browse files Browse the repository at this point in the history
…ting over manifests

Target.keyspace is calculated at the beginning of each restore task run.
It is vulnerable for changes occurring during restore process, e.g.:
- restore targeting some views is started
- target.keyspace excludes restored views (checked via CQL query)
- restore is paused
- restore is resumed
- recalculated target.keyspace no longer excludes restored views

Units are calculated only once at the beginning of the restore,
so they are not vulnerable to mentioned changes and should
be used to determine which tables should be restored.

Fixes #4037
  • Loading branch information
Michal-Leszczynski committed Sep 26, 2024
1 parent 9cb43f8 commit 93ec7db
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
9 changes: 9 additions & 0 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package restore

import (
"reflect"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -111,6 +112,14 @@ func (u *Unit) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte) error
return gocql.Unmarshal(info, data, f.Addr().Interface())
}

func unitsContainTable(units []Unit, ks, tab string) bool {
idx := slices.IndexFunc(units, func(u Unit) bool { return u.Keyspace == ks })
if idx < 0 {
return false
}
return slices.ContainsFunc(units[idx].Tables, func(t Table) bool { return t.Table == tab })
}

// Table represents restored table, its size and original tombstone_gc mode.
type Table struct {
Table string `json:"table" db:"table_name"`
Expand Down
6 changes: 5 additions & 1 deletion pkg/service/restore/schema_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc
w.run.Location = location.String()

tableDownloadHandler := func(fm FilesMeta) error {
if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) {
return nil
}

w.logger.Info(ctx, "Downloading schema table", "keyspace", fm.Keyspace, "table", fm.Table)
defer w.logger.Info(ctx, "Downloading schema table finished", "keyspace", fm.Keyspace, "table", fm.Table)

Expand All @@ -208,7 +212,7 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc
w.run.ManifestPath = miwc.Path()
w.insertRun(ctx)

return miwc.ForEachIndexIterWithError(w.target.Keyspace, tableDownloadHandler)
return miwc.ForEachIndexIterWithError(nil, tableDownloadHandler)
}

return w.forEachManifest(ctx, location, manifestDownloadHandler)
Expand Down
12 changes: 10 additions & 2 deletions pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,18 @@ func (w *tablesWorker) restoreLocation(ctx context.Context, location Location) e
w.logger.Info(ctx, "Restoring manifest", "manifest", miwc.ManifestInfo)
defer w.logger.Info(ctx, "Restoring manifest finished", "manifest", miwc.ManifestInfo)

return miwc.ForEachIndexIterWithError(w.target.Keyspace, w.restoreDir(ctx, miwc))
return miwc.ForEachIndexIterWithError(nil, w.restoreDir(ctx, miwc))
}

return w.forEachManifest(ctx, location, restoreManifest)
}

func (w *tablesWorker) restoreDir(ctx context.Context, miwc ManifestInfoWithContent) func(fm FilesMeta) error {
return func(fm FilesMeta) error {
if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) {
return nil
}

if !w.alreadyResumed {
if w.run.Keyspace != fm.Keyspace || w.run.Table != fm.Table {
w.logger.Info(ctx, "Skipping table", "keyspace", fm.Keyspace, "table", fm.Table)
Expand Down Expand Up @@ -279,8 +283,12 @@ func (w *tablesWorker) initRestoreMetrics(ctx context.Context) {
func(miwc ManifestInfoWithContent) error {
sizePerTableAndKeyspace := make(map[string]map[string]int64)
err := miwc.ForEachIndexIterWithError(
w.target.Keyspace,
nil,
func(fm FilesMeta) error {
if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) {
return nil
}

if sizePerTableAndKeyspace[fm.Keyspace] == nil {
sizePerTableAndKeyspace[fm.Keyspace] = make(map[string]int64)
}
Expand Down

0 comments on commit 93ec7db

Please sign in to comment.