Skip to content

Commit

Permalink
nsqd: add experiment topology-aware-consumption for preferred msg con…
Browse files Browse the repository at this point in the history
…sumption
  • Loading branch information
jehiah authored and zoemccormick committed Jan 21, 2025
1 parent 8e7f8d1 commit 1cd6297
Show file tree
Hide file tree
Showing 26 changed files with 717 additions and 172 deletions.
9 changes: 9 additions & 0 deletions apps/nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)")
flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect")
flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP request")
flagSet.String("topology-region", opts.TopologyRegion, "A region represents a larger domain, made up of one or more zones for preferring closer consumer")
flagSet.String("topology-zone", opts.TopologyZone, "A zone represents a logical failure domain for preferring closer consumer")

// diskqueue options
flagSet.String("data-path", opts.DataPath, "path to store disk-backed messages")
Expand Down Expand Up @@ -197,5 +199,12 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)")
flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)")

experiments := app.StringArray{}
var validExperiments []string
for _, e := range nsqd.AllExperiments {
validExperiments = append(validExperiments, fmt.Sprintf("%q", string(e)))
}
flagSet.Var(&experiments, "enable-experiment", fmt.Sprintf("enable experimental feature (may be given multiple times) (valid options: %s)", strings.Join(validExperiments, ", ")))

return flagSet
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/stretchr/testify v1.9.0 // indirect
golang.org/x/sys v0.10.0 // indirect
)

Expand Down
17 changes: 11 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/bitly/go-hostpool v0.1.0 h1:XKmsF6k5el6xHG3WPJ8U0Ku/ye7njX7W81Ng7O2ioR0=
Expand Down Expand Up @@ -29,13 +27,20 @@ github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQT
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20211023085530-d6a326fbbf70 h1:SeSEfdIxyvwGJliREIJhRPPXvW6sDlLT+UQ3B0hD0NA=
golang.org/x/sys v0.0.0-20211023085530-d6a326fbbf70/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
12 changes: 12 additions & 0 deletions internal/clusterinfo/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ func (c *ClusterInfo) GetNSQDProducers(nsqdHTTPAddrs []string) (Producers, error
Hostname string `json:"hostname"`
HTTPPort int `json:"http_port"`
TCPPort int `json:"tcp_port"`
TopologyZone string `json:"topology_zone,omitempty"`
TopologyRegion string `json:"topology_region,omitempty"`
}

type statsRespType struct {
Expand Down Expand Up @@ -409,6 +411,8 @@ func (c *ClusterInfo) GetNSQDProducers(nsqdHTTPAddrs []string) (Producers, error
HTTPPort: infoResp.HTTPPort,
TCPPort: infoResp.TCPPort,
Topics: producerTopics,
TopologyZone: infoResp.TopologyZone,
TopologyRegion: infoResp.TopologyRegion,
})
}(addr)
}
Expand Down Expand Up @@ -437,6 +441,8 @@ func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string
Hostname string `json:"hostname"`
HTTPPort int `json:"http_port"`
TCPPort int `json:"tcp_port"`
TopologyZone string `json:"topology_zone,omitempty"`
TopologyRegion string `json:"topology_region,omitempty"`
}

type statsRespType struct {
Expand Down Expand Up @@ -508,6 +514,8 @@ func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string
HTTPPort: infoResp.HTTPPort,
TCPPort: infoResp.TCPPort,
Topics: producerTopics,
TopologyZone: infoResp.TopologyZone,
TopologyRegion: infoResp.TopologyRegion,
})
lock.Unlock()

