From 8cc82dc463bbed810862c2eb278a89c7212454f9 Mon Sep 17 00:00:00 2001 From: Pedro Soares Date: Fri, 17 May 2024 10:15:52 -0300 Subject: [PATCH] [APNS] Make pushQueue and responseChannel buffered (#57) * feat(apns): make pushQ and responseChan buffered The pushQueue and responseChannels are used by multiple goroutines and are not buffered, thus deadlocks might happen and this would throttle whoever is feeding or consuming from it. Thus, make them buffered and configurable * chore(apns): log retries as info They are important for us to check that this event is happening and check if there's any throttling/deadlock happening on channel consumption --- config/default.yaml | 2 ++ extensions/apns_message_handler.go | 2 +- extensions/apns_push_queue.go | 13 +++++++++++-- extensions/apns_push_queue_test.go | 7 +++++++ 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/config/default.yaml b/config/default.yaml index 585257a..5de15a0 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -3,6 +3,8 @@ gracefulShutdownTimeout: 30 apns: concurrentWorkers: 300 connectionPoolSize: 1 + pushQueueSize: 100 + responseChannelSize: 100 logStatsInterval: 10000 apps: "game" certs: diff --git a/extensions/apns_message_handler.go b/extensions/apns_message_handler.go index 2ad2c9a..03507c8 100644 --- a/extensions/apns_message_handler.go +++ b/extensions/apns_message_handler.go @@ -323,7 +323,7 @@ func (a *APNSMessageHandler) handleAPNSResponse(responseWithMetadata *structs.Re "sendAttempts": sendAttempts, "maxRetries": a.maxRetryAttempts, "apnsID": responseWithMetadata.ApnsID, - }).Debug("retrying notification") + }).Info("retrying notification") inFlightNotificationInstance.sendAttempts.Add(1) if a.pendingMessagesWG != nil { a.pendingMessagesWG.Add(1) diff --git a/extensions/apns_push_queue.go b/extensions/apns_push_queue.go index d5d890c..6573c6e 100644 --- a/extensions/apns_push_queue.go +++ b/extensions/apns_push_queue.go @@ -64,6 +64,12 @@ func NewAPNSPushQueue( } } +func (p *APNSPushQueue) loadConfigDefault() { + p.Config.SetDefault("apns.connectionPoolSize", 1) + p.Config.SetDefault("apns.pushQueueSize", 100) + p.Config.SetDefault("apns.responseChannelSize", 100) +} + // Configure configures queues and token func (p *APNSPushQueue) Configure() error { l := p.Logger.WithFields(log.Fields{ @@ -75,7 +81,10 @@ func (p *APNSPushQueue) Configure() error { return err } p.Closed = false + p.loadConfigDefault() connectionPoolSize := p.Config.GetInt("apns.connectionPoolSize") + pushQueueSize := p.Config.GetInt("apns.pushQueueSize") + respChannelSize := p.Config.GetInt("apns.responseChannelSize") p.clients = make(chan *apns2.Client, connectionPoolSize) for i := 0; i < connectionPoolSize; i++ { client := apns2.NewTokenClient(p.token) @@ -89,8 +98,8 @@ func (p *APNSPushQueue) Configure() error { p.clients <- client } l.Debug("clients configured") - p.pushChannel = make(chan *apns2.Notification) - p.responseChannel = make(chan *structs.ResponseWithMetadata) + p.pushChannel = make(chan *apns2.Notification, pushQueueSize) + p.responseChannel = make(chan *structs.ResponseWithMetadata, respChannelSize) for i := 0; i < p.Config.GetInt("apns.concurrentWorkers"); i++ { go p.pushWorker() diff --git a/extensions/apns_push_queue_test.go b/extensions/apns_push_queue_test.go index fed1e77..6ce99d0 100644 --- a/extensions/apns_push_queue_test.go +++ b/extensions/apns_push_queue_test.go @@ -75,6 +75,13 @@ var _ = Describe("APNS Push Queue", func() { Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("open ./invalid-certficate.pem: no such file or directory")) }) + + It("should load default configs", func() { + err := queue.Configure() + Expect(err).NotTo(HaveOccurred()) + Expect(cap(queue.pushChannel)).To(Equal(100)) + Expect(cap(queue.responseChannel)).To(Equal(100)) + }) }) Describe("Configuring Certificate", func() {