Skip to content

Commit

Permalink
Pointerise PauseUntil in ConsumerConfig
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Feb 20, 2024
1 parent 7c1d4cb commit d4a3758
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 27 deletions.
52 changes: 35 additions & 17 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type ConsumerConfig struct {
Metadata map[string]string `json:"metadata,omitempty"`

// PauseUntil is for suspending the consumer until the deadline.
PauseUntil time.Time `json:"pause_until,omitempty"`
PauseUntil *time.Time `json:"pause_until,omitempty"`
}

// SequenceInfo has both the consumer and the stream sequence and last activity.
Expand Down Expand Up @@ -1090,13 +1090,13 @@ func (o *consumer) updatePauseState(cfg *ConsumerConfig) {
// loopAndGatherMsgs.
return
}
if cfg.PauseUntil.IsZero() || cfg.PauseUntil.Before(time.Now()) {
if cfg.PauseUntil == nil || cfg.PauseUntil.IsZero() || cfg.PauseUntil.Before(time.Now()) {
// Either the PauseUntil is unset (is effectively zero) or the
// deadline has already passed, in which case there is nothing
// to do.
return
}
o.uptmr = time.AfterFunc(time.Until(cfg.PauseUntil), func() {
o.uptmr = time.AfterFunc(time.Until(*cfg.PauseUntil), func() {
o.mu.Lock()
defer o.mu.Unlock()

Expand Down Expand Up @@ -1494,11 +1494,14 @@ func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) {
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: o.stream,
Consumer: o.name,
Paused: time.Now().Before(cfg.PauseUntil),
PauseUntil: cfg.PauseUntil,
Domain: o.srv.getOpts().JetStreamDomain,
Stream: o.stream,
Consumer: o.name,
Domain: o.srv.getOpts().JetStreamDomain,
}

if cfg.PauseUntil != nil {
e.PauseUntil = *cfg.PauseUntil
e.Paused = time.Now().Before(e.PauseUntil)
}

j, err := json.Marshal(e)
Expand Down Expand Up @@ -1874,7 +1877,10 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
}

// Make sure we always store PauseUntil in UTC.
cfg.PauseUntil = cfg.PauseUntil.UTC()
if cfg.PauseUntil != nil {
utc := (*cfg.PauseUntil).UTC()
cfg.PauseUntil = &utc
}

if o.store != nil {
// Update local state always.
Expand Down Expand Up @@ -1924,10 +1930,20 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
}
if !cfg.PauseUntil.Equal(o.cfg.PauseUntil) {
o.updatePauseState(cfg)
if o.isLeader() {
o.sendPauseAdvisoryLocked(cfg)
// heck whether the pause has changed
{
var old, new time.Time
if o.cfg.PauseUntil != nil {
old = *o.cfg.PauseUntil
}
if cfg.PauseUntil != nil {
new = *cfg.PauseUntil
}
if !old.Equal(new) {
o.updatePauseState(cfg)
if o.isLeader() {
o.sendPauseAdvisoryLocked(cfg)
}
}
}

Expand Down Expand Up @@ -2643,10 +2659,12 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
NumPending: o.checkNumPending(),
PushBound: o.isPushMode() && o.active,
TimeStamp: time.Now().UTC(),
Paused: time.Now().Before(o.cfg.PauseUntil),
}
if info.Paused {
info.PauseRemaining = time.Until(o.cfg.PauseUntil)
if o.cfg.PauseUntil != nil {
p := *o.cfg.PauseUntil
if info.Paused = time.Now().Before(p); info.Paused {
info.PauseRemaining = time.Until(p)
}
}

// If we are replicated and we are not the leader we need to pull certain data from our store.
Expand Down Expand Up @@ -3984,7 +4002,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
err = nil

// If the consumer is paused then stop sending.
if !o.cfg.PauseUntil.IsZero() && time.Now().Before(o.cfg.PauseUntil) {
if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
// If the consumer is paused and we haven't reached the deadline yet then
// go back to waiting.
goto waitForMsgs
Expand Down
24 changes: 15 additions & 9 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4016,7 +4016,7 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
resp.ConsumerInfo = o.initialInfo()
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))

if !o.cfg.PauseUntil.IsZero() && time.Now().Before(o.cfg.PauseUntil) {
if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
o.sendPauseAdvisoryLocked(&o.cfg)
}
}
Expand Down Expand Up @@ -4554,13 +4554,16 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
}

nca := *ca
nca.Config.PauseUntil = req.PauseUntil.UTC()
pauseUTC := req.PauseUntil.UTC()
if !pauseUTC.IsZero() {
nca.Config.PauseUntil = &pauseUTC
}
eca := encodeAddConsumerAssignment(&nca)
cc.meta.Propose(eca)

resp.PauseUntil = nca.Config.PauseUntil
if resp.Paused = time.Now().Before(nca.Config.PauseUntil); resp.Paused {
resp.PauseRemaining = time.Until(nca.Config.PauseUntil)
resp.PauseUntil = pauseUTC
if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
resp.PauseRemaining = time.Until(pauseUTC)
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
return
Expand All @@ -4581,7 +4584,10 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
}

ncfg := obs.cfg
ncfg.PauseUntil = req.PauseUntil.UTC()
pauseUTC := req.PauseUntil.UTC()
if !pauseUTC.IsZero() {
ncfg.PauseUntil = &pauseUTC
}

if err := obs.updateConfig(&ncfg); err != nil {
// The only type of error that should be returned here is from o.store,
Expand All @@ -4591,9 +4597,9 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
return
}

resp.PauseUntil = ncfg.PauseUntil
if resp.Paused = time.Now().Before(ncfg.PauseUntil); resp.Paused {
resp.PauseRemaining = time.Until(ncfg.PauseUntil)
resp.PauseUntil = pauseUTC
if resp.Paused = time.Now().Before(pauseUTC); resp.Paused {
resp.PauseRemaining = time.Until(pauseUTC)
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5084,7 +5084,7 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err
// Only send a pause advisory on consumer create if we're
// actually paused. The timer would have been kicked by now
// by the call to o.setLeader() above.
if isLeader && !o.cfg.PauseUntil.IsZero() && time.Now().Before(o.cfg.PauseUntil) {
if isLeader && o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
o.sendPauseAdvisoryLocked(&o.cfg)
}

Expand Down

0 comments on commit d4a3758

Please sign in to comment.