diff --git a/consumer_test.go b/consumer_test.go index 945f5c0c..7363ef09 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -70,6 +70,21 @@ func SendMessage(t *testing.T, port int, topic string, method string, body []byt resp.Body.Close() } +func SendDeferMessage(t *testing.T, port int, topic string, ds time.Duration, body []byte) { + httpclient := &http.Client{} + endpoint := fmt.Sprintf("http://127.0.0.1:%d/pub?topic=%s&defer=%d", port, topic, ds/time.Millisecond) + req, _ := http.NewRequest("POST", endpoint, bytes.NewBuffer(body)) + resp, err := httpclient.Do(req) + if err != nil { + t.Fatalf(err.Error()) + return + } + if resp.StatusCode != 200 { + t.Fatalf("status code: %d", resp.StatusCode) + } + resp.Body.Close() +} + func TestConsumer(t *testing.T) { consumerTest(t, nil) } @@ -258,3 +273,39 @@ func consumerTest(t *testing.T, cb func(c *Config)) { t.Fatal("failed message not done") } } + +func TestChannelStats(t *testing.T) { + config := NewConfig() + laddr := "127.0.0.1" + // so that the test can simulate binding consumer to specified address + config.LocalAddr, _ = net.ResolveTCPAddr("tcp", laddr+":0") + // so that the test can simulate reaching max requeues and a call to LogFailedMessage + config.DefaultRequeueDelay = 0 + // so that the test wont timeout from backing off + config.MaxBackoffDuration = time.Millisecond * 50 + topicName := "channel_stats_test" + q, _ := NewConsumer(topicName, "ch", config) + + h := &MyTestHandler{ + t: t, + q: q, + } + q.AddHandler(h) + + // SendMessage(t, 4151, topicName, "mpub", []byte("{\"msg\":\"double\"}\n{\"msg\":\"double\"}")) + // SendDeferMessage(t, 4151, topicName, time.Minute, []byte(`{"msg":"single"}`)) + // time.Sleep(time.Second) + + addr := "127.0.0.1:4150" + err := q.ConnectToNSQD(addr) + if err != nil { + t.Fatal(err) + } + + m, err := q.ChannelStats(time.Second) + if err != nil { + t.Fatal(err) + } + b, _ := json.Marshal(m) + fmt.Println(string(b)) +}