Skip to content

Commit

Permalink
feat: expose events processed history on /event-log endpoint
Browse files Browse the repository at this point in the history
[skip ci]
  • Loading branch information
adityathebe committed Aug 5, 2024
1 parent 9a536a6 commit 8c070a8
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 4 deletions.
10 changes: 10 additions & 0 deletions echo/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/flanksource/incident-commander/catalog"
"github.com/flanksource/incident-commander/connection"
"github.com/flanksource/incident-commander/db"
"github.com/flanksource/incident-commander/events"
"github.com/flanksource/incident-commander/logs"
"github.com/flanksource/incident-commander/playbook"
"github.com/flanksource/incident-commander/push"
Expand Down Expand Up @@ -93,6 +94,15 @@ func New(ctx context.Context) *echov4.Echo {
AllowOrigins: AllowedCORS,
}))

e.GET("/event-log", func(c echov4.Context) error {
l, err := events.ConsumerLogs()
if err != nil {
return err
}

return c.JSON(http.StatusOK, l)
})

e.GET("/health", func(c echov4.Context) error {
return c.String(http.StatusOK, "OK")
})
Expand Down
38 changes: 37 additions & 1 deletion events/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ const (
// eventQueueUpdateChannel is the channel on which new events on the `event_queue` table
// are notified.
eventQueueUpdateChannel = "event_queue_updates"

// record the last `defaultEventLogSize` events for audit purpose.
defaultEventLogSize = 20
)

type Handler func(ctx context.Context, e postq.Event) error
Expand All @@ -33,6 +36,33 @@ var AsyncHandlers = utils.SyncedMap[string, asyncHandlerData]{}
var consumers []*postq.PGConsumer
var registers []func(ctx context.Context)

type EventRecordProvider interface {
GetRecords() ([]postq.Event, error)
}

var syncConsumers []EventRecordProvider
var asyncConsumers []EventRecordProvider

func getRecords(providers ...EventRecordProvider) (map[string][]postq.Event, error) {
allEvents := map[string][]postq.Event{}
for _, consumer := range providers {
events, err := consumer.GetRecords()
if err != nil {
return nil, err
}

for _, event := range events {
allEvents[event.Name] = append(allEvents[event.Name], event)
}
}

return allEvents, nil
}

func ConsumerLogs() (map[string][]postq.Event, error) {
return getRecords(append(syncConsumers, asyncConsumers...)...)
}

func Register(fn func(ctx context.Context)) {
registers = append(registers, fn)
}
Expand Down Expand Up @@ -80,6 +110,9 @@ func StartConsumers(ctx context.Context) {
ErrorHandler: defaultLoggerErrorHandler,
},
}
consumer.RecordEvents(ctx.Properties().Int("events.audit.size", defaultEventLogSize))
syncConsumers = append(syncConsumers, consumer)

if ec, err := consumer.EventConsumer(); err != nil {
logger.Fatalf("failed to create event consumer: %s", err)
} else {
Expand Down Expand Up @@ -123,6 +156,10 @@ func StartConsumers(ctx context.Context) {
},
},
}

consumer.RecordEvents(ctx.Properties().Int("events.audit.size", defaultEventLogSize))
asyncConsumers = append(asyncConsumers, consumer)

if ec, err := consumer.EventConsumer(); err != nil {
logger.Fatalf("failed to create event consumer: %s", err)
} else {
Expand All @@ -132,7 +169,6 @@ func StartConsumers(ctx context.Context) {
}
}
})

}

// on conflict clause when inserting new events to the `event_queue` table
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,6 @@ require (

// replace github.com/flanksource/duty => ../duty

// replace github.com/flanksource/postq => ../postq
replace github.com/flanksource/postq => ../postq

// replace github.com/flanksource/gomplate/v3 => ../gomplate
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,6 @@ github.com/flanksource/kopper v1.0.9 h1:IZq5HBai5BMnHruB6IdpglZ0GoKWgTK1S+buWBGi
github.com/flanksource/kopper v1.0.9/go.mod h1:6JMCgy+a/mzoAbhN5wo3cbCXb8BdHBFabLgjsIVS6nA=
github.com/flanksource/kubectl-neat v1.0.4 h1:t5/9CqgE84oEtB0KitgJ2+WIeLfD+RhXSxYrqb4X8yI=
github.com/flanksource/kubectl-neat v1.0.4/go.mod h1:Un/Voyh3cmiZNKQrW/TkAl28nAA7vwnwDGVjRErKjOw=
github.com/flanksource/postq v0.1.5 h1:UA5mgIgqkk9Btczh5eWtR57kLXkAzHVQqcHrD2C4PcM=
github.com/flanksource/postq v0.1.5/go.mod h1:AAuaPRhpqxvyF7JPs8X1NMsJVenh80ldpJPDVgWvFf8=
github.com/fluxcd/gitkit v0.6.0 h1:iNg5LTx6ePo+Pl0ZwqHTAkhbUHxGVSY3YCxCdw7VIFg=
github.com/fluxcd/gitkit v0.6.0/go.mod h1:svOHuKi0fO9HoawdK4HfHAJJseZDHHjk7I3ihnCIqNo=
github.com/fluxcd/pkg/gittestserver v0.8.6 h1:YM8prVKB3LC9LBBe+a2p7l1BlfV9erXCgC1em9sbqW4=
Expand Down

0 comments on commit 8c070a8

Please sign in to comment.