Skip to content

Commit

Permalink
Changes to librd and prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
goyalbhumika committed Sep 19, 2019
1 parent 3425a04 commit 3acd7e8
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 49 deletions.
6 changes: 3 additions & 3 deletions callback/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func Acker(ack acknowledger) Callback {
logger.Debugf("Unable to acknowledge message: %s", message)
}
metrics.AcknowledgedMessage(message, *msg.TopicPartition.Topic)
metrics.ConsumerLatency(time.Since(message.CreatedTime), *msg.TopicPartition.Topic)
metrics.ConsumerLatency(time.Since(message.CreatedTime))
}
}
}
Expand All @@ -43,8 +43,8 @@ func MessageSent(msg *kafka.Message) {
if err != nil {
logger.Debugf("Unable to decode message during message sent callback")
} else {
metrics.SentMessage(message, *msg.TopicPartition.Topic)
metrics.ProduceLatency(time.Since(message.CreatedTime), *msg.TopicPartition.Topic)
metrics.SentMessage(message)
metrics.ProduceLatency(time.Since(message.CreatedTime))
}
}

Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Application struct {
Config
Reporter
Store
LibrdConfigs
}

type Config struct {
Expand Down Expand Up @@ -41,6 +42,7 @@ type Producer struct {
WorkerDelayMs int `split_words:"true" default:"50"`
Acks int `default:"1"`
Librdconfigs LibrdConfigs
ClusterName string `envconfig:"KAFKA_CLUSTER"`
}

type Consumer struct {
Expand Down
4 changes: 2 additions & 2 deletions config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func Load() error {

application.Consumer.ssl = consumerSslCfg
application.Producer.ssl = producerSslCfg
application.Producer.Librdconfigs = librdConfigs
application.Consumer.LibrdConfigs = librdConfigs
application.Producer.Librdconfigs = application.Librdconfigs
application.Consumer.LibrdConfigs = application.Librdconfigs
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (c *Consumer) Run(ctx context.Context) {
for i, cons := range c.consumers {
c.wg.Add(2)
msgs := c.consumerWorker(ctx, cons, i) // goroutine producer
metrics.ConsumerCount(c.config.Topic)
metrics.ConsumerCount()
go c.processor(msgs, i)
}
}
Expand Down
8 changes: 6 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -146,7 +147,7 @@ func setup(appCfg config.Application) (*application, error) {

ctx, cancel := context.WithTimeout(context.Background(), appCfg.RunDuration())

reporter.Setup(ms, 10, appCfg.Reporter)
reporter.Setup(ms, 10, appCfg.Reporter, appCfg.Producer)

app := &application{
msgStore: ms,
Expand All @@ -159,7 +160,10 @@ func setup(appCfg config.Application) (*application, error) {
traceCloser: closer,
}
if kafkaProducer != nil {
app.Handler = producer.NewHandler(kafkaProducer.Events(), &wg, ms, appCfg.Producer.Topic)
librdTags := reporter.LibrdTags{ClusterName: appCfg.Producer.ClusterName,
Ack: strconv.Itoa(appCfg.Librdconfigs.RequestRequiredAcks),
Topic: appCfg.Producer.Topic}
app.Handler = producer.NewHandler(kafkaProducer.Events(), &wg, ms, librdTags)
}
go app.registerSignalHandler()
return app, nil
Expand Down
9 changes: 4 additions & 5 deletions producer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type Handler struct {
events <-chan kafka.Event
msgStore store.MsgStore
librdStatsHandler reporter.LibrdKafkaStatsHandler
topic string
}

func (h *Handler) Handle() {
Expand All @@ -26,10 +25,10 @@ func (h *Handler) Handle() {
switch ev := e.(type) {

case *kafka.Stats:
h.librdStatsHandler.HandleStats(e.String(), h.topic)
h.librdStatsHandler.HandleStats(e.String())

case *kafka.Message:
// TODO: fix this span not availabe in the message
// TODO: fix this span not available in the message
// span := tracer.StartSpanFromMessage("kafqa.handler", ev)
if ev.TopicPartition.Error != nil {
logger.Debugf("Delivery failed: %v", ev.TopicPartition)
Expand All @@ -52,6 +51,6 @@ func (h *Handler) Handle() {
}
}

func NewHandler(events <-chan kafka.Event, wg *sync.WaitGroup, msgStore store.MsgStore, topic string) *Handler {
return &Handler{events: events, wg: wg, msgStore: msgStore, librdStatsHandler: reporter.NewlibrdKafkaStat(), topic: topic}
func NewHandler(events <-chan kafka.Event, wg *sync.WaitGroup, msgStore store.MsgStore, librdTags reporter.LibrdTags) *Handler {
return &Handler{events: events, wg: wg, msgStore: msgStore, librdStatsHandler: reporter.NewlibrdKafkaStat(librdTags)}
}
4 changes: 2 additions & 2 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (p Producer) runProducers(ctx context.Context) {
for i := 0; i < p.config.Concurrency; i++ {
logger.Debugf("running producer %d on brokers: %s for topic %s", i, p.config.KafkaBrokers, p.config.Topic)
go p.ProduceWorker(ctx)
metrics.ProducerCount(p.config.Topic)
metrics.ProducerCount()
p.wg.Add(1)
}
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func (p Producer) Poll(ctx context.Context) {
select {
case <-ticker.C:
chanLength := len(p.kafkaProducer.ProduceChannel())
metrics.ProducerChannelLength(chanLength, p.config.Topic)
metrics.ProducerChannelLength(chanLength)
logger.Debugf("Producer channel length: %v", chanLength)
case <-ctx.Done():
ticker.Stop()
Expand Down
27 changes: 19 additions & 8 deletions reporter/librdkafka.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,42 @@
package reporter

import "fmt"

type LibrdKafkaStatsHandler struct {
counters map[string][]string
gauges map[string][]string
tags []string
}

type LibrdTags struct {
ClusterName string
Ack string
Topic string
}

func GetStats(stats []string, statsJSON, level, statType, topic string) {
func GetStats(stats, tags []string, statsJSON, level, statType string) {
switch level {
case "top-level":
TopLevelStats(stats, statsJSON, statType, topic)
TopLevelStats(stats, tags, statsJSON, statType)
case "brokers":
BrokersStats(stats, statsJSON, statType, topic)
BrokersStats(stats, tags, statsJSON, statType)
}
}

func (stats LibrdKafkaStatsHandler) HandleStats(statJSON, topic string) {
func (stats LibrdKafkaStatsHandler) HandleStats(statJSON string) {
for k, counterStat := range stats.counters {
GetStats(counterStat, statJSON, k, "counter", topic)
GetStats(counterStat, stats.tags, statJSON, k, "counter")
}

for k, gaugeStat := range stats.gauges {
GetStats(gaugeStat, statJSON, k, "gauge", topic)
GetStats(gaugeStat, stats.tags, statJSON, k, "gauge")
}
}

func NewlibrdKafkaStat() LibrdKafkaStatsHandler {
return LibrdKafkaStatsHandler{defaultCounters(), defaultGauges()}
func NewlibrdKafkaStat(tags LibrdTags) LibrdKafkaStatsHandler {
librdtags := []string{fmt.Sprintf("topic:%s", tags.Topic),
fmt.Sprintf("ack:%s", tags.Ack), fmt.Sprintf("kafka_cluster:%s", tags.ClusterName)}
return LibrdKafkaStatsHandler{defaultCounters(), defaultGauges(), librdtags}
}

func defaultCounters() map[string][]string {
Expand Down
7 changes: 3 additions & 4 deletions reporter/librdstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func GetValueFromJq(parseStr, statJSON string) ([]byte, error) {
return value, err
}

func BrokersStats(stats []string, statJSON, statsType, topic string) {
func BrokersStats(stats, tags []string, statJSON, statsType string) {
value, _ := GetValueFromJq(".brokers", statJSON)
var brokerStat map[string]interface{}
err := json.Unmarshal([]byte(string(value)), &brokerStat)
Expand All @@ -28,7 +28,7 @@ func BrokersStats(stats []string, statJSON, statsType, topic string) {
}

for brokerName := range brokerStat {
tags := []string{fmt.Sprintf("topic:%s", topic), fmt.Sprintf("broker:%s", brokerName)}
tags = append(tags, fmt.Sprintf("broker:%s", brokerName))
for _, stat := range stats {
metricName := fmt.Sprintf("librd.brokers.%s", stat)
value, err := GetValueFromJq(fmt.Sprintf(".brokers.%s.%s", brokerName, stat), statJSON)
Expand All @@ -49,8 +49,7 @@ func BrokersStats(stats []string, statJSON, statsType, topic string) {
}
}

func TopLevelStats(stats []string, statJSON, statsType, topic string) {
tags := []string{fmt.Sprintf("topic:%s", topic)}
func TopLevelStats(stats, tags []string, statJSON, statsType string) {
for _, stat := range stats {
metricName := fmt.Sprintf("librd.%s", stat)
value, err := GetValueFromJq(fmt.Sprintf(".%s", stat), statJSON)
Expand Down
57 changes: 37 additions & 20 deletions reporter/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"net/http"
"strconv"
"time"

"github.com/gojekfarm/kafqa/config"
Expand All @@ -12,7 +13,7 @@ import (
)

var (
tags = []string{"topic", "pod_name", "deployment"}
tags = []string{"topic", "pod_name", "deployment", "kafka_cluster", "ack"}

messagesSent = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "kafqa_messages",
Expand Down Expand Up @@ -47,66 +48,82 @@ var (
)

type promClient struct {
enabled bool
port int
pod string
deployment string
enabled bool
port int
}

type promTags struct {
topic string
podName string
deployment string
ack string
kafkaCluster string
}

var prom promClient
var promtags promTags

func AcknowledgedMessage(msg creator.Message, topic string) {
if prom.enabled {
messagesReceived.WithLabelValues(topic, prom.pod, prom.deployment).Inc()
messagesReceived.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment,
promtags.kafkaCluster, promtags.ack).Inc()
}
}

func SentMessage(msg creator.Message, topic string) {
func SentMessage(msg creator.Message) {
if prom.enabled {
messagesSent.WithLabelValues(topic, prom.pod, prom.deployment).Inc()
messagesSent.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment,
promtags.kafkaCluster, promtags.ack).Inc()
}
}

func ConsumerLatency(dur time.Duration, topic string) {
func ConsumerLatency(dur time.Duration) {
if prom.enabled {
ms := dur / time.Millisecond
consumeLatency.WithLabelValues(topic, prom.pod, prom.deployment).Observe(float64(ms))
consumeLatency.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment,
promtags.kafkaCluster, promtags.ack).Observe(float64(ms))
}
}

func ProduceLatency(dur time.Duration, topic string) {
func ProduceLatency(dur time.Duration) {
if prom.enabled {
ms := dur / time.Millisecond
produceLatency.WithLabelValues(topic, prom.pod, prom.deployment).Observe(float64(ms))
produceLatency.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment,
promtags.kafkaCluster, promtags.ack).Observe(float64(ms))
}
}

func ProducerCount(topic string) {
func ProducerCount() {
if prom.enabled {
producerCount.WithLabelValues(topic, prom.pod, prom.deployment).Inc()
producerCount.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment,
promtags.kafkaCluster, promtags.ack).Inc()
}
}

func ConsumerCount(topic string) {
func ConsumerCount() {
if prom.enabled {
consumerCount.WithLabelValues(topic, prom.pod, prom.deployment).Inc()
consumerCount.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment,
promtags.kafkaCluster, promtags.ack).Inc()
}
}

func ProducerChannelLength(count int, topic string) {
func ProducerChannelLength(count int) {
if prom.enabled {
producerChannelCount.WithLabelValues(topic, prom.pod, prom.deployment).Add(float64(count))
producerChannelCount.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment,
promtags.kafkaCluster, promtags.ack).Add(float64(count))
}
}

func Setup(cfg config.Prometheus) {
func Setup(cfg config.Prometheus, producerCfg config.Producer) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("Error creating metrics: %v", err)
}
}()

prom = promClient{enabled: cfg.Enabled, port: cfg.Port, pod: cfg.PodName, deployment: cfg.Deployment}
promtags = promTags{topic: producerCfg.Topic, ack: strconv.Itoa(producerCfg.Acks),
kafkaCluster: producerCfg.ClusterName, podName: cfg.PodName, deployment: cfg.Deployment}
prom = promClient{enabled: cfg.Enabled, port: cfg.Port}
if cfg.Enabled {

prometheus.MustRegister(messagesSent)
Expand Down
4 changes: 2 additions & 2 deletions reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ type reporter struct {

var rep reporter

func Setup(sr storeReporter, maxNLatency int, cfg config.Reporter) {
func Setup(sr storeReporter, maxNLatency int, cfg config.Reporter, producerCfg config.Producer) {
rep = reporter{
srep: sr,
Latency: NewLatencyReporter(maxNLatency),
start: time.Now(),
}
metrics.Setup(cfg.Prometheus)
metrics.Setup(cfg.Prometheus, producerCfg)
}

func ConsumptionDelay(t time.Duration) {
Expand Down

0 comments on commit 3acd7e8

Please sign in to comment.