From 93ec7db1e4804d14e633b01ef214318288d459fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Mon, 23 Sep 2024 14:51:50 +0200 Subject: [PATCH] fix(restore): rely on run.units instead of target.keyspace when iterating over manifests Target.keyspace is calculated at the beginning of each restore task run. It is vulnerable for changes occurring during restore process, e.g.: - restore targeting some views is started - target.keyspace excludes restored views (checked via CQL query) - restore is paused - restore is resumed - recalculated target.keyspace no longer excludes restored views Units are calculated only once at the beginning of the restore, so they are not vulnerable to mentioned changes and should be used to determine which tables should be restored. Fixes #4037 --- pkg/service/restore/model.go | 9 +++++++++ pkg/service/restore/schema_worker.go | 6 +++++- pkg/service/restore/tables_worker.go | 12 ++++++++++-- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index 4ce4fb195e..39a7d4a282 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -4,6 +4,7 @@ package restore import ( "reflect" + "slices" "sort" "time" @@ -111,6 +112,14 @@ func (u *Unit) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte) error return gocql.Unmarshal(info, data, f.Addr().Interface()) } +func unitsContainTable(units []Unit, ks, tab string) bool { + idx := slices.IndexFunc(units, func(u Unit) bool { return u.Keyspace == ks }) + if idx < 0 { + return false + } + return slices.ContainsFunc(units[idx].Tables, func(t Table) bool { return t.Table == tab }) +} + // Table represents restored table, its size and original tombstone_gc mode. type Table struct { Table string `json:"table" db:"table_name"` diff --git a/pkg/service/restore/schema_worker.go b/pkg/service/restore/schema_worker.go index 831578f404..bf1048f7f3 100644 --- a/pkg/service/restore/schema_worker.go +++ b/pkg/service/restore/schema_worker.go @@ -191,6 +191,10 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc w.run.Location = location.String() tableDownloadHandler := func(fm FilesMeta) error { + if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { + return nil + } + w.logger.Info(ctx, "Downloading schema table", "keyspace", fm.Keyspace, "table", fm.Table) defer w.logger.Info(ctx, "Downloading schema table finished", "keyspace", fm.Keyspace, "table", fm.Table) @@ -208,7 +212,7 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc w.run.ManifestPath = miwc.Path() w.insertRun(ctx) - return miwc.ForEachIndexIterWithError(w.target.Keyspace, tableDownloadHandler) + return miwc.ForEachIndexIterWithError(nil, tableDownloadHandler) } return w.forEachManifest(ctx, location, manifestDownloadHandler) diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index 25bd2c07ba..774b581de7 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -182,7 +182,7 @@ func (w *tablesWorker) restoreLocation(ctx context.Context, location Location) e w.logger.Info(ctx, "Restoring manifest", "manifest", miwc.ManifestInfo) defer w.logger.Info(ctx, "Restoring manifest finished", "manifest", miwc.ManifestInfo) - return miwc.ForEachIndexIterWithError(w.target.Keyspace, w.restoreDir(ctx, miwc)) + return miwc.ForEachIndexIterWithError(nil, w.restoreDir(ctx, miwc)) } return w.forEachManifest(ctx, location, restoreManifest) @@ -190,6 +190,10 @@ func (w *tablesWorker) restoreLocation(ctx context.Context, location Location) e func (w *tablesWorker) restoreDir(ctx context.Context, miwc ManifestInfoWithContent) func(fm FilesMeta) error { return func(fm FilesMeta) error { + if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { + return nil + } + if !w.alreadyResumed { if w.run.Keyspace != fm.Keyspace || w.run.Table != fm.Table { w.logger.Info(ctx, "Skipping table", "keyspace", fm.Keyspace, "table", fm.Table) @@ -279,8 +283,12 @@ func (w *tablesWorker) initRestoreMetrics(ctx context.Context) { func(miwc ManifestInfoWithContent) error { sizePerTableAndKeyspace := make(map[string]map[string]int64) err := miwc.ForEachIndexIterWithError( - w.target.Keyspace, + nil, func(fm FilesMeta) error { + if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { + return nil + } + if sizePerTableAndKeyspace[fm.Keyspace] == nil { sizePerTableAndKeyspace[fm.Keyspace] = make(map[string]int64) }