diff --git a/apps/nsqd/options.go b/apps/nsqd/options.go index f93597013..8f5eb5241 100644 --- a/apps/nsqd/options.go +++ b/apps/nsqd/options.go @@ -159,6 +159,7 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.Int64("max-msg-size", opts.MaxMsgSize, "maximum size of a single message in bytes") flagSet.Duration("max-req-timeout", opts.MaxReqTimeout, "maximum requeuing timeout for a message") flagSet.Int64("max-body-size", opts.MaxBodySize, "maximum size of a single command body") + flagSet.Duration("max-defer-delay", opts.MaxDeferDelay, "maximum duration when deferring a message") // client overridable configuration options flagSet.Duration("max-heartbeat-interval", opts.MaxHeartbeatInterval, "maximum client configurable duration of time between client heartbeats") diff --git a/nsqd/http.go b/nsqd/http.go index c67424187..499d908d2 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -241,7 +241,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout return nil, http_api.Err{400, "INVALID_DEFER"} } deferred = time.Duration(di) * time.Millisecond - if deferred < 0 || deferred > s.nsqd.getOpts().MaxReqTimeout { + if deferred < 0 || deferred > s.nsqd.getOpts().MaxDeferDelay { return nil, http_api.Err{400, "INVALID_DEFER"} } } diff --git a/nsqd/options.go b/nsqd/options.go index 8997c7265..74c9dbebb 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -51,6 +51,7 @@ type Options struct { MaxBodySize int64 `flag:"max-body-size"` MaxReqTimeout time.Duration `flag:"max-req-timeout"` ClientTimeout time.Duration + MaxDeferDelay time.Duration `flag:"max-defer-delay"` // client overridable configuration options MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"` @@ -133,6 +134,7 @@ func NewOptions() *Options { MaxBodySize: 5 * 1024 * 1024, MaxReqTimeout: 1 * time.Hour, ClientTimeout: 60 * time.Second, + MaxDeferDelay: 1 * time.Hour, MaxHeartbeatInterval: 60 * time.Second, MaxRdyCount: 2500, diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 8ec422430..e242ce372 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -878,17 +878,17 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("DPUB topic name %q is not valid", topicName)) } - timeoutMs, err := protocol.ByteToBase10(params[2]) + delayMs, err := protocol.ByteToBase10(params[2]) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_INVALID", - fmt.Sprintf("DPUB could not parse timeout %s", params[2])) + fmt.Sprintf("DPUB could not parse defer delay %s", params[2])) } - timeoutDuration := time.Duration(timeoutMs) * time.Millisecond + delayDuration := time.Duration(delayMs) * time.Millisecond - if timeoutDuration < 0 || timeoutDuration > p.nsqd.getOpts().MaxReqTimeout { + if delayDuration < 0 || delayDuration > p.nsqd.getOpts().MaxDeferDelay { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", - fmt.Sprintf("DPUB timeout %d out of range 0-%d", - timeoutMs, p.nsqd.getOpts().MaxReqTimeout/time.Millisecond)) + fmt.Sprintf("DPUB defer delay %d out of range 0-%d", + delayMs, p.nsqd.getOpts().MaxDeferDelay/time.Millisecond)) } bodyLen, err := readLen(client.Reader, client.lenSlice) @@ -918,7 +918,7 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { topic := p.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) - msg.deferred = timeoutDuration + msg.deferred = delayDuration err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error()) diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 19015d1e9..9aa9ad3f1 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -596,12 +596,12 @@ func TestDPUB(t *testing.T) { test.Equal(t, 1, int(atomic.LoadUint64(&ch.messageCount))) // duration out of range - nsq.DeferredPublish(topicName, opts.MaxReqTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn) + nsq.DeferredPublish(topicName, opts.MaxDeferDelay+100*time.Millisecond, make([]byte, 100)).WriteTo(conn) resp, _ = nsq.ReadResponse(conn) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) - test.Equal(t, "E_INVALID DPUB timeout 3600100 out of range 0-3600000", string(data)) + test.Equal(t, "E_INVALID DPUB defer delay 3600100 out of range 0-3600000", string(data)) } func TestTouch(t *testing.T) { diff --git a/nsqd/protocol_v2_unixsocket_test.go b/nsqd/protocol_v2_unixsocket_test.go index 85820aca6..d1796c5cf 100644 --- a/nsqd/protocol_v2_unixsocket_test.go +++ b/nsqd/protocol_v2_unixsocket_test.go @@ -531,12 +531,12 @@ func TestUnixSocketDPUB(t *testing.T) { test.Equal(t, 1, int(atomic.LoadUint64(&ch.messageCount))) // duration out of range - nsq.DeferredPublish(topicName, opts.MaxReqTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn) + nsq.DeferredPublish(topicName, opts.MaxDeferDelay+100*time.Millisecond, make([]byte, 100)).WriteTo(conn) resp, _ = nsq.ReadResponse(conn) frameType, data, _ = nsq.UnpackResponse(resp) t.Logf("frameType: %d, data: %s", frameType, data) test.Equal(t, frameTypeError, frameType) - test.Equal(t, "E_INVALID DPUB timeout 3600100 out of range 0-3600000", string(data)) + test.Equal(t, "E_INVALID DPUB defer delay 3600100 out of range 0-3600000", string(data)) } func TestUnixSocketTouch(t *testing.T) {