Skip to content

Commit

Permalink
fix: onConnStats is processed in an open coroutine manner to avoid bl…
Browse files Browse the repository at this point in the history
…ocking the readLoop
  • Loading branch information
侯尧 committed Aug 1, 2024
1 parent 9733b91 commit cfb82c5
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,13 +760,18 @@ func (r *Consumer) onConnResponse(c *Conn, data []byte) {

func (r *Consumer) onConnStats(c *Conn, data []byte) {
var channelStats *ChannelStats
_ = json.Unmarshal(data, &channelStats)
ctx, cancel := context.WithTimeout(context.Background(), r.channelStatsTimeout)
defer cancel()
select {
case <-ctx.Done():
case r.channelStatsMapChan <- map[string]*ChannelStats{c.String(): channelStats}:
if err := json.Unmarshal(data, &channelStats); err != nil {
r.log(LogLevelError, "(%s) failed to unmarshal channel stats response - %s", c.String(), err)
return
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), r.channelStatsTimeout)
defer cancel()
select {
case <-ctx.Done():
case r.channelStatsMapChan <- map[string]*ChannelStats{c.String(): channelStats}:
}
}()
}

func (r *Consumer) onConnError(c *Conn, data []byte) {}
Expand Down

0 comments on commit cfb82c5

Please sign in to comment.