Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pausing consumers #5066

Merged
merged 3 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 106 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type ConsumerInfo struct {
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
Paused bool `json:"paused,omitempty"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
// TimeStamp indicates when the info was gathered
TimeStamp time.Time `json:"ts"`
}
Expand Down Expand Up @@ -104,6 +106,9 @@ type ConsumerConfig struct {

// Metadata is additional metadata for the Consumer.
Metadata map[string]string `json:"metadata,omitempty"`

// PauseUntil is for suspending the consumer until the deadline.
PauseUntil *time.Time `json:"pause_until,omitempty"`
}

// SequenceInfo has both the consumer and the stream sequence and last activity.
Expand Down Expand Up @@ -352,11 +357,12 @@ type consumer struct {
active bool
replay bool
dtmr *time.Timer
uptmr *time.Timer // Unpause timer
gwdtmr *time.Timer
dthresh time.Duration
mch chan struct{}
qch chan struct{}
inch chan bool
mch chan struct{} // Message channel
qch chan struct{} // Quit channel
inch chan bool // Interest change channel
sfreq int32
ackEventT string
nakEventT string
Expand Down Expand Up @@ -1072,6 +1078,34 @@ func (o *consumer) updateInactiveThreshold(cfg *ConsumerConfig) {
}
}

// Updates the paused state. If we are the leader and the pause deadline
// hasn't passed yet then we will start a timer to kick the consumer once
// that deadline is reached. Lock should be held.
func (o *consumer) updatePauseState(cfg *ConsumerConfig) {
if o.uptmr != nil {
stopAndClearTimer(&o.uptmr)
}
if !o.isLeader() {
// Only the leader will run the timer as only the leader will run
// loopAndGatherMsgs.
return
}
if cfg.PauseUntil == nil || cfg.PauseUntil.IsZero() || cfg.PauseUntil.Before(time.Now()) {
// Either the PauseUntil is unset (is effectively zero) or the
neilalexander marked this conversation as resolved.
Show resolved Hide resolved
// deadline has already passed, in which case there is nothing
// to do.
return
}
o.uptmr = time.AfterFunc(time.Until(*cfg.PauseUntil), func() {
o.mu.Lock()
defer o.mu.Unlock()

stopAndClearTimer(&o.uptmr)
o.sendPauseAdvisoryLocked(&o.cfg)
o.signalNewMessages()
})
}

func (o *consumer) consumerAssignment() *consumerAssignment {
o.mu.RLock()
defer o.mu.RUnlock()
Expand Down Expand Up @@ -1265,6 +1299,9 @@ func (o *consumer) setLeader(isLeader bool) {
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}

// Update the consumer pause tracking.
o.updatePauseState(&o.cfg)

// If we are not in ReplayInstant mode mark us as in replay state until resolved.
if o.cfg.ReplayPolicy != ReplayInstant {
o.replay = true
Expand Down Expand Up @@ -1332,7 +1369,8 @@ func (o *consumer) setLeader(isLeader bool) {
}
// Stop any inactivity timers. Should only be running on leaders.
stopAndClearTimer(&o.dtmr)

// Stop any unpause timers. Should only be running on leaders.
stopAndClearTimer(&o.uptmr)
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
// Make sure to clear out any re-deliver queues
stopAndClearTimer(&o.ptmr)
o.rdq = nil
Expand Down Expand Up @@ -1449,6 +1487,32 @@ func (o *consumer) sendCreateAdvisory() {
o.sendAdvisory(subj, j)
}

func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) {
e := JSConsumerPauseAdvisory{
TypedEvent: TypedEvent{
Type: JSConsumerPauseAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: o.stream,
Consumer: o.name,
Domain: o.srv.getOpts().JetStreamDomain,
}

if cfg.PauseUntil != nil {
e.PauseUntil = *cfg.PauseUntil
e.Paused = time.Now().Before(e.PauseUntil)
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerPausePre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
}

// Created returns created time.
func (o *consumer) createdTime() time.Time {
o.mu.Lock()
Expand Down Expand Up @@ -1812,6 +1876,12 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
return err
}

// Make sure we always store PauseUntil in UTC.
if cfg.PauseUntil != nil {
utc := (*cfg.PauseUntil).UTC()
cfg.PauseUntil = &utc
}

if o.store != nil {
// Update local state always.
if err := o.store.UpdateConfig(cfg); err != nil {
Expand Down Expand Up @@ -1860,6 +1930,22 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
}
// Check whether the pause has changed
{
var old, new time.Time
if o.cfg.PauseUntil != nil {
old = *o.cfg.PauseUntil
}
if cfg.PauseUntil != nil {
new = *cfg.PauseUntil
}
if !old.Equal(new) {
o.updatePauseState(cfg)
if o.isLeader() {
o.sendPauseAdvisoryLocked(cfg)
}
}
}

// Check for Subject Filters update.
newSubjects := gatherSubjectFilters(cfg.FilterSubject, cfg.FilterSubjects)
Expand Down Expand Up @@ -2574,6 +2660,12 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
PushBound: o.isPushMode() && o.active,
TimeStamp: time.Now().UTC(),
}
if o.cfg.PauseUntil != nil {
p := *o.cfg.PauseUntil
if info.Paused = time.Now().Before(p); info.Paused {
info.PauseRemaining = time.Until(p)
}
}

// If we are replicated and we are not the leader we need to pull certain data from our store.
if rg != nil && rg.node != nil && !o.isLeader() && o.store != nil {
Expand Down Expand Up @@ -3841,6 +3933,8 @@ func (o *consumer) suppressDeletion() {
}
}

// loopAndGatherMsgs waits for messages for the consumer. qch is the quit channel,
// upch is the unpause channel which fires when the PauseUntil deadline is reached.
func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// On startup check to see if we are in a reply situation where replay policy is not instant.
var (
Expand Down Expand Up @@ -3907,6 +4001,13 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// Clear last error.
err = nil

// If the consumer is paused then stop sending.
if o.cfg.PauseUntil != nil && !o.cfg.PauseUntil.IsZero() && time.Now().Before(*o.cfg.PauseUntil) {
// If the consumer is paused and we haven't reached the deadline yet then
// go back to waiting.
goto waitForMsgs
}

// If we are in push mode and not active or under flowcontrol let's stop sending.
if o.isPushMode() {
if !o.active || (o.maxpb > 0 && o.pbytes > o.maxpb) {
Expand Down Expand Up @@ -5220,6 +5321,7 @@ func (o *consumer) switchToEphemeral() {
rr := o.acc.sl.Match(o.cfg.DeliverSubject)
// Setup dthresh.
o.updateInactiveThreshold(&o.cfg)
o.updatePauseState(&o.cfg)
o.mu.Unlock()

// Update interest
Expand Down
Loading