Skip to content

Commit

Permalink
Merge pull request #258 from Shopify/reverse-rewrites
Browse files Browse the repository at this point in the history
Fix BinlogStreamer for TargetVerifier by correctly handling rewrites
  • Loading branch information
fjordan authored Feb 23, 2021
2 parents 69b55cc + b09e36b commit 7b03a45
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 18 deletions.
24 changes: 21 additions & 3 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ type BinlogStreamer struct {
binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer

// These rewrite structures are used specifically for the Target
// Verifier as it needs to map events streamed from the Target back
// to the TableSchemaCache of the Source
//
// See https://github.com/Shopify/ghostferry/pull/258 for details
DatabaseRewrites map[string]string
TableRewrites map[string]string

lastStreamedBinlogPosition mysql.Position
lastResumableBinlogPosition mysql.Position
stopAtBinlogPosition mysql.Position
Expand Down Expand Up @@ -341,12 +349,22 @@ func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent, query []by
Pos: ev.Header.LogPos,
}

table := s.TableSchema.Get(string(rowsEvent.Table.Schema), string(rowsEvent.Table.Table))
if table == nil {
db := string(rowsEvent.Table.Schema)
if rewrittenDBName, exists := s.DatabaseRewrites[db]; exists {
db = rewrittenDBName
}

table := string(rowsEvent.Table.Table)
if rewrittenTableName, exists := s.TableRewrites[table]; exists {
table = rewrittenTableName
}

tableFromSchemaCache := s.TableSchema.Get(db, table)
if tableFromSchemaCache == nil {
return nil
}

dmlEvs, err := NewBinlogDMLEvents(table, ev, pos, s.lastResumableBinlogPosition, query)
dmlEvs, err := NewBinlogDMLEvents(tableFromSchemaCache, ev, pos, s.lastResumableBinlogPosition, query)
if err != nil {
return err
}
Expand Down
44 changes: 34 additions & 10 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,37 @@ func (f *Ferry) NewDataIteratorWithoutStateTracker() *DataIterator {
return dataIterator
}

func (f *Ferry) NewBinlogStreamer(db *sql.DB, dbConf *DatabaseConfig) *BinlogStreamer {
func (f *Ferry) NewSourceBinlogStreamer() *BinlogStreamer {
return f.newBinlogStreamer(f.SourceDB, f.Config.Source, nil, nil, "source_binlog_streamer")
}

func (f *Ferry) NewTargetBinlogStreamer() (*BinlogStreamer, error) {
schemaRewrites, err := TargetToSourceRewrites(f.Config.DatabaseRewrites)
if err != nil {
return nil, err
}

tableRewrites, err := TargetToSourceRewrites(f.Config.TableRewrites)
if err != nil {
return nil, err
}

return f.newBinlogStreamer(f.TargetDB, f.Config.Target, schemaRewrites, tableRewrites, "target_binlog_streamer"), nil
}

func (f *Ferry) newBinlogStreamer(db *sql.DB, dbConf *DatabaseConfig, schemaRewrites, tableRewrites map[string]string, logTag string) *BinlogStreamer {
f.ensureInitialized()

return &BinlogStreamer{
DB: db,
DBConfig: dbConf,
MyServerId: f.Config.MyServerId,
ErrorHandler: f.ErrorHandler,
Filter: f.CopyFilter,
TableSchema: f.Tables,
DB: db,
DBConfig: dbConf,
MyServerId: f.Config.MyServerId,
ErrorHandler: f.ErrorHandler,
Filter: f.CopyFilter,
TableSchema: f.Tables,
LogTag: logTag,
DatabaseRewrites: schemaRewrites,
TableRewrites: tableRewrites,
}
}

Expand Down Expand Up @@ -452,11 +473,14 @@ func (f *Ferry) Initialize() (err error) {

// The iterative verifier needs the binlog streamer so this has to be first.
// Eventually this can be moved below the verifier initialization.
f.BinlogStreamer = f.NewBinlogStreamer(f.SourceDB, f.Config.Source)
f.BinlogStreamer = f.NewSourceBinlogStreamer()

if !f.Config.SkipTargetVerification {
targetBinlogStreamer := f.NewBinlogStreamer(f.TargetDB, f.Config.Target)
targetBinlogStreamer.LogTag = "tgt_binlog_streamer"
targetBinlogStreamer, err := f.NewTargetBinlogStreamer()
if err != nil {
return err
}

targetVerifier, err := NewTargetVerifier(f.TargetDB, f.StateTracker, targetBinlogStreamer)
if err != nil {
return err
Expand Down
17 changes: 16 additions & 1 deletion table_schema_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package ghostferry

import (
sqlorig "database/sql"
"errors"
"fmt"
sql "github.com/Shopify/ghostferry/sqlwrapper"
"strings"

sql "github.com/Shopify/ghostferry/sqlwrapper"

sq "github.com/Masterminds/squirrel"
"github.com/siddontang/go-mysql/schema"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -302,6 +304,19 @@ func (c TableSchemaCache) Get(database, table string) *TableSchema {
return c[fullTableName(database, table)]
}

func TargetToSourceRewrites(databaseRewrites map[string]string) (map[string]string, error) {
targetToSourceRewrites := make(map[string]string)

for sourceVal, targetVal := range databaseRewrites {
if _, exists := targetToSourceRewrites[targetVal]; exists {
return nil, errors.New("duplicate target to source rewrite detected")
}
targetToSourceRewrites[targetVal] = sourceVal
}

return targetToSourceRewrites, nil
}

// Helper to sort a given map of tables with a second list giving a priority.
// If an element is present in the input and the priority lists, the item will
// appear first (in the order of the priority list), all other items appear in
Expand Down
45 changes: 45 additions & 0 deletions test/go/binlog_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,51 @@ func (this *BinlogStreamerTestSuite) TestBinlogStreamerSetsBinlogPositionOnDMLEv
this.Require().True(eventAsserted)
}

func (this *BinlogStreamerTestSuite) TestBinlogStreamerSetsBinlogPositionOnDMLEventWithRewrites() {
testSchemaRewrite := "gftest_target"
testTableRewrite := "target_test_table_1"

testhelpers.SeedInitialData(this.sourceDB, testSchemaRewrite, testTableRewrite, 0)

databaseRewrites := make(map[string]string)
databaseRewrites[testSchemaRewrite] = "gftest"
this.binlogStreamer.DatabaseRewrites = databaseRewrites

tableRewrites := make(map[string]string)
tableRewrites[testTableRewrite] = "test_table_1"
this.binlogStreamer.TableRewrites = tableRewrites

_, err := this.binlogStreamer.ConnectBinlogStreamerToMysql()
this.Require().Nil(err)

eventAsserted := false

this.binlogStreamer.AddEventListener(func(evs []ghostferry.DMLEvent) error {
eventAsserted = true
this.Require().Equal(1, len(evs))
this.Require().True(strings.HasPrefix(evs[0].BinlogPosition().Name, "mysql-bin."))
this.Require().True(evs[0].BinlogPosition().Pos > 0)
this.binlogStreamer.FlushAndStop()
return nil
})

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
this.binlogStreamer.Run()
}()

_, err = this.sourceDB.Exec(fmt.Sprintf("INSERT INTO %s.%s VALUES (null, 'testdata')", testSchemaRewrite, testTableRewrite))
this.Require().Nil(err)

wg.Wait()
this.Require().True(eventAsserted)

_, err = this.Ferry.SourceDB.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", testSchemaRewrite))
this.Require().Nil(err)
}

func (this *BinlogStreamerTestSuite) TestBinlogStreamerSetsQueryEventOnRowsEvent() {
_, err := this.binlogStreamer.ConnectBinlogStreamerToMysql()
this.Require().Nil(err)
Expand Down
20 changes: 18 additions & 2 deletions test/go/table_schema_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,7 @@ func getMultiTableMap() *ghostferry.TableSchemaCache {

func (this *TableSchemaCacheTestSuite) TestGetTableListWithPriorityNil() {
tables := getMultiTableMap()
// make sure we are not losing any elements, even if the priority does not
// mater
// make sure we are not losing any elements, even if the priority doesn't matter
creationOrder := tables.GetTableListWithPriority(nil)
this.Require().Equal(len(creationOrder), 3)
this.Require().ElementsMatch(creationOrder, tables.AllTableNames())
Expand All @@ -428,6 +427,23 @@ func (this *TableSchemaCacheTestSuite) TestGetTableListWithPriorityIgnoreUnknown
this.Require().Equal(creationOrder[0], "schema.table2")
}

func (this *TableSchemaCacheTestSuite) TestTargetToSourceRewritesErrorsOnDuplicateRewriteValue() {
rewrites := make(map[string]string)
rewrites["source"] = "target"

reversed, err := ghostferry.TargetToSourceRewrites(rewrites)
this.Require().Nil(err)
this.Require().Equal(len(reversed), 1)
this.Require().Equal(reversed["target"], "source")

dupRewrites := make(map[string]string)
dupRewrites["source1"] = "target"
dupRewrites["source2"] = "target"

_, err = ghostferry.TargetToSourceRewrites(dupRewrites)
this.Require().Equal(err.Error(), "duplicate target to source rewrite detected")
}

func TestTableSchemaCache(t *testing.T) {
testhelpers.SetupTest()
suite.Run(t, &TableSchemaCacheTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}})
Expand Down
5 changes: 3 additions & 2 deletions testhelpers/data_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package testhelpers

import (
"fmt"
sql "github.com/Shopify/ghostferry/sqlwrapper"
"math/rand"
"sync"

sql "github.com/Shopify/ghostferry/sqlwrapper"

sq "github.com/Masterminds/squirrel"
)

Expand Down Expand Up @@ -65,7 +66,7 @@ func SeedInitialData(db *sql.DB, dbname, tablename string, numberOfRows int) {
_, err = db.Exec(query)
PanicIfError(err)

query = "CREATE TABLE %s.%s (id bigint(20) not null auto_increment, data TEXT, primary key(id))"
query = "CREATE TABLE IF NOT EXISTS %s.%s (id bigint(20) not null auto_increment, data TEXT, primary key(id))"
query = fmt.Sprintf(query, dbname, tablename)

_, err = db.Exec(query)
Expand Down

0 comments on commit 7b03a45

Please sign in to comment.