Skip to content

Commit

Permalink
feat: record events in a ring buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Aug 5, 2024
1 parent 8124b4e commit a1aec4b
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 0 deletions.
23 changes: 23 additions & 0 deletions async_consumer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package postq

import (
"container/ring"
"fmt"
)

// AsyncEventHandlerFunc processes multiple events and returns the failed ones
type AsyncEventHandlerFunc func(Context, Events) Events

type AsyncEventConsumer struct {
eventLog *ring.Ring

// Name of the events in the push queue to watch for.
WatchEvents []string

Expand All @@ -24,6 +27,19 @@ type AsyncEventConsumer struct {
EventFetcherOption *EventFetcherOption
}

// RecordEvents will record all the events fetched by the consumer in a ring buffer.
func (t *AsyncEventConsumer) RecordEvents(size int) {
t.eventLog = ring.New(size)
}

func (t AsyncEventConsumer) GetRecords() ([]Event, error) {
if t.eventLog == nil {
return nil, fmt.Errorf("event log is not initialized")
}

return getRecords(t.eventLog), nil
}

func (t *AsyncEventConsumer) Handle(ctx Context) (int, error) {
tx, err := ctx.Pool().Begin(ctx)
if err != nil {
Expand All @@ -36,6 +52,13 @@ func (t *AsyncEventConsumer) Handle(ctx Context) (int, error) {
return 0, fmt.Errorf("error fetching events: %w", err)
}

for _, event := range events {
if t.eventLog != nil {
t.eventLog.Value = event
t.eventLog = t.eventLog.Next()
}
}

failedEvents := t.Consumer(ctx, events)
if err := failedEvents.Recreate(ctx, tx.Conn()); err != nil {
ctx.Debugf("error saving event attempt updates to event_queue: %v\n", err)
Expand Down
19 changes: 19 additions & 0 deletions ring_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package postq

import (
"container/ring"
)

func getRecords(ringBuffer *ring.Ring) []Event {
events := make([]Event, 0, ringBuffer.Len())
ringBuffer.Do(func(v any) {
if v == nil {
return
}

e := v.(Event)
events = append(events, e)
})

return events
}
20 changes: 20 additions & 0 deletions sync_consumer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package postq

import (
"container/ring"
"fmt"
)

// SyncEventHandlerFunc processes a single event and ONLY makes db changes.
type SyncEventHandlerFunc func(Context, Event) error

type SyncEventConsumer struct {
eventLog *ring.Ring

// Name of the events in the push queue to watch for.
WatchEvents []string

Expand All @@ -22,6 +25,19 @@ type SyncEventConsumer struct {
EventFetchOption *EventFetcherOption
}

// RecordEvents will record all the events fetched by the consumer in a ring buffer.
func (t *SyncEventConsumer) RecordEvents(size int) {
t.eventLog = ring.New(size)
}

func (t SyncEventConsumer) GetRecords() ([]Event, error) {
if t.eventLog == nil {
return nil, fmt.Errorf("event log is not initialized")
}

return getRecords(t.eventLog), nil
}

func (t SyncEventConsumer) EventConsumer() (*PGConsumer, error) {
return NewPGConsumer(t.Handle, t.ConsumerOption)
}
Expand Down Expand Up @@ -68,6 +84,10 @@ func (t *SyncEventConsumer) consumeEvent(ctx Context) (*Event, error) {

// sync consumers always fetch a single event at a time
event := events[0]
if t.eventLog != nil {
t.eventLog.Value = event
t.eventLog = t.eventLog.Next()
}

for _, syncConsumer := range t.Consumers {
if err := syncConsumer(ctx, event); err != nil {
Expand Down

0 comments on commit a1aec4b

Please sign in to comment.