Skip to content

Commit

Permalink
This adds a new pause_until configuration option for pausing consum…
Browse files Browse the repository at this point in the history
…ers.

It can either be set when the consumer is created (but not via a
standard consumer update) or it can be changed later by using the new
`$JS.API.CONSUMER.PAUSE.*.*` API endpoint by sending:
```
{"pause_until": "2024-02-08T19:00:00Z"}
```

Any time that is in the past, or a zero timestamp, is considered as
"unpaused". Once the consumer reaches the `pause_until` time, messages
will start flowing again automatically.

The consumer info will additionally include `paused` (type `bool`) and,
if paused, a `pause_remaining` (type `time.Duration`) to report the pause
status.

Also adds `$JS.EVENT.ADVISORY.CONSUMER.PAUSE.*.*` advisory messages which
are sent when pausing and unpausing (i.e. reaching the pause deadline).

Idle heartbeats continue to be sent while the consumer is paused to
satisfy liveness checks.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Feb 16, 2024
1 parent dc52bc2 commit e5255cc
Show file tree
Hide file tree
Showing 10 changed files with 1,122 additions and 6 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/nats-io/nats-server/v2

go 1.20

replace github.com/nats-io/nats.go => github.com/nats-io/nats.go v1.33.1-0.20240216092839-2c1498a72b57

require (
github.com/klauspost/compress v1.17.6
github.com/minio/highwayhash v1.0.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.4 h1:Bz+drKl2GbE30fxTOtb0NYl1BQ5RwZ+Zcqkg3mR5bbI=
github.com/nats-io/jwt/v2 v2.5.4/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats.go v1.33.0 h1:rRg0l2F29B30n6EPl0j50hl8eYp7rA2ecoJ74E62US8=
github.com/nats-io/nats.go v1.33.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.33.1-0.20240216092839-2c1498a72b57 h1:HaDH4TZvaPw437wCaUklGXhnmqBpwJ4KfOmJLWpRl+g=
github.com/nats-io/nats.go v1.33.1-0.20240216092839-2c1498a72b57/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
92 changes: 88 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type ConsumerInfo struct {
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
Paused bool `json:"paused,omitempty"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
// TimeStamp indicates when the info was gathered
TimeStamp time.Time `json:"ts"`
}
Expand Down Expand Up @@ -104,6 +106,9 @@ type ConsumerConfig struct {

// Metadata is additional metadata for the Consumer.
Metadata map[string]string `json:"metadata,omitempty"`

// PauseUntil is for suspending the consumer until the deadline.
PauseUntil time.Time `json:"pause_until,omitempty"`
}

// SequenceInfo has both the consumer and the stream sequence and last activity.
Expand Down Expand Up @@ -352,11 +357,12 @@ type consumer struct {
active bool
replay bool
dtmr *time.Timer
uptmr *time.Timer // Unpause timer
gwdtmr *time.Timer
dthresh time.Duration
mch chan struct{}
qch chan struct{}
inch chan bool
mch chan struct{} // Message channel
qch chan struct{} // Quit channel
inch chan bool // Interest change channel
sfreq int32
ackEventT string
nakEventT string
Expand Down Expand Up @@ -1072,6 +1078,34 @@ func (o *consumer) updateInactiveThreshold(cfg *ConsumerConfig) {
}
}

// Updates the paused state. If we are the leader and the pause deadline
// hasn't passed yet then we will start a timer to kick the consumer once
// that deadline is reached. Lock should be held.
func (o *consumer) updatePauseState(cfg *ConsumerConfig) {
if o.uptmr != nil {
stopAndClearTimer(&o.uptmr)
}
if !o.isLeader() {
// Only the leader will run the timer as only the leader will run
// loopAndGatherMsgs.
return
}
if cfg.PauseUntil.Before(time.Now()) {
// Either the PauseUntil is unset (is effectively zero) or the
// deadline has already passed, in which case there is nothing
// to do.
return
}
o.uptmr = time.AfterFunc(time.Until(cfg.PauseUntil), func() {
o.mu.Lock()
defer o.mu.Unlock()

stopAndClearTimer(&o.uptmr)
o.signalNewMessages()
o.sendPauseAdvisoryLocked(&o.cfg)
})
}

func (o *consumer) consumerAssignment() *consumerAssignment {
o.mu.RLock()
defer o.mu.RUnlock()
Expand Down Expand Up @@ -1265,6 +1299,9 @@ func (o *consumer) setLeader(isLeader bool) {
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}

// Update the consumer pause tracking.
o.updatePauseState(&o.cfg)

// If we are not in ReplayInstant mode mark us as in replay state until resolved.
if o.cfg.ReplayPolicy != ReplayInstant {
o.replay = true
Expand Down Expand Up @@ -1332,7 +1369,8 @@ func (o *consumer) setLeader(isLeader bool) {
}
// Stop any inactivity timers. Should only be running on leaders.
stopAndClearTimer(&o.dtmr)

// Stop any unpause timers. Should only be running on leaders.
stopAndClearTimer(&o.uptmr)
// Make sure to clear out any re-deliver queues
stopAndClearTimer(&o.ptmr)
o.rdq = nil
Expand Down Expand Up @@ -1449,6 +1487,29 @@ func (o *consumer) sendCreateAdvisory() {
o.sendAdvisory(subj, j)
}

func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) {
e := JSConsumerPauseAdvisory{
TypedEvent: TypedEvent{
Type: JSConsumerPauseAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Stream: o.stream,
Consumer: o.name,
Paused: time.Now().Before(cfg.PauseUntil),
PauseUntil: cfg.PauseUntil,
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerPausePre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
}

// Created returns created time.
func (o *consumer) createdTime() time.Time {
o.mu.Lock()
Expand Down Expand Up @@ -1812,6 +1873,9 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
return err
}

// Make sure we always store PauseUntil in UTC.
cfg.PauseUntil = cfg.PauseUntil.UTC()

if o.store != nil {
// Update local state always.
if err := o.store.UpdateConfig(cfg); err != nil {
Expand Down Expand Up @@ -1860,6 +1924,12 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
}
if !cfg.PauseUntil.Equal(o.cfg.PauseUntil) {
o.updatePauseState(cfg)
if o.isLeader() {
o.sendPauseAdvisoryLocked(cfg)
}
}

// Check for Subject Filters update.
newSubjects := gatherSubjectFilters(cfg.FilterSubject, cfg.FilterSubjects)
Expand Down Expand Up @@ -2573,6 +2643,10 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
NumPending: o.checkNumPending(),
PushBound: o.isPushMode() && o.active,
TimeStamp: time.Now().UTC(),
Paused: time.Now().Before(o.cfg.PauseUntil),
}
if info.Paused {
info.PauseRemaining = time.Until(o.cfg.PauseUntil)
}

// If we are replicated and we are not the leader we need to pull certain data from our store.
Expand Down Expand Up @@ -3841,6 +3915,8 @@ func (o *consumer) suppressDeletion() {
}
}

// loopAndGatherMsgs waits for messages for the consumer. qch is the quit channel,
// upch is the unpause channel which fires when the PauseUntil deadline is reached.
func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// On startup check to see if we are in a reply situation where replay policy is not instant.
var (
Expand Down Expand Up @@ -3907,6 +3983,13 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// Clear last error.
err = nil

// If the consumer is paused then stop sending.
if time.Now().Before(o.cfg.PauseUntil) {
// If the consumer is paused and we haven't reached the deadline yet then
// go back to waiting.
goto waitForMsgs
}

// If we are in push mode and not active or under flowcontrol let's stop sending.
if o.isPushMode() {
if !o.active || (o.maxpb > 0 && o.pbytes > o.maxpb) {
Expand Down Expand Up @@ -5220,6 +5303,7 @@ func (o *consumer) switchToEphemeral() {
rr := o.acc.sl.Match(o.cfg.DeliverSubject)
// Setup dthresh.
o.updateInactiveThreshold(&o.cfg)
o.updatePauseState(&o.cfg)
o.mu.Unlock()

// Update interest
Expand Down
143 changes: 143 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ const (
JSApiConsumerDelete = "$JS.API.CONSUMER.DELETE.*.*"
JSApiConsumerDeleteT = "$JS.API.CONSUMER.DELETE.%s.%s"

// JSApiConsumerPause is the endpoint to pause or unpause consumers.
// Will return JSON response.
JSApiConsumerPause = "$JS.API.CONSUMER.PAUSE.*.*"
JSApiConsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"

// JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"

Expand Down Expand Up @@ -265,6 +270,9 @@ const (
// JSAdvisoryConsumerDeletedPre notification that a template deleted.
JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"

// JSAdvisoryConsumerPausePre notification that a consumer paused/unpaused.
JSAdvisoryConsumerPausePre = "$JS.EVENT.ADVISORY.CONSUMER.PAUSE"

// JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"

Expand Down Expand Up @@ -667,6 +675,19 @@ type JSApiConsumerDeleteResponse struct {

const JSApiConsumerDeleteResponseType = "io.nats.jetstream.api.v1.consumer_delete_response"

type JSApiConsumerPauseRequest struct {
PauseUntil time.Time `json:"pause_until,omitempty"`
}

const JSApiConsumerPauseResponseType = "io.nats.jetstream.api.v1.consumer_pause_response"

type JSApiConsumerPauseResponse struct {
ApiResponse
Paused bool `json:"paused"`
PauseUntil time.Time `json:"pause_until"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
}

type JSApiConsumerInfoResponse struct {
ApiResponse
*ConsumerInfo
Expand Down Expand Up @@ -928,6 +949,7 @@ func (s *Server) setJetStreamExportSubs() error {
{JSApiConsumerList, s.jsConsumerListRequest},
{JSApiConsumerInfo, s.jsConsumerInfoRequest},
{JSApiConsumerDelete, s.jsConsumerDeleteRequest},
{JSApiConsumerPause, s.jsConsumerPauseRequest},
}

js.mu.Lock()
Expand Down Expand Up @@ -3973,6 +3995,12 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
return
}

// If the consumer already exists then don't allow updating the PauseUntil, just set
// it back to whatever the current configured value is.
if o := stream.lookupConsumer(consumerName); o != nil {
req.Config.PauseUntil = o.cfg.PauseUntil
}

o, err := stream.addConsumerWithAction(&req.Config, req.Action)

if err != nil {
Expand All @@ -3987,6 +4015,10 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun
}
resp.ConsumerInfo = o.initialInfo()
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))

if time.Now().Before(o.cfg.PauseUntil) {
o.sendPauseAdvisoryLocked(&o.cfg)
}
}

// Request for the list of all consumer names.
Expand Down Expand Up @@ -4455,6 +4487,117 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Accoun
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}

// Request to pause or unpause a Consumer.
func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
if err != nil {
s.Warnf(badAPIRequestT, msg)
return
}

var req JSApiConsumerPauseRequest
var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}}

if !isEmptyRequest(msg) {
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}

// Determine if we should proceed here when we are in clustered mode.
isClustered := s.JetStreamIsClustered()
js, cc := s.getJetStreamCluster()
if isClustered {
if js == nil || cc == nil {
return
}
if js.isLeaderless() {
resp.Error = NewJSClusterNotAvailError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Make sure we are meta leader.
if !s.JetStreamIsLeader() {
return
}
}

if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}

stream := streamNameFromSubject(subject)
consumer := consumerNameFromSubject(subject)

if isClustered {
sa := js.streamAssignment(acc.Name, stream)
if sa == nil {
resp.Error = NewJSStreamNotFoundError(Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

ca, ok := sa.consumers[consumer]
if !ok || ca == nil {
resp.Error = NewJSConsumerNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

nca := *ca
nca.Config.PauseUntil = req.PauseUntil.UTC()
eca := encodeAddConsumerAssignment(&nca)
cc.meta.Propose(eca)

resp.PauseUntil = nca.Config.PauseUntil
if resp.Paused = time.Now().Before(nca.Config.PauseUntil); resp.Paused {
resp.PauseRemaining = time.Until(nca.Config.PauseUntil)
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
return
}

mset, err := acc.lookupStream(stream)
if err != nil {
resp.Error = NewJSStreamNotFoundError(Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

obs := mset.lookupConsumer(consumer)
if obs == nil {
resp.Error = NewJSConsumerNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

ncfg := obs.cfg
ncfg.PauseUntil = req.PauseUntil.UTC()

if err := obs.updateConfig(&ncfg); err != nil {
// The only type of error that should be returned here is from o.store,
// so use a store failed error type.
resp.Error = NewJSConsumerStoreFailedError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

resp.PauseUntil = ncfg.PauseUntil
if resp.Paused = time.Now().Before(ncfg.PauseUntil); resp.Paused {
resp.PauseRemaining = time.Until(ncfg.PauseUntil)
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}

// sendJetStreamAPIAuditAdvisor will send the audit event for a given event.
func (s *Server) sendJetStreamAPIAuditAdvisory(ci *ClientInfo, acc *Account, subject, request, response string) {
s.publishAdvisory(acc, JSAuditAdvisory, JSAPIAudit{
Expand Down
Loading

0 comments on commit e5255cc

Please sign in to comment.