diff --git a/docker-compose.yml b/docker-compose.yml index e38a5d0..00f79c1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -88,7 +88,7 @@ services: retries: 10 kafka-ui: - image: docker.redpanda.com/vectorized/console:latest + image: docker.redpanda.com/redpandadata/console:latest ports: - 9000:8080 depends_on: diff --git a/e2e/apns_e2e_test.go b/e2e/apns_e2e_test.go index 87981d2..f07fe86 100644 --- a/e2e/apns_e2e_test.go +++ b/e2e/apns_e2e_test.go @@ -454,7 +454,6 @@ func (s *ApnsE2ETestSuite) TestConsumeMessagesBeforeExiting() { } func (s *ApnsE2ETestSuite) TestConsumeMessagesBeforeExitingWithRetries() { - app, p, mockApnsClient, statsdClientMock, responsesChannel := s.setupApnsPusher() ctx := context.Background() ctx, cancel := context.WithCancel(ctx) diff --git a/extensions/common.go b/extensions/common.go index bd0ea08..03e5b3f 100644 --- a/extensions/common.go +++ b/extensions/common.go @@ -31,7 +31,7 @@ import ( "github.com/topfreegames/pusher/interfaces" ) -var topicRegex = regexp.MustCompile("push-([^-_]+)[-_]([^-_]+)") +var topicRegex = regexp.MustCompile("^push-([\\w]+(?:[_-][\\w]+)*)[-_](gcm|apns)") // ParsedTopic contains game and platform extracted from topic name type ParsedTopic struct { @@ -39,7 +39,8 @@ type ParsedTopic struct { Game string } -func getGameAndPlatformFromTopic(topic string) ParsedTopic { +// GetGameAndPlatformFromTopic returns the game and platform specified in the Kafka topic +func GetGameAndPlatformFromTopic(topic string) ParsedTopic { res := topicRegex.FindStringSubmatch(topic) return ParsedTopic{ Platform: res[2], @@ -47,11 +48,6 @@ func getGameAndPlatformFromTopic(topic string) ParsedTopic { } } -// GetGameAndPlatformFromTopic returns the game and plaform specified in the Kafka topic -func GetGameAndPlatformFromTopic(topic string) ParsedTopic { - return getGameAndPlatformFromTopic(topic) -} - func SendToFeedbackReporters(feedbackReporters []interfaces.FeedbackReporter, res interface{}, topic ParsedTopic) error { jres, err := json.Marshal(res) if err != nil { diff --git a/extensions/kafka_consumer.go b/extensions/kafka_consumer.go index 42179da..2014f4c 100644 --- a/extensions/kafka_consumer.go +++ b/extensions/kafka_consumer.go @@ -25,6 +25,7 @@ package extensions import ( "context" "fmt" + "regexp" "sync" "time" @@ -212,6 +213,15 @@ func (q *KafkaConsumer) ConsumeLoop(ctx context.Context) error { "topics": q.Topics, }) + parsedTopicsLists := make([]string, len(q.Topics)) + for i, t := range q.Topics { + rgx, err := regexp.Compile(t) + if err != nil { + parsedTopicsLists[i] = t + } + parsedTopicsLists[i] = rgx.String() + } + err := q.Consumer.SubscribeTopics(q.Topics, func(_ *kafka.Consumer, event kafka.Event) error { l.WithField("event", event.String()).Debug("got event from Kafka") return nil @@ -264,7 +274,7 @@ func (q *KafkaConsumer) receiveMessage(topicPartition kafka.TopicPartition, valu } message := interfaces.KafkaMessage{ - Game: getGameAndPlatformFromTopic(*topicPartition.Topic).Game, + Game: GetGameAndPlatformFromTopic(*topicPartition.Topic).Game, Topic: *topicPartition.Topic, Value: value, } diff --git a/pusher/apns.go b/pusher/apns.go index 40829b5..94fde06 100644 --- a/pusher/apns.go +++ b/pusher/apns.go @@ -25,6 +25,7 @@ package pusher import ( "errors" "fmt" + "slices" "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -88,9 +89,22 @@ func (a *APNSPusher) configure(queue interfaces.APNSPushQueue, db interfaces.DB, a.Logger, a.stopChannel, ) + if err != nil { return err } + + for _, a := range a.Config.GetApnsAppsArray() { + singleTopic := fmt.Sprintf("push-%s_apns-single", a) + if !slices.Contains(q.Topics, singleTopic) { + q.Topics = append(q.Topics, singleTopic) + } + massiveTopic := fmt.Sprintf("push-%s_apns-massive", a) + if !slices.Contains(q.Topics, massiveTopic) { + q.Topics = append(q.Topics, massiveTopic) + } + } + a.MessageHandler = make(map[string]interfaces.MessageHandler) a.Queue = q l.Info("Configuring messageHandler") @@ -129,7 +143,7 @@ func (a *APNSPusher) configure(queue interfaces.APNSPushQueue, db interfaces.DB, for _, statsReporter := range a.StatsReporters { statsReporter.InitializeFailure(k, "apns") } - return fmt.Errorf("failed to initialize apns firebase for %s", k) + return fmt.Errorf("failed to initialize apns firebase for %s: %w", k, err) } } if len(a.MessageHandler) == 0 { diff --git a/pusher/gcm.go b/pusher/gcm.go index 1c51514..52fad2c 100644 --- a/pusher/gcm.go +++ b/pusher/gcm.go @@ -25,14 +25,14 @@ package pusher import ( "context" "fmt" - "github.com/topfreegames/pusher/extensions/firebase/client" - "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/topfreegames/pusher/config" "github.com/topfreegames/pusher/extensions" "github.com/topfreegames/pusher/extensions/firebase" + "github.com/topfreegames/pusher/extensions/firebase/client" "github.com/topfreegames/pusher/interfaces" + "slices" ) // GCMPusher struct for GCM pusher @@ -81,6 +81,16 @@ func NewGCMPusher( return nil, fmt.Errorf("could not create kafka consumer: %w", err) } g.Queue = q + for _, a := range g.Config.GetGcmAppsArray() { + singleTopic := fmt.Sprintf("push-%s_gcm-single", a) + if !slices.Contains(q.Topics, singleTopic) { + q.Topics = append(q.Topics, singleTopic) + } + massiveTopic := fmt.Sprintf("push-%s_gcm-massive", a) + if !slices.Contains(q.Topics, massiveTopic) { + q.Topics = append(q.Topics, massiveTopic) + } + } err = g.createMessageHandlerForApps(ctx) if err != nil {