Skip to content

Commit

Permalink
feat: add rate limiter to apns and gcm (#58)
Browse files Browse the repository at this point in the history
* feat: add rate limiter to apns and gcm

* fix: add rate limiter mock and fix tests

* try to fix tests

* add redis to github worflow

* fix: rate limiter config

* chore: add rate limiter to firebase handler

* fix: add tls config

* chore: log level and config name

* fix: tests

* chore: try to fix pipeline

* try to enable tls in pipeline

* chore: add test flag to config to set tls

* chore: fix port type

* chore: add pr suggestions

* chore: add labels to redis failure metric
  • Loading branch information
gussf authored Jun 19, 2024
1 parent a5fa115 commit 201f0e1
Show file tree
Hide file tree
Showing 24 changed files with 392 additions and 12 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ jobs:
--health-retries 5
statsd:
image: hopsoft/graphite-statsd
redis:
image: redis:6.0.9-alpine
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v2
- name: Set up go vendor cache
Expand Down
9 changes: 9 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
gracefulShutdownTimeout: 30
apns:
rateLimit.rpm: 20
concurrentWorkers: 300
connectionPoolSize: 1
pushQueueSize: 100
Expand All @@ -14,6 +15,7 @@ apns:
teamID: "ABC123DEFG"
topic: "com.game.test"
gcm:
rateLimit.rpm: 20
pingInterval: 30
pingTimeout: 10
maxPendingMessages: 100
Expand Down Expand Up @@ -93,3 +95,10 @@ feedbackListeners:
maxRetries: 3
database: push
connectionTimeout: 100
rateLimiter:
redis:
host: "localhost"
port: 6379
password: ""
tls:
disabled: false
10 changes: 9 additions & 1 deletion config/docker_test.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
gracefulShutdownTimeout: 10
apns:
rateLimit.rpm: 100
concurrentWorkers: 300
connectionPoolSize: 1
logStatsInterval: 750
Expand All @@ -14,6 +15,7 @@ apns:
responsechannelsize: 100
connectionpoolsize: 10
gcm:
rateLimit.rpm: 100
pingInterval: 30
pingTimeout: 10
maxPendingMessages: 3
Expand Down Expand Up @@ -93,4 +95,10 @@ feedbackListeners:
maxRetries: 3
database: push
connectionTimeout: 100

rateLimiter:
redis:
host: "redis"
port: 6379
password: ""
tls:
disabled: true
10 changes: 9 additions & 1 deletion config/test.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
gracefulShutdownTimeout: 10
apns:
rateLimit.rpm: 100
concurrentWorkers: 300
connectionPoolSize: 1
logStatsInterval: 750
Expand All @@ -12,6 +13,7 @@ apns:
teamID: "ABC123DEFG"
topic: "com.game.test"
gcm:
rateLimit.rpm: 100
pingInterval: 30
pingTimeout: 10
maxPendingMessages: 3
Expand Down Expand Up @@ -91,4 +93,10 @@ feedbackListeners:
maxRetries: 3
database: push
connectionTimeout: 100

rateLimiter:
redis:
host: "localhost"
port: 6379
password: ""
tls:
disabled: true
11 changes: 11 additions & 0 deletions docker-compose-container-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,14 @@ services:
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
DYNAMIC_CONFIG_ENABLED: 'true'

redis:
image: redis:6.0.9-alpine
container_name: redis
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 3
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,13 @@ services:
condition: service_healthy
environment:
KAFKA_BROKERS: kafka:9092

redis:
image: redis:6.0.9-alpine
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 3
4 changes: 4 additions & 0 deletions e2e/fcm_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,17 @@ func (s *FcmE2ETestSuite) setupFcmPusher(appName string) (*firebaseMock.MockPush
statsReport, err := extensions.NewStatsD(s.vConfig, logger, statsdClientMock)
s.Require().NoError(err)

limit := s.vConfig.GetInt("gcm.rateLimit.rpm")
rateLimiter := extensions.NewRateLimiter(limit, s.vConfig, []interfaces.StatsReporter{statsReport}, logger)

pushClient := firebaseMock.NewMockPushClient(ctrl)
gcmPusher.MessageHandler = map[string]interfaces.MessageHandler{
appName: handler.NewMessageHandler(
appName,
pushClient,
[]interfaces.FeedbackReporter{},
[]interfaces.StatsReporter{statsReport},
rateLimiter,
logger,
s.config.GCM.ConcurrentWorkers,
),
Expand Down
13 changes: 12 additions & 1 deletion extensions/apns_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type APNSMessageHandler struct {
consumptionManager interfaces.ConsumptionManager
retryInterval time.Duration
maxRetryAttempts uint
rateLimiter interfaces.RateLimiter
}

var _ interfaces.MessageHandler = &APNSMessageHandler{}
Expand All @@ -94,6 +95,7 @@ func NewAPNSMessageHandler(
feedbackReporters []interfaces.FeedbackReporter,
pushQueue interfaces.APNSPushQueue,
consumptionManager interfaces.ConsumptionManager,
rateLimiter interfaces.RateLimiter,
) (*APNSMessageHandler, error) {
a := &APNSMessageHandler{
authKeyPath: authKeyPath,
Expand All @@ -117,6 +119,7 @@ func NewAPNSMessageHandler(
requestsHeap: NewTimeoutHeap(config),
PushQueue: pushQueue,
consumptionManager: consumptionManager,
rateLimiter: rateLimiter,
}

if a.Logger != nil {
Expand Down Expand Up @@ -216,7 +219,7 @@ func (a *APNSMessageHandler) CleanMetadataCache() {
}

// HandleMessages get messages from msgChan and send to APNS.
func (a *APNSMessageHandler) HandleMessages(_ context.Context, message interfaces.KafkaMessage) {
func (a *APNSMessageHandler) HandleMessages(ctx context.Context, message interfaces.KafkaMessage) {
l := a.Logger.WithFields(log.Fields{
"method": "HandleMessages",
"jsonValue": string(message.Value),
Expand All @@ -227,6 +230,14 @@ func (a *APNSMessageHandler) HandleMessages(_ context.Context, message interface
if err != nil {
return
}

allowed := a.rateLimiter.Allow(ctx, notification.DeviceToken, a.appName, "apns")
if !allowed {
statsReporterNotificationRateLimitReached(a.StatsReporters, a.appName, "apns")
l.WithField("message", message).Warn("rate limit reached")
return
}

if err := a.sendNotification(notification); err != nil {
return
}
Expand Down
9 changes: 7 additions & 2 deletions extensions/apns_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"time"

uuid "github.com/satori/go.uuid"
"github.com/sideshow/apns2"
mock_interfaces "github.com/topfreegames/pusher/mocks/interfaces"
"os"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand All @@ -51,6 +52,7 @@ var _ = FDescribe("APNS Message Handler", func() {
var mockStatsDClient *mocks.StatsDClientMock
var statsClients []interfaces.StatsReporter
mockConsumptionManager := mock_interfaces.NewMockConsumptionManager()
mockRateLimiter := mocks.NewRateLimiterMock()
ctx := context.Background()

configFile := os.Getenv("CONFIG_FILE")
Expand Down Expand Up @@ -98,6 +100,7 @@ var _ = FDescribe("APNS Message Handler", func() {
feedbackClients,
mockPushQueue,
mockConsumptionManager,
mockRateLimiter,
)
Expect(err).NotTo(HaveOccurred())
db.(*mocks.PGMock).RowsReturned = 0
Expand Down Expand Up @@ -668,6 +671,7 @@ var _ = FDescribe("APNS Message Handler", func() {
feedbackClients,
mockPushQueue,
mockConsumptionManager,
mockRateLimiter,
)
Expect(err).NotTo(HaveOccurred())
})
Expand Down Expand Up @@ -880,6 +884,7 @@ var _ = FDescribe("APNS Message Handler", func() {
nil,
nil,
nil,
mockRateLimiter,
)
Expect(err).NotTo(HaveOccurred())
hook.Reset()
Expand Down
12 changes: 12 additions & 0 deletions extensions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ func statsReporterHandleNotificationFailure(
}
}

func statsReporterNotificationRateLimitReached(statsReporters []interfaces.StatsReporter, game string, platform string) {
for _, statsReporter := range statsReporters {
statsReporter.NotificationRateLimitReached(game, platform)
}
}

func statsReporterNotificationRateLimitFailed(statsReporters []interfaces.StatsReporter, game string, platform string) {
for _, statsReporter := range statsReporters {
statsReporter.NotificationRateLimitFailed(game, platform)
}
}

func statsReporterReportSendNotificationLatency(statsReporters []interfaces.StatsReporter, latencyMs time.Duration, game string, platform string, labels ...string) {
for _, statsReporter := range statsReporters {
statsReporter.ReportSendNotificationLatency(latencyMs, game, platform, labels...)
Expand Down
18 changes: 18 additions & 0 deletions extensions/datadog_statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,24 @@ func (s *StatsD) HandleNotificationFailure(game string, platform string, err *er
)
}

// NotificationRateLimitReached stores how many times rate limits were reached for the devices
func (s *StatsD) NotificationRateLimitReached(game string, platform string) {
s.Client.Incr(
"rate_limit_reached",
[]string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)},
1,
)
}

// NotificationRateLimitFailed stores how many times rate limits failed to be calculated
func (s *StatsD) NotificationRateLimitFailed(game string, platform string) {
s.Client.Incr(
"rate_limit_failed",
[]string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)},
1,
)
}

// InitializeFailure notifu error when is impossible tho initilizer an app
func (s *StatsD) InitializeFailure(game string, platform string) {
s.Client.Incr("initialize_failure", []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1)
Expand Down
13 changes: 12 additions & 1 deletion extensions/gcm_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type GCMMessageHandler struct {
requestsHeap *TimeoutHeap
CacheCleaningInterval int
IsProduction bool
rateLimiter interfaces.RateLimiter
}

// NewGCMMessageHandler returns a new instance of a GCMMessageHandler
Expand All @@ -87,14 +88,15 @@ func NewGCMMessageHandler(
pendingMessagesWG *sync.WaitGroup,
statsReporters []interfaces.StatsReporter,
feedbackReporters []interfaces.FeedbackReporter,
rateLimiter interfaces.RateLimiter,
) (*GCMMessageHandler, error) {
l := logger.WithFields(logrus.Fields{
"method": "NewGCMMessageHandler",
"game": game,
"isProduction": isProduction,
})

h, err := NewGCMMessageHandlerWithClient(game, isProduction, config, l.Logger, pendingMessagesWG, statsReporters, feedbackReporters, nil)
h, err := NewGCMMessageHandlerWithClient(game, isProduction, config, l.Logger, pendingMessagesWG, statsReporters, feedbackReporters, nil, rateLimiter)
if err != nil {
l.WithError(err).Error("Failed to create a new GCM Message handler.")
return nil, err
Expand All @@ -111,6 +113,7 @@ func NewGCMMessageHandlerWithClient(
statsReporters []interfaces.StatsReporter,
feedbackReporters []interfaces.FeedbackReporter,
client interfaces.GCMClient,
rateLimiter interfaces.RateLimiter,
) (*GCMMessageHandler, error) {
l := logger.WithFields(logrus.Fields{
"method": "NewGCMMessageHandlerWithClient",
Expand All @@ -131,6 +134,7 @@ func NewGCMMessageHandlerWithClient(
requestsHeap: NewTimeoutHeap(config),
StatsReporters: statsReporters,
GCMClient: client,
rateLimiter: rateLimiter,
}

err := g.configure()
Expand Down Expand Up @@ -340,6 +344,13 @@ func (g *GCMMessageHandler) sendMessage(message interfaces.KafkaMessage) error {
}

l = l.WithField("message", km)

allowed := g.rateLimiter.Allow(context.Background(), km.To, message.Game, "gcm")
if !allowed {
statsReporterNotificationRateLimitReached(g.StatsReporters, message.Game, "gcm")
l.WithField("message", message).Warn("rate limit reached")
return errors.New("rate limit reached")
}
l.Debug("sending message to gcm")

var messageID string
Expand Down
10 changes: 7 additions & 3 deletions extensions/gcm_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ package extensions

import (
"encoding/json"
"os"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/stretchr/testify/suite"
"github.com/topfreegames/pusher/config"
"os"
"testing"
"time"

uuid "github.com/satori/go.uuid"
"github.com/sirupsen/logrus/hooks/test"
Expand Down Expand Up @@ -73,6 +74,7 @@ func (s *GCMMessageHandlerTestSuite) setupHandler() (
logger, _ := test.NewNullLogger()
mockClient := mocks.NewGCMClientMock()
mockStatsdClient := mocks.NewStatsDClientMock()
mockRateLimiter := mocks.NewRateLimiterMock()

statsD, err := NewStatsD(s.vConfig, logger, mockStatsdClient)
s.Require().NoError(err)
Expand All @@ -92,6 +94,7 @@ func (s *GCMMessageHandlerTestSuite) setupHandler() (
statsClients,
feedbackClients,
mockClient,
mockRateLimiter,
)
s.NoError(err)
s.Require().NotNil(handler)
Expand All @@ -115,6 +118,7 @@ func (s *GCMMessageHandlerTestSuite) TestConfigureHandler() {
nil,
[]interfaces.StatsReporter{},
[]interfaces.FeedbackReporter{},
nil,
)
s.Error(err)
s.Nil(handler)
Expand Down
Loading

0 comments on commit 201f0e1

Please sign in to comment.