-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for pausing consumers #5066
Conversation
d773766
to
81b0221
Compare
Have tested:
Good stuff |
e5255cc
to
27e87b6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM functionality wise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really good, few comments.
27e87b6
to
4b945c1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - Let's figure out client piece.
In general when I introduce new things to the server I simply bypass the client and do them all manually (See direct get batch etc). And place a note to change over once client supports it. I suggest we do that here.
4b945c1
to
d4a3758
Compare
The unit tests have been updated to use new Therefore they no longer rely on upstream changes in nats.go and the |
server/consumer.go
Outdated
@@ -1860,6 +1930,22 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { | |||
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) | |||
} | |||
} | |||
// heck whether the pause has changed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, fixed & rebased!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - One nit on spelling.
Nice work!
…ers. It can either be set when the consumer is created (but not via a standard consumer update) or it can be changed later by using the new `$JS.API.CONSUMER.PAUSE.*.*` API endpoint by sending: ``` {"pause_until": "2024-02-08T19:00:00Z"} ``` Any time that is in the past, or a zero timestamp, is considered as "unpaused". Once the consumer reaches the `pause_until` time, messages will start flowing again automatically. The consumer info will additionally include `paused` (type `bool`) and, if paused, a `pause_remaining` (type `time.Duration`) to report the pause status. Also adds `$JS.EVENT.ADVISORY.CONSUMER.PAUSE.*.*` advisory messages which are sent when pausing and unpausing (i.e. reaching the pause deadline). Idle heartbeats continue to be sent while the consumer is paused to satisfy liveness checks. Signed-off-by: Neil Twigg <[email protected]>
Signed-off-by: Neil Twigg <[email protected]>
Signed-off-by: Neil Twigg <[email protected]>
a25f743
to
88b3027
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This adds a new
pause_until
configuration option for pausing consumers.It can either be set when the consumer is created (but not via a standard consumer update) or it can be changed later by using the new
$JS.API.CONSUMER.PAUSE.*.*
API endpoint by sending:Any time that is in the past, or a zero timestamp, is considered as "unpaused". Once the consumer reaches the
pause_until
time, messages will start flowing again automatically.The consumer info will additionally include
paused
(typebool
) and, if paused, apause_remaining
(typetime.Duration
) to report the pause status.Also adds
$JS.EVENT.ADVISORY.CONSUMER.PAUSE.*.*
advisory messages which are sent when pausing and unpausing (i.e. reaching the pause deadline).Idle heartbeats continue to be sent while the consumer is paused to satisfy liveness checks.
Before merge: nats-io/nats.go#1554 and
go.mod
updated.Signed-off-by: Neil Twigg [email protected]