Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opentracing examples #72

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 102 additions & 16 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ func main() {
}
```

### Opentracing
To enable integraton with Opentracing. Checkpoint, Consumer are now required to pass in context as first parameter. Context object wraps tracing context within and is required to pass down to other layer. Another change, that should be invisible from user is that, all AWS SDK GO call are now using the version WithContext, e.g. if codebase is using GetID(...), now they are replaced with GetIDWithContext(ctx,...). This is done so we can link the span created for AWS call to spans created upstream within application code.

## Contributing

Please see [CONTRIBUTING.md] for more information. Thank you, [contributors]!
Expand Down
12 changes: 8 additions & 4 deletions checkpoint.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package consumer

import (
"context"
)

// Checkpoint interface used track consumer progress in the stream
type Checkpoint interface {
Get(streamName, shardID string) (string, error)
Set(streamName, shardID, sequenceNumber string) error
Get(ctx context.Context, streamName, shardID string) (string, error)
Set(ctx context.Context, streamName, shardID, sequenceNumber string) error
}

// noopCheckpoint implements the checkpoint interface with discard
type noopCheckpoint struct{}

func (n noopCheckpoint) Set(string, string, string) error { return nil }
func (n noopCheckpoint) Get(string, string) (string, error) { return "", nil }
func (n noopCheckpoint) Set(context.Context, string, string, string) error { return nil }
func (n noopCheckpoint) Get(context.Context, string, string) (string, error) { return "", nil }
49 changes: 34 additions & 15 deletions checkpoint/ddb/ddb.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ddb

import (
"context"
"fmt"
"log"
"sync"
Expand All @@ -11,6 +12,8 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)

// Option is used to override defaults when creating a new Checkpoint
Expand Down Expand Up @@ -38,7 +41,7 @@ func WithRetryer(r Retryer) Option {
}

// New returns a checkpoint that uses DynamoDB for underlying storage
func New(appName, tableName string, opts ...Option) (*Checkpoint, error) {
func New(ctx context.Context, appName, tableName string, opts ...Option) (*Checkpoint, error) {
client := dynamodb.New(session.New(aws.NewConfig()))

ck := &Checkpoint{
Expand All @@ -56,7 +59,7 @@ func New(appName, tableName string, opts ...Option) (*Checkpoint, error) {
opt(ck)
}

go ck.loop()
go ck.loop(ctx)

return ck, nil
}
Expand Down Expand Up @@ -87,9 +90,13 @@ type item struct {
// Get determines if a checkpoint for a particular Shard exists.
// Typically used to determine whether we should start processing the shard with
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
func (c *Checkpoint) Get(ctx context.Context, streamName, shardID string) (string, error) {
namespace := fmt.Sprintf("%s-%s", c.appName, streamName)

span, ctx := opentracing.StartSpanFromContext(ctx, "checkpoint.ddb.Get",
opentracing.Tag{Key: "namespace", Value: namespace},
opentracing.Tag{Key: "shardID", Value: shardID},
)
defer span.Finish()
params := &dynamodb.GetItemInput{
TableName: aws.String(c.tableName),
ConsistentRead: aws.Bool(true),
Expand All @@ -103,11 +110,13 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
},
}

resp, err := c.client.GetItem(params)
resp, err := c.client.GetItemWithContext(ctx, params)
if err != nil {
if c.retryer.ShouldRetry(err) {
return c.Get(streamName, shardID)
return c.Get(ctx, streamName, shardID)
}
span.LogKV("checkpoint get item error", err.Error())
ext.Error.Set(span, true)
return "", err
}

Expand All @@ -118,10 +127,14 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) {

// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
// Upon failover, record processing is resumed from this point.
func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error {
func (c *Checkpoint) Set(ctx context.Context, streamName, shardID, sequenceNumber string) error {
c.mu.Lock()
defer c.mu.Unlock()

span, ctx := opentracing.StartSpanFromContext(ctx, "checkpoint.ddb.Set",
opentracing.Tag{Key: "stream.name", Value: streamName},
opentracing.Tag{Key: "shardID", Value: shardID},
)
defer span.Finish()
if sequenceNumber == "" {
return fmt.Errorf("sequence number should not be empty")
}
Expand All @@ -136,29 +149,31 @@ func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error {
}

// Shutdown the checkpoint. Save any in-flight data.
func (c *Checkpoint) Shutdown() error {
func (c *Checkpoint) Shutdown(ctx context.Context) error {
c.done <- struct{}{}
return c.save()
return c.save(ctx)
}

func (c *Checkpoint) loop() {
func (c *Checkpoint) loop(ctx context.Context) {
tick := time.NewTicker(c.maxInterval)
defer tick.Stop()
defer close(c.done)

for {
select {
case <-tick.C:
c.save()
c.save(ctx)
case <-c.done:
return
}
}
}

func (c *Checkpoint) save() error {
func (c *Checkpoint) save(ctx context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
span, ctx := opentracing.StartSpanFromContext(ctx, "checkpoint.ddb.save")
defer span.Finish()

for key, sequenceNumber := range c.checkpoints {
item, err := dynamodbattribute.MarshalMap(item{
Expand All @@ -168,18 +183,22 @@ func (c *Checkpoint) save() error {
})
if err != nil {
log.Printf("marshal map error: %v", err)
span.LogKV("marshal map error", err.Error())
ext.Error.Set(span, true)
return nil
}

_, err = c.client.PutItem(&dynamodb.PutItemInput{
_, err = c.client.PutItemWithContext(ctx, &dynamodb.PutItemInput{
TableName: aws.String(c.tableName),
Item: item,
})
if err != nil {
if !c.retryer.ShouldRetry(err) {
return err
}
return c.save()
span.LogKV("checkpoint put item error", err.Error())
ext.Error.Set(span, true)
return c.save(ctx)
}
}

Expand Down
5 changes: 3 additions & 2 deletions checkpoint/postgres/postgres.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package postgres

import (
"context"
"database/sql"
"errors"
"fmt"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (c *Checkpoint) GetMaxInterval() time.Duration {
// Get determines if a checkpoint for a particular Shard exists.
// Typically used to determine whether we should start processing the shard with
// TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
func (c *Checkpoint) Get(streamName, shardID string) (string, error) {
func (c *Checkpoint) Get(ctx context.Context, streamName, shardID string) (string, error) {
namespace := fmt.Sprintf("%s-%s", c.appName, streamName)

var sequenceNumber string
Expand All @@ -99,7 +100,7 @@ func (c *Checkpoint) Get(streamName, shardID string) (string, error) {

// Set stores a checkpoint for a shard (e.g. sequence number of last record processed by application).
// Upon failover, record processing is resumed from this point.
func (c *Checkpoint) Set(streamName, shardID, sequenceNumber string) error {
func (c *Checkpoint) Set(ctx context.Context, streamName, shardID, sequenceNumber string) error {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down
Loading