From 5d96861d89c0979832eb22547606fc3f5c058457 Mon Sep 17 00:00:00 2001 From: Lincoln Lo Date: Mon, 22 Feb 2021 15:48:48 -0500 Subject: [PATCH] Add RowsVerified to InlineVerifier and EventsVerified to TargetVerifier --- copydb/copydb.go | 5 ++++- ferry.go | 8 +++++++- inline_verifier.go | 12 ++++++++++++ sharding/sharding.go | 8 +++++--- target_verifier.go | 4 +++- test/integration/inline_verifier_test.rb | 22 +++++++++++++++++----- 6 files changed, 48 insertions(+), 11 deletions(-) diff --git a/copydb/copydb.go b/copydb/copydb.go index 6cd72d936..3b5157d87 100644 --- a/copydb/copydb.go +++ b/copydb/copydb.go @@ -109,7 +109,10 @@ func (this *CopydbFerry) Run() { // should be identical. copyWG.Wait() - this.Ferry.StopTargetVerifier() + err := this.Ferry.StopTargetVerifier() + if err != nil { + this.Ferry.ErrorHandler.Fatal("target_verifier", err) + } // This is where you cutover from using the source database to // using the target database. diff --git a/ferry.go b/ferry.go index cd4ff1a3b..7bab15d1b 100644 --- a/ferry.go +++ b/ferry.go @@ -844,11 +844,17 @@ func (f *Ferry) FlushBinlogAndStopStreaming() { f.BinlogStreamer.FlushAndStop() } -func (f *Ferry) StopTargetVerifier() { +func (f *Ferry) StopTargetVerifier() error { if !f.Config.SkipTargetVerification { f.TargetVerifier.BinlogStreamer.FlushAndStop() f.targetVerifierWg.Wait() + if f.TargetVerifier.EventsChecked == 0 { + err := fmt.Errorf("no events checked") + f.logger.WithField("error", err).Errorf("target verifier did not check any events") + return err + } } + return nil } func (f *Ferry) SerializeStateToJSON() (string, error) { diff --git a/inline_verifier.go b/inline_verifier.go index d221479a3..c19e8e6d4 100644 --- a/inline_verifier.go +++ b/inline_verifier.go @@ -248,6 +248,7 @@ type InlineVerifier struct { reverifyStore *BinlogVerifyStore verifyDuringCutoverStarted AtomicBoolean + rowsChecked int sourceStmtCache *StmtCache targetStmtCache *StmtCache logger *logrus.Entry @@ -435,6 +436,13 @@ func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) { return VerificationResult{}, err } + if v.rowsChecked == 0 { + return VerificationResult{ + DataCorrect: false, + Message: "cutover verification failed, no rows were compared", + }, nil + } + if !mismatchFound { return VerificationResult{ DataCorrect: true, @@ -563,6 +571,7 @@ func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uin if !bytes.Equal(sourceHash, targetHash) || !exists { mismatchSet[paginationKey] = struct{}{} } + v.rowsChecked += 1 } for paginationKey, sourceHash := range source { @@ -570,6 +579,7 @@ func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uin if !bytes.Equal(sourceHash, targetHash) || !exists { mismatchSet[paginationKey] = struct{}{} } + v.rowsChecked += 1 } return mismatchSet @@ -587,6 +597,7 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s for colName, targetData := range targetDecompressedColumns { sourceData, exists := sourceDecompressedColumns[colName] + v.rowsChecked += 1 if !exists || !bytes.Equal(sourceData, targetData) { mismatchSet[paginationKey] = struct{}{} break // no need to compare other columns @@ -603,6 +614,7 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s for colName, sourceData := range sourceDecompressedColumns { targetData, exists := targetDecompressedColumns[colName] + v.rowsChecked += 1 if !exists || !bytes.Equal(sourceData, targetData) { mismatchSet[paginationKey] = struct{}{} break diff --git a/sharding/sharding.go b/sharding/sharding.go index 04bac6fe7..f8ced8dbf 100644 --- a/sharding/sharding.go +++ b/sharding/sharding.go @@ -157,11 +157,10 @@ func (r *ShardingFerry) Run() { r.logger.WithField("error", err).Errorf("verification encountered an error, aborting run") r.Ferry.ErrorHandler.Fatal("inline_verifier", err) } else if !verificationResult.DataCorrect { - err = fmt.Errorf("verifier detected data discrepancy: %s", verificationResult.Message) + err = fmt.Errorf("verifier detected error: %s", verificationResult.Message) r.logger.WithField("error", err).Errorf("verification failed, aborting run") r.Ferry.ErrorHandler.Fatal("inline_verifier", err) } - metrics.Measure("CopyPrimaryKeyTables", nil, 1.0, func() { err = r.copyPrimaryKeyTables() }) @@ -172,7 +171,10 @@ func (r *ShardingFerry) Run() { r.Ferry.Throttler.SetDisabled(false) - r.Ferry.StopTargetVerifier() + err = r.Ferry.StopTargetVerifier() + if err != nil { + r.Ferry.ErrorHandler.Fatal("target_verifier", err) + } metrics.Measure("CutoverUnlock", nil, 1.0, func() { err = r.config.CutoverUnlock.Post(&client) diff --git a/target_verifier.go b/target_verifier.go index 1ffe99eb0..f050d2626 100644 --- a/target_verifier.go +++ b/target_verifier.go @@ -12,6 +12,7 @@ type TargetVerifier struct { DB *sql.DB BinlogStreamer *BinlogStreamer StateTracker *StateTracker + EventsChecked int } func NewTargetVerifier(targetDB *sql.DB, stateTracker *StateTracker, binlogStreamer *BinlogStreamer) (*TargetVerifier, error) { @@ -36,7 +37,8 @@ func (t *TargetVerifier) BinlogEventListener(evs []DMLEvent) error { return err } - // Ghostferry's annotation will alwaays be the first, if available + t.EventsChecked += 1 + // Ghostferry's annotation will always be the first, if available if annotation == "" || annotation != t.DB.Marginalia { paginationKey, err := ev.PaginationKey() if err != nil { diff --git a/test/integration/inline_verifier_test.rb b/test/integration/inline_verifier_test.rb index 97c209f89..d1d663e26 100644 --- a/test/integration/inline_verifier_test.rb +++ b/test/integration/inline_verifier_test.rb @@ -68,7 +68,7 @@ def test_different_compressed_data_is_detected_inline_with_batch_writer assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"] + assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"] end def test_same_decompressed_data_different_compressed_test_passes_inline_verification @@ -430,7 +430,7 @@ def test_positive_negative_zero assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert_equal "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"] + assert_equal "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"] # Now we run the real test case. target_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = -0.0 WHERE id = 1") @@ -484,7 +484,7 @@ def test_null_vs_empty_string assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"] + assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"] end def test_null_vs_null_string @@ -507,7 +507,7 @@ def test_null_vs_null_string assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"] + assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"] end def test_null_in_different_order @@ -533,7 +533,19 @@ def test_null_in_different_order assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"] + assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"] + end + + def test_no_events_verified_on_target_will_log_error + seed_random_data(source_db, number_of_rows: 0) + seed_random_data(target_db, number_of_rows: 0) + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) + + ghostferry.run + + assert_equal "target verifier did not check any events", ghostferry.error_lines.last["msg"] + assert_equal "no events checked", ghostferry.error_lines.last["error"] end ###################