From 7c1d4cb98f13f3bc7209378ca8e9b7752ce9305d Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 30 Jan 2024 16:58:42 +0000 Subject: [PATCH] This adds a new `pause_until` configuration option for pausing consumers. It can either be set when the consumer is created (but not via a standard consumer update) or it can be changed later by using the new `$JS.API.CONSUMER.PAUSE.*.*` API endpoint by sending: ``` {"pause_until": "2024-02-08T19:00:00Z"} ``` Any time that is in the past, or a zero timestamp, is considered as "unpaused". Once the consumer reaches the `pause_until` time, messages will start flowing again automatically. The consumer info will additionally include `paused` (type `bool`) and, if paused, a `pause_remaining` (type `time.Duration`) to report the pause status. Also adds `$JS.EVENT.ADVISORY.CONSUMER.PAUSE.*.*` advisory messages which are sent when pausing and unpausing (i.e. reaching the pause deadline). Idle heartbeats continue to be sent while the consumer is paused to satisfy liveness checks. Signed-off-by: Neil Twigg --- go.mod | 2 + go.sum | 4 +- server/consumer.go | 92 ++++- server/jetstream_api.go | 143 ++++++++ server/jetstream_cluster.go | 11 + server/jetstream_cluster_3_test.go | 450 +++++++++++++++++++++++++ server/jetstream_events.go | 12 + server/jetstream_super_cluster_test.go | 85 +++++ server/jetstream_test.go | 326 ++++++++++++++++++ server/test_test.go | 9 + 10 files changed, 1128 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index d4e8dc7a047..f6d70cc8e68 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/nats-io/nats-server/v2 go 1.20 +replace github.com/nats-io/nats.go => github.com/nats-io/nats.go v1.33.1-0.20240216092839-2c1498a72b57 + require ( github.com/klauspost/compress v1.17.6 github.com/minio/highwayhash v1.0.2 diff --git a/go.sum b/go.sum index 78052a45b64..987a2e58233 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.5.4 h1:Bz+drKl2GbE30fxTOtb0NYl1BQ5RwZ+Zcqkg3mR5bbI= github.com/nats-io/jwt/v2 v2.5.4/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70= -github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.33.1-0.20240216092839-2c1498a72b57 h1:HaDH4TZvaPw437wCaUklGXhnmqBpwJ4KfOmJLWpRl+g= +github.com/nats-io/nats.go v1.33.1-0.20240216092839-2c1498a72b57/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/server/consumer.go b/server/consumer.go index 708772c98bd..40e3732bcf3 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -55,6 +55,8 @@ type ConsumerInfo struct { NumPending uint64 `json:"num_pending"` Cluster *ClusterInfo `json:"cluster,omitempty"` PushBound bool `json:"push_bound,omitempty"` + Paused bool `json:"paused,omitempty"` + PauseRemaining time.Duration `json:"pause_remaining,omitempty"` // TimeStamp indicates when the info was gathered TimeStamp time.Time `json:"ts"` } @@ -104,6 +106,9 @@ type ConsumerConfig struct { // Metadata is additional metadata for the Consumer. Metadata map[string]string `json:"metadata,omitempty"` + + // PauseUntil is for suspending the consumer until the deadline. + PauseUntil time.Time `json:"pause_until,omitempty"` } // SequenceInfo has both the consumer and the stream sequence and last activity. @@ -352,11 +357,12 @@ type consumer struct { active bool replay bool dtmr *time.Timer + uptmr *time.Timer // Unpause timer gwdtmr *time.Timer dthresh time.Duration - mch chan struct{} - qch chan struct{} - inch chan bool + mch chan struct{} // Message channel + qch chan struct{} // Quit channel + inch chan bool // Interest change channel sfreq int32 ackEventT string nakEventT string @@ -1072,6 +1078,34 @@ func (o *consumer) updateInactiveThreshold(cfg *ConsumerConfig) { } } +// Updates the paused state. If we are the leader and the pause deadline +// hasn't passed yet then we will start a timer to kick the consumer once +// that deadline is reached. Lock should be held. +func (o *consumer) updatePauseState(cfg *ConsumerConfig) { + if o.uptmr != nil { + stopAndClearTimer(&o.uptmr) + } + if !o.isLeader() { + // Only the leader will run the timer as only the leader will run + // loopAndGatherMsgs. + return + } + if cfg.PauseUntil.IsZero() || cfg.PauseUntil.Before(time.Now()) { + // Either the PauseUntil is unset (is effectively zero) or the + // deadline has already passed, in which case there is nothing + // to do. + return + } + o.uptmr = time.AfterFunc(time.Until(cfg.PauseUntil), func() { + o.mu.Lock() + defer o.mu.Unlock() + + stopAndClearTimer(&o.uptmr) + o.sendPauseAdvisoryLocked(&o.cfg) + o.signalNewMessages() + }) +} + func (o *consumer) consumerAssignment() *consumerAssignment { o.mu.RLock() defer o.mu.RUnlock() @@ -1265,6 +1299,9 @@ func (o *consumer) setLeader(isLeader bool) { o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } + // Update the consumer pause tracking. + o.updatePauseState(&o.cfg) + // If we are not in ReplayInstant mode mark us as in replay state until resolved. if o.cfg.ReplayPolicy != ReplayInstant { o.replay = true @@ -1332,7 +1369,8 @@ func (o *consumer) setLeader(isLeader bool) { } // Stop any inactivity timers. Should only be running on leaders. stopAndClearTimer(&o.dtmr) - + // Stop any unpause timers. Should only be running on leaders. + stopAndClearTimer(&o.uptmr) // Make sure to clear out any re-deliver queues stopAndClearTimer(&o.ptmr) o.rdq = nil @@ -1449,6 +1487,29 @@ func (o *consumer) sendCreateAdvisory() { o.sendAdvisory(subj, j) } +func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) { + e := JSConsumerPauseAdvisory{ + TypedEvent: TypedEvent{ + Type: JSConsumerPauseAdvisoryType, + ID: nuid.Next(), + Time: time.Now().UTC(), + }, + Stream: o.stream, + Consumer: o.name, + Paused: time.Now().Before(cfg.PauseUntil), + PauseUntil: cfg.PauseUntil, + Domain: o.srv.getOpts().JetStreamDomain, + } + + j, err := json.Marshal(e) + if err != nil { + return + } + + subj := JSAdvisoryConsumerPausePre + "." + o.stream + "." + o.name + o.sendAdvisory(subj, j) +} + // Created returns created time. func (o *consumer) createdTime() time.Time { o.mu.Lock() @@ -1812,6 +1873,9 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { return err } + // Make sure we always store PauseUntil in UTC. + cfg.PauseUntil = cfg.PauseUntil.UTC() + if o.store != nil { // Update local state always. if err := o.store.UpdateConfig(cfg); err != nil { @@ -1860,6 +1924,12 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } } + if !cfg.PauseUntil.Equal(o.cfg.PauseUntil) { + o.updatePauseState(cfg) + if o.isLeader() { + o.sendPauseAdvisoryLocked(cfg) + } + } // Check for Subject Filters update. newSubjects := gatherSubjectFilters(cfg.FilterSubject, cfg.FilterSubjects) @@ -2573,6 +2643,10 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo { NumPending: o.checkNumPending(), PushBound: o.isPushMode() && o.active, TimeStamp: time.Now().UTC(), + Paused: time.Now().Before(o.cfg.PauseUntil), + } + if info.Paused { + info.PauseRemaining = time.Until(o.cfg.PauseUntil) } // If we are replicated and we are not the leader we need to pull certain data from our store. @@ -3841,6 +3915,8 @@ func (o *consumer) suppressDeletion() { } } +// loopAndGatherMsgs waits for messages for the consumer. qch is the quit channel, +// upch is the unpause channel which fires when the PauseUntil deadline is reached. func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { // On startup check to see if we are in a reply situation where replay policy is not instant. var ( @@ -3907,6 +3983,13 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { // Clear last error. err = nil + // If the consumer is paused then stop sending. + if !o.cfg.PauseUntil.IsZero() && time.Now().Before(o.cfg.PauseUntil) { + // If the consumer is paused and we haven't reached the deadline yet then + // go back to waiting. + goto waitForMsgs + } + // If we are in push mode and not active or under flowcontrol let's stop sending. if o.isPushMode() { if !o.active || (o.maxpb > 0 && o.pbytes > o.maxpb) { @@ -5220,6 +5303,7 @@ func (o *consumer) switchToEphemeral() { rr := o.acc.sl.Match(o.cfg.DeliverSubject) // Setup dthresh. o.updateInactiveThreshold(&o.cfg) + o.updatePauseState(&o.cfg) o.mu.Unlock() // Update interest diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 03c55ea5935..f46d717b4a5 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -165,6 +165,11 @@ const ( JSApiConsumerDelete = "$JS.API.CONSUMER.DELETE.*.*" JSApiConsumerDeleteT = "$JS.API.CONSUMER.DELETE.%s.%s" + // JSApiConsumerPause is the endpoint to pause or unpause consumers. + // Will return JSON response. + JSApiConsumerPause = "$JS.API.CONSUMER.PAUSE.*.*" + JSApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s" + // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode. JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s" @@ -265,6 +270,9 @@ const ( // JSAdvisoryConsumerDeletedPre notification that a template deleted. JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED" + // JSAdvisoryConsumerPausePre notification that a consumer paused/unpaused. + JSAdvisoryConsumerPausePre = "$JS.EVENT.ADVISORY.CONSUMER.PAUSE" + // JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created. JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE" @@ -667,6 +675,19 @@ type JSApiConsumerDeleteResponse struct { const JSApiConsumerDeleteResponseType = "io.nats.jetstream.api.v1.consumer_delete_response" +type JSApiConsumerPauseRequest struct { + PauseUntil time.Time `json:"pause_until,omitempty"` +} + +const JSApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response" + +type JSApiConsumerPauseResponse struct { + ApiResponse + Paused bool `json:"paused"` + PauseUntil time.Time `json:"pause_until"` + PauseRemaining time.Duration `json:"pause_remaining,omitempty"` +} + type JSApiConsumerInfoResponse struct { ApiResponse *ConsumerInfo @@ -928,6 +949,7 @@ func (s *Server) setJetStreamExportSubs() error { {JSApiConsumerList, s.jsConsumerListRequest}, {JSApiConsumerInfo, s.jsConsumerInfoRequest}, {JSApiConsumerDelete, s.jsConsumerDeleteRequest}, + {JSApiConsumerPause, s.jsConsumerPauseRequest}, } js.mu.Lock() @@ -3973,6 +3995,12 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun return } + // If the consumer already exists then don't allow updating the PauseUntil, just set + // it back to whatever the current configured value is. + if o := stream.lookupConsumer(consumerName); o != nil { + req.Config.PauseUntil = o.cfg.PauseUntil + } + o, err := stream.addConsumerWithAction(&req.Config, req.Action) if err != nil { @@ -3987,6 +4015,10 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun } resp.ConsumerInfo = o.initialInfo() s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + + if !o.cfg.PauseUntil.IsZero() && time.Now().Before(o.cfg.PauseUntil) { + o.sendPauseAdvisoryLocked(&o.cfg) + } } // Request for the list of all consumer names. @@ -4455,6 +4487,117 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Accoun s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } +// Request to pause or unpause a Consumer. +func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { + if c == nil || !s.JetStreamEnabled() { + return + } + ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) + if err != nil { + s.Warnf(badAPIRequestT, msg) + return + } + + var req JSApiConsumerPauseRequest + var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}} + + if !isEmptyRequest(msg) { + if err := json.Unmarshal(msg, &req); err != nil { + resp.Error = NewJSInvalidJSONError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + } + + // Determine if we should proceed here when we are in clustered mode. + isClustered := s.JetStreamIsClustered() + js, cc := s.getJetStreamCluster() + if isClustered { + if js == nil || cc == nil { + return + } + if js.isLeaderless() { + resp.Error = NewJSClusterNotAvailError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Make sure we are meta leader. + if !s.JetStreamIsLeader() { + return + } + } + + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { + resp.Error = NewJSNotEnabledForAccountError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } + return + } + + stream := streamNameFromSubject(subject) + consumer := consumerNameFromSubject(subject) + + if isClustered { + sa := js.streamAssignment(acc.Name, stream) + if sa == nil { + resp.Error = NewJSStreamNotFoundError(Unless(err)) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + ca, ok := sa.consumers[consumer] + if !ok || ca == nil { + resp.Error = NewJSConsumerNotFoundError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + nca := *ca + nca.Config.PauseUntil = req.PauseUntil.UTC() + eca := encodeAddConsumerAssignment(&nca) + cc.meta.Propose(eca) + + resp.PauseUntil = nca.Config.PauseUntil + if resp.Paused = time.Now().Before(nca.Config.PauseUntil); resp.Paused { + resp.PauseRemaining = time.Until(nca.Config.PauseUntil) + } + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + return + } + + mset, err := acc.lookupStream(stream) + if err != nil { + resp.Error = NewJSStreamNotFoundError(Unless(err)) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + obs := mset.lookupConsumer(consumer) + if obs == nil { + resp.Error = NewJSConsumerNotFoundError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + ncfg := obs.cfg + ncfg.PauseUntil = req.PauseUntil.UTC() + + if err := obs.updateConfig(&ncfg); err != nil { + // The only type of error that should be returned here is from o.store, + // so use a store failed error type. + resp.Error = NewJSConsumerStoreFailedError(err) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + resp.PauseUntil = ncfg.PauseUntil + if resp.Paused = time.Now().Before(ncfg.PauseUntil); resp.Paused { + resp.PauseRemaining = time.Until(ncfg.PauseUntil) + } + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) +} + // sendJetStreamAPIAuditAdvisor will send the audit event for a given event. func (s *Server) sendJetStreamAPIAuditAdvisory(ci *ClientInfo, acc *Account, subject, request, response string) { s.publishAdvisory(acc, JSAuditAdvisory, JSAPIAudit{ diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f25ec4efd7f..daa67ca3b32 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5081,6 +5081,13 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err } } + // Only send a pause advisory on consumer create if we're + // actually paused. The timer would have been kicked by now + // by the call to o.setLeader() above. + if isLeader && !o.cfg.PauseUntil.IsZero() && time.Now().Before(o.cfg.PauseUntil) { + o.sendPauseAdvisoryLocked(&o.cfg) + } + return nil } @@ -7191,6 +7198,10 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec Created: time.Now().UTC(), } } else { + // If the consumer already exists then don't allow updating the PauseUntil, just set + // it back to whatever the current configured value is. + cfg.PauseUntil = ca.Config.PauseUntil + nca := ca.copyGroup() rBefore := nca.Config.replicas(sa.Config) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 4d4e386b7cf..3c5eeeae291 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -6546,3 +6546,453 @@ func TestJetStreamClusterStreamPlacementDistribution(t *testing.T) { require_Equal(t, jsz.Streams, expectedStreams) } } + +func TestJetStreamClusterConsumerPauseViaConfig(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "my_consumer", + Replicas: 3, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe("foo", "", nats.Bind("TEST", "my_consumer")) + require_NoError(t, err) + + stepdown := func() { + t.Helper() + _, err := nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "my_consumer"), nil, time.Second) + require_NoError(t, err) + c.waitOnConsumerLeader(globalAccountName, "TEST", "my_consumer") + } + + publish := func(wait time.Duration) { + t.Helper() + for i := 0; i < 5; i++ { + _, err = js.Publish("foo", []byte("OK")) + require_NoError(t, err) + } + msgs, err := sub.Fetch(5, nats.MaxWait(wait)) + require_NoError(t, err) + require_Equal(t, len(msgs), 5) + } + + // This should be fast as there's no deadline. + publish(time.Second) + + // Now we're going to set the deadline. + ci.Config.PauseUntil = time.Now().Add(time.Second * 3) + ci, err = js.UpdateConsumer("TEST", &ci.Config) + require_NoError(t, err) + + // It will now take longer than 3 seconds. + publish(time.Second * 5) + require_True(t, time.Now().After(ci.Config.PauseUntil)) + + // The next set of publishes after the deadline should now be fast. + publish(time.Second) + + // We'll kick the leader, but since we're after the deadline, this + // should still be fast. + stepdown() + publish(time.Second) + + // Now we're going to do an update and then immediately kick the + // leader. The pause should still be in effect afterwards. + ci.Config.PauseUntil = time.Now().Add(time.Second * 3) + ci, err = js.UpdateConsumer("TEST", &ci.Config) + require_NoError(t, err) + publish(time.Second * 5) + require_True(t, time.Now().After(ci.Config.PauseUntil)) + + // The next set of publishes after the deadline should now be fast. + publish(time.Second) +} + +func TestJetStreamClusterConsumerPauseViaEndpoint(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + pauseReq := func(consumer string, deadline time.Time) time.Time { + j, err := json.Marshal(JSApiConsumerPauseRequest{ + PauseUntil: deadline, + }) + require_NoError(t, err) + msg, err := nc.Request(fmt.Sprintf(JSApiConsumerPauseT, "TEST", consumer), j, time.Second*2) + require_NoError(t, err) + var res JSApiConsumerPauseResponse + err = json.Unmarshal(msg.Data, &res) + require_NoError(t, err) + return res.PauseUntil + } + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"push", "pull"}, + Replicas: 3, + }) + require_NoError(t, err) + + t.Run("PullConsumer", func(t *testing.T) { + _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "pull_consumer", + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe("pull", "", nats.Bind("TEST", "pull_consumer")) + require_NoError(t, err) + + // This should succeed as there's no pause, so it definitely + // shouldn't take more than a second. + for i := 0; i < 10; i++ { + _, err = js.Publish("pull", []byte("OK")) + require_NoError(t, err) + } + msgs, err := sub.Fetch(10, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + + // Now we'll pause the consumer for 3 seconds. + deadline := time.Now().Add(time.Second * 3) + require_True(t, pauseReq("pull_consumer", deadline).Equal(deadline)) + + // This should fail as we'll wait for only half of the deadline. + for i := 0; i < 10; i++ { + _, err = js.Publish("pull", []byte("OK")) + require_NoError(t, err) + } + _, err = sub.Fetch(10, nats.MaxWait(time.Until(deadline)/2)) + require_Error(t, err, nats.ErrTimeout) + + // This should succeed after a short wait, and when we're done, + // we should be after the deadline. + msgs, err = sub.Fetch(10) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + require_True(t, time.Now().After(deadline)) + + // This should succeed as there's no pause, so it definitely + // shouldn't take more than a second. + for i := 0; i < 10; i++ { + _, err = js.Publish("pull", []byte("OK")) + require_NoError(t, err) + } + msgs, err = sub.Fetch(10, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + + require_True(t, pauseReq("pull_consumer", time.Time{}).Equal(time.Time{})) + + // This should succeed as there's no pause, so it definitely + // shouldn't take more than a second. + for i := 0; i < 10; i++ { + _, err = js.Publish("pull", []byte("OK")) + require_NoError(t, err) + } + msgs, err = sub.Fetch(10, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + }) + + t.Run("PushConsumer", func(t *testing.T) { + ch := make(chan *nats.Msg, 100) + _, err = js.ChanSubscribe("push", ch, nats.BindStream("TEST"), nats.ConsumerName("push_consumer")) + require_NoError(t, err) + + // This should succeed as there's no pause, so it definitely + // shouldn't take more than a second. + for i := 0; i < 10; i++ { + _, err = js.Publish("push", []byte("OK")) + require_NoError(t, err) + } + for i := 0; i < 10; i++ { + msg := require_ChanRead(t, ch, time.Second) + require_NotEqual(t, msg, nil) + } + + // Now we'll pause the consumer for 3 seconds. + deadline := time.Now().Add(time.Second * 3) + require_True(t, pauseReq("push_consumer", deadline).Equal(deadline)) + + // This should succeed after a short wait, and when we're done, + // we should be after the deadline. + for i := 0; i < 10; i++ { + _, err = js.Publish("push", []byte("OK")) + require_NoError(t, err) + } + for i := 0; i < 10; i++ { + msg := require_ChanRead(t, ch, time.Second*5) + require_NotEqual(t, msg, nil) + require_True(t, time.Now().After(deadline)) + } + + // This should succeed as there's no pause, so it definitely + // shouldn't take more than a second. + for i := 0; i < 10; i++ { + _, err = js.Publish("push", []byte("OK")) + require_NoError(t, err) + } + for i := 0; i < 10; i++ { + msg := require_ChanRead(t, ch, time.Second) + require_NotEqual(t, msg, nil) + } + + require_True(t, pauseReq("push_consumer", time.Time{}).Equal(time.Time{})) + + // This should succeed as there's no pause, so it definitely + // shouldn't take more than a second. + for i := 0; i < 10; i++ { + _, err = js.Publish("push", []byte("OK")) + require_NoError(t, err) + } + for i := 0; i < 10; i++ { + msg := require_ChanRead(t, ch, time.Second) + require_NotEqual(t, msg, nil) + } + }) +} + +func TestJetStreamClusterConsumerPauseTimerFollowsLeader(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "my_consumer", + PauseUntil: time.Now().Add(time.Hour), + Replicas: 3, + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + c.waitOnConsumerLeader(globalAccountName, "TEST", "my_consumer") + c.waitOnAllCurrent() + + for _, s := range c.servers { + stream, err := s.gacc.lookupStream("TEST") + require_NoError(t, err) + + consumer := stream.lookupConsumer("my_consumer") + require_NotEqual(t, consumer, nil) + + isLeader := s.JetStreamIsConsumerLeader(globalAccountName, "TEST", "my_consumer") + + consumer.mu.RLock() + hasTimer := consumer.uptmr != nil + consumer.mu.RUnlock() + + require_Equal(t, isLeader, hasTimer) + } + + _, err = nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "TEST", "my_consumer"), nil, time.Second) + require_NoError(t, err) + } +} + +func TestJetStreamClusterConsumerPauseHeartbeats(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + deadline := time.Now().Add(time.Hour) + dsubj := "deliver_subj" + + ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "my_consumer", + PauseUntil: deadline, + Heartbeat: time.Millisecond * 100, + DeliverSubject: dsubj, + }) + require_NoError(t, err) + require_True(t, ci.Config.PauseUntil.Equal(deadline)) + + ch := make(chan *nats.Msg, 10) + _, err = nc.ChanSubscribe(dsubj, ch) + require_NoError(t, err) + + for i := 0; i < 20; i++ { + msg := require_ChanRead(t, ch, time.Millisecond*200) + require_Equal(t, msg.Header.Get("Status"), "100") + require_Equal(t, msg.Header.Get("Description"), "Idle Heartbeat") + } +} + +func TestJetStreamClusterConsumerPauseAdvisories(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + pauseReq := func(consumer string, deadline time.Time) time.Time { + j, err := json.Marshal(JSApiConsumerPauseRequest{ + PauseUntil: deadline, + }) + require_NoError(t, err) + msg, err := nc.Request(fmt.Sprintf(JSApiConsumerPauseT, "TEST", consumer), j, time.Second) + require_NoError(t, err) + var res JSApiConsumerPauseResponse + err = json.Unmarshal(msg.Data, &res) + require_NoError(t, err) + return res.PauseUntil + } + + checkAdvisory := func(msg *nats.Msg, shouldBePaused bool, deadline time.Time) { + t.Helper() + var advisory JSConsumerPauseAdvisory + require_NoError(t, json.Unmarshal(msg.Data, &advisory)) + require_Equal(t, advisory.Stream, "TEST") + require_Equal(t, advisory.Consumer, "my_consumer") + require_Equal(t, advisory.Paused, shouldBePaused) + require_True(t, advisory.PauseUntil.Equal(deadline)) + } + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + ch := make(chan *nats.Msg, 10) + _, err = nc.ChanSubscribe(JSAdvisoryConsumerPausePre+".TEST.my_consumer", ch) + require_NoError(t, err) + + deadline := time.Now().Add(time.Second) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "my_consumer", + PauseUntil: deadline, + Replicas: 3, + }) + require_NoError(t, err) + + // First advisory should tell us that the consumer was paused + // on creation. + msg := require_ChanRead(t, ch, time.Second*2) + checkAdvisory(msg, true, deadline) + require_Len(t, len(ch), 0) // Should only receive one advisory. + + // The second one for the unpause. + msg = require_ChanRead(t, ch, time.Second*2) + checkAdvisory(msg, false, deadline) + require_Len(t, len(ch), 0) // Should only receive one advisory. + + // Now we'll pause the consumer for a second using the API. + deadline = time.Now().Add(time.Second) + require_True(t, pauseReq("my_consumer", deadline).Equal(deadline)) + + // Third advisory should tell us about the pause via the API. + msg = require_ChanRead(t, ch, time.Second*2) + checkAdvisory(msg, true, deadline) + require_Len(t, len(ch), 0) // Should only receive one advisory. + + // Finally that should unpause. + msg = require_ChanRead(t, ch, time.Second*2) + checkAdvisory(msg, false, deadline) + require_Len(t, len(ch), 0) // Should only receive one advisory. + + // Now we're going to set the deadline into the future so we can + // see what happens when we kick leaders or restart. + deadline = time.Now().Add(time.Hour) + require_True(t, pauseReq("my_consumer", deadline).Equal(deadline)) + + // Setting the deadline should have generated an advisory. + msg = require_ChanRead(t, ch, time.Second) + checkAdvisory(msg, true, deadline) + require_Len(t, len(ch), 0) // Should only receive one advisory. + + // Try to kick the consumer leader. + srv := c.consumerLeader(globalAccountName, "TEST", "my_consumer") + srv.JetStreamStepdownConsumer(globalAccountName, "TEST", "my_consumer") + c.waitOnConsumerLeader(globalAccountName, "TEST", "my_consumer") + + // This shouldn't have generated an advisory. + require_NoChanRead(t, ch, time.Second) +} + +func TestJetStreamClusterConsumerPauseSurvivesRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + checkTimer := func(s *Server) { + stream, err := s.gacc.lookupStream("TEST") + require_NoError(t, err) + + consumer := stream.lookupConsumer("my_consumer") + require_NotEqual(t, consumer, nil) + + consumer.mu.RLock() + timer := consumer.uptmr + consumer.mu.RUnlock() + require_True(t, timer != nil) + } + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + deadline := time.Now().Add(time.Hour) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "my_consumer", + PauseUntil: deadline, + Replicas: 3, + }) + require_NoError(t, err) + + // First try with just restarting the consumer leader. + srv := c.consumerLeader(globalAccountName, "TEST", "my_consumer") + srv.Shutdown() + c.restartServer(srv) + c.waitOnAllCurrent() + c.waitOnConsumerLeader(globalAccountName, "TEST", "my_consumer") + leader := c.consumerLeader(globalAccountName, "TEST", "my_consumer") + require_True(t, leader != nil) + checkTimer(leader) + + // Then try restarting the entire cluster. + c.stopAll() + c.restartAllSamePorts() + c.waitOnAllCurrent() + c.waitOnConsumerLeader(globalAccountName, "TEST", "my_consumer") + leader = c.consumerLeader(globalAccountName, "TEST", "my_consumer") + require_True(t, leader != nil) + checkTimer(leader) +} diff --git a/server/jetstream_events.go b/server/jetstream_events.go index 1852811bb96..b39e7ccaee8 100644 --- a/server/jetstream_events.go +++ b/server/jetstream_events.go @@ -80,6 +80,18 @@ type JSConsumerActionAdvisory struct { const JSConsumerActionAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_action" +// JSConsumerPauseAdvisory indicates that a consumer was paused or unpaused +type JSConsumerPauseAdvisory struct { + TypedEvent + Stream string `json:"stream"` + Consumer string `json:"consumer"` + Paused bool `json:"paused"` + PauseUntil time.Time `json:"pause_until,omitempty"` + Domain string `json:"domain,omitempty"` +} + +const JSConsumerPauseAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_pause" + // JSConsumerAckMetric is a metric published when a user acknowledges a message, the // number of these that will be published is dependent on SampleFrequency type JSConsumerAckMetric struct { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index b7b7f69f12b..01d92256fe9 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -4047,3 +4047,88 @@ func TestJetStreamSuperClusterR1StreamPeerRemove(t *testing.T) { _, err = js.StreamInfo("TEST") require_NoError(t, err) } + +func TestJetStreamSuperClusterConsumerPauseAdvisories(t *testing.T) { + sc := createJetStreamSuperCluster(t, 3, 3) + defer sc.shutdown() + + nc, js := jsClientConnect(t, sc.randomServer()) + defer nc.Close() + + pauseReq := func(consumer string, deadline time.Time) time.Time { + j, err := json.Marshal(JSApiConsumerPauseRequest{ + PauseUntil: deadline, + }) + require_NoError(t, err) + msg, err := nc.Request(fmt.Sprintf(JSApiConsumerPauseT, "TEST", consumer), j, time.Second) + require_NoError(t, err) + var res JSApiConsumerPauseResponse + err = json.Unmarshal(msg.Data, &res) + require_NoError(t, err) + return res.PauseUntil + } + + checkAdvisory := func(msg *nats.Msg, shouldBePaused bool, deadline time.Time) { + t.Helper() + var advisory JSConsumerPauseAdvisory + require_NoError(t, json.Unmarshal(msg.Data, &advisory)) + require_Equal(t, advisory.Stream, "TEST") + require_Equal(t, advisory.Consumer, "my_consumer") + require_Equal(t, advisory.Paused, shouldBePaused) + require_True(t, advisory.PauseUntil.Equal(deadline)) + } + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + ch := make(chan *nats.Msg, 10) + _, err = nc.ChanSubscribe(JSAdvisoryConsumerPausePre+".TEST.my_consumer", ch) + require_NoError(t, err) + + deadline := time.Now().Add(time.Second) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "my_consumer", + PauseUntil: deadline, + Replicas: 3, + }) + require_NoError(t, err) + + // First advisory should tell us that the consumer was paused + // on creation. + msg := require_ChanRead(t, ch, time.Second*2) + checkAdvisory(msg, true, deadline) + require_Len(t, len(ch), 0) // Should only receive one advisory. + + // The second one for the unpause. + msg = require_ChanRead(t, ch, time.Second*2) + checkAdvisory(msg, false, deadline) + require_Len(t, len(ch), 0) // Should only receive one advisory. + + // Now we'll pause the consumer for a second using the API. + deadline = time.Now().Add(time.Second) + require_True(t, pauseReq("my_consumer", deadline).Equal(deadline)) + + // Third advisory should tell us about the pause via the API. + msg = require_ChanRead(t, ch, time.Second*2) + checkAdvisory(msg, true, deadline) + require_Len(t, len(ch), 0) // Should only receive one advisory. + + // Finally that should unpause. + msg = require_ChanRead(t, ch, time.Second*2) + checkAdvisory(msg, false, deadline) + require_Len(t, len(ch), 0) // Should only receive one advisory. + + // Now we're going to set the deadline into the future so we can + // see what happens when we kick leaders or restart. + deadline = time.Now().Add(time.Hour) + require_True(t, pauseReq("my_consumer", deadline).Equal(deadline)) + + // Setting the deadline should have generated an advisory. + msg = require_ChanRead(t, ch, time.Second*2) + checkAdvisory(msg, true, deadline) + require_Len(t, len(ch), 0) // Should only receive one advisory. +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 6bbc858c4bc..eda53d5a385 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22402,3 +22402,329 @@ func TestJetStreamSubjectFilteredPurgeClearsPendingAcks(t *testing.T) { require_Equal(t, ci.NumPending, 0) require_Equal(t, ci.NumAckPending, 10) } + +func TestJetStreamConsumerPauseViaConfig(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + t.Run("CreateShouldSucceed", func(t *testing.T) { + deadline := time.Now().Add(time.Hour) + ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "my_consumer_1", + PauseUntil: deadline, + }) + require_NoError(t, err) + require_True(t, ci.Config.PauseUntil.Equal(deadline)) + }) + + t.Run("UpdateShouldFail", func(t *testing.T) { + deadline := time.Now().Add(time.Hour) + ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "my_consumer_2", + }) + require_NoError(t, err) + require_True(t, ci.Config.PauseUntil.Equal(time.Time{})) + + ci.Config.PauseUntil = deadline + ci, err = js.UpdateConsumer("TEST", &ci.Config) + require_NoError(t, err) + require_False(t, ci.Config.PauseUntil.Equal(deadline)) + require_True(t, ci.Config.PauseUntil.Equal(time.Time{})) + }) +} + +func TestJetStreamConsumerPauseViaEndpoint(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + pauseReq := func(consumer string, deadline time.Time) time.Time { + j, err := json.Marshal(JSApiConsumerPauseRequest{ + PauseUntil: deadline, + }) + require_NoError(t, err) + msg, err := nc.Request(fmt.Sprintf(JSApiConsumerPauseT, "TEST", consumer), j, time.Second) + require_NoError(t, err) + var res JSApiConsumerPauseResponse + err = json.Unmarshal(msg.Data, &res) + require_NoError(t, err) + return res.PauseUntil + } + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"push", "pull"}, + }) + require_NoError(t, err) + + t.Run("PullConsumer", func(t *testing.T) { + _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "pull_consumer", + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe("pull", "", nats.Bind("TEST", "pull_consumer")) + require_NoError(t, err) + + // This should succeed as there's no pause, so it definitely + // shouldn't take more than a second. + for i := 0; i < 10; i++ { + _, err = js.Publish("pull", []byte("OK")) + require_NoError(t, err) + } + msgs, err := sub.Fetch(10, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + + // Now we'll pause the consumer for 3 seconds. + deadline := time.Now().Add(time.Second * 3) + require_True(t, pauseReq("pull_consumer", deadline).Equal(deadline)) + + // This should fail as we'll wait for only half of the deadline. + for i := 0; i < 10; i++ { + _, err = js.Publish("pull", []byte("OK")) + require_NoError(t, err) + } + _, err = sub.Fetch(10, nats.MaxWait(time.Until(deadline)/2)) + require_Error(t, err, nats.ErrTimeout) + + // This should succeed after a short wait, and when we're done, + // we should be after the deadline. + msgs, err = sub.Fetch(10) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + require_True(t, time.Now().After(deadline)) + + // This should succeed as there's no pause, so it definitely + // shouldn't take more than a second. + for i := 0; i < 10; i++ { + _, err = js.Publish("pull", []byte("OK")) + require_NoError(t, err) + } + msgs, err = sub.Fetch(10, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + + require_True(t, pauseReq("pull_consumer", time.Time{}).Equal(time.Time{})) + + // This should succeed as there's no pause, so it definitely + // shouldn't take more than a second. + for i := 0; i < 10; i++ { + _, err = js.Publish("pull", []byte("OK")) + require_NoError(t, err) + } + msgs, err = sub.Fetch(10, nats.MaxWait(time.Second)) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + }) + + t.Run("PushConsumer", func(t *testing.T) { + ch := make(chan *nats.Msg, 100) + _, err = js.ChanSubscribe("push", ch, nats.BindStream("TEST"), nats.ConsumerName("push_consumer")) + require_NoError(t, err) + + // This should succeed as there's no pause, so it definitely + // shouldn't take more than a second. + for i := 0; i < 10; i++ { + _, err = js.Publish("push", []byte("OK")) + require_NoError(t, err) + } + for i := 0; i < 10; i++ { + msg := require_ChanRead(t, ch, time.Second) + require_NotEqual(t, msg, nil) + } + + // Now we'll pause the consumer for 3 seconds. + deadline := time.Now().Add(time.Second * 3) + require_True(t, pauseReq("push_consumer", deadline).Equal(deadline)) + + // This should succeed after a short wait, and when we're done, + // we should be after the deadline. + for i := 0; i < 10; i++ { + _, err = js.Publish("push", []byte("OK")) + require_NoError(t, err) + } + for i := 0; i < 10; i++ { + msg := require_ChanRead(t, ch, time.Second*5) + require_NotEqual(t, msg, nil) + require_True(t, time.Now().After(deadline)) + } + + // This should succeed as there's no pause, so it definitely + // shouldn't take more than a second. + for i := 0; i < 10; i++ { + _, err = js.Publish("push", []byte("OK")) + require_NoError(t, err) + } + for i := 0; i < 10; i++ { + msg := require_ChanRead(t, ch, time.Second) + require_NotEqual(t, msg, nil) + } + + require_True(t, pauseReq("push_consumer", time.Time{}).Equal(time.Time{})) + + // This should succeed as there's no pause, so it definitely + // shouldn't take more than a second. + for i := 0; i < 10; i++ { + _, err = js.Publish("push", []byte("OK")) + require_NoError(t, err) + } + for i := 0; i < 10; i++ { + msg := require_ChanRead(t, ch, time.Second) + require_NotEqual(t, msg, nil) + } + }) +} + +func TestJetStreamConsumerPauseHeartbeats(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + deadline := time.Now().Add(time.Hour) + dsubj := "deliver_subj" + + ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "my_consumer", + PauseUntil: deadline, + Heartbeat: time.Millisecond * 100, + DeliverSubject: dsubj, + }) + require_NoError(t, err) + require_True(t, ci.Config.PauseUntil.Equal(deadline)) + + ch := make(chan *nats.Msg, 10) + _, err = nc.ChanSubscribe(dsubj, ch) + require_NoError(t, err) + + for i := 0; i < 20; i++ { + msg := require_ChanRead(t, ch, time.Millisecond*200) + require_Equal(t, msg.Header.Get("Status"), "100") + require_Equal(t, msg.Header.Get("Description"), "Idle Heartbeat") + } +} + +func TestJetStreamConsumerPauseAdvisories(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + pauseReq := func(consumer string, deadline time.Time) time.Time { + j, err := json.Marshal(JSApiConsumerPauseRequest{ + PauseUntil: deadline, + }) + require_NoError(t, err) + msg, err := nc.Request(fmt.Sprintf(JSApiConsumerPauseT, "TEST", consumer), j, time.Second) + require_NoError(t, err) + var res JSApiConsumerPauseResponse + err = json.Unmarshal(msg.Data, &res) + require_NoError(t, err) + return res.PauseUntil + } + + checkAdvisory := func(msg *nats.Msg, shouldBePaused bool, deadline time.Time) { + t.Helper() + var advisory JSConsumerPauseAdvisory + require_NoError(t, json.Unmarshal(msg.Data, &advisory)) + require_Equal(t, advisory.Stream, "TEST") + require_Equal(t, advisory.Consumer, "my_consumer") + require_Equal(t, advisory.Paused, shouldBePaused) + require_True(t, advisory.PauseUntil.Equal(deadline)) + } + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + ch := make(chan *nats.Msg, 10) + _, err = nc.ChanSubscribe(JSAdvisoryConsumerPausePre+".TEST.my_consumer", ch) + require_NoError(t, err) + + deadline := time.Now().Add(time.Second) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "my_consumer", + PauseUntil: deadline, + }) + require_NoError(t, err) + + // First advisory should tell us that the consumer was paused + // on creation. + msg := require_ChanRead(t, ch, time.Second*2) + checkAdvisory(msg, true, deadline) + + // The second one for the unpause. + msg = require_ChanRead(t, ch, time.Second*2) + checkAdvisory(msg, false, deadline) + + // Now we'll pause the consumer using the API. + deadline = time.Now().Add(time.Second) + require_True(t, pauseReq("my_consumer", deadline).Equal(deadline)) + + // Third advisory should tell us about the pause via the API. + msg = require_ChanRead(t, ch, time.Second*2) + checkAdvisory(msg, true, deadline) + + // Finally that should unpause. + msg = require_ChanRead(t, ch, time.Second*2) + checkAdvisory(msg, false, deadline) +} + +func TestJetStreamConsumerSurvivesRestart(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + deadline := time.Now().Add(time.Hour) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "my_consumer", + PauseUntil: deadline, + }) + require_NoError(t, err) + + sd := s.JetStreamConfig().StoreDir + s.Shutdown() + s = RunJetStreamServerOnPort(-1, sd) + defer s.Shutdown() + + stream, err := s.gacc.lookupStream("TEST") + require_NoError(t, err) + + consumer := stream.lookupConsumer("my_consumer") + require_NotEqual(t, consumer, nil) + + consumer.mu.RLock() + timer := consumer.uptmr + consumer.mu.RUnlock() + require_True(t, timer != nil) +} diff --git a/server/test_test.go b/server/test_test.go index 35f9c526dc6..8f0d393aff9 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -151,6 +151,15 @@ func require_ChanRead[T any](t *testing.T, ch chan T, timeout time.Duration) T { panic("this shouldn't be possible") } +func require_NoChanRead[T any](t *testing.T, ch chan T, timeout time.Duration) { + t.Helper() + select { + case <-ch: + t.Fatalf("require no read from channel within %v but got something", timeout) + case <-time.After(timeout): + } +} + func checkNatsError(t *testing.T, e *ApiError, id ErrorIdentifier) { t.Helper() ae, ok := ApiErrors[id]