Skip to content

Commit

Permalink
add ConsumeOptions to jetstream config (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
minghsu0107 authored Nov 7, 2023
1 parent 4129c56 commit e3be6f1
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 2 deletions.
4 changes: 4 additions & 0 deletions pkg/jetstream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ThreeDotsLabs/watermill"
"github.com/nats-io/nats.go"
natsJS "github.com/nats-io/nats.go/jetstream"
)

// PublisherConfig defines the watermill configuration for a JetStream publisher
Expand Down Expand Up @@ -51,6 +52,9 @@ type SubscriberConfig struct {
ConfigureStream StreamConfigurator
// ConfigureConsumer is a custom function that can be used to define consumer configuration from a topic. Publisher uses it to calculate publish destination from topic.
ConfigureConsumer ConsumerConfigurator

// ConsumeOptions is the option that adjusts consume behavior
ConsumeOptions []natsJS.PullConsumeOpt
}

// setDefaults sets default values needed for a subscriber if unset
Expand Down
3 changes: 2 additions & 1 deletion pkg/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func createOrUpdateConsumerWithCloser(ctx context.Context,
func consume(ctx context.Context,
closing chan struct{},
consumer jetstream.Consumer,
pullConsumeOpts []jetstream.PullConsumeOpt,
cb handleFunc,
deferred func(),
) (chan *message.Message, error) {
Expand All @@ -129,7 +130,7 @@ func consume(ctx context.Context,
// add support for batching pull consumers using consumer.Fetch / FetchNoWait
cc, err := consumer.Consume(func(msg jetstream.Msg) {
cb(ctx, msg, output)
})
}, pullConsumeOpts...)
if err != nil {
return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/jetstream/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Subscriber struct {
nakDelay Delay
configureStream StreamConfigurator
configureConsumer ConsumerConfigurator
consumeOptions []jetstream.PullConsumeOpt
}

// NewSubscriber creates a new watermill JetStream subscriber.
Expand Down Expand Up @@ -70,6 +71,7 @@ func newSubscriber(nc *nats.Conn, config *SubscriberConfig) (*Subscriber, error)
consumerBuilder: config.ResourceInitializer,
configureStream: config.ConfigureStream,
configureConsumer: config.ConfigureConsumer,
consumeOptions: config.ConsumeOptions,
}, nil
}

Expand Down Expand Up @@ -102,7 +104,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa

s.outputsWg.Add(1)

return consume(ctx, s.closing, consumer, s.handleMsg, cleanup)
return consume(ctx, s.closing, consumer, s.consumeOptions, s.handleMsg, cleanup)
}

// Close closes the subscriber and signals to close any subscriptions it created along with the underlying connection.
Expand Down

0 comments on commit e3be6f1

Please sign in to comment.