Skip to content

Commit

Permalink
Add logs to CleanMetadataCache (#60)
Browse files Browse the repository at this point in the history
* Add logs to CleanMetadataCache

* Add timing metrics for sending the notifications
  • Loading branch information
miguelreiswildlife authored May 29, 2024
1 parent 4d6c50b commit 50e410d
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 12 deletions.
13 changes: 13 additions & 0 deletions e2e/apns_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (s *ApnsE2ETestSuite) TestSimpleNotification() {
return nil
})

statsdClientMock.EXPECT().
Timing("send_notification_latency", gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)

err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Expand Down Expand Up @@ -200,6 +204,10 @@ func (s *ApnsE2ETestSuite) TestNotificationRetry() {
done <- true
return nil
})
statsdClientMock.EXPECT().
Timing("send_notification_latency", gomock.Any(), gomock.Any(), gomock.Any()).
Times(2).
Return(nil)

err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Expand Down Expand Up @@ -273,6 +281,11 @@ func (s *ApnsE2ETestSuite) TestMultipleNotifications() {
return nil
})

statsdClientMock.EXPECT().
Timing("send_notification_latency", gomock.Any(), gomock.Any(), gomock.Any()).
Times(notificationsToSend).
Return(nil)

for i := 0; i < notificationsToSend; i++ {
err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Expand Down
9 changes: 9 additions & 0 deletions e2e/fcm_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ func (s *FcmE2ETestSuite) TestSimpleNotification() {
return nil
})

statsdClientMock.EXPECT().
Timing("send_notification_latency", gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)

err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Expand Down Expand Up @@ -177,6 +181,11 @@ func (s *FcmE2ETestSuite) TestMultipleNotifications() {
return nil
})

statsdClientMock.EXPECT().
Timing("send_notification_latency", gomock.Any(), gomock.Any(), gomock.Any()).
Times(notificationsToSend).
Return(nil)

