Skip to content

Commit

Permalink
jetstream: introduce experimental package using new API (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexCuse authored Oct 29, 2023
1 parent 23497d0 commit 18eb1cb
Show file tree
Hide file tree
Showing 18 changed files with 1,034 additions and 557 deletions.
36 changes: 30 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,46 @@ up:
docker-compose up -d

test:
go test -parallel 20 ./...
go test -parallel 20 ./pkg/nats...

test_v:
go test -parallel 20 -v ./...
go test -parallel 20 -v ./pkg/nats...

test_short:
go test -parallel 20 ./... -short
go test -parallel 20 ./pkg/nats... -short

test_race:
go test ./... -short -race
go test ./pkg/nats... -short -race

test_stress:
STRESS_TEST_COUNT=4 go test -tags=stress -parallel 30 -timeout=45m ./...
STRESS_TEST_COUNT=4 go test -tags=stress -parallel 30 -timeout=45m ./pkg/nats...

test_codecov:
go test -coverprofile=coverage.out -covermode=atomic ./pkg/nats...

test_reconnect:
go test -tags=reconnect ./...
go test -tags=reconnect ./pkg/nats...

jetstream_test:
go test -parallel 20 ./pkg/jetstream...

jetstream_test_v:
go test -parallel 20 -v ./pkg/jetstream...

jetstream_test_short:
go test -parallel 20 ./pkg/jetstream... -short

jetstream_test_race:
go test ./pkg/jetstream... -short -race

jetstream_test_stress:
STRESS_TEST_COUNT=4 go test -tags=stress -parallel 30 -timeout=45m ./pkg/jetstream...

jetstream_test_reconnect:
go test -tags=reconnect ./pkg/jetstream...

jetstream_test_codecov:
go test -coverprofile=coverage.out -covermode=atomic ./pkg/jetstream...

BENCHCNT := 1

Expand Down
15 changes: 9 additions & 6 deletions _examples/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module main

go 1.17
go 1.21

toolchain go1.21.0

replace github.com/ThreeDotsLabs/watermill-nats/v2 => ../

Expand All @@ -9,24 +11,25 @@ require (
github.com/ThreeDotsLabs/watermill-nats/v2 v2.0.0
github.com/google/uuid v1.3.0
github.com/nats-io/nats-server/v2 v2.9.8
github.com/nats-io/nats.go v1.23.0
github.com/nats-io/nats.go v1.31.0
github.com/stretchr/testify v1.8.1
google.golang.org/protobuf v1.28.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/klauspost/compress v1.17.1 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.3.0 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
28 changes: 18 additions & 10 deletions _examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g=
github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
Expand All @@ -24,10 +27,11 @@ github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.9.8 h1:jgxZsv+A3Reb3MgwxaINcNq/za8xZInKhDg9Q0cGN1o=
github.com/nats-io/nats-server/v2 v2.9.8/go.mod h1:AB6hAnGZDlYfqb7CTAm66ZKMZy9DpfierY1/PbpvI2g=
github.com/nats-io/nats.go v1.23.0 h1:lR28r7IX44WjYgdiKz9GmUeW0uh/m33uD3yEjLZ2cOE=
github.com/nats-io/nats.go v1.23.0/go.mod h1:ki/Scsa23edbh8IRZbCuNXR9TDcbvfaSijKtaqQgw+Q=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand All @@ -37,6 +41,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand All @@ -45,15 +50,17 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y=
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand All @@ -64,6 +71,7 @@ google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Binary file added _examples/jetstream_new
Binary file not shown.
162 changes: 162 additions & 0 deletions _examples/jetstream_new.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-nats/v2/pkg/jetstream"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
natsJS "github.com/nats-io/nats.go/jetstream"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

svr, err := server.NewServer(&server.Options{
Port: 42223,
JetStream: true,
})

if err != nil {
panic(err)
}

svr.Start()
defer svr.Shutdown()

topic := "example_topic"
// this is the default watermill will look for if no namer func passed
consumer := fmt.Sprintf("watermill__%s", topic)
var namer jetstream.ConsumerConfigurator

// test a custom namer
/*
consumer := fmt.Sprintf("test__%s", topic)
namer := func(s, _ string) string {
return fmt.Sprintf("test__%s", s)
}
*/

logger := watermill.NewStdLogger(true, true)
mainLogFields := watermill.LogFields{"topic": topic, "consumer": consumer, "url": svr.ClientURL()}

// create an existing jetstream consumer and connect explicitly
conn, natsConnectErr := nats.Connect(svr.ClientURL())
if natsConnectErr != nil {
panic(err)
}
js, jsErr := natsJS.New(conn)
if jsErr != nil {
panic(jsErr)
}
s, se := js.CreateStream(ctx, natsJS.StreamConfig{
Name: topic,
Subjects: []string{topic},
})
if se != nil {
panic(se)
}
_, ce := s.CreateOrUpdateConsumer(ctx, natsJS.ConsumerConfig{
Name: consumer,
AckPolicy: natsJS.AckExplicitPolicy,
})
if ce != nil {
panic(ce)
}

subscriber, err := jetstream.NewSubscriber(jetstream.SubscriberConfig{
URL: svr.ClientURL(),
Logger: logger,
AckWaitTimeout: 5 * time.Second,
ResourceInitializer: jetstream.ExistingConsumer(namer, ""),
})
if err != nil {
panic(err)
}

stopPublisher := make(chan struct{}, 1)

messages, err := subscriber.Subscribe(ctx, "example_topic")
if err != nil {
panic(err)
}

go func() {
for msg := range messages {
logger.Info(fmt.Sprintf("received message: %s (test: %s), payload: %s", msg.UUID, msg.Metadata.Get("test"), string(msg.Payload)), mainLogFields)

// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg.Ack()
}
logger.Info("subscriber stopped", mainLogFields)
}()

publisher, err := jetstream.NewPublisher(jetstream.PublisherConfig{
URL: svr.ClientURL(),
Logger: logger,
})
if err != nil {
panic(err)
}

logger.Info("starting", mainLogFields)

go func() {
for {
select {
case <-stopPublisher:
if closeErr := publisher.Close(); err != nil {
logger.Error("failed to close publisher", closeErr, mainLogFields)
}
logger.Info("publisher stopped", mainLogFields)

case <-time.After(time.Second):
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
msg.Metadata.Set("test", watermill.NewShortUUID())
if err := publisher.Publish(topic, msg); err != nil {
logger.Error("publish failed", err, mainLogFields)
}
}
}
}()

c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

for {
select {

case sig := <-c:
signal.Ignore(sig)
logger.Info("Ctrl+C pressed in Terminal - shutting down", mainLogFields)
stopPublisher <- struct{}{}
if closeErr := subscriber.Close(); closeErr != nil {
logger.Error("failed to close subscriber", closeErr, mainLogFields)
} else {
logger.Info("subscriber closed", mainLogFields)
}
if cde := s.DeleteConsumer(ctx, consumer); cde != nil {
logger.Error("failed to delete consumer", cde, mainLogFields)
} else {
logger.Info("consumer deleted", mainLogFields)
}
if sde := js.DeleteStream(ctx, topic); sde != nil {
logger.Error("failed to delete stream", sde, mainLogFields)
} else {
logger.Info("stream deleted", mainLogFields)
}

logger.Info("done", mainLogFields)
return
}
}
}
1 change: 1 addition & 0 deletions _examples/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func process(messages <-chan *message.Message) {

// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
// Note that ack is a no-op under the hood for core-nats but it is still needed to fit watermill flow.
msg.Ack()
}
}
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
version: '3'
services:
nats:
image: nats:2
image: nats:2.10
ports:
- "0.0.0.0:4222:4222"
restart: unless-stopped
Expand Down
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
module github.com/ThreeDotsLabs/watermill-nats/v2

go 1.17
go 1.19

require (
github.com/ThreeDotsLabs/watermill v1.2.0
github.com/google/uuid v1.3.0
github.com/nats-io/nats.go v1.23.0
github.com/nats-io/nats.go v1.31.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.1
)
Expand All @@ -14,15 +14,17 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/klauspost/compress v1.17.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/nats-io/nats-server/v2 v2.9.8 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 18eb1cb

Please sign in to comment.