Expand Down Expand Up @@ -582,6 +590,7 @@ func (c *ClusterInfo) GetNSQDStats(producers Producers,
topic.Node = addr
topic.Hostname = p.Hostname
topic.MemoryDepth = topic.Depth - topic.BackendDepth
topic.DeliveryMsgCount = topic.ZoneLocalMsgCount + topic.RegionLocalMsgCount + topic.GlobalMsgCount
if selectedTopic != "" && topic.TopicName != selectedTopic {
continue
}
Expand All @@ -592,6 +601,7 @@ func (c *ClusterInfo) GetNSQDStats(producers Producers,
channel.Hostname = p.Hostname
channel.TopicName = topic.TopicName
channel.MemoryDepth = channel.Depth - channel.BackendDepth
channel.DeliveryMsgCount = channel.ZoneLocalMsgCount + channel.RegionLocalMsgCount + channel.GlobalMsgCount
key := channel.ChannelName
if selectedTopic == "" {
key = fmt.Sprintf("%s:%s", topic.TopicName, channel.ChannelName)
Expand All @@ -607,6 +617,8 @@ func (c *ClusterInfo) GetNSQDStats(producers Producers,
}
for _, c := range channel.Clients {
c.Node = addr
c.NodeTopologyRegion = p.TopologyRegion
c.NodeTopologyZone = p.TopologyZone
}
channelStats.Add(channel)
}
Expand Down
147 changes: 101 additions & 46 deletions internal/clusterinfo/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Producer struct {
TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"`
Version string `json:"version"`
TopologyZone string `json:"topology_zone,omitempty"`
TopologyRegion string `json:"topology_region,omitempty"`
VersionObj semver.Version `json:"-"`
Topics ProducerTopics `json:"topics"`
OutOfDate bool `json:"out_of_date"`
Expand All @@ -46,6 +48,8 @@ func (p *Producer) UnmarshalJSON(b []byte) error {
Version string `json:"version"`
Topics []string `json:"topics"`
Tombstoned []bool `json:"tombstones"`
TopologyZone string `json:"topology_zone,omitempty"`
TopologyRegion string `json:"topology_region,omitempty"`
}
if err := json.Unmarshal(b, &r); err != nil {
return err
Expand All @@ -57,6 +61,8 @@ func (p *Producer) UnmarshalJSON(b []byte) error {
TCPPort: r.TCPPort,
HTTPPort: r.HTTPPort,
Version: r.Version,
TopologyZone: r.TopologyZone,
TopologyRegion: r.TopologyRegion,
}
for i, t := range r.Topics {
p.Topics = append(p.Topics, ProducerTopic{Topic: t, Tombstoned: r.Tombstoned[i]})
Expand Down Expand Up @@ -92,16 +98,20 @@ func (p *Producer) IsInconsistent(numLookupd int) bool {
}

type TopicStats struct {
Node string `json:"node"`
Hostname string `json:"hostname"`
TopicName string `json:"topic_name"`
Depth int64 `json:"depth"`
MemoryDepth int64 `json:"memory_depth"`
BackendDepth int64 `json:"backend_depth"`
MessageCount int64 `json:"message_count"`
NodeStats []*TopicStats `json:"nodes"`
Channels []*ChannelStats `json:"channels"`
Paused bool `json:"paused"`
Node string `json:"node"`
Hostname string `json:"hostname"`
TopicName string `json:"topic_name"`
Depth int64 `json:"depth"`
MemoryDepth int64 `json:"memory_depth"`
BackendDepth int64 `json:"backend_depth"`
MessageCount int64 `json:"message_count"`
DeliveryMsgCount int64 `json:"delivery_msg_count"`
ZoneLocalMsgCount int64 `json:"zone_local_msg_count,omitempty"`
RegionLocalMsgCount int64 `json:"region_local_msg_count,omitempty"`
GlobalMsgCount int64 `json:"global_msg_count,omitempty"`
NodeStats []*TopicStats `json:"nodes"`
Channels []*ChannelStats `json:"channels"`
Paused bool `json:"paused"`

E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
}
Expand All @@ -112,6 +122,10 @@ func (t *TopicStats) Add(a *TopicStats) {
t.MemoryDepth += a.MemoryDepth
t.BackendDepth += a.BackendDepth
t.MessageCount += a.MessageCount
t.DeliveryMsgCount += a.DeliveryMsgCount
t.ZoneLocalMsgCount += a.ZoneLocalMsgCount
t.RegionLocalMsgCount += a.RegionLocalMsgCount
t.GlobalMsgCount += a.GlobalMsgCount
if a.Paused {
t.Paused = a.Paused
}
Expand Down Expand Up @@ -139,23 +153,27 @@ func (t *TopicStats) Add(a *TopicStats) {
}

type ChannelStats struct {
Node string `json:"node"`
Hostname string `json:"hostname"`
TopicName string `json:"topic_name"`
ChannelName string `json:"channel_name"`
Depth int64 `json:"depth"`
MemoryDepth int64 `json:"memory_depth"`
BackendDepth int64 `json:"backend_depth"`
InFlightCount int64 `json:"in_flight_count"`
DeferredCount int64 `json:"deferred_count"`
RequeueCount int64 `json:"requeue_count"`
TimeoutCount int64 `json:"timeout_count"`
MessageCount int64 `json:"message_count"`
ClientCount int `json:"client_count"`
Selected bool `json:"-"`
NodeStats []*ChannelStats `json:"nodes"`
Clients []*ClientStats `json:"clients"`
Paused bool `json:"paused"`
Node string `json:"node"`
Hostname string `json:"hostname"`
TopicName string `json:"topic_name"`
ChannelName string `json:"channel_name"`
Depth int64 `json:"depth"`
MemoryDepth int64 `json:"memory_depth"`
BackendDepth int64 `json:"backend_depth"`
InFlightCount int64 `json:"in_flight_count"`
DeferredCount int64 `json:"deferred_count"`
RequeueCount int64 `json:"requeue_count"`
TimeoutCount int64 `json:"timeout_count"`
MessageCount int64 `json:"message_count"`
DeliveryMsgCount int64 `json:"delivery_msg_count,omitempty"`
ZoneLocalMsgCount int64 `json:"zone_local_msg_count,omitempty"`
RegionLocalMsgCount int64 `json:"region_local_msg_count,omitempty"`
GlobalMsgCount int64 `json:"global_msg_count,omitempty"`
ClientCount int `json:"client_count"`
Selected bool `json:"-"`
NodeStats []*ChannelStats `json:"nodes"`
Clients []*ClientStats `json:"clients"`
Paused bool `json:"paused"`

E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
}
Expand All @@ -170,6 +188,10 @@ func (c *ChannelStats) Add(a *ChannelStats) {
c.RequeueCount += a.RequeueCount
c.TimeoutCount += a.TimeoutCount
c.MessageCount += a.MessageCount
c.DeliveryMsgCount += a.DeliveryMsgCount
c.ZoneLocalMsgCount += a.ZoneLocalMsgCount
c.RegionLocalMsgCount += a.RegionLocalMsgCount
c.GlobalMsgCount += a.GlobalMsgCount
c.ClientCount += a.ClientCount
if a.Paused {
c.Paused = a.Paused
Expand All @@ -189,25 +211,29 @@ func (c *ChannelStats) Add(a *ChannelStats) {
}

type ClientStats struct {
Node string `json:"node"`
RemoteAddress string `json:"remote_address"`
Version string `json:"version"`
ClientID string `json:"client_id"`
Hostname string `json:"hostname"`
UserAgent string `json:"user_agent"`
ConnectTs int64 `json:"connect_ts"`
ConnectedDuration time.Duration `json:"connected"`
InFlightCount int `json:"in_flight_count"`
ReadyCount int `json:"ready_count"`
FinishCount int64 `json:"finish_count"`
RequeueCount int64 `json:"requeue_count"`
MessageCount int64 `json:"message_count"`
SampleRate int32 `json:"sample_rate"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
Authed bool `json:"authed"`
AuthIdentity string `json:"auth_identity"`
AuthIdentityURL string `json:"auth_identity_url"`
Node string `json:"node"`
RemoteAddress string `json:"remote_address"`
Version string `json:"version"`
ClientID string `json:"client_id"`
Hostname string `json:"hostname"`
UserAgent string `json:"user_agent"`
ConnectTs int64 `json:"connect_ts"`
ConnectedDuration time.Duration `json:"connected"`
InFlightCount int `json:"in_flight_count"`
ReadyCount int `json:"ready_count"`
FinishCount int64 `json:"finish_count"`
RequeueCount int64 `json:"requeue_count"`
MessageCount int64 `json:"message_count"`
SampleRate int32 `json:"sample_rate"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
Authed bool `json:"authed"`
AuthIdentity string `json:"auth_identity"`
AuthIdentityURL string `json:"auth_identity_url"`
NodeTopologyRegion string `json:"node_topology_region,omitempty"`
NodeTopologyZone string `json:"node_topology_zone,omitempty"`
TopologyRegion string `json:"topology_region,omitempty"`
TopologyZone string `json:"topology_zone,omitempty"`

TLS bool `json:"tls"`
CipherSuite string `json:"tls_cipher_suite"`
Expand Down Expand Up @@ -262,6 +288,35 @@ func (c ClientsByHost) Less(i, j int) bool {
return c.ClientStatsList[i].Hostname < c.ClientStatsList[j].Hostname
}

type ClientStatsByNodeTopology struct {
ClientStatsList
}

func (c ClientStatsByNodeTopology) Less(i, j int) bool {
// if its the same node, sort by topology
if c.ClientStatsList[i].Node == c.ClientStatsList[j].Node {
region := c.ClientStatsList[i].NodeTopologyRegion
zone := c.ClientStatsList[i].NodeTopologyZone

switch {
case c.ClientStatsList[i].TopologyRegion == region && c.ClientStatsList[i].TopologyZone == zone:
return true
case c.ClientStatsList[j].TopologyRegion == region && c.ClientStatsList[j].TopologyZone == zone:
return false
case c.ClientStatsList[i].TopologyRegion == region:
return true
case c.ClientStatsList[j].TopologyRegion == region:
return false
default:
if c.ClientStatsList[i].TopologyRegion == c.ClientStatsList[j].TopologyRegion {
return c.ClientStatsList[i].TopologyZone < c.ClientStatsList[j].TopologyZone
}
return c.ClientStatsList[i].TopologyRegion < c.ClientStatsList[j].TopologyRegion
}
}
return c.ClientStatsList[i].Node < c.ClientStatsList[j].Node
}

type TopicStatsList []*TopicStats

func (t TopicStatsList) Len() int { return len(t) }
Expand Down
3 changes: 3 additions & 0 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os"
"path"
"reflect"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -328,6 +329,8 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps
messages = append(messages, pe.Error())
}

sort.Sort(clusterinfo.ClientStatsByNodeTopology{channelStats[channelName].Clients})

return struct {
*clusterinfo.ChannelStats
Message string `json:"message"`
Expand Down
Loading

0 comments on commit 1cd6297

Please sign in to comment.