Skip to content

Commit

Permalink
fix: handling failed events
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Mar 20, 2024
1 parent 899edea commit cd0246f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
8 changes: 1 addition & 7 deletions async_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,7 @@ func (t *AsyncEventConsumer) Handle(ctx Context) (int, error) {
}

failedEvents := t.Consumer(ctx, events)

for i := range failedEvents {
e := failedEvents[i]
e.Attempts += 1
}

if err := failedEvents.Update(ctx, tx.Conn()); err != nil {
if err := failedEvents.Recreate(ctx, tx.Conn()); err != nil {
ctx.Debugf("error saving event attempt updates to event_queue: %v\n", err)
}

Expand Down
16 changes: 11 additions & 5 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Event struct {
LastAttempt *time.Time `json:"last_attempt"`
Properties map[string]string `json:"properties"`
CreatedAt time.Time `json:"created_at"`
Priority int `json:"priority"`
}

func (t Event) TableName() string {
Expand All @@ -39,6 +40,7 @@ func (t *Event) Scan(rows pgx.Row) error {
&t.Error,
&t.LastAttempt,
&t.Attempts,
&t.Priority,
)
if err != nil {
return err
Expand All @@ -49,16 +51,20 @@ func (t *Event) Scan(rows pgx.Row) error {

type Events []Event

// Update updates the events in batches.
func (events Events) Update(ctx Context, tx *pgx.Conn) error {
// Recreate creates the given failed events in batches after updating the
// attempts count.
func (events Events) Recreate(ctx Context, tx *pgx.Conn) error {
if len(events) == 0 {
return nil
}

var batch pgx.Batch
for _, event := range events {
query := `UPDATE event_queue SET error=$1, attempts=$2, last_attempt=NOW() WHERE id=$3`
batch.Queue(query, event.Error, event.Attempts, event.ID)
attempts := event.Attempts + 1
query := `INSERT INTO event_queue
(name, properties, error, last_attempt, attempts, priority)
VALUES($1, $2, $3, NOW(), $4, $5)`
batch.Queue(query, event.Name, event.Properties, event.Error, attempts, event.Priority)
}

br := tx.SendBatch(ctx, &batch)
Expand Down Expand Up @@ -107,7 +113,7 @@ func fetchEvents(ctx Context, tx pgx.Tx, watchEvents []string, batchSize int, op
FOR UPDATE SKIP LOCKED
LIMIT @batchSize
)
RETURNING id, name, created_at, properties, error, last_attempt, attempts
RETURNING id, name, created_at, properties, error, last_attempt, attempts, priority
`

args := pgx.NamedArgs{
Expand Down

0 comments on commit cd0246f

Please sign in to comment.