for i := 0; i < notificationsToSend; i++ {
err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Expand Down
11 changes: 11 additions & 0 deletions extensions/apns_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ func (a *APNSMessageHandler) HandleResponses() {

// CleanMetadataCache clears expired requests from memory.
func (a *APNSMessageHandler) CleanMetadataCache() {
l := a.Logger.WithFields(log.Fields{
"method": "CleanMetadataCache",
"interval": a.CacheCleaningInterval,
})

var deviceToken string
var hasIndeed bool
for {
Expand All @@ -195,7 +200,11 @@ func (a *APNSMessageHandler) CleanMetadataCache() {
if a.pendingMessagesWG != nil {
a.pendingMessagesWG.Done()
}

l.WithField("deviceToken", deviceToken).
Info("deleting expired request from in-flight notifications map")
}

delete(a.InFlightNotificationsMap, deviceToken)
deviceToken, hasIndeed = a.requestsHeap.HasExpiredRequest()
}
Expand Down Expand Up @@ -291,6 +300,8 @@ func (a *APNSMessageHandler) sendNotification(notification *Notification) error
return err
}
l.WithField("notification", notification).Debug("adding notification to apns push queue")
before := time.Now()
defer statsReporterReportSendNotificationLatency(a.StatsReporters, time.Since(before), a.appName, "apns", "client", "apns")
a.PushQueue.Push(&apns2.Notification{
Topic: a.ApnsTopic,
DeviceToken: notification.DeviceToken,
Expand Down
7 changes: 7 additions & 0 deletions extensions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package extensions
import (
"encoding/json"
"regexp"
"time"

"github.com/topfreegames/pusher/errors"
"github.com/topfreegames/pusher/interfaces"
Expand Down Expand Up @@ -86,3 +87,9 @@ func statsReporterHandleNotificationFailure(
statsReporter.HandleNotificationFailure(game, platform, err)
}
}

func statsReporterReportSendNotificationLatency(statsReporters []interfaces.StatsReporter, latencyMs time.Duration, game string, platform string, labels ...string) {
for _, statsReporter := range statsReporters {
statsReporter.ReportSendNotificationLatency(latencyMs, game, platform, labels...)
}
}
34 changes: 25 additions & 9 deletions extensions/datadog_statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package extensions
import (
"fmt"
"os"
"time"

"github.com/DataDog/datadog-go/statsd"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -54,6 +55,8 @@ func NewStatsD(config *viper.Viper, logger *logrus.Logger, clientOrNil ...interf
return q, err
}

var _ interfaces.StatsReporter = &StatsD{}

func (s *StatsD) loadConfigurationDefaults() {
s.Config.SetDefault("stats.statsd.host", "localhost:8125")
s.Config.SetDefault("stats.statsd.prefix", "test")
Expand Down Expand Up @@ -89,17 +92,17 @@ func (s *StatsD) configure(client interfaces.StatsDClient) error {
return nil
}

//HandleNotificationSent stores notification count in StatsD
// HandleNotificationSent stores notification count in StatsD
func (s *StatsD) HandleNotificationSent(game string, platform string) {
s.Client.Incr("sent", []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1)
}

//HandleNotificationSuccess stores notifications success in StatsD
// HandleNotificationSuccess stores notifications success in StatsD
func (s *StatsD) HandleNotificationSuccess(game string, platform string) {
s.Client.Incr("ack", []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1)
}

//HandleNotificationFailure stores each type of failure
// HandleNotificationFailure stores each type of failure
func (s *StatsD) HandleNotificationFailure(game string, platform string, err *errors.PushError) {
s.Client.Incr(
"failed",
Expand All @@ -108,12 +111,12 @@ func (s *StatsD) HandleNotificationFailure(game string, platform string, err *er
)
}

//InitializeFailure notifu error when is impossible tho initilizer an app
// InitializeFailure notifu error when is impossible tho initilizer an app
func (s *StatsD) InitializeFailure(game string, platform string) {
s.Client.Incr("initialize_failure", []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1)
}

//ReportGoStats reports go stats in statsd
// ReportGoStats reports go stats in statsd
func (s *StatsD) ReportGoStats(
numGoRoutines int,
allocatedAndNotFreed, heapObjects, nextGCBytes, pauseGCNano uint64,
Expand All @@ -126,10 +129,17 @@ func (s *StatsD) ReportGoStats(
s.Client.Gauge("next_gc_bytes", float64(nextGCBytes), tags, 1)
}

//Cleanup closes statsd connection
func (s *StatsD) Cleanup() error {
s.Client.Close()
return nil
func (s *StatsD) ReportSendNotificationLatency(latencyMs time.Duration, game string, platform string, labels ...string) {
metricLabels := []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}
for i := 0; i < len(labels); i += 2 {
metricLabels = append(labels, fmt.Sprintf("%s:%s", labels[i], labels[i+1]))
}
s.Client.Timing(
"send_notification_latency",
latencyMs,
metricLabels,
1,
)
}

// ReportMetricGauge reports a metric as a Gauge with hostname, game and platform
Expand Down Expand Up @@ -175,3 +185,9 @@ func (s *StatsD) ReportMetricCount(

s.Client.Count(metric, value, tags, 1)
}

// Cleanup closes statsd connection
func (s *StatsD) Cleanup() error {
s.Client.Close()
return nil
}
4 changes: 4 additions & 0 deletions extensions/gcm_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,11 @@ func (g *GCMMessageHandler) sendMessage(message interfaces.KafkaMessage) error {

g.pendingMessages <- true
xmppMessage := toGCMMessage(km.Message)

before := time.Now()
messageID, bytes, err = g.GCMClient.SendXMPP(xmppMessage)
elapsed := time.Since(before)
statsReporterReportSendNotificationLatency(g.StatsReporters, elapsed, g.game, "gcm", "client", "gcm")

if err != nil {
<-g.pendingMessages
Expand Down
10 changes: 9 additions & 1 deletion extensions/handler/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type messageHandler struct {
statsMutex sync.Mutex
feedbackReporters []interfaces.FeedbackReporter
statsReporters []interfaces.StatsReporter
statsDClient extensions.StatsD
sendPushConcurrencyControl chan interface{}
responsesChannel chan struct {
msg interfaces.Message
Expand Down Expand Up @@ -98,7 +99,8 @@ func (h *messageHandler) HandleMessages(ctx context.Context, msg interfaces.Kafk
}
}
}

before := time.Now()
defer h.reportLatency(time.Since(before))
h.sendPush(ctx, km.Message)
}

Expand Down Expand Up @@ -220,6 +222,12 @@ func (h *messageHandler) handleNotificationFailure(err error) {
h.statsMutex.Unlock()
}

func (h *messageHandler) reportLatency(latency time.Duration) {
for _, statsReporter := range h.statsReporters {
statsReporter.ReportSendNotificationLatency(latency, h.app, "gcm", "client", "fcm")
}
}

func translateToPushError(err error) *pushErrors.PushError {
if pusherError, ok := err.(*pushErrors.PushError); ok {
return pusherError
Expand Down
1 change: 1 addition & 0 deletions feedback/invalid_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (i *InvalidTokenHandler) processMessages() {
l.Info("stop processing Invalid Token Handler's in channel")
return
}
time.Sleep(100 * time.Millisecond)
}

}
Expand Down
2 changes: 1 addition & 1 deletion feedback/invalid_token_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ var _ = Describe("InvalidToken Handler", func() {
inChan <- t
}

time.Sleep(200 * time.Millisecond)
time.Sleep(time.Second)
handler.Stop()
time.Sleep(500 * time.Millisecond)

Expand Down
6 changes: 5 additions & 1 deletion interfaces/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@

package interfaces

import "github.com/topfreegames/pusher/errors"
import (
"github.com/topfreegames/pusher/errors"
"time"
)

// StatsReporter interface for making stats reporters pluggable easily.
type StatsReporter interface {
Expand All @@ -33,4 +36,5 @@ type StatsReporter interface {
ReportGoStats(numGoRoutines int, allocatedAndNotFreed, heapObjects, nextGCBytes, pauseGCNano uint64)
ReportMetricGauge(metric string, value float64, game string, platform string)
ReportMetricCount(metric string, value int64, game string, platform string)
ReportSendNotificationLatency(latencyMs time.Duration, game string, platform string, labels ...string)
}

0 comments on commit 50e410d

Please sign in to comment.