diff --git a/async_consumer.go b/async_consumer.go index 3a1b2eb..39e9829 100644 --- a/async_consumer.go +++ b/async_consumer.go @@ -1,6 +1,7 @@ package postq import ( + "container/ring" "fmt" ) @@ -8,6 +9,8 @@ import ( type AsyncEventHandlerFunc func(Context, Events) Events type AsyncEventConsumer struct { + eventLog *ring.Ring + // Name of the events in the push queue to watch for. WatchEvents []string @@ -24,6 +27,19 @@ type AsyncEventConsumer struct { EventFetcherOption *EventFetcherOption } +// RecordEvents will record all the events fetched by the consumer in a ring buffer. +func (t *AsyncEventConsumer) RecordEvents(size int) { + t.eventLog = ring.New(size) +} + +func (t AsyncEventConsumer) GetRecords() ([]Event, error) { + if t.eventLog == nil { + return nil, fmt.Errorf("event log is not initialized") + } + + return getRecords(t.eventLog), nil +} + func (t *AsyncEventConsumer) Handle(ctx Context) (int, error) { tx, err := ctx.Pool().Begin(ctx) if err != nil { @@ -36,6 +52,13 @@ func (t *AsyncEventConsumer) Handle(ctx Context) (int, error) { return 0, fmt.Errorf("error fetching events: %w", err) } + if t.eventLog != nil { + for _, event := range events { + t.eventLog.Value = event + t.eventLog = t.eventLog.Next() + } + } + failedEvents := t.Consumer(ctx, events) if err := failedEvents.Recreate(ctx, tx.Conn()); err != nil { ctx.Debugf("error saving event attempt updates to event_queue: %v\n", err) diff --git a/ring_buffer.go b/ring_buffer.go new file mode 100644 index 0000000..5745d3e --- /dev/null +++ b/ring_buffer.go @@ -0,0 +1,19 @@ +package postq + +import ( + "container/ring" +) + +func getRecords(ringBuffer *ring.Ring) []Event { + events := make([]Event, 0, ringBuffer.Len()) + ringBuffer.Do(func(v any) { + if v == nil { + return + } + + e := v.(Event) + events = append(events, e) + }) + + return events +} diff --git a/sync_consumer.go b/sync_consumer.go index 86ef025..74abb58 100644 --- a/sync_consumer.go +++ b/sync_consumer.go @@ -1,6 +1,7 @@ package postq import ( + "container/ring" "fmt" ) @@ -8,6 +9,8 @@ import ( type SyncEventHandlerFunc func(Context, Event) error type SyncEventConsumer struct { + eventLog *ring.Ring + // Name of the events in the push queue to watch for. WatchEvents []string @@ -22,6 +25,19 @@ type SyncEventConsumer struct { EventFetchOption *EventFetcherOption } +// RecordEvents will record all the events fetched by the consumer in a ring buffer. +func (t *SyncEventConsumer) RecordEvents(size int) { + t.eventLog = ring.New(size) +} + +func (t SyncEventConsumer) GetRecords() ([]Event, error) { + if t.eventLog == nil { + return nil, fmt.Errorf("event log is not initialized") + } + + return getRecords(t.eventLog), nil +} + func (t SyncEventConsumer) EventConsumer() (*PGConsumer, error) { return NewPGConsumer(t.Handle, t.ConsumerOption) } @@ -68,6 +84,10 @@ func (t *SyncEventConsumer) consumeEvent(ctx Context) (*Event, error) { // sync consumers always fetch a single event at a time event := events[0] + if t.eventLog != nil { + t.eventLog.Value = event + t.eventLog = t.eventLog.Next() + } for _, syncConsumer := range t.Consumers { if err := syncConsumer(ctx, event); err != nil {