From d4a3758ddb88b043b72dd2518f7b6beb24ad54f6 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 20 Feb 2024 13:39:00 +0000 Subject: [PATCH] Pointerise `PauseUntil` in `ConsumerConfig` Signed-off-by: Neil Twigg --- server/consumer.go | 52 +++++++++++++++++++++++++------------ server/jetstream_api.go | 24 ++++++++++------- server/jetstream_cluster.go | 2 +- 3 files changed, 51 insertions(+), 27 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 40e3732bcf3..1040c9c7654 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -108,7 +108,7 @@ type ConsumerConfig struct { Metadata map[string]string `json:"metadata,omitempty"` // PauseUntil is for suspending the consumer until the deadline. - PauseUntil time.Time `json:"pause_until,omitempty"` + PauseUntil *time.Time `json:"pause_until,omitempty"` } // SequenceInfo has both the consumer and the stream sequence and last activity. @@ -1090,13 +1090,13 @@ func (o *consumer) updatePauseState(cfg *ConsumerConfig) { // loopAndGatherMsgs. return } - if cfg.PauseUntil.IsZero() || cfg.PauseUntil.Before(time.Now()) { + if cfg.PauseUntil == nil || cfg.PauseUntil.IsZero() || cfg.PauseUntil.Before(time.Now()) { // Either the PauseUntil is unset (is effectively zero) or the // deadline has already passed, in which case there is nothing // to do. return } - o.uptmr = time.AfterFunc(time.Until(cfg.PauseUntil), func() { + o.uptmr = time.AfterFunc(time.Until(*cfg.PauseUntil), func() { o.mu.Lock() defer o.mu.Unlock() @@ -1494,11 +1494,14 @@ func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) { ID: nuid.Next(), Time: time.Now().UTC(), }, - Stream: o.stream, - Consumer: o.name, - Paused: time.Now().Before(cfg.PauseUntil), - PauseUntil: cfg.PauseUntil, - Domain: o.srv.getOpts().JetStreamDomain, + Stream: o.stream, + Consumer: o.name, + Domain: o.srv.getOpts().JetStreamDomain, + } + + if cfg.PauseUntil != nil { + e.PauseUntil = *cfg.PauseUntil + e.Paused = time.Now().Before(e.PauseUntil) } j, err := json.Marshal(e) @@ -1874,7 +1877,10 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { } // Make sure we always store PauseUntil in UTC. - cfg.PauseUntil = cfg.PauseUntil.UTC() + if cfg.PauseUntil != nil { + utc := (*cfg.PauseUntil).UTC() + cfg.PauseUntil = &utc + } if o.store != nil { // Update local state always. @@ -1924,10 +1930,20 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } } - if !cfg.PauseUntil.Equal(o.cfg.PauseUntil) { - o.updatePauseState(cfg) - if o.isLeader() { - o.sendPauseAdvisoryLocked(cfg) + // heck whether the pause has changed + { + var old, new time.Time + if o.cfg.PauseUntil != nil { + old = *o.cfg.PauseUntil + } + if cfg.PauseUntil != nil { + new = *cfg.PauseUntil + } + if !old.Equal(new) { + o.updatePauseState(cfg) + if o.isLeader() { + o.sendPauseAdvisoryLocked(cfg) + } } } @@ -2643,10 +2659,12 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo { NumPending: o.checkNumPending(), PushBound: o.isPushMode() && o.active, TimeStamp: time.Now().UTC(), - Paused: time.Now().Before(o.cfg.PauseUntil), } - if info.Paused { - info.PauseRemaining = time.Until(o.cfg.PauseUntil) + if o.cfg.PauseUntil != nil { + p := *o.cfg.PauseUntil + if info.Paused = time.Now().Before(p); info.Paused { + info.PauseRemaining = time.Until(p) + } } // If we are replicated and we are not the leader we need to pull certain data from our store. @@ -3984,7 +4002,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { err = nil // If the consumer is paused then stop sending. - if !o.cfg.PauseUntil.IsZero() && time.Now().Before(o.cfg.PauseUntil) { + if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) { // If the consumer is paused and we haven't reached the deadline yet then // go back to waiting. goto waitForMsgs diff --git a/server/jetstream_api.go b/server/jetstream_api.go index f46d717b4a5..8dcf6bad9e2 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -4016,7 +4016,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun resp.ConsumerInfo = o.initialInfo() s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) - if !o.cfg.PauseUntil.IsZero() && time.Now().Before(o.cfg.PauseUntil) { + if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) { o.sendPauseAdvisoryLocked(&o.cfg) } } @@ -4554,13 +4554,16 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account } nca := *ca - nca.Config.PauseUntil = req.PauseUntil.UTC() + pauseUTC := req.PauseUntil.UTC() + if !pauseUTC.IsZero() { + nca.Config.PauseUntil = &pauseUTC + } eca := encodeAddConsumerAssignment(&nca) cc.meta.Propose(eca) - resp.PauseUntil = nca.Config.PauseUntil - if resp.Paused = time.Now().Before(nca.Config.PauseUntil); resp.Paused { - resp.PauseRemaining = time.Until(nca.Config.PauseUntil) + resp.PauseUntil = pauseUTC + if resp.Paused = time.Now().Before(pauseUTC); resp.Paused { + resp.PauseRemaining = time.Until(pauseUTC) } s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) return @@ -4581,7 +4584,10 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account } ncfg := obs.cfg - ncfg.PauseUntil = req.PauseUntil.UTC() + pauseUTC := req.PauseUntil.UTC() + if !pauseUTC.IsZero() { + ncfg.PauseUntil = &pauseUTC + } if err := obs.updateConfig(&ncfg); err != nil { // The only type of error that should be returned here is from o.store, @@ -4591,9 +4597,9 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account return } - resp.PauseUntil = ncfg.PauseUntil - if resp.Paused = time.Now().Before(ncfg.PauseUntil); resp.Paused { - resp.PauseRemaining = time.Until(ncfg.PauseUntil) + resp.PauseUntil = pauseUTC + if resp.Paused = time.Now().Before(pauseUTC); resp.Paused { + resp.PauseRemaining = time.Until(pauseUTC) } s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index daa67ca3b32..c775dbc6736 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5084,7 +5084,7 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err // Only send a pause advisory on consumer create if we're // actually paused. The timer would have been kicked by now // by the call to o.setLeader() above. - if isLeader && !o.cfg.PauseUntil.IsZero() && time.Now().Before(o.cfg.PauseUntil) { + if isLeader && o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) { o.sendPauseAdvisoryLocked(&o.cfg) }