Skip to content

Commit

Permalink
chore: add pr suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
gussf committed Jun 13, 2024
1 parent 5065f28 commit f1a7d5e
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 30 deletions.
7 changes: 4 additions & 3 deletions config/default.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
gracefulShutdownTimeout: 30
apns:
rateLimit.rpm: 100
concurrentWorkers: 300
connectionPoolSize: 1
pushQueueSize: 100
Expand All @@ -14,6 +15,7 @@ apns:
teamID: "ABC123DEFG"
topic: "com.game.test"
gcm:
rateLimit.rpm: 100
pingInterval: 30
pingTimeout: 10
maxPendingMessages: 100
Expand Down Expand Up @@ -94,10 +96,9 @@ feedbackListeners:
database: push
connectionTimeout: 100
rateLimiter:
limit:
rpm: 100
redis:
host: "localhost"
port: 6379
password: ""
test: false
tls:
disabled: false
7 changes: 4 additions & 3 deletions config/docker_test.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
gracefulShutdownTimeout: 10
apns:
rateLimit.rpm: 100
concurrentWorkers: 300
connectionPoolSize: 1
logStatsInterval: 750
Expand All @@ -14,6 +15,7 @@ apns:
responsechannelsize: 100
connectionpoolsize: 10
gcm:
rateLimit.rpm: 100
pingInterval: 30
pingTimeout: 10
maxPendingMessages: 3
Expand Down Expand Up @@ -94,10 +96,9 @@ feedbackListeners:
database: push
connectionTimeout: 100
rateLimiter:
limit:
rpm: 100
redis:
host: "redis"
port: 6379
password: ""
test: true
tls:
disabled: true
7 changes: 4 additions & 3 deletions config/test.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
gracefulShutdownTimeout: 10
apns:
rateLimit.rpm: 100
concurrentWorkers: 300
connectionPoolSize: 1
logStatsInterval: 750
Expand All @@ -12,6 +13,7 @@ apns:
teamID: "ABC123DEFG"
topic: "com.game.test"
gcm:
rateLimit.rpm: 100
pingInterval: 30
pingTimeout: 10
maxPendingMessages: 3
Expand Down Expand Up @@ -92,10 +94,9 @@ feedbackListeners:
database: push
connectionTimeout: 100
rateLimiter:
limit:
rpm: 100
redis:
host: "localhost"
port: 6379
password: ""
test: true
tls:
disabled: true
3 changes: 2 additions & 1 deletion e2e/fcm_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func (s *FcmE2ETestSuite) setupFcmPusher(appName string) (*firebaseMock.MockPush
statsReport, err := extensions.NewStatsD(s.vConfig, logger, statsdClientMock)
s.Require().NoError(err)

rateLimiter := extensions.NewRateLimiter(s.vConfig, logger)
limit := s.vConfig.GetInt("gcm.rateLimit.rpm")
rateLimiter := extensions.NewRateLimiter(limit, s.vConfig, []interfaces.StatsReporter{statsReport}, logger)

pushClient := firebaseMock.NewMockPushClient(ctrl)
gcmPusher.MessageHandler = map[string]interfaces.MessageHandler{
Expand Down
2 changes: 1 addition & 1 deletion extensions/apns_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (a *APNSMessageHandler) HandleMessages(ctx context.Context, message interfa
allowed := a.rateLimiter.Allow(ctx, notification.DeviceToken)
if !allowed {
statsReporterNotificationRateLimitReached(a.StatsReporters, a.appName, "apns")
l.Warn("rate limit reached")
l.WithField("message", message).Warn("rate limit reached")
return
}

Expand Down
6 changes: 6 additions & 0 deletions extensions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func statsReporterNotificationRateLimitReached(statsReporters []interfaces.Stats
}
}

func statsReporterNotificationRateLimitFailed(statsReporters []interfaces.StatsReporter) {
for _, statsReporter := range statsReporters {
statsReporter.NotificationRateLimitFailed()
}
}

func statsReporterReportSendNotificationLatency(statsReporters []interfaces.StatsReporter, latencyMs time.Duration, game string, platform string, labels ...string) {
for _, statsReporter := range statsReporters {
statsReporter.ReportSendNotificationLatency(latencyMs, game, platform, labels...)
Expand Down
9 changes: 9 additions & 0 deletions extensions/datadog_statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ func (s *StatsD) NotificationRateLimitReached(game string, platform string) {
)
}

// NotificationRateLimitFailed stores how many times rate limits failed to be calculated
func (s *StatsD) NotificationRateLimitFailed() {
s.Client.Incr(
"rate_limit_failed",
[]string{},
1,
)
}

// InitializeFailure notifu error when is impossible tho initilizer an app
func (s *StatsD) InitializeFailure(game string, platform string) {
s.Client.Incr("initialize_failure", []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1)
Expand Down
2 changes: 1 addition & 1 deletion extensions/gcm_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (g *GCMMessageHandler) sendMessage(message interfaces.KafkaMessage) error {
allowed := g.rateLimiter.Allow(context.Background(), km.To)
if !allowed {
statsReporterNotificationRateLimitReached(g.StatsReporters, message.Game, "gcm")
l.Warn("rate limit reached")
l.WithField("message", message).Warn("rate limit reached")
return errors.New("rate limit reached")
}
l.Debug("sending message to gcm")
Expand Down
2 changes: 1 addition & 1 deletion extensions/handler/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (h *messageHandler) HandleMessages(ctx context.Context, msg interfaces.Kafk
allowed := h.rateLimiter.Allow(ctx, km.To)
if !allowed {
h.reportRateLimitReached(msg.Game)
l.Warn("rate limit reached")
l.WithField("message", msg).Warn("rate limit reached")
return
}

Expand Down
29 changes: 18 additions & 11 deletions extensions/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,40 @@ import (
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/interfaces"
)

type rateLimiter struct {
redis *redis.Client
rpmLimit int
l *logrus.Entry
redis *redis.Client
rpmLimit int
statsReporters []interfaces.StatsReporter
l *logrus.Entry
}

func NewRateLimiter(config *viper.Viper, logger *logrus.Logger) rateLimiter {
func NewRateLimiter(limit int, config *viper.Viper, statsReporters []interfaces.StatsReporter, logger *logrus.Logger) rateLimiter {
host := config.GetString("rateLimiter.redis.host")
port := config.GetInt("rateLimiter.redis.port")
pwd := config.GetString("rateLimiter.redis.password")
limit := config.GetInt("rateLimiter.limit.rpm")
isTest := config.GetBool("rateLimiter.test")
disableTLS := config.GetBool("rateLimiter.tls.disabled")

addr := fmt.Sprintf("%s:%d", host, port)
opts := &redis.Options{
Addr: addr,
Password: pwd,
}
// Setting TLSConfig only for production due to not being able to enable TLS in the integration test container.
if !isTest {

// TLS for integration tests running in containers can raise connection errors.
// Not recommended to disable TLS for production.
if !disableTLS {
opts.TLSConfig = &tls.Config{}
}

rdb := redis.NewClient(opts)

return rateLimiter{
redis: rdb,
rpmLimit: limit,
redis: rdb,
rpmLimit: limit,
statsReporters: statsReporters,
l: logger.WithFields(logrus.Fields{
"extension": "RateLimiter",
"rpmLimit": limit,
Expand All @@ -62,6 +66,7 @@ func (r rateLimiter) Allow(ctx context.Context, device string) bool {
if err != nil && !errors.Is(err, redis.Nil) {
// Something went wrong, return true to avoid blocking notifications.
l.WithError(err).Error("could not get current rate in redis")
statsReporterNotificationRateLimitFailed(r.statsReporters)
return true
}
if errors.Is(err, redis.Nil) {
Expand All @@ -73,6 +78,7 @@ func (r rateLimiter) Allow(ctx context.Context, device string) bool {
if err != nil {
// Something went wrong, return true to avoid blocking notifications.
l.WithError(err).Error("current rate is invalid")
statsReporterNotificationRateLimitFailed(r.statsReporters)
return true
}

Expand All @@ -82,12 +88,13 @@ func (r rateLimiter) Allow(ctx context.Context, device string) bool {

_, err = r.redis.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Incr(ctx, deviceKey)
pipe.Expire(ctx, deviceKey, 1*time.Minute)
pipe.Expire(ctx, deviceKey, time.Minute)
return nil
})
if err != nil {
// Allow the operation even if the transaction fails, to avoid blocking notifications.
l.WithError(err).Error("increment to current rate failed")
statsReporterNotificationRateLimitFailed(r.statsReporters)
}

l.WithField("currentRate", current).Debug("current rate allows message")
Expand Down
20 changes: 17 additions & 3 deletions extensions/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/topfreegames/pusher/interfaces"
"github.com/topfreegames/pusher/util"

"github.com/google/uuid"
Expand All @@ -27,10 +28,11 @@ var _ = FDescribe("Rate Limiter", func() {
Expect(err).NotTo(HaveOccurred())
hook.Reset()

statsClients := []interfaces.StatsReporter{}

Describe("Rate limiting", func() {
It("should return not-allowed when rate limit is reached", func() {
rl := NewRateLimiter(config, logger)
rl.rpmLimit = 1
rl := NewRateLimiter(1, config, statsClients, logger)
ctx := context.Background()
device := uuid.NewString()
allowed := rl.Allow(ctx, device)
Expand All @@ -42,7 +44,7 @@ var _ = FDescribe("Rate Limiter", func() {
})

It("should increment current rate if limit is not reached", func() {
rl := NewRateLimiter(config, logger)
rl := NewRateLimiter(10, config, statsClients, logger)
ctx := context.Background()
device := uuid.NewString()
currMin := time.Now().Minute()
Expand All @@ -56,6 +58,18 @@ var _ = FDescribe("Rate Limiter", func() {
Expect(actual).To(BeEquivalentTo("1"))
})

It("should return allowed if redis fails", func() {
wrongConfig, err := util.NewViperWithConfigFile(configFile)
Expect(err).NotTo(HaveOccurred())
wrongConfig.Set("rateLimiter.redis.host", "unreachable")
rl := NewRateLimiter(10, wrongConfig, statsClients, logger)
ctx := context.Background()
device := uuid.NewString()

allowed := rl.Allow(ctx, device)
Expect(allowed).To(BeTrue())
})

})
})
})
1 change: 1 addition & 0 deletions interfaces/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type StatsReporter interface {
ReportMetricGauge(metric string, value float64, game string, platform string)
ReportMetricCount(metric string, value int64, game string, platform string)
NotificationRateLimitReached(game string, platform string)
NotificationRateLimitFailed()
ReportSendNotificationLatency(latencyMs time.Duration, game string, platform string, labels ...string)
ReportFirebaseLatency(latencyMs time.Duration, game string, labels ...string)
}
3 changes: 2 additions & 1 deletion pusher/apns.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (a *APNSPusher) configure(queue interfaces.APNSPushQueue, db interfaces.DB,
keyID := a.ViperConfig.GetString("apns.certs." + k + ".keyID")
teamID := a.ViperConfig.GetString("apns.certs." + k + ".teamID")
topic := a.ViperConfig.GetString("apns.certs." + k + ".topic")
rateLimit := a.ViperConfig.GetInt("apns.rateLimit.rpm")

l.WithFields(logrus.Fields{
"app": k,
Expand All @@ -120,7 +121,7 @@ func (a *APNSPusher) configure(queue interfaces.APNSPushQueue, db interfaces.DB,
a.feedbackReporters,
queue,
interfaces.ConsumptionManager(q),
extensions.NewRateLimiter(a.ViperConfig, l.Logger),
extensions.NewRateLimiter(rateLimit, a.ViperConfig, a.StatsReporters, l.Logger),
)
if err == nil {
a.MessageHandler[k] = handler
Expand Down
6 changes: 4 additions & 2 deletions pusher/gcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func (g *GCMPusher) createMessageHandlerForApps(ctx context.Context) error {
g.MessageHandler = make(map[string]interfaces.MessageHandler)
for _, app := range g.Config.GetGcmAppsArray() {
credentials := g.ViperConfig.GetString("gcm.firebaseCredentials." + app)
rateLimit := g.ViperConfig.GetInt("gcm.rateLimit.rpm")

l = l.WithField("app", app)
if credentials != "" { // Firebase is configured, use new handler
pushClient, err := client.NewFirebaseClient(ctx, credentials, g.Logger)
Expand All @@ -111,7 +113,7 @@ func (g *GCMPusher) createMessageHandlerForApps(ctx context.Context) error {
pushClient,
g.feedbackReporters,
g.StatsReporters,
extensions.NewRateLimiter(g.ViperConfig, l.Logger),
extensions.NewRateLimiter(rateLimit, g.ViperConfig, g.StatsReporters, l.Logger),
g.Logger,
g.Config.GCM.ConcurrentWorkers,
)
Expand All @@ -124,7 +126,7 @@ func (g *GCMPusher) createMessageHandlerForApps(ctx context.Context) error {
g.Queue.PendingMessagesWaitGroup(),
g.StatsReporters,
g.feedbackReporters,
extensions.NewRateLimiter(g.ViperConfig, l.Logger),
extensions.NewRateLimiter(rateLimit, g.ViperConfig, g.StatsReporters, l.Logger),
)
if err != nil {
l.WithError(err).Error("could not create gcm message handler")
Expand Down

0 comments on commit f1a7d5e

Please sign in to comment.