Skip to content

Commit

Permalink
REP-5358 Fix hangs when reading change stream and documents. (#77)
Browse files Browse the repository at this point in the history
This fixes hangs from the change stream and document reader when contexts are canceled.

StartChangeEventHandler doesn’t need to send to the error channel. It now just returns its error.

The other channels—writes-off, error, and done—now allow infinite (non-blocking) reads once populated. This works via a new struct, Eventual, that supplies the same semantics as Context’s Done() and Err() methods. The change stream struct’s writes-off and error channels are written to be Eventual. The doneChan remains a channel but now is just closed rather than written to.

This also refactors a couple functions for general reuse and fixes TestStartAtTimeNoChanges, which has flapped occasionally.
  • Loading branch information
FGasper authored Dec 9, 2024
1 parent 09adbd0 commit dd2478c
Show file tree
Hide file tree
Showing 12 changed files with 425 additions and 111 deletions.
67 changes: 67 additions & 0 deletions internal/testutil/testutil.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package testutil

import (
"context"
"testing"

"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
)

// Marshal wraps `bsonMarshal` with a panic on failure.
Expand Down Expand Up @@ -34,3 +40,64 @@ func convertDocsToAnys(docs []bson.D) []any {

return anys
}

func KillApplicationChangeStreams(
ctx context.Context,
t *testing.T,
client *mongo.Client,
appName string,
) error {
// Kill verifier’s change stream.
cursor, err := client.Database(
"admin",
options.Database().SetReadConcern(readconcern.Local()),
).Aggregate(
ctx,
mongo.Pipeline{
{
{"$currentOp", bson.D{
{"idleCursors", true},
}},
},
{
{"$match", bson.D{
{"clientMetadata.application.name", appName},
{"command.collection", "$cmd.aggregate"},
{"cursor.originatingCommand.pipeline.0.$_internalChangeStreamOplogMatch",
bson.D{{"$type", "object"}},
},
}},
},
},
)
if err != nil {
return errors.Wrapf(err, "failed to list %#q's change streams", appName)
}

ops := []struct {
Opid any
}{}
err = cursor.All(ctx, &ops)
if err != nil {
return errors.Wrapf(err, "failed to decode %#q's change streams", appName)
}

for _, op := range ops {
t.Logf("Killing change stream op %+v", op.Opid)

err :=
client.Database("admin").RunCommand(
ctx,
bson.D{
{"killOp", 1},
{"op", op.Opid},
},
).Err()

if err != nil {
return errors.Wrapf(err, "failed to kill change stream with opId %#q", op.Opid)
}
}

return nil
}
24 changes: 24 additions & 0 deletions internal/util/cluster_time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package util

import (
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
)

func GetClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) {
ctStruct := struct {
ClusterTime struct {
ClusterTime primitive.Timestamp `bson:"clusterTime"`
} `bson:"$clusterTime"`
}{}

clusterTimeRaw := sess.ClusterTime()
err := bson.Unmarshal(sess.ClusterTime(), &ctStruct)
if err != nil {
return primitive.Timestamp{}, errors.Wrapf(err, "failed to find clusterTime in session cluster time document (%v)", clusterTimeRaw)
}

return ctStruct.ClusterTime.ClusterTime, nil
}
63 changes: 63 additions & 0 deletions internal/util/eventual.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package util

import (
"sync"

"github.com/10gen/migration-verifier/option"
)

// Eventual solves the “one writer, many readers” problem: a value gets
// written once, then the readers will see that the value is `Ready()` and
// can then `Get()` it.
//
// It’s like how `context.Context`’s `Done()` and `Err()` methods work, but
// generalized to any data type.
type Eventual[T any] struct {
ready chan struct{}
val option.Option[T]
mux sync.RWMutex
}

// NewEventual creates an Eventual and returns a pointer
// to it.
func NewEventual[T any]() *Eventual[T] {
return &Eventual[T]{
ready: make(chan struct{}),
}
}

// Ready returns a channel that closes once the Eventual’s value is ready.
func (e *Eventual[T]) Ready() <-chan struct{} {
return e.ready
}

// Get returns the Eventual’s value if it’s ready.
// It panics otherwise.
func (e *Eventual[T]) Get() T {
e.mux.RLock()
defer e.mux.RUnlock()

val, has := e.val.Get()
if has {
return val
}

panic("Eventual's Get() called before value was ready.")
}

// Set sets the Eventual’s value. It may be called only once;
// if called again it will panic.
func (e *Eventual[T]) Set(val T) {
e.mux.Lock()
defer e.mux.Unlock()

if e.val.IsSome() {
panic("Tried to set an eventual twice!")
}

// NB: This *must* happen before the close(), or else a fast reader may
// not see this value.
e.val = option.Some(val)

close(e.ready)
}
34 changes: 34 additions & 0 deletions internal/util/eventual_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package util

import (
"time"
)

func (s *UnitTestSuite) TestEventual() {
eventual := NewEventual[int]()

s.Assert().Panics(
func() { eventual.Get() },
"Get() should panic before the value is set",
)

select {
case <-eventual.Ready():
s.Require().Fail("should not be ready")
case <-time.NewTimer(time.Second).C:
}

eventual.Set(123)

select {
case <-eventual.Ready():
case <-time.NewTimer(time.Second).C:
s.Require().Fail("should be ready")
}

s.Assert().Equal(
123,
eventual.Get(),
"Get() should return the value",
)
}
61 changes: 30 additions & 31 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type DocKey struct {

const (
minChangeStreamPersistInterval = time.Second * 10
maxChangeStreamAwaitTime = time.Second
metadataChangeStreamCollectionName = "changeStream"
)

Expand All @@ -68,8 +69,8 @@ type ChangeStreamReader struct {

changeStreamRunning bool
changeEventBatchChan chan []ParsedEvent
writesOffTsChan chan primitive.Timestamp
errChan chan error
writesOffTs *util.Eventual[primitive.Timestamp]
error *util.Eventual[error]
doneChan chan struct{}

startAtTs *primitive.Timestamp
Expand All @@ -87,8 +88,8 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
clusterInfo: *verifier.srcClusterInfo,
changeStreamRunning: false,
changeEventBatchChan: make(chan []ParsedEvent),
writesOffTsChan: make(chan primitive.Timestamp),
errChan: make(chan error),
writesOffTs: util.NewEventual[primitive.Timestamp](),
error: util.NewEventual[error](),
doneChan: make(chan struct{}),
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
}
Expand All @@ -101,8 +102,8 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
clusterInfo: *verifier.dstClusterInfo,
changeStreamRunning: false,
changeEventBatchChan: make(chan []ParsedEvent),
writesOffTsChan: make(chan primitive.Timestamp),
errChan: make(chan error),
writesOffTs: util.NewEventual[primitive.Timestamp](),
error: util.NewEventual[error](),
doneChan: make(chan struct{}),
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
}
Expand All @@ -123,7 +124,6 @@ func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *C
verifier.logger.Trace().Msgf("Verifier is handling a change event batch from %s: %v", reader, batch)
err := verifier.HandleChangeStreamEvents(ctx, batch, reader.readerType)
if err != nil {
reader.errChan <- err
return err
}
}
Expand Down Expand Up @@ -268,6 +268,8 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
eventsRead := 0
var changeEventBatch []ParsedEvent

latestEvent := option.None[ParsedEvent]()

for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
gotEvent := cs.TryNext(ctx)

Expand All @@ -293,7 +295,9 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
if changeEventBatch[eventsRead].ClusterTime != nil &&
(csr.lastChangeEventTime == nil ||
csr.lastChangeEventTime.Before(*changeEventBatch[eventsRead].ClusterTime)) {

csr.lastChangeEventTime = changeEventBatch[eventsRead].ClusterTime
latestEvent = option.Some(changeEventBatch[eventsRead])
}

eventsRead++
Expand All @@ -305,6 +309,12 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
return nil
}

if event, has := latestEvent.Get(); has {
csr.logger.Trace().
Interface("event", event).
Msg("Updated lastChangeEventTime.")
}

var curTs primitive.Timestamp
curTs, err := extractTimestampFromResumeToken(cs.ResumeToken())
if err == nil {
Expand Down Expand Up @@ -355,7 +365,9 @@ func (csr *ChangeStreamReader) iterateChangeStream(
// source writes are ended and the migration tool is finished / committed.
// This means we should exit rather than continue reading the change stream
// since there should be no more events.
case writesOffTs := <-csr.writesOffTsChan:
case <-csr.writesOffTs.Ready():
writesOffTs := csr.writesOffTs.Get()

csr.logger.Debug().
Interface("writesOffTimestamp", writesOffTs).
Msgf("%s thread received writesOff timestamp. Finalizing change stream.", csr)
Expand Down Expand Up @@ -408,7 +420,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
}
// since we have started Recheck, we must signal that we have
// finished the change stream changes so that Recheck can continue.
csr.doneChan <- struct{}{}
close(csr.doneChan)
break
}
}
Expand All @@ -430,7 +442,7 @@ func (csr *ChangeStreamReader) createChangeStream(
) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) {
pipeline := csr.GetChangeStreamFilter()
opts := options.ChangeStream().
SetMaxAwaitTime(1 * time.Second).
SetMaxAwaitTime(maxChangeStreamAwaitTime).
SetFullDocument(options.UpdateLookup)

if csr.clusterInfo.VersionArray[0] >= 6 {
Expand Down Expand Up @@ -487,11 +499,17 @@ func (csr *ChangeStreamReader) createChangeStream(
// With sharded clusters the resume token might lead the cluster time
// by 1 increment. In that case we need the actual cluster time;
// otherwise we will get errors.
clusterTime, err := getClusterTimeFromSession(sess)
clusterTime, err := util.GetClusterTimeFromSession(sess)
if err != nil {
return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
}

csr.logger.Debug().
Interface("resumeTokenTimestamp", startTs).
Interface("clusterTime", clusterTime).
Stringer("changeStreamReader", csr).
Msg("Using earlier time as start timestamp.")

if startTs.After(clusterTime) {
startTs = clusterTime
}
Expand Down Expand Up @@ -542,10 +560,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
).Run(ctx, csr.logger)

if err != nil {
// NB: This failure always happens after the initial change stream
// creation.
csr.errChan <- err
close(csr.errChan)
csr.error.Set(err)
}
}()

Expand Down Expand Up @@ -661,19 +676,3 @@ func extractTimestampFromResumeToken(resumeToken bson.Raw) (primitive.Timestamp,

return resumeTokenTime, nil
}

func getClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) {
ctStruct := struct {
ClusterTime struct {
ClusterTime primitive.Timestamp `bson:"clusterTime"`
} `bson:"$clusterTime"`
}{}

clusterTimeRaw := sess.ClusterTime()
err := bson.Unmarshal(sess.ClusterTime(), &ctStruct)
if err != nil {
return primitive.Timestamp{}, errors.Wrapf(err, "failed to find clusterTime in session cluster time document (%v)", clusterTimeRaw)
}

return ctStruct.ClusterTime.ClusterTime, nil
}
Loading

0 comments on commit dd2478c

Please sign in to comment.