Skip to content

Commit

Permalink
Code cleanup (#63)
Browse files Browse the repository at this point in the history
* Remove inflight map + remove clean goroutine + fix tests

* Use different consumer groups for each test

* Fix error wrap

* re-add code I removed by mistake

* Remove logs stats and refactor apns tests

* Fix feedback reporting for fcm and refactor tests

* Fix startup

* Fix initialization on fcm e2e test

* Remove wait
  • Loading branch information
miguelreiswildlife authored Jun 26, 2024
1 parent 138cc3e commit 5020846
Show file tree
Hide file tree
Showing 30 changed files with 1,220 additions and 2,855 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,10 @@ integration-test-container-dev: build-image-dev start-deps-container-dev test-db

.PHONY: mocks
mocks:
$(MOCKGENERATE) -source=interfaces/client.go -destination=mocks/firebase/client.go
$(MOCKGENERATE) -source=interfaces/client.go -destination=mocks/interfaces/client.go
$(MOCKGENERATE) -source=interfaces/apns.go -destination=mocks/interfaces/apns.go
$(MOCKGENERATE) -source=interfaces/statsd.go -destination=mocks/interfaces/statsd.go
$(MOCKGENERATE) -source=interfaces/stats_reporter.go -destination=mocks/interfaces/stats_reporter.go
$(MOCKGENERATE) -source=interfaces/feedback_reporter.go -destination=mocks/interfaces/feedback_reporter.go
$(MOCKGENERATE) -source=interfaces/message_handler.go -destination=mocks/interfaces/message_handler.go
$(MOCKGENERATE) -source=interfaces/rate_limiter.go -destination=mocks/interfaces/rate_limiter.go
10 changes: 5 additions & 5 deletions e2e/fcm_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ import (
"github.com/stretchr/testify/suite"
"github.com/topfreegames/pusher/config"
"github.com/topfreegames/pusher/extensions"
"github.com/topfreegames/pusher/extensions/handler"
"github.com/topfreegames/pusher/extensions/firebase"
"github.com/topfreegames/pusher/interfaces"
firebaseMock "github.com/topfreegames/pusher/mocks/firebase"
mocks "github.com/topfreegames/pusher/mocks/interfaces"
"github.com/topfreegames/pusher/pusher"
"go.uber.org/mock/gomock"
Expand Down Expand Up @@ -46,7 +45,7 @@ func (s *FcmE2ETestSuite) SetupSuite() {
s.vConfig = v
}

func (s *FcmE2ETestSuite) setupFcmPusher(appName string) (*firebaseMock.MockPushClient, *mocks.MockStatsDClient) {
func (s *FcmE2ETestSuite) setupFcmPusher(appName string) (*mocks.MockPushClient, *mocks.MockStatsDClient) {
ctrl := gomock.NewController(s.T())

statsdClientMock := mocks.NewMockStatsDClient(ctrl)
Expand Down Expand Up @@ -74,14 +73,15 @@ func (s *FcmE2ETestSuite) setupFcmPusher(appName string) (*firebaseMock.MockPush
limit := s.vConfig.GetInt("gcm.rateLimit.rpm")
rateLimiter := extensions.NewRateLimiter(limit, s.vConfig, []interfaces.StatsReporter{statsReport}, logger)

pushClient := firebaseMock.NewMockPushClient(ctrl)
pushClient := mocks.NewMockPushClient(ctrl)
gcmPusher.MessageHandler = map[string]interfaces.MessageHandler{
appName: handler.NewMessageHandler(
appName: firebase.NewMessageHandler(
appName,
pushClient,
[]interfaces.FeedbackReporter{},
[]interfaces.StatsReporter{statsReport},
rateLimiter,
nil,
logger,
s.config.GCM.ConcurrentWorkers,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package extensions
package apns

import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/topfreegames/pusher/extensions"
"os"
"sync"
"time"
Expand All @@ -40,8 +41,6 @@ import (
"github.com/topfreegames/pusher/structs"
)

var apnsResMutex sync.Mutex

// pusherAPNSKafkaMessage is the notification format received in Kafka messages.
type pusherAPNSKafkaMessage struct {
ApnsID string
Expand Down Expand Up @@ -72,10 +71,8 @@ type APNSMessageHandler struct {
sentMessages int64
ignoredMessages int64
successesReceived int64
requestsHeap *TimeoutHeap
CacheCleaningInterval int
IsProduction bool
consumptionManager interfaces.ConsumptionManager
retryInterval time.Duration
maxRetryAttempts uint
rateLimiter interfaces.RateLimiter
Expand All @@ -93,31 +90,22 @@ func NewAPNSMessageHandler(
statsReporters []interfaces.StatsReporter,
feedbackReporters []interfaces.FeedbackReporter,
pushQueue interfaces.APNSPushQueue,
consumptionManager interfaces.ConsumptionManager,
rateLimiter interfaces.RateLimiter,
) (*APNSMessageHandler, error) {
a := &APNSMessageHandler{
authKeyPath: authKeyPath,
keyID: keyID,
teamID: teamID,
ApnsTopic: topic,
appName: appName,
Config: config,
failuresReceived: 0,
feedbackReporters: feedbackReporters,
IsProduction: isProduction,
Logger: logger,
pendingMessagesWG: pendingMessagesWG,
ignoredMessages: 0,
inFlightNotificationsMapLock: &sync.Mutex{},
responsesReceived: 0,
sentMessages: 0,
StatsReporters: statsReporters,
successesReceived: 0,
requestsHeap: NewTimeoutHeap(config),
PushQueue: pushQueue,
consumptionManager: consumptionManager,
rateLimiter: rateLimiter,
authKeyPath: authKeyPath,
keyID: keyID,
teamID: teamID,
ApnsTopic: topic,
appName: appName,
Config: config,
feedbackReporters: feedbackReporters,
IsProduction: isProduction,
Logger: logger,
pendingMessagesWG: pendingMessagesWG,
StatsReporters: statsReporters,
PushQueue: pushQueue,
rateLimiter: rateLimiter,
}

if a.Logger != nil {
Expand Down Expand Up @@ -196,35 +184,26 @@ func (a *APNSMessageHandler) HandleMessages(ctx context.Context, message interfa
if err != nil {
l.WithError(err).Error("error parsing kafka message")
a.waitGroupDone()
apnsResMutex.Lock()
a.ignoredMessages++
apnsResMutex.Unlock()
return
}
l = l.WithField("notification", parsedNotification)

allowed := a.rateLimiter.Allow(ctx, parsedNotification.DeviceToken, a.appName, "apns")
if !allowed {
extensions.StatsReporterNotificationRateLimitReached(a.StatsReporters, a.appName, "apns")
a.waitGroupDone()
return
}

n, err := a.buildAndValidateNotification(parsedNotification)
if err != nil {
l.WithError(err).Error("notification is invalid")
a.waitGroupDone()
apnsResMutex.Lock()
a.ignoredMessages++
apnsResMutex.Unlock()
return
}

allowed := a.rateLimiter.Allow(ctx, parsedNotification.DeviceToken, a.appName, "apns")
if !allowed {
statsReporterNotificationRateLimitReached(a.StatsReporters, a.appName, "apns")
l.WithField("message", message).Warn("rate limit reached")
return
}

a.sendNotification(n)
statsReporterHandleNotificationSent(a.StatsReporters, a.appName, "apns")

apnsResMutex.Lock()
a.sentMessages++
apnsResMutex.Unlock()
extensions.StatsReporterHandleNotificationSent(a.StatsReporters, a.appName, "apns")
}

func (a *APNSMessageHandler) parseKafkaMessage(message interfaces.KafkaMessage) (*pusherAPNSKafkaMessage, error) {
Expand All @@ -240,8 +219,8 @@ func (a *APNSMessageHandler) parseKafkaMessage(message interfaces.KafkaMessage)
}
notification.Metadata["game"] = a.appName
notification.Metadata["deviceToken"] = notification.DeviceToken
hostname, err := os.Hostname()

hostname, err := os.Hostname()
if err != nil {
a.Logger.WithError(err).Error("error retrieving hostname")
} else {
Expand All @@ -253,7 +232,7 @@ func (a *APNSMessageHandler) parseKafkaMessage(message interfaces.KafkaMessage)
}

func (a *APNSMessageHandler) buildAndValidateNotification(notification *pusherAPNSKafkaMessage) (*structs.ApnsNotification, error) {
if notification.PushExpiry > 0 && notification.PushExpiry < MakeTimestamp() {
if notification.PushExpiry > 0 && notification.PushExpiry < extensions.MakeTimestamp() {
return nil, errors.New("push message has expired")
}

Expand All @@ -277,7 +256,7 @@ func (a *APNSMessageHandler) buildAndValidateNotification(notification *pusherAP

func (a *APNSMessageHandler) sendNotification(notification *structs.ApnsNotification) {
before := time.Now()
defer statsReporterReportSendNotificationLatency(a.StatsReporters, time.Since(before), a.appName, "apns", "client", "apns")
defer extensions.StatsReporterReportSendNotificationLatency(a.StatsReporters, time.Since(before), a.appName, "apns", "client", "apns")

notification.SendAttempts += 1
a.PushQueue.Push(notification)
Expand Down Expand Up @@ -311,106 +290,38 @@ func (a *APNSMessageHandler) handleAPNSResponse(responseWithMetadata *structs.Re
}
delete(responseWithMetadata.Metadata, "timestamp")

apnsResMutex.Lock()
a.responsesReceived++
apnsResMutex.Unlock()

parsedTopic := ParsedTopic{
parsedTopic := extensions.ParsedTopic{
Game: a.appName,
Platform: "apns",
}

a.waitGroupDone()

if responseWithMetadata.Reason == "" {
sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata, parsedTopic)
if sendFeedbackErr != nil {
l.WithError(sendFeedbackErr).Error("error sending feedback to reporter")
}
apnsResMutex.Lock()
a.successesReceived++
apnsResMutex.Unlock()
statsReporterHandleNotificationSuccess(a.StatsReporters, a.appName, "apns")
extensions.StatsReporterHandleNotificationSuccess(a.StatsReporters, a.appName, "apns")
return nil
}

apnsResMutex.Lock()
a.failuresReceived++
apnsResMutex.Unlock()

pErr := pusher_errors.NewPushError(a.mapErrorReason(responseWithMetadata.Reason), responseWithMetadata.Reason)
responseWithMetadata.Err = pErr
statsReporterHandleNotificationFailure(a.StatsReporters, a.appName, "apns", pErr)
err := pErr
l.Info("notification failed")
extensions.StatsReporterHandleNotificationFailure(a.StatsReporters, a.appName, "apns", pErr)

switch responseWithMetadata.Reason {
case apns2.ReasonBadDeviceToken, apns2.ReasonUnregistered, apns2.ReasonTopicDisallowed, apns2.ReasonDeviceTokenNotForTopic:
// https://developer.apple.com/library/content/documentation/NetworkingInternet/Conceptual/RemoteNotificationsPG/CommunicatingwithAPNs.html
l.WithFields(log.Fields{
"category": "TokenError",
log.ErrorKey: responseWithMetadata.Reason,
}).Debug("received an error")
if responseWithMetadata.Metadata != nil {
responseWithMetadata.Metadata["deleteToken"] = true
if responseWithMetadata.Metadata == nil {
responseWithMetadata.Metadata = map[string]interface{}{}
}
case apns2.ReasonBadCertificate, apns2.ReasonBadCertificateEnvironment, apns2.ReasonForbidden:
l.WithFields(log.Fields{
"category": "CertificateError",
log.ErrorKey: responseWithMetadata.Reason,
}).Debug("received an error")
case apns2.ReasonExpiredProviderToken, apns2.ReasonInvalidProviderToken, apns2.ReasonMissingProviderToken:
l.WithFields(log.Fields{
"category": "ProviderTokenError",
log.ErrorKey: responseWithMetadata.Reason,
}).Debug("received an error")
case apns2.ReasonMissingTopic:
l.WithFields(log.Fields{
"category": "TopicError",
log.ErrorKey: responseWithMetadata.Reason,
}).Debug("received an error")
case apns2.ReasonIdleTimeout, apns2.ReasonShutdown, apns2.ReasonInternalServerError, apns2.ReasonServiceUnavailable:
l.WithFields(log.Fields{
"category": "AppleError",
log.ErrorKey: responseWithMetadata.Reason,
}).Debug("received an error")
default:
l.WithFields(log.Fields{
"category": "DefaultError",
log.ErrorKey: responseWithMetadata.Reason,
}).Debug("received an error")
responseWithMetadata.Metadata["deleteToken"] = true
}
sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata, parsedTopic)

sendFeedbackErr := extensions.SendToFeedbackReporters(a.feedbackReporters, responseWithMetadata, parsedTopic)
if sendFeedbackErr != nil {
l.WithError(sendFeedbackErr).Error("error sending feedback to reporter")
}
return err
}

// LogStats from time to time.
func (a *APNSMessageHandler) LogStats() {
l := a.Logger.WithFields(log.Fields{
"method": "apnsMessageHandler.logStats",
"interval(ns)": a.LogStatsInterval,
})

ticker := time.NewTicker(a.LogStatsInterval)
for range ticker.C {
apnsResMutex.Lock()
if a.sentMessages > 0 || a.responsesReceived > 0 || a.ignoredMessages > 0 || a.successesReceived > 0 || a.failuresReceived > 0 {
l.WithFields(log.Fields{
"sentMessages": a.sentMessages,
"ignoredMessages": a.ignoredMessages,
"responsesReceived": a.responsesReceived,
"successesReceived": a.successesReceived,
"failuresReceived": a.failuresReceived,
}).Info("flushing stats")
a.sentMessages = 0
a.responsesReceived = 0
a.ignoredMessages = 0
a.successesReceived = 0
a.failuresReceived = 0
}
apnsResMutex.Unlock()
}
return nil
}

func (a *APNSMessageHandler) mapErrorReason(reason string) string {
Expand Down
Loading

0 comments on commit 5020846

Please sign in to comment.