Skip to content

Commit

Permalink
Use helper functions for pause consumer tests
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Feb 21, 2024
1 parent d4a3758 commit a25f743
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 111 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ module github.com/nats-io/nats-server/v2

go 1.20

replace github.com/nats-io/nats.go => github.com/nats-io/nats.go v1.33.1-0.20240216092839-2c1498a72b57

require (
github.com/klauspost/compress v1.17.6
github.com/minio/highwayhash v1.0.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.4 h1:Bz+drKl2GbE30fxTOtb0NYl1BQ5RwZ+Zcqkg3mR5bbI=
github.com/nats-io/jwt/v2 v2.5.4/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats.go v1.33.1-0.20240216092839-2c1498a72b57 h1:HaDH4TZvaPw437wCaUklGXhnmqBpwJ4KfOmJLWpRl+g=
github.com/nats-io/nats.go v1.33.1-0.20240216092839-2c1498a72b57/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70=
github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
74 changes: 20 additions & 54 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6561,11 +6561,10 @@ func TestJetStreamClusterConsumerPauseViaConfig(t *testing.T) {
})
require_NoError(t, err)

ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{
Name: "my_consumer",
Replicas: 3,
})
require_NoError(t, err)

sub, err := js.PullSubscribe("foo", "", nats.Bind("TEST", "my_consumer"))
require_NoError(t, err)
Expand All @@ -6592,13 +6591,11 @@ func TestJetStreamClusterConsumerPauseViaConfig(t *testing.T) {
publish(time.Second)

// Now we're going to set the deadline.
ci.Config.PauseUntil = time.Now().Add(time.Second * 3)
ci, err = js.UpdateConsumer("TEST", &ci.Config)
require_NoError(t, err)
deadline := jsTestPause_PauseConsumer(t, nc, "TEST", "my_consumer", time.Now().Add(time.Second*3))

// It will now take longer than 3 seconds.
publish(time.Second * 5)
require_True(t, time.Now().After(ci.Config.PauseUntil))
require_True(t, time.Now().After(deadline))

// The next set of publishes after the deadline should now be fast.
publish(time.Second)
Expand All @@ -6610,11 +6607,9 @@ func TestJetStreamClusterConsumerPauseViaConfig(t *testing.T) {

// Now we're going to do an update and then immediately kick the
// leader. The pause should still be in effect afterwards.
ci.Config.PauseUntil = time.Now().Add(time.Second * 3)
ci, err = js.UpdateConsumer("TEST", &ci.Config)
require_NoError(t, err)
deadline = jsTestPause_PauseConsumer(t, nc, "TEST", "my_consumer", time.Now().Add(time.Second*3))
publish(time.Second * 5)
require_True(t, time.Now().After(ci.Config.PauseUntil))
require_True(t, time.Now().After(deadline))

// The next set of publishes after the deadline should now be fast.
publish(time.Second)
Expand All @@ -6627,19 +6622,6 @@ func TestJetStreamClusterConsumerPauseViaEndpoint(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

pauseReq := func(consumer string, deadline time.Time) time.Time {
j, err := json.Marshal(JSApiConsumerPauseRequest{
PauseUntil: deadline,
})
require_NoError(t, err)
msg, err := nc.Request(fmt.Sprintf(JSApiConsumerPauseT, "TEST", consumer), j, time.Second*2)
require_NoError(t, err)
var res JSApiConsumerPauseResponse
err = json.Unmarshal(msg.Data, &res)
require_NoError(t, err)
return res.PauseUntil
}

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"push", "pull"},
Expand Down Expand Up @@ -6668,7 +6650,7 @@ func TestJetStreamClusterConsumerPauseViaEndpoint(t *testing.T) {

// Now we'll pause the consumer for 3 seconds.
deadline := time.Now().Add(time.Second * 3)
require_True(t, pauseReq("pull_consumer", deadline).Equal(deadline))
require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "pull_consumer", deadline).Equal(deadline))

// This should fail as we'll wait for only half of the deadline.
for i := 0; i < 10; i++ {
Expand All @@ -6695,7 +6677,7 @@ func TestJetStreamClusterConsumerPauseViaEndpoint(t *testing.T) {
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

require_True(t, pauseReq("pull_consumer", time.Time{}).Equal(time.Time{}))
require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "pull_consumer", time.Time{}).Equal(time.Time{}))

// This should succeed as there's no pause, so it definitely
// shouldn't take more than a second.
Expand Down Expand Up @@ -6726,7 +6708,7 @@ func TestJetStreamClusterConsumerPauseViaEndpoint(t *testing.T) {

// Now we'll pause the consumer for 3 seconds.
deadline := time.Now().Add(time.Second * 3)
require_True(t, pauseReq("push_consumer", deadline).Equal(deadline))
require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "push_consumer", deadline).Equal(deadline))

// This should succeed after a short wait, and when we're done,
// we should be after the deadline.
Expand All @@ -6751,7 +6733,7 @@ func TestJetStreamClusterConsumerPauseViaEndpoint(t *testing.T) {
require_NotEqual(t, msg, nil)
}

require_True(t, pauseReq("push_consumer", time.Time{}).Equal(time.Time{}))
require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "push_consumer", time.Time{}).Equal(time.Time{}))

// This should succeed as there's no pause, so it definitely
// shouldn't take more than a second.
Expand Down Expand Up @@ -6780,12 +6762,12 @@ func TestJetStreamClusterConsumerPauseTimerFollowsLeader(t *testing.T) {
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
deadline := time.Now().Add(time.Hour)
jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{
Name: "my_consumer",
PauseUntil: time.Now().Add(time.Hour),
PauseUntil: &deadline,
Replicas: 3,
})
require_NoError(t, err)

for i := 0; i < 10; i++ {
c.waitOnConsumerLeader(globalAccountName, "TEST", "my_consumer")
Expand Down Expand Up @@ -6829,13 +6811,12 @@ func TestJetStreamClusterConsumerPauseHeartbeats(t *testing.T) {
deadline := time.Now().Add(time.Hour)
dsubj := "deliver_subj"

ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
ci := jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{
Name: "my_consumer",
PauseUntil: deadline,
PauseUntil: &deadline,
Heartbeat: time.Millisecond * 100,
DeliverSubject: dsubj,
})
require_NoError(t, err)
require_True(t, ci.Config.PauseUntil.Equal(deadline))

ch := make(chan *nats.Msg, 10)
Expand All @@ -6856,19 +6837,6 @@ func TestJetStreamClusterConsumerPauseAdvisories(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

pauseReq := func(consumer string, deadline time.Time) time.Time {
j, err := json.Marshal(JSApiConsumerPauseRequest{
PauseUntil: deadline,
})
require_NoError(t, err)
msg, err := nc.Request(fmt.Sprintf(JSApiConsumerPauseT, "TEST", consumer), j, time.Second)
require_NoError(t, err)
var res JSApiConsumerPauseResponse
err = json.Unmarshal(msg.Data, &res)
require_NoError(t, err)
return res.PauseUntil
}

checkAdvisory := func(msg *nats.Msg, shouldBePaused bool, deadline time.Time) {
t.Helper()
var advisory JSConsumerPauseAdvisory
Expand All @@ -6891,12 +6859,11 @@ func TestJetStreamClusterConsumerPauseAdvisories(t *testing.T) {
require_NoError(t, err)

deadline := time.Now().Add(time.Second)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{
Name: "my_consumer",
PauseUntil: deadline,
PauseUntil: &deadline,
Replicas: 3,
})
require_NoError(t, err)

// First advisory should tell us that the consumer was paused
// on creation.
Expand All @@ -6911,7 +6878,7 @@ func TestJetStreamClusterConsumerPauseAdvisories(t *testing.T) {

// Now we'll pause the consumer for a second using the API.
deadline = time.Now().Add(time.Second)
require_True(t, pauseReq("my_consumer", deadline).Equal(deadline))
require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "my_consumer", deadline).Equal(deadline))

// Third advisory should tell us about the pause via the API.
msg = require_ChanRead(t, ch, time.Second*2)
Expand All @@ -6926,7 +6893,7 @@ func TestJetStreamClusterConsumerPauseAdvisories(t *testing.T) {
// Now we're going to set the deadline into the future so we can
// see what happens when we kick leaders or restart.
deadline = time.Now().Add(time.Hour)
require_True(t, pauseReq("my_consumer", deadline).Equal(deadline))
require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "my_consumer", deadline).Equal(deadline))

// Setting the deadline should have generated an advisory.
msg = require_ChanRead(t, ch, time.Second)
Expand Down Expand Up @@ -6970,12 +6937,11 @@ func TestJetStreamClusterConsumerPauseSurvivesRestart(t *testing.T) {
require_NoError(t, err)

deadline := time.Now().Add(time.Hour)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{
Name: "my_consumer",
PauseUntil: deadline,
PauseUntil: &deadline,
Replicas: 3,
})
require_NoError(t, err)

// First try with just restarting the consumer leader.
srv := c.consumerLeader(globalAccountName, "TEST", "my_consumer")
Expand Down
5 changes: 2 additions & 3 deletions server/jetstream_super_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4090,12 +4090,11 @@ func TestJetStreamSuperClusterConsumerPauseAdvisories(t *testing.T) {
require_NoError(t, err)

deadline := time.Now().Add(time.Second)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{
Name: "my_consumer",
PauseUntil: deadline,
PauseUntil: &deadline,
Replicas: 3,
})
require_NoError(t, err)

// First advisory should tell us that the consumer was paused
// on creation.
Expand Down
Loading

0 comments on commit a25f743

Please sign in to comment.