Skip to content

Commit

Permalink
feat: implement ring buffer on the handlers instead
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Aug 6, 2024
1 parent e57ded1 commit 29166d9
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 56 deletions.
13 changes: 5 additions & 8 deletions echo/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/flanksource/commons/http/middlewares"
"github.com/flanksource/commons/logger"
cutils "github.com/flanksource/commons/utils"
dutyAPI "github.com/flanksource/duty/api"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/schema/openapi"
"github.com/flanksource/incident-commander/agent"
Expand All @@ -21,8 +20,8 @@ import (
"github.com/flanksource/incident-commander/catalog"
"github.com/flanksource/incident-commander/connection"
"github.com/flanksource/incident-commander/db"
"github.com/flanksource/incident-commander/events"
"github.com/flanksource/incident-commander/logs"
"github.com/flanksource/incident-commander/notification"
"github.com/flanksource/incident-commander/playbook"
"github.com/flanksource/incident-commander/push"
"github.com/flanksource/incident-commander/rbac"
Expand Down Expand Up @@ -96,12 +95,10 @@ func New(ctx context.Context) *echov4.Echo {
}))

e.GET("/event-log", func(c echov4.Context) error {
l, err := events.ConsumerLogs()
if err != nil {
return dutyAPI.WriteError(c, err)
}

return c.JSON(http.StatusOK, l)
return c.JSON(http.StatusOK, map[string]any{
"notifications": notification.EventRing.Get(),
"playbooks": playbook.EventRing.Get(),
})
}, rbac.Authorization(rbac.ObjectMonitor, rbac.ActionRead))

e.GET("/health", func(c echov4.Context) error {
Expand Down
36 changes: 2 additions & 34 deletions events/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ const (
// are notified.
eventQueueUpdateChannel = "event_queue_updates"

// record the last `defaultEventLogSize` events for audit purpose.
defaultEventLogSize = 20
// record the last `DefaultEventLogSize` events for audit purpose.
DefaultEventLogSize = 20
)

type Handler func(ctx context.Context, e postq.Event) error
Expand All @@ -36,33 +36,6 @@ var AsyncHandlers = utils.SyncedMap[string, asyncHandlerData]{}
var consumers []*postq.PGConsumer
var registers []func(ctx context.Context)

type EventRecordProvider interface {
GetRecords() ([]postq.Event, error)
}

var syncConsumers []EventRecordProvider
var asyncConsumers []EventRecordProvider

func getRecords(providers ...EventRecordProvider) (map[string][]postq.Event, error) {
allEvents := map[string][]postq.Event{}
for _, consumer := range providers {
events, err := consumer.GetRecords()
if err != nil {
return nil, err
}

for _, event := range events {
allEvents[event.Name] = append(allEvents[event.Name], event)
}
}

return allEvents, nil
}

func ConsumerLogs() (map[string][]postq.Event, error) {
return getRecords(append(syncConsumers, asyncConsumers...)...)
}

func Register(fn func(ctx context.Context)) {
registers = append(registers, fn)
}
Expand Down Expand Up @@ -110,8 +83,6 @@ func StartConsumers(ctx context.Context) {
ErrorHandler: defaultLoggerErrorHandler,
},
}
consumer.RecordEvents(ctx.Properties().Int("events.audit.size", defaultEventLogSize))
syncConsumers = append(syncConsumers, consumer)

if ec, err := consumer.EventConsumer(); err != nil {
logger.Fatalf("failed to create event consumer: %s", err)
Expand Down Expand Up @@ -157,9 +128,6 @@ func StartConsumers(ctx context.Context) {
},
}

consumer.RecordEvents(ctx.Properties().Int("events.audit.size", defaultEventLogSize))
asyncConsumers = append(asyncConsumers, consumer)

if ec, err := consumer.EventConsumer(); err != nil {
logger.Fatalf("failed to create event consumer: %s", err)
} else {
Expand Down
43 changes: 43 additions & 0 deletions events/ring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package events

import (
"sync"

"github.com/flanksource/postq"
)

type EventRing struct {
size int
events map[string][]map[string]any
mu *sync.RWMutex
}

func NewEventRing(size int) *EventRing {
return &EventRing{
size: size,
events: make(map[string][]map[string]any),
mu: &sync.RWMutex{},
}
}

func (t *EventRing) Add(event postq.Event, env map[string]any) {
t.mu.Lock()
defer t.mu.Unlock()

if _, ok := t.events[event.Name]; !ok {
t.events[event.Name] = make([]map[string]any, 0)
}

t.events[event.Name] = append(t.events[event.Name], map[string]any{
"event": event,
"env": env,
})

if len(t.events[event.Name]) > t.size {
t.events[event.Name] = t.events[event.Name][1:]
}
}

func (t *EventRing) Get() map[string][]map[string]any {
return t.events
}
15 changes: 13 additions & 2 deletions notification/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@ var (

RateLimitWindow = time.Hour * 4
MaxNotificationsPerWindow = 50

EventRing *events.EventRing
)

func init() {
events.Register(RegisterEvents)
}

func RegisterEvents(ctx context.Context) {
events.RegisterSyncHandler(addNotificationEvent, append(api.EventStatusGroup, api.EventIncidentGroup...)...)
EventRing = events.NewEventRing(ctx.Properties().Int("events.audit.size", events.DefaultEventLogSize))
nh := notificationHandler{Ring: EventRing}
events.RegisterSyncHandler(nh.addNotificationEvent, append(api.EventStatusGroup, api.EventIncidentGroup...)...)

events.RegisterAsyncHandler(sendNotifications, 1, 5, api.EventNotificationSend)
}

Expand Down Expand Up @@ -74,10 +79,14 @@ func getOrCreateRateLimiter(ctx context.Context, notificationID string) (*sw.Lim
return rateLimiter, nil
}

type notificationHandler struct {
Ring *events.EventRing
}

// addNotificationEvent responds to a event that can possibly generate a notification.
// If a notification is found for the given event and passes all the filters, then
// a new `notification.send` event is created.
func addNotificationEvent(ctx context.Context, event postq.Event) error {
func (t *notificationHandler) addNotificationEvent(ctx context.Context, event postq.Event) error {
notificationIDs, err := GetNotificationIDsForEvent(ctx, event.Name)
if err != nil {
return err
Expand All @@ -92,6 +101,8 @@ func addNotificationEvent(ctx context.Context, event postq.Event) error {
return err
}

t.Ring.Add(event, celEnv)

for _, id := range notificationIDs {
n, err := GetNotification(ctx, id)
if err != nil {
Expand Down
50 changes: 38 additions & 12 deletions playbook/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ var eventToSpecEvent = map[string]PlaybookSpecEvent{
api.EventComponentUnknown: {"component", "unknown"},
}

var eventPlaybooksCache = cache.New(time.Hour*1, time.Hour*1)
var (
eventPlaybooksCache = cache.New(time.Hour*1, time.Hour*1)

EventRing *events.EventRing
)

func eventPlaybookCacheKey(eventClass, event string) string {
return fmt.Sprintf("%s::%s", eventClass, event)
Expand All @@ -60,7 +64,10 @@ func init() {
}

func RegisterEvents(ctx context.Context) {
events.RegisterSyncHandler(SchedulePlaybookRun, api.EventStatusGroup...)
EventRing = events.NewEventRing(ctx.Properties().Int("events.audit.size", events.DefaultEventLogSize))
nh := playbookScheduler{Ring: EventRing}
events.RegisterSyncHandler(nh.Handle, api.EventStatusGroup...)

events.RegisterSyncHandler(onApprovalUpdated, api.EventPlaybookSpecApprovalUpdated)
events.RegisterSyncHandler(onPlaybookRunNewApproval, api.EventPlaybookApprovalInserted)

Expand All @@ -80,16 +87,32 @@ type EventResource struct {
}

func (t *EventResource) AsMap() map[string]any {
return map[string]any{
"component": t.Component,
"check": t.Check,
"config": t.Config,
"canary": t.Canary,
"check_summary": t.CheckSummary,
output := map[string]any{}

if t.Component != nil {
output["component"] = t.Component.AsMap()
}
if t.Config != nil {
output["config"] = t.Config.AsMap()
}
if t.Check != nil {
output["check"] = t.Check.AsMap()
}
if t.Canary != nil {
output["canary"] = t.Canary.AsMap()
}
if t.CheckSummary != nil {
output["check_summary"] = t.CheckSummary.AsMap()
}

return output
}

func SchedulePlaybookRun(ctx context.Context, event postq.Event) error {
type playbookScheduler struct {
Ring *events.EventRing
}

func (t *playbookScheduler) Handle(ctx context.Context, event postq.Event) error {
specEvent, ok := eventToSpecEvent[event.Name]
if !ok {
return nil
Expand Down Expand Up @@ -139,6 +162,9 @@ func SchedulePlaybookRun(ctx context.Context, event postq.Event) error {
}

for _, p := range playbooks {
celEnv := eventResource.AsMap()
t.Ring.Add(event, celEnv)

playbook, err := v1.PlaybookFromModel(p)
if err != nil {
logger.Errorf("error converting playbook model to spec: %s", err)
Expand All @@ -158,7 +184,7 @@ func SchedulePlaybookRun(ctx context.Context, event postq.Event) error {
switch specEvent.Class {
case "canary":
run.CheckID = &eventResource.Check.ID
if ok, err := matchResource(eventResource.Check.Labels, eventResource.AsMap(), playbook.Spec.On.Canary); err != nil {
if ok, err := matchResource(eventResource.Check.Labels, celEnv, playbook.Spec.On.Canary); err != nil {
logToJobHistory(ctx, p.ID.String(), err.Error())
continue
} else if ok {
Expand All @@ -168,7 +194,7 @@ func SchedulePlaybookRun(ctx context.Context, event postq.Event) error {
}
case "component":
run.ComponentID = &eventResource.Component.ID
if ok, err := matchResource(eventResource.Component.Labels, eventResource.AsMap(), playbook.Spec.On.Component); err != nil {
if ok, err := matchResource(eventResource.Component.Labels, celEnv, playbook.Spec.On.Component); err != nil {
logToJobHistory(ctx, p.ID.String(), err.Error())
continue
} else if ok {
Expand All @@ -178,7 +204,7 @@ func SchedulePlaybookRun(ctx context.Context, event postq.Event) error {
}
case "config":
run.ConfigID = &eventResource.Config.ID
if ok, err := matchResource(eventResource.Config.Tags, eventResource.AsMap(), playbook.Spec.On.Config); err != nil {
if ok, err := matchResource(eventResource.Config.Tags, celEnv, playbook.Spec.On.Config); err != nil {
logToJobHistory(ctx, p.ID.String(), err.Error())
continue
} else if ok {
Expand Down

0 comments on commit 29166d9

Please sign in to comment.