From a1aec4b637b608501b0ba68fe7c608903798df94 Mon Sep 17 00:00:00 2001
From: Aditya Thebe <contact@adityathebe.com>
Date: Mon, 5 Aug 2024 17:18:44 +0545
Subject: [PATCH] feat: record events in a ring buffer

---
 async_consumer.go | 23 +++++++++++++++++++++++
 ring_buffer.go    | 19 +++++++++++++++++++
 sync_consumer.go  | 20 ++++++++++++++++++++
 3 files changed, 62 insertions(+)
 create mode 100644 ring_buffer.go

diff --git a/async_consumer.go b/async_consumer.go
index 3a1b2eb..dff13d5 100644
--- a/async_consumer.go
+++ b/async_consumer.go
@@ -1,6 +1,7 @@
 package postq
 
 import (
+	"container/ring"
 	"fmt"
 )
 
@@ -8,6 +9,8 @@ import (
 type AsyncEventHandlerFunc func(Context, Events) Events
 
 type AsyncEventConsumer struct {
+	eventLog *ring.Ring
+
 	// Name of the events in the push queue to watch for.
 	WatchEvents []string
 
@@ -24,6 +27,19 @@ type AsyncEventConsumer struct {
 	EventFetcherOption *EventFetcherOption
 }
 
+// RecordEvents will record all the events fetched by the consumer in a ring buffer.
+func (t *AsyncEventConsumer) RecordEvents(size int) {
+	t.eventLog = ring.New(size)
+}
+
+func (t AsyncEventConsumer) GetRecords() ([]Event, error) {
+	if t.eventLog == nil {
+		return nil, fmt.Errorf("event log is not initialized")
+	}
+
+	return getRecords(t.eventLog), nil
+}
+
 func (t *AsyncEventConsumer) Handle(ctx Context) (int, error) {
 	tx, err := ctx.Pool().Begin(ctx)
 	if err != nil {
@@ -36,6 +52,13 @@ func (t *AsyncEventConsumer) Handle(ctx Context) (int, error) {
 		return 0, fmt.Errorf("error fetching events: %w", err)
 	}
 
+	for _, event := range events {
+		if t.eventLog != nil {
+			t.eventLog.Value = event
+			t.eventLog = t.eventLog.Next()
+		}
+	}
+
 	failedEvents := t.Consumer(ctx, events)
 	if err := failedEvents.Recreate(ctx, tx.Conn()); err != nil {
 		ctx.Debugf("error saving event attempt updates to event_queue: %v\n", err)
diff --git a/ring_buffer.go b/ring_buffer.go
new file mode 100644
index 0000000..5745d3e
--- /dev/null
+++ b/ring_buffer.go
@@ -0,0 +1,19 @@
+package postq
+
+import (
+	"container/ring"
+)
+
+func getRecords(ringBuffer *ring.Ring) []Event {
+	events := make([]Event, 0, ringBuffer.Len())
+	ringBuffer.Do(func(v any) {
+		if v == nil {
+			return
+		}
+
+		e := v.(Event)
+		events = append(events, e)
+	})
+
+	return events
+}
diff --git a/sync_consumer.go b/sync_consumer.go
index 86ef025..74abb58 100644
--- a/sync_consumer.go
+++ b/sync_consumer.go
@@ -1,6 +1,7 @@
 package postq
 
 import (
+	"container/ring"
 	"fmt"
 )
 
@@ -8,6 +9,8 @@ import (
 type SyncEventHandlerFunc func(Context, Event) error
 
 type SyncEventConsumer struct {
+	eventLog *ring.Ring
+
 	// Name of the events in the push queue to watch for.
 	WatchEvents []string
 
@@ -22,6 +25,19 @@ type SyncEventConsumer struct {
 	EventFetchOption *EventFetcherOption
 }
 
+// RecordEvents will record all the events fetched by the consumer in a ring buffer.
+func (t *SyncEventConsumer) RecordEvents(size int) {
+	t.eventLog = ring.New(size)
+}
+
+func (t SyncEventConsumer) GetRecords() ([]Event, error) {
+	if t.eventLog == nil {
+		return nil, fmt.Errorf("event log is not initialized")
+	}
+
+	return getRecords(t.eventLog), nil
+}
+
 func (t SyncEventConsumer) EventConsumer() (*PGConsumer, error) {
 	return NewPGConsumer(t.Handle, t.ConsumerOption)
 }
@@ -68,6 +84,10 @@ func (t *SyncEventConsumer) consumeEvent(ctx Context) (*Event, error) {
 
 	// sync consumers always fetch a single event at a time
 	event := events[0]
+	if t.eventLog != nil {
+		t.eventLog.Value = event
+		t.eventLog = t.eventLog.Next()
+	}
 
 	for _, syncConsumer := range t.Consumers {
 		if err := syncConsumer(ctx, event); err != nil {