Skip to content

Commit

Permalink
Use only auto commit (#56)
Browse files Browse the repository at this point in the history
* Use only auto commit

* Nil check
  • Loading branch information
miguelreiswildlife authored May 16, 2024
1 parent fc2fffd commit e8b7c8c
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 29 deletions.
5 changes: 4 additions & 1 deletion extensions/apns_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ func (a *APNSMessageHandler) handleAPNSResponse(responseWithMetadata *structs.Re
"apnsID": responseWithMetadata.ApnsID,
}).Debug("retrying notification")
inFlightNotificationInstance.sendAttempts.Add(1)
if a.pendingMessagesWG != nil {
a.pendingMessagesWG.Add(1)
}
<-time.After(a.retryInterval)
if err := a.sendNotification(inFlightNotificationInstance.notification); err == nil {
return nil
Expand Down Expand Up @@ -419,7 +422,7 @@ func (a *APNSMessageHandler) handleAPNSResponse(responseWithMetadata *structs.Re
// LogStats from time to time.
func (a *APNSMessageHandler) LogStats() {
l := a.Logger.WithFields(log.Fields{
"method": "logStats",
"method": "apnsMessageHandler.logStats",
"interval(ns)": a.LogStatsInterval,
})

Expand Down
2 changes: 1 addition & 1 deletion extensions/gcm_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (g *GCMMessageHandler) HandleMessages(_ context.Context, msg interfaces.Kaf
// LogStats from time to time
func (g *GCMMessageHandler) LogStats() {
l := g.Logger.WithFields(logrus.Fields{
"method": "logStats",
"method": "gcmMessageHandler.logStats",
"interval(ns)": g.LogStatsInterval,
})

Expand Down
14 changes: 0 additions & 14 deletions extensions/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,6 @@ func (q *KafkaConsumer) PendingMessagesWaitGroup() *sync.WaitGroup {
// StopConsuming stops consuming messages from the queue
func (q *KafkaConsumer) StopConsuming() {
close(q.stopChannel)
_, err := q.Consumer.Commit()
if err != nil {
q.Logger.
WithField("method", "extensions.StopConsuming").
WithError(err).
Error("error committing messages")
}
}

func (q *KafkaConsumer) Pause(topic string) error {
Expand Down Expand Up @@ -251,13 +244,6 @@ func (q *KafkaConsumer) ConsumeLoop(ctx context.Context) error {
}
l.Debug("got message from Kafka")
q.receiveMessage(message.TopicPartition, message.Value)

_, err = q.Consumer.CommitMessage(message)
if err != nil {
l.WithError(err).
WithField("message", string(message.Value)).
Error("error committing message")
}
}
}

Expand Down
13 changes: 0 additions & 13 deletions feedback/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,6 @@ func (q *KafkaConsumer) PendingMessagesWaitGroup() *sync.WaitGroup {
// StopConsuming stops consuming messages from the queue
func (q *KafkaConsumer) StopConsuming() {
q.stopFunc()
_, err := q.Consumer.Commit()
if err != nil {
q.Logger.
WithField("method", "feedback.StopConsuming").
WithError(err).
Error("error committing messages")
}
}

// MessagesChannel returns the channel that will receive all messages got from kafka
Expand Down Expand Up @@ -227,12 +220,6 @@ func (q *KafkaConsumer) ConsumeLoop(ctx context.Context) error {
continue
}
q.receiveMessage(message.TopicPartition, message.Value)
_, err = q.Consumer.CommitMessage(message)
if err != nil {
l.WithError(err).
WithField("message", string(message.Value)).
Error("error committing message")
}
}
}
}
Expand Down

0 comments on commit e8b7c8c

Please sign in to comment.