diff --git a/callback/callbacks.go b/callback/callbacks.go index 230d598..06dec40 100644 --- a/callback/callbacks.go +++ b/callback/callbacks.go @@ -33,7 +33,7 @@ func Acker(ack acknowledger) Callback { logger.Debugf("Unable to acknowledge message: %s", message) } metrics.AcknowledgedMessage(message, *msg.TopicPartition.Topic) - metrics.ConsumerLatency(time.Since(message.CreatedTime), *msg.TopicPartition.Topic) + metrics.ConsumerLatency(time.Since(message.CreatedTime)) } } } @@ -43,8 +43,8 @@ func MessageSent(msg *kafka.Message) { if err != nil { logger.Debugf("Unable to decode message during message sent callback") } else { - metrics.SentMessage(message, *msg.TopicPartition.Topic) - metrics.ProduceLatency(time.Since(message.CreatedTime), *msg.TopicPartition.Topic) + metrics.SentMessage(message) + metrics.ProduceLatency(time.Since(message.CreatedTime)) } } diff --git a/config/config.go b/config/config.go index 0f11b6e..90c10e0 100644 --- a/config/config.go +++ b/config/config.go @@ -13,6 +13,7 @@ type Application struct { Config Reporter Store + LibrdConfigs } type Config struct { @@ -41,6 +42,7 @@ type Producer struct { WorkerDelayMs int `split_words:"true" default:"50"` Acks int `default:"1"` Librdconfigs LibrdConfigs + ClusterName string `envconfig:"KAFKA_CLUSTER"` } type Consumer struct { diff --git a/config/loader.go b/config/loader.go index d3c7336..9bb2054 100644 --- a/config/loader.go +++ b/config/loader.go @@ -34,8 +34,8 @@ func Load() error { application.Consumer.ssl = consumerSslCfg application.Producer.ssl = producerSslCfg - application.Producer.Librdconfigs = librdConfigs - application.Consumer.LibrdConfigs = librdConfigs + application.Producer.Librdconfigs = application.Librdconfigs + application.Consumer.LibrdConfigs = application.Librdconfigs return nil } diff --git a/consumer/consumer.go b/consumer/consumer.go index d89daf0..5829b25 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -36,7 +36,7 @@ func (c *Consumer) Run(ctx context.Context) { for i, cons := range c.consumers { c.wg.Add(2) msgs := c.consumerWorker(ctx, cons, i) // goroutine producer - metrics.ConsumerCount(c.config.Topic) + metrics.ConsumerCount() go c.processor(msgs, i) } } diff --git a/main.go b/main.go index 0c14ff5..a9c836f 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "log" "os" "os/signal" + "strconv" "sync" "syscall" "time" @@ -146,7 +147,7 @@ func setup(appCfg config.Application) (*application, error) { ctx, cancel := context.WithTimeout(context.Background(), appCfg.RunDuration()) - reporter.Setup(ms, 10, appCfg.Reporter) + reporter.Setup(ms, 10, appCfg.Reporter, appCfg.Producer) app := &application{ msgStore: ms, @@ -159,7 +160,10 @@ func setup(appCfg config.Application) (*application, error) { traceCloser: closer, } if kafkaProducer != nil { - app.Handler = producer.NewHandler(kafkaProducer.Events(), &wg, ms, appCfg.Producer.Topic) + librdTags := reporter.LibrdTags{ClusterName: appCfg.Producer.ClusterName, + Ack: strconv.Itoa(appCfg.Librdconfigs.RequestRequiredAcks), + Topic: appCfg.Producer.Topic} + app.Handler = producer.NewHandler(kafkaProducer.Events(), &wg, ms, librdTags) } go app.registerSignalHandler() return app, nil diff --git a/producer/handler.go b/producer/handler.go index 567c028..3d65fa8 100644 --- a/producer/handler.go +++ b/producer/handler.go @@ -16,7 +16,6 @@ type Handler struct { events <-chan kafka.Event msgStore store.MsgStore librdStatsHandler reporter.LibrdKafkaStatsHandler - topic string } func (h *Handler) Handle() { @@ -26,10 +25,10 @@ func (h *Handler) Handle() { switch ev := e.(type) { case *kafka.Stats: - h.librdStatsHandler.HandleStats(e.String(), h.topic) + h.librdStatsHandler.HandleStats(e.String()) case *kafka.Message: - // TODO: fix this span not availabe in the message + // TODO: fix this span not available in the message // span := tracer.StartSpanFromMessage("kafqa.handler", ev) if ev.TopicPartition.Error != nil { logger.Debugf("Delivery failed: %v", ev.TopicPartition) @@ -52,6 +51,6 @@ func (h *Handler) Handle() { } } -func NewHandler(events <-chan kafka.Event, wg *sync.WaitGroup, msgStore store.MsgStore, topic string) *Handler { - return &Handler{events: events, wg: wg, msgStore: msgStore, librdStatsHandler: reporter.NewlibrdKafkaStat(), topic: topic} +func NewHandler(events <-chan kafka.Event, wg *sync.WaitGroup, msgStore store.MsgStore, librdTags reporter.LibrdTags) *Handler { + return &Handler{events: events, wg: wg, msgStore: msgStore, librdStatsHandler: reporter.NewlibrdKafkaStat(librdTags)} } diff --git a/producer/producer.go b/producer/producer.go index ca2507d..4c90be0 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -82,7 +82,7 @@ func (p Producer) runProducers(ctx context.Context) { for i := 0; i < p.config.Concurrency; i++ { logger.Debugf("running producer %d on brokers: %s for topic %s", i, p.config.KafkaBrokers, p.config.Topic) go p.ProduceWorker(ctx) - metrics.ProducerCount(p.config.Topic) + metrics.ProducerCount() p.wg.Add(1) } } @@ -158,7 +158,7 @@ func (p Producer) Poll(ctx context.Context) { select { case <-ticker.C: chanLength := len(p.kafkaProducer.ProduceChannel()) - metrics.ProducerChannelLength(chanLength, p.config.Topic) + metrics.ProducerChannelLength(chanLength) logger.Debugf("Producer channel length: %v", chanLength) case <-ctx.Done(): ticker.Stop() diff --git a/reporter/librdkafka.go b/reporter/librdkafka.go index 38e4452..c12b6af 100644 --- a/reporter/librdkafka.go +++ b/reporter/librdkafka.go @@ -1,31 +1,42 @@ package reporter +import "fmt" + type LibrdKafkaStatsHandler struct { counters map[string][]string gauges map[string][]string + tags []string +} + +type LibrdTags struct { + ClusterName string + Ack string + Topic string } -func GetStats(stats []string, statsJSON, level, statType, topic string) { +func GetStats(stats, tags []string, statsJSON, level, statType string) { switch level { case "top-level": - TopLevelStats(stats, statsJSON, statType, topic) + TopLevelStats(stats, tags, statsJSON, statType) case "brokers": - BrokersStats(stats, statsJSON, statType, topic) + BrokersStats(stats, tags, statsJSON, statType) } } -func (stats LibrdKafkaStatsHandler) HandleStats(statJSON, topic string) { +func (stats LibrdKafkaStatsHandler) HandleStats(statJSON string) { for k, counterStat := range stats.counters { - GetStats(counterStat, statJSON, k, "counter", topic) + GetStats(counterStat, stats.tags, statJSON, k, "counter") } for k, gaugeStat := range stats.gauges { - GetStats(gaugeStat, statJSON, k, "gauge", topic) + GetStats(gaugeStat, stats.tags, statJSON, k, "gauge") } } -func NewlibrdKafkaStat() LibrdKafkaStatsHandler { - return LibrdKafkaStatsHandler{defaultCounters(), defaultGauges()} +func NewlibrdKafkaStat(tags LibrdTags) LibrdKafkaStatsHandler { + librdtags := []string{fmt.Sprintf("topic:%s", tags.Topic), + fmt.Sprintf("ack:%s", tags.Ack), fmt.Sprintf("kafka_cluster:%s", tags.ClusterName)} + return LibrdKafkaStatsHandler{defaultCounters(), defaultGauges(), librdtags} } func defaultCounters() map[string][]string { diff --git a/reporter/librdstats.go b/reporter/librdstats.go index 219104b..3ba8ec7 100644 --- a/reporter/librdstats.go +++ b/reporter/librdstats.go @@ -18,7 +18,7 @@ func GetValueFromJq(parseStr, statJSON string) ([]byte, error) { return value, err } -func BrokersStats(stats []string, statJSON, statsType, topic string) { +func BrokersStats(stats, tags []string, statJSON, statsType string) { value, _ := GetValueFromJq(".brokers", statJSON) var brokerStat map[string]interface{} err := json.Unmarshal([]byte(string(value)), &brokerStat) @@ -28,7 +28,7 @@ func BrokersStats(stats []string, statJSON, statsType, topic string) { } for brokerName := range brokerStat { - tags := []string{fmt.Sprintf("topic:%s", topic), fmt.Sprintf("broker:%s", brokerName)} + tags = append(tags, fmt.Sprintf("broker:%s", brokerName)) for _, stat := range stats { metricName := fmt.Sprintf("librd.brokers.%s", stat) value, err := GetValueFromJq(fmt.Sprintf(".brokers.%s.%s", brokerName, stat), statJSON) @@ -49,8 +49,7 @@ func BrokersStats(stats []string, statJSON, statsType, topic string) { } } -func TopLevelStats(stats []string, statJSON, statsType, topic string) { - tags := []string{fmt.Sprintf("topic:%s", topic)} +func TopLevelStats(stats, tags []string, statJSON, statsType string) { for _, stat := range stats { metricName := fmt.Sprintf("librd.%s", stat) value, err := GetValueFromJq(fmt.Sprintf(".%s", stat), statJSON) diff --git a/reporter/metrics/prometheus.go b/reporter/metrics/prometheus.go index 5dcb1ae..eb66c50 100644 --- a/reporter/metrics/prometheus.go +++ b/reporter/metrics/prometheus.go @@ -2,6 +2,7 @@ package metrics import ( "net/http" + "strconv" "time" "github.com/gojekfarm/kafqa/config" @@ -12,7 +13,7 @@ import ( ) var ( - tags = []string{"topic", "pod_name", "deployment"} + tags = []string{"topic", "pod_name", "deployment", "kafka_cluster", "ack"} messagesSent = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "kafqa_messages", @@ -47,66 +48,82 @@ var ( ) type promClient struct { - enabled bool - port int - pod string - deployment string + enabled bool + port int +} + +type promTags struct { + topic string + podName string + deployment string + ack string + kafkaCluster string } var prom promClient +var promtags promTags func AcknowledgedMessage(msg creator.Message, topic string) { if prom.enabled { - messagesReceived.WithLabelValues(topic, prom.pod, prom.deployment).Inc() + messagesReceived.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment, + promtags.kafkaCluster, promtags.ack).Inc() } } -func SentMessage(msg creator.Message, topic string) { +func SentMessage(msg creator.Message) { if prom.enabled { - messagesSent.WithLabelValues(topic, prom.pod, prom.deployment).Inc() + messagesSent.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment, + promtags.kafkaCluster, promtags.ack).Inc() } } -func ConsumerLatency(dur time.Duration, topic string) { +func ConsumerLatency(dur time.Duration) { if prom.enabled { ms := dur / time.Millisecond - consumeLatency.WithLabelValues(topic, prom.pod, prom.deployment).Observe(float64(ms)) + consumeLatency.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment, + promtags.kafkaCluster, promtags.ack).Observe(float64(ms)) } } -func ProduceLatency(dur time.Duration, topic string) { +func ProduceLatency(dur time.Duration) { if prom.enabled { ms := dur / time.Millisecond - produceLatency.WithLabelValues(topic, prom.pod, prom.deployment).Observe(float64(ms)) + produceLatency.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment, + promtags.kafkaCluster, promtags.ack).Observe(float64(ms)) } } -func ProducerCount(topic string) { +func ProducerCount() { if prom.enabled { - producerCount.WithLabelValues(topic, prom.pod, prom.deployment).Inc() + producerCount.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment, + promtags.kafkaCluster, promtags.ack).Inc() } } -func ConsumerCount(topic string) { +func ConsumerCount() { if prom.enabled { - consumerCount.WithLabelValues(topic, prom.pod, prom.deployment).Inc() + consumerCount.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment, + promtags.kafkaCluster, promtags.ack).Inc() } } -func ProducerChannelLength(count int, topic string) { +func ProducerChannelLength(count int) { if prom.enabled { - producerChannelCount.WithLabelValues(topic, prom.pod, prom.deployment).Add(float64(count)) + producerChannelCount.WithLabelValues(promtags.topic, promtags.podName, promtags.deployment, + promtags.kafkaCluster, promtags.ack).Add(float64(count)) } } -func Setup(cfg config.Prometheus) { +func Setup(cfg config.Prometheus, producerCfg config.Producer) { defer func() { if err := recover(); err != nil { logger.Errorf("Error creating metrics: %v", err) } }() - prom = promClient{enabled: cfg.Enabled, port: cfg.Port, pod: cfg.PodName, deployment: cfg.Deployment} + promtags = promTags{topic: producerCfg.Topic, ack: strconv.Itoa(producerCfg.Acks), + kafkaCluster: producerCfg.ClusterName, podName: cfg.PodName, deployment: cfg.Deployment} + prom = promClient{enabled: cfg.Enabled, port: cfg.Port} if cfg.Enabled { prometheus.MustRegister(messagesSent) diff --git a/reporter/reporter.go b/reporter/reporter.go index 0e0d86b..1f21f7f 100644 --- a/reporter/reporter.go +++ b/reporter/reporter.go @@ -21,13 +21,13 @@ type reporter struct { var rep reporter -func Setup(sr storeReporter, maxNLatency int, cfg config.Reporter) { +func Setup(sr storeReporter, maxNLatency int, cfg config.Reporter, producerCfg config.Producer) { rep = reporter{ srep: sr, Latency: NewLatencyReporter(maxNLatency), start: time.Now(), } - metrics.Setup(cfg.Prometheus) + metrics.Setup(cfg.Prometheus, producerCfg) } func ConsumptionDelay(t time.Duration) {