Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow custom route extractor on pg notify router #1041

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 47 additions & 10 deletions postq/pg/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,49 @@ import (
"github.com/flanksource/duty/context"
)

type routeExtractorFn func(string) (string, string, error)

func defaultRouteExtractor(payload string) (string, string, error) {
// The original payload is expected to be in the form of
// <route> <...optional payload>
fields := strings.Fields(payload)
route := fields[0]
derivedPayload := strings.Join(fields[1:], " ")
return route, derivedPayload, nil
}

// notifyRouter distributes the pgNotify event to multiple channels
// based on the payload.
type notifyRouter struct {
registry map[string]chan string
registry map[string]chan string
routeExtractor routeExtractorFn
}

func NewNotifyRouter() *notifyRouter {
return &notifyRouter{
registry: make(map[string]chan string),
registry: make(map[string]chan string),
routeExtractor: defaultRouteExtractor,
}
}

func (t *notifyRouter) WithRouteExtractor(routeExtractor routeExtractorFn) *notifyRouter {
t.routeExtractor = routeExtractor
return t
}

// RegisterRoutes creates a single channel for the given routes and returns it.
func (t *notifyRouter) RegisterRoutes(routes ...string) <-chan string {
// If any of the routes already has a channel, we use that
// for all the routes.
// Caution: The caller needs to ensure that the route
// groups do not overlap.
pgNotifyChannel := make(chan string)
for _, we := range routes {
if existing, ok := t.registry[we]; ok {
pgNotifyChannel = existing
}
}

for _, we := range routes {
t.registry[we] = pgNotifyChannel
}
Expand All @@ -32,19 +60,28 @@ func (t *notifyRouter) Run(ctx context.Context, channel string) {
eventQueueNotifyChannel := make(chan string)
go Listen(ctx, channel, eventQueueNotifyChannel)

for payload := range eventQueueNotifyChannel {
if _, ok := t.registry[payload]; !ok || payload == "" {
t.start(eventQueueNotifyChannel)
}

func (t *notifyRouter) start(channel chan string) {
for payload := range channel {
if payload == "" {
continue
}

route, extractedPayload, err := t.routeExtractor(payload)
if err != nil {
continue
}

// The original payload is expected to be in the form of
// <route> <...optional payload>
fields := strings.Fields(payload)
route := fields[0]
derivedPayload := strings.Join(fields[1:], " ")
if _, ok := t.registry[route]; !ok {
continue
}

if ch, ok := t.registry[route]; ok {
ch <- derivedPayload
go func() {
ch <- extractedPayload
}()
}
}
}
63 changes: 63 additions & 0 deletions postq/pg/router_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package pg

import (
"sync"
"testing"
"time"
)

func TestPGRouter(t *testing.T) {
// Create & run the router
r := NewNotifyRouter()
pgNotifyChan := make(chan string)
go func() {
r.start(pgNotifyChan)
}()

// Two subscribers
alpha := r.RegisterRoutes("alphaA", "alphaB")
beta := r.RegisterRoutes("beta")

var alphaCount, betaCount int
timeout := time.NewTimer(time.Second * 3)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

for {
select {
case <-alpha:
alphaCount++
if alphaCount+betaCount == 3 {
return
}

case <-beta:
betaCount++
if alphaCount+betaCount == 3 {
return
}

case <-timeout.C:
return
}
}
}()

// Simulate receiving pg notify
go func() {
pgNotifyChan <- "alphaA 1"
pgNotifyChan <- "beta 1"
pgNotifyChan <- "alphaB 1"
}()

wg.Wait()
if alphaCount != 2 {
t.Errorf("Expected alphaCount to be 2, got %d", alphaCount)
}

if betaCount != 1 {
t.Errorf("Expected betaCount to be 1, got %d", betaCount)
}
}
Loading