From a25f7432c198949bcaaadfaa3238a94373814ca5 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 21 Feb 2024 10:28:17 +0000 Subject: [PATCH] Use helper functions for pause consumer tests Signed-off-by: Neil Twigg --- go.mod | 2 - go.sum | 4 +- server/jetstream_cluster_3_test.go | 74 +++++----------- server/jetstream_super_cluster_test.go | 5 +- server/jetstream_test.go | 112 ++++++++++++++----------- 5 files changed, 86 insertions(+), 111 deletions(-) diff --git a/go.mod b/go.mod index f6d70cc8e68..d4e8dc7a047 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module github.com/nats-io/nats-server/v2 go 1.20 -replace github.com/nats-io/nats.go => github.com/nats-io/nats.go v1.33.1-0.20240216092839-2c1498a72b57 - require ( github.com/klauspost/compress v1.17.6 github.com/minio/highwayhash v1.0.2 diff --git a/go.sum b/go.sum index 987a2e58233..78052a45b64 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.5.4 h1:Bz+drKl2GbE30fxTOtb0NYl1BQ5RwZ+Zcqkg3mR5bbI= github.com/nats-io/jwt/v2 v2.5.4/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats.go v1.33.1-0.20240216092839-2c1498a72b57 h1:HaDH4TZvaPw437wCaUklGXhnmqBpwJ4KfOmJLWpRl+g= -github.com/nats-io/nats.go v1.33.1-0.20240216092839-2c1498a72b57/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70= +github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 3c5eeeae291..032e49cd2a0 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -6561,11 +6561,10 @@ func TestJetStreamClusterConsumerPauseViaConfig(t *testing.T) { }) require_NoError(t, err) - ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ Name: "my_consumer", Replicas: 3, }) - require_NoError(t, err) sub, err := js.PullSubscribe("foo", "", nats.Bind("TEST", "my_consumer")) require_NoError(t, err) @@ -6592,13 +6591,11 @@ func TestJetStreamClusterConsumerPauseViaConfig(t *testing.T) { publish(time.Second) // Now we're going to set the deadline. - ci.Config.PauseUntil = time.Now().Add(time.Second * 3) - ci, err = js.UpdateConsumer("TEST", &ci.Config) - require_NoError(t, err) + deadline := jsTestPause_PauseConsumer(t, nc, "TEST", "my_consumer", time.Now().Add(time.Second*3)) // It will now take longer than 3 seconds. publish(time.Second * 5) - require_True(t, time.Now().After(ci.Config.PauseUntil)) + require_True(t, time.Now().After(deadline)) // The next set of publishes after the deadline should now be fast. publish(time.Second) @@ -6610,11 +6607,9 @@ func TestJetStreamClusterConsumerPauseViaConfig(t *testing.T) { // Now we're going to do an update and then immediately kick the // leader. The pause should still be in effect afterwards. - ci.Config.PauseUntil = time.Now().Add(time.Second * 3) - ci, err = js.UpdateConsumer("TEST", &ci.Config) - require_NoError(t, err) + deadline = jsTestPause_PauseConsumer(t, nc, "TEST", "my_consumer", time.Now().Add(time.Second*3)) publish(time.Second * 5) - require_True(t, time.Now().After(ci.Config.PauseUntil)) + require_True(t, time.Now().After(deadline)) // The next set of publishes after the deadline should now be fast. publish(time.Second) @@ -6627,19 +6622,6 @@ func TestJetStreamClusterConsumerPauseViaEndpoint(t *testing.T) { nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() - pauseReq := func(consumer string, deadline time.Time) time.Time { - j, err := json.Marshal(JSApiConsumerPauseRequest{ - PauseUntil: deadline, - }) - require_NoError(t, err) - msg, err := nc.Request(fmt.Sprintf(JSApiConsumerPauseT, "TEST", consumer), j, time.Second*2) - require_NoError(t, err) - var res JSApiConsumerPauseResponse - err = json.Unmarshal(msg.Data, &res) - require_NoError(t, err) - return res.PauseUntil - } - _, err := js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"push", "pull"}, @@ -6668,7 +6650,7 @@ func TestJetStreamClusterConsumerPauseViaEndpoint(t *testing.T) { // Now we'll pause the consumer for 3 seconds. deadline := time.Now().Add(time.Second * 3) - require_True(t, pauseReq("pull_consumer", deadline).Equal(deadline)) + require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "pull_consumer", deadline).Equal(deadline)) // This should fail as we'll wait for only half of the deadline. for i := 0; i < 10; i++ { @@ -6695,7 +6677,7 @@ func TestJetStreamClusterConsumerPauseViaEndpoint(t *testing.T) { require_NoError(t, err) require_Equal(t, len(msgs), 10) - require_True(t, pauseReq("pull_consumer", time.Time{}).Equal(time.Time{})) + require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "pull_consumer", time.Time{}).Equal(time.Time{})) // This should succeed as there's no pause, so it definitely // shouldn't take more than a second. @@ -6726,7 +6708,7 @@ func TestJetStreamClusterConsumerPauseViaEndpoint(t *testing.T) { // Now we'll pause the consumer for 3 seconds. deadline := time.Now().Add(time.Second * 3) - require_True(t, pauseReq("push_consumer", deadline).Equal(deadline)) + require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "push_consumer", deadline).Equal(deadline)) // This should succeed after a short wait, and when we're done, // we should be after the deadline. @@ -6751,7 +6733,7 @@ func TestJetStreamClusterConsumerPauseViaEndpoint(t *testing.T) { require_NotEqual(t, msg, nil) } - require_True(t, pauseReq("push_consumer", time.Time{}).Equal(time.Time{})) + require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "push_consumer", time.Time{}).Equal(time.Time{})) // This should succeed as there's no pause, so it definitely // shouldn't take more than a second. @@ -6780,12 +6762,12 @@ func TestJetStreamClusterConsumerPauseTimerFollowsLeader(t *testing.T) { }) require_NoError(t, err) - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + deadline := time.Now().Add(time.Hour) + jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ Name: "my_consumer", - PauseUntil: time.Now().Add(time.Hour), + PauseUntil: &deadline, Replicas: 3, }) - require_NoError(t, err) for i := 0; i < 10; i++ { c.waitOnConsumerLeader(globalAccountName, "TEST", "my_consumer") @@ -6829,13 +6811,12 @@ func TestJetStreamClusterConsumerPauseHeartbeats(t *testing.T) { deadline := time.Now().Add(time.Hour) dsubj := "deliver_subj" - ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + ci := jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ Name: "my_consumer", - PauseUntil: deadline, + PauseUntil: &deadline, Heartbeat: time.Millisecond * 100, DeliverSubject: dsubj, }) - require_NoError(t, err) require_True(t, ci.Config.PauseUntil.Equal(deadline)) ch := make(chan *nats.Msg, 10) @@ -6856,19 +6837,6 @@ func TestJetStreamClusterConsumerPauseAdvisories(t *testing.T) { nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() - pauseReq := func(consumer string, deadline time.Time) time.Time { - j, err := json.Marshal(JSApiConsumerPauseRequest{ - PauseUntil: deadline, - }) - require_NoError(t, err) - msg, err := nc.Request(fmt.Sprintf(JSApiConsumerPauseT, "TEST", consumer), j, time.Second) - require_NoError(t, err) - var res JSApiConsumerPauseResponse - err = json.Unmarshal(msg.Data, &res) - require_NoError(t, err) - return res.PauseUntil - } - checkAdvisory := func(msg *nats.Msg, shouldBePaused bool, deadline time.Time) { t.Helper() var advisory JSConsumerPauseAdvisory @@ -6891,12 +6859,11 @@ func TestJetStreamClusterConsumerPauseAdvisories(t *testing.T) { require_NoError(t, err) deadline := time.Now().Add(time.Second) - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ Name: "my_consumer", - PauseUntil: deadline, + PauseUntil: &deadline, Replicas: 3, }) - require_NoError(t, err) // First advisory should tell us that the consumer was paused // on creation. @@ -6911,7 +6878,7 @@ func TestJetStreamClusterConsumerPauseAdvisories(t *testing.T) { // Now we'll pause the consumer for a second using the API. deadline = time.Now().Add(time.Second) - require_True(t, pauseReq("my_consumer", deadline).Equal(deadline)) + require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "my_consumer", deadline).Equal(deadline)) // Third advisory should tell us about the pause via the API. msg = require_ChanRead(t, ch, time.Second*2) @@ -6926,7 +6893,7 @@ func TestJetStreamClusterConsumerPauseAdvisories(t *testing.T) { // Now we're going to set the deadline into the future so we can // see what happens when we kick leaders or restart. deadline = time.Now().Add(time.Hour) - require_True(t, pauseReq("my_consumer", deadline).Equal(deadline)) + require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "my_consumer", deadline).Equal(deadline)) // Setting the deadline should have generated an advisory. msg = require_ChanRead(t, ch, time.Second) @@ -6970,12 +6937,11 @@ func TestJetStreamClusterConsumerPauseSurvivesRestart(t *testing.T) { require_NoError(t, err) deadline := time.Now().Add(time.Hour) - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ Name: "my_consumer", - PauseUntil: deadline, + PauseUntil: &deadline, Replicas: 3, }) - require_NoError(t, err) // First try with just restarting the consumer leader. srv := c.consumerLeader(globalAccountName, "TEST", "my_consumer") diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 01d92256fe9..a000f959aa3 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -4090,12 +4090,11 @@ func TestJetStreamSuperClusterConsumerPauseAdvisories(t *testing.T) { require_NoError(t, err) deadline := time.Now().Add(time.Second) - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ Name: "my_consumer", - PauseUntil: deadline, + PauseUntil: &deadline, Replicas: 3, }) - require_NoError(t, err) // First advisory should tell us that the consumer was paused // on creation. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index eda53d5a385..89dbedec939 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22403,6 +22403,39 @@ func TestJetStreamSubjectFilteredPurgeClearsPendingAcks(t *testing.T) { require_Equal(t, ci.NumAckPending, 10) } +// Helper function for TestJetStreamConsumerPause*, TestJetStreamClusterConsumerPause*, TestJetStreamSuperClusterConsumerPause* +func jsTestPause_CreateOrUpdateConsumer(t *testing.T, nc *nats.Conn, action ConsumerAction, stream string, cc ConsumerConfig) *JSApiConsumerCreateResponse { + t.Helper() + j, err := json.Marshal(CreateConsumerRequest{ + Stream: stream, + Config: cc, + Action: action, + }) + require_NoError(t, err) + subj := fmt.Sprintf("$JS.API.CONSUMER.CREATE.%s.%s", stream, cc.Name) + m, err := nc.Request(subj, j, time.Second*3) + require_NoError(t, err) + var res JSApiConsumerCreateResponse + require_NoError(t, json.Unmarshal(m.Data, &res)) + require_True(t, res.Config != nil) + return &res +} + +// Helper function for TestJetStreamConsumerPause*, TestJetStreamClusterConsumerPause*, TestJetStreamSuperClusterConsumerPause* +func jsTestPause_PauseConsumer(t *testing.T, nc *nats.Conn, stream, consumer string, deadline time.Time) time.Time { + t.Helper() + j, err := json.Marshal(JSApiConsumerPauseRequest{ + PauseUntil: deadline, + }) + require_NoError(t, err) + subj := fmt.Sprintf("$JS.API.CONSUMER.PAUSE.%s.%s", stream, consumer) + msg, err := nc.Request(subj, j, time.Second) + require_NoError(t, err) + var res JSApiConsumerPauseResponse + require_NoError(t, json.Unmarshal(msg.Data, &res)) + return res.PauseUntil +} + func TestJetStreamConsumerPauseViaConfig(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() @@ -22418,27 +22451,35 @@ func TestJetStreamConsumerPauseViaConfig(t *testing.T) { t.Run("CreateShouldSucceed", func(t *testing.T) { deadline := time.Now().Add(time.Hour) - ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + ci := jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ Name: "my_consumer_1", - PauseUntil: deadline, + PauseUntil: &deadline, }) - require_NoError(t, err) + require_True(t, ci != nil) + require_True(t, ci.Config != nil) + require_True(t, ci.Config.PauseUntil != nil) require_True(t, ci.Config.PauseUntil.Equal(deadline)) }) t.Run("UpdateShouldFail", func(t *testing.T) { deadline := time.Now().Add(time.Hour) - ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + ci := jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ Name: "my_consumer_2", }) - require_NoError(t, err) - require_True(t, ci.Config.PauseUntil.Equal(time.Time{})) + require_True(t, ci != nil) + require_True(t, ci.Config != nil) + require_True(t, ci.Config.PauseUntil == nil || ci.Config.PauseUntil.IsZero()) - ci.Config.PauseUntil = deadline - ci, err = js.UpdateConsumer("TEST", &ci.Config) + var cc ConsumerConfig + j, err := json.Marshal(ci.Config) require_NoError(t, err) - require_False(t, ci.Config.PauseUntil.Equal(deadline)) - require_True(t, ci.Config.PauseUntil.Equal(time.Time{})) + require_NoError(t, json.Unmarshal(j, &cc)) + + pauseUntil := time.Now().Add(time.Hour) + cc.PauseUntil = &pauseUntil + ci2 := jsTestPause_CreateOrUpdateConsumer(t, nc, ActionUpdate, "TEST", cc) + require_False(t, ci2.Config.PauseUntil != nil && ci2.Config.PauseUntil.Equal(deadline)) + require_True(t, ci2.Config.PauseUntil == nil || ci2.Config.PauseUntil.Equal(time.Time{})) }) } @@ -22449,19 +22490,6 @@ func TestJetStreamConsumerPauseViaEndpoint(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() - pauseReq := func(consumer string, deadline time.Time) time.Time { - j, err := json.Marshal(JSApiConsumerPauseRequest{ - PauseUntil: deadline, - }) - require_NoError(t, err) - msg, err := nc.Request(fmt.Sprintf(JSApiConsumerPauseT, "TEST", consumer), j, time.Second) - require_NoError(t, err) - var res JSApiConsumerPauseResponse - err = json.Unmarshal(msg.Data, &res) - require_NoError(t, err) - return res.PauseUntil - } - _, err := js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"push", "pull"}, @@ -22489,7 +22517,7 @@ func TestJetStreamConsumerPauseViaEndpoint(t *testing.T) { // Now we'll pause the consumer for 3 seconds. deadline := time.Now().Add(time.Second * 3) - require_True(t, pauseReq("pull_consumer", deadline).Equal(deadline)) + require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "pull_consumer", deadline).Equal(deadline)) // This should fail as we'll wait for only half of the deadline. for i := 0; i < 10; i++ { @@ -22516,7 +22544,7 @@ func TestJetStreamConsumerPauseViaEndpoint(t *testing.T) { require_NoError(t, err) require_Equal(t, len(msgs), 10) - require_True(t, pauseReq("pull_consumer", time.Time{}).Equal(time.Time{})) + require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "pull_consumer", time.Time{}).Equal(time.Time{})) // This should succeed as there's no pause, so it definitely // shouldn't take more than a second. @@ -22547,7 +22575,7 @@ func TestJetStreamConsumerPauseViaEndpoint(t *testing.T) { // Now we'll pause the consumer for 3 seconds. deadline := time.Now().Add(time.Second * 3) - require_True(t, pauseReq("push_consumer", deadline).Equal(deadline)) + require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "push_consumer", deadline).Equal(deadline)) // This should succeed after a short wait, and when we're done, // we should be after the deadline. @@ -22572,7 +22600,7 @@ func TestJetStreamConsumerPauseViaEndpoint(t *testing.T) { require_NotEqual(t, msg, nil) } - require_True(t, pauseReq("push_consumer", time.Time{}).Equal(time.Time{})) + require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "push_consumer", time.Time{}).Equal(time.Time{})) // This should succeed as there's no pause, so it definitely // shouldn't take more than a second. @@ -22603,13 +22631,12 @@ func TestJetStreamConsumerPauseHeartbeats(t *testing.T) { deadline := time.Now().Add(time.Hour) dsubj := "deliver_subj" - ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + ci := jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ Name: "my_consumer", - PauseUntil: deadline, + PauseUntil: &deadline, Heartbeat: time.Millisecond * 100, DeliverSubject: dsubj, }) - require_NoError(t, err) require_True(t, ci.Config.PauseUntil.Equal(deadline)) ch := make(chan *nats.Msg, 10) @@ -22630,19 +22657,6 @@ func TestJetStreamConsumerPauseAdvisories(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() - pauseReq := func(consumer string, deadline time.Time) time.Time { - j, err := json.Marshal(JSApiConsumerPauseRequest{ - PauseUntil: deadline, - }) - require_NoError(t, err) - msg, err := nc.Request(fmt.Sprintf(JSApiConsumerPauseT, "TEST", consumer), j, time.Second) - require_NoError(t, err) - var res JSApiConsumerPauseResponse - err = json.Unmarshal(msg.Data, &res) - require_NoError(t, err) - return res.PauseUntil - } - checkAdvisory := func(msg *nats.Msg, shouldBePaused bool, deadline time.Time) { t.Helper() var advisory JSConsumerPauseAdvisory @@ -22664,11 +22678,10 @@ func TestJetStreamConsumerPauseAdvisories(t *testing.T) { require_NoError(t, err) deadline := time.Now().Add(time.Second) - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ Name: "my_consumer", - PauseUntil: deadline, + PauseUntil: &deadline, }) - require_NoError(t, err) // First advisory should tell us that the consumer was paused // on creation. @@ -22681,7 +22694,7 @@ func TestJetStreamConsumerPauseAdvisories(t *testing.T) { // Now we'll pause the consumer using the API. deadline = time.Now().Add(time.Second) - require_True(t, pauseReq("my_consumer", deadline).Equal(deadline)) + require_True(t, jsTestPause_PauseConsumer(t, nc, "TEST", "my_consumer", deadline).Equal(deadline)) // Third advisory should tell us about the pause via the API. msg = require_ChanRead(t, ch, time.Second*2) @@ -22706,11 +22719,10 @@ func TestJetStreamConsumerSurvivesRestart(t *testing.T) { require_NoError(t, err) deadline := time.Now().Add(time.Hour) - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + jsTestPause_CreateOrUpdateConsumer(t, nc, ActionCreate, "TEST", ConsumerConfig{ Name: "my_consumer", - PauseUntil: deadline, + PauseUntil: &deadline, }) - require_NoError(t, err) sd := s.JetStreamConfig().StoreDir s.Shutdown()