Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not rely on regex for topic list #66

Merged
merged 7 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ services:
retries: 10

kafka-ui:
image: docker.redpanda.com/vectorized/console:latest
image: docker.redpanda.com/redpandadata/console:latest
ports:
- 9000:8080
depends_on:
Expand Down
1 change: 0 additions & 1 deletion e2e/apns_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ func (s *ApnsE2ETestSuite) TestConsumeMessagesBeforeExiting() {
}

func (s *ApnsE2ETestSuite) TestConsumeMessagesBeforeExitingWithRetries() {

app, p, mockApnsClient, statsdClientMock, responsesChannel := s.setupApnsPusher()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
Expand Down
10 changes: 3 additions & 7 deletions extensions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,23 @@ import (
"github.com/topfreegames/pusher/interfaces"
)

var topicRegex = regexp.MustCompile("push-([^-_]+)[-_]([^-_]+)")
var topicRegex = regexp.MustCompile("^push-([\\w]+(?:[_-][\\w]+)*)[-_](gcm|apns)")

// ParsedTopic contains game and platform extracted from topic name
type ParsedTopic struct {
Platform string
Game string
}

func getGameAndPlatformFromTopic(topic string) ParsedTopic {
// GetGameAndPlatformFromTopic returns the game and platform specified in the Kafka topic
func GetGameAndPlatformFromTopic(topic string) ParsedTopic {
res := topicRegex.FindStringSubmatch(topic)
return ParsedTopic{
Platform: res[2],
Game: res[1],
}
}

// GetGameAndPlatformFromTopic returns the game and plaform specified in the Kafka topic
func GetGameAndPlatformFromTopic(topic string) ParsedTopic {
return getGameAndPlatformFromTopic(topic)
}

func SendToFeedbackReporters(feedbackReporters []interfaces.FeedbackReporter, res interface{}, topic ParsedTopic) error {
jres, err := json.Marshal(res)
if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion extensions/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package extensions
import (
"context"
"fmt"
"regexp"
"sync"
"time"

Expand Down Expand Up @@ -212,6 +213,15 @@ func (q *KafkaConsumer) ConsumeLoop(ctx context.Context) error {
"topics": q.Topics,
})

parsedTopicsLists := make([]string, len(q.Topics))
for i, t := range q.Topics {
rgx, err := regexp.Compile(t)
if err != nil {
parsedTopicsLists[i] = t
}
parsedTopicsLists[i] = rgx.String()
}

err := q.Consumer.SubscribeTopics(q.Topics, func(_ *kafka.Consumer, event kafka.Event) error {
l.WithField("event", event.String()).Debug("got event from Kafka")
return nil
Expand Down Expand Up @@ -264,7 +274,7 @@ func (q *KafkaConsumer) receiveMessage(topicPartition kafka.TopicPartition, valu
}

message := interfaces.KafkaMessage{
Game: getGameAndPlatformFromTopic(*topicPartition.Topic).Game,
Game: GetGameAndPlatformFromTopic(*topicPartition.Topic).Game,
Topic: *topicPartition.Topic,
Value: value,
}
Expand Down
16 changes: 15 additions & 1 deletion pusher/apns.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package pusher
import (
"errors"
"fmt"
"slices"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand Down Expand Up @@ -88,9 +89,22 @@ func (a *APNSPusher) configure(queue interfaces.APNSPushQueue, db interfaces.DB,
a.Logger,
a.stopChannel,
)

if err != nil {
return err
}

for _, a := range a.Config.GetApnsAppsArray() {
singleTopic := fmt.Sprintf("push-%s_apns-single", a)
if !slices.Contains(q.Topics, singleTopic) {
q.Topics = append(q.Topics, singleTopic)
}
massiveTopic := fmt.Sprintf("push-%s_apns-massive", a)
if !slices.Contains(q.Topics, massiveTopic) {
q.Topics = append(q.Topics, massiveTopic)
}
}

a.MessageHandler = make(map[string]interfaces.MessageHandler)
a.Queue = q
l.Info("Configuring messageHandler")
Expand Down Expand Up @@ -129,7 +143,7 @@ func (a *APNSPusher) configure(queue interfaces.APNSPushQueue, db interfaces.DB,
for _, statsReporter := range a.StatsReporters {
statsReporter.InitializeFailure(k, "apns")
}
return fmt.Errorf("failed to initialize apns firebase for %s", k)
return fmt.Errorf("failed to initialize apns firebase for %s: %w", k, err)
}
}
if len(a.MessageHandler) == 0 {
Expand Down
14 changes: 12 additions & 2 deletions pusher/gcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ package pusher
import (
"context"
"fmt"
"github.com/topfreegames/pusher/extensions/firebase/client"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/config"
"github.com/topfreegames/pusher/extensions"
"github.com/topfreegames/pusher/extensions/firebase"
"github.com/topfreegames/pusher/extensions/firebase/client"
"github.com/topfreegames/pusher/interfaces"
"slices"
)

// GCMPusher struct for GCM pusher
Expand Down Expand Up @@ -81,6 +81,16 @@ func NewGCMPusher(
return nil, fmt.Errorf("could not create kafka consumer: %w", err)
}
g.Queue = q
for _, a := range g.Config.GetGcmAppsArray() {
singleTopic := fmt.Sprintf("push-%s_gcm-single", a)
if !slices.Contains(q.Topics, singleTopic) {
q.Topics = append(q.Topics, singleTopic)
}
massiveTopic := fmt.Sprintf("push-%s_gcm-massive", a)
if !slices.Contains(q.Topics, massiveTopic) {
q.Topics = append(q.Topics, massiveTopic)
}
}

err = g.createMessageHandlerForApps(ctx)
if err != nil {
Expand Down
Loading