Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
dhurley committed May 17, 2024
1 parent a1d51f1 commit a52437a
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 30 deletions.
20 changes: 17 additions & 3 deletions internal/watcher/instance_watcher_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ type (
newInstances []*v1.Instance
deletedInstances []*v1.Instance
}

InstanceUpdatesMessage struct {
correlationID slog.Attr
instanceUpdates InstanceUpdates
}
)

func NewInstanceWatcherService(agentConfig *config.Config) *InstanceWatcherService {
Expand All @@ -55,7 +60,7 @@ func NewInstanceWatcherService(agentConfig *config.Config) *InstanceWatcherServi
}
}

func (iw *InstanceWatcherService) Watch(ctx context.Context, ch chan<- InstanceUpdates) {
func (iw *InstanceWatcherService) Watch(ctx context.Context, ch chan<- InstanceUpdatesMessage) {
monitoringFrequency := iw.agentConfig.Watchers.InstanceWatcher.MonitoringFrequency
slog.DebugContext(ctx, "Starting instance watcher monitoring", "monitoring_frequency", monitoringFrequency)

Expand All @@ -68,12 +73,21 @@ func (iw *InstanceWatcherService) Watch(ctx context.Context, ch chan<- InstanceU
close(ch)
return
case <-instanceWatcherTicker.C:
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, logger.GenerateCorrelationID())
correlationID := logger.GenerateCorrelationID()
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, correlationID)

instanceUpdates, err := iw.updates(newCtx)
if err != nil {
slog.ErrorContext(newCtx, "Instance watcher updates", "error", err)
}

if len(instanceUpdates.newInstances) > 0 || len(instanceUpdates.deletedInstances) > 0 {
ch <- InstanceUpdatesMessage{
correlationID: correlationID,
instanceUpdates: instanceUpdates,
}
} else {
ch <- instanceUpdates
slog.DebugContext(newCtx, "Instance watcher found no instance updates")
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions internal/watcher/nginx_process_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ func NewNginxProcessParser() *NginxProcessParser {
}

func (npp *NginxProcessParser) Parse(ctx context.Context, processes []*model.Process) []*v1.Instance {
processList := []*v1.Instance{}
instanceList := []*v1.Instance{}

nginxProcesses := filterNginxProcesses(processes)

for _, nginxProcess := range nginxProcesses {
// Here we are determining if the nginxProcess is a master process.
// NGINX worker processes are ignored.
_, ok := nginxProcesses[nginxProcess.Ppid]
if !ok {
nginxInfo, err := npp.getInfo(ctx, nginxProcess)
Expand All @@ -71,11 +73,11 @@ func (npp *NginxProcessParser) Parse(ctx context.Context, processes []*model.Pro
continue
}

processList = append(processList, convertInfoToProcess(*nginxInfo))
instanceList = append(instanceList, convertInfoToInstance(*nginxInfo))
}
}

return processList
return instanceList
}

func (npp *NginxProcessParser) getInfo(ctx context.Context, nginxProcess *model.Process) (*Info, error) {
Expand Down Expand Up @@ -145,7 +147,7 @@ func sanitizeExeDeletedPath(exe string) string {
return strings.TrimSpace(exe)
}

func convertInfoToProcess(nginxInfo Info) *v1.Instance {
func convertInfoToInstance(nginxInfo Info) *v1.Instance {
var instanceRuntime *v1.InstanceRuntime
nginxType := v1.InstanceMeta_INSTANCE_TYPE_NGINX
version := nginxInfo.Version
Expand Down
29 changes: 16 additions & 13 deletions internal/watcher/watcher_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (

"github.com/nginx/agent/v3/internal/bus"
"github.com/nginx/agent/v3/internal/config"
"github.com/nginx/agent/v3/internal/logger"
)

type (
Watcher struct {
messagePipe bus.MessagePipeInterface
agentConfig *config.Config
instanceWatcherService *InstanceWatcherService
instanceUpdatesChannel chan InstanceUpdates
instanceUpdatesChannel chan InstanceUpdatesMessage
cancel context.CancelFunc
}
)
Expand All @@ -27,7 +28,7 @@ func NewWatcher(agentConfig *config.Config) *Watcher {
return &Watcher{
agentConfig: agentConfig,
instanceWatcherService: NewInstanceWatcherService(agentConfig),
instanceUpdatesChannel: make(chan InstanceUpdates),
instanceUpdatesChannel: make(chan InstanceUpdatesMessage),
}
}

Expand All @@ -41,7 +42,7 @@ func (w *Watcher) Init(ctx context.Context, messagePipe bus.MessagePipeInterface
w.cancel = cancel

go w.instanceWatcherService.Watch(watcherContext, w.instanceUpdatesChannel)
go w.monitorInstanceUpdates(watcherContext)
go w.monitorWatchers(watcherContext)

return nil
}
Expand All @@ -68,24 +69,26 @@ func (*Watcher) Subscriptions() []string {
return []string{}
}

func (w *Watcher) monitorInstanceUpdates(ctx context.Context) {
func (w *Watcher) monitorWatchers(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case instanceUpdates := <-w.instanceUpdatesChannel:
if len(instanceUpdates.newInstances) > 0 {
slog.DebugContext(ctx, "New instances found", "instances", instanceUpdates.newInstances)
case message := <-w.instanceUpdatesChannel:
newCtx := context.WithValue(ctx, logger.CorrelationIDContextKey, message.correlationID)

if len(message.instanceUpdates.newInstances) > 0 {
slog.DebugContext(newCtx, "New instances found", "instances", message.instanceUpdates.newInstances)
w.messagePipe.Process(
ctx,
&bus.Message{Topic: bus.NewInstancesTopic, Data: instanceUpdates.newInstances},
newCtx,
&bus.Message{Topic: bus.NewInstancesTopic, Data: message.instanceUpdates.newInstances},
)
}
if len(instanceUpdates.deletedInstances) > 0 {
slog.DebugContext(ctx, "Instances deleted", "instances", instanceUpdates.deletedInstances)
if len(message.instanceUpdates.deletedInstances) > 0 {
slog.DebugContext(newCtx, "Instances deleted", "instances", message.instanceUpdates.deletedInstances)
w.messagePipe.Process(
ctx,
&bus.Message{Topic: bus.DeletedInstancesTopic, Data: instanceUpdates.deletedInstances},
newCtx,
&bus.Message{Topic: bus.DeletedInstancesTopic, Data: message.instanceUpdates.deletedInstances},
)
}
}
Expand Down
32 changes: 22 additions & 10 deletions internal/watcher/watcher_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/nginx/agent/v3/api/grpc/mpi/v1"
"github.com/nginx/agent/v3/internal/bus"
"github.com/nginx/agent/v3/internal/logger"
"github.com/nginx/agent/v3/test/protos"
"github.com/nginx/agent/v3/test/types"
"github.com/stretchr/testify/assert"
Expand All @@ -36,22 +37,33 @@ func TestWatcher_Init(t *testing.T) {

assert.Empty(t, messages)

instanceUpdates := InstanceUpdates{
newInstances: []*v1.Instance{
protos.GetNginxOssInstance([]string{}),
},
deletedInstances: []*v1.Instance{
protos.GetNginxPlusInstance([]string{}),
instanceUpdatesMessage := InstanceUpdatesMessage{
correlationID: logger.GenerateCorrelationID(),
instanceUpdates: InstanceUpdates{
newInstances: []*v1.Instance{
protos.GetNginxOssInstance([]string{}),
},
deletedInstances: []*v1.Instance{
protos.GetNginxPlusInstance([]string{}),
},
},
}

watcherPlugin.instanceUpdatesChannel <- instanceUpdates
watcherPlugin.instanceUpdatesChannel <- instanceUpdatesMessage

messages = messagePipe.GetMessages()

assert.Eventually(t, func() bool { return len(messages) == 2 }, 1*time.Second, 10*time.Millisecond)
assert.Equal(t, &bus.Message{Topic: bus.NewInstancesTopic, Data: instanceUpdates.newInstances}, messages[0])
assert.Equal(t, &bus.Message{Topic: bus.DeletedInstancesTopic, Data: instanceUpdates.deletedInstances}, messages[1])
assert.Eventually(t, func() bool { return len(messages) == 2 }, 5*time.Second, 10*time.Millisecond)
assert.Equal(
t,
&bus.Message{Topic: bus.NewInstancesTopic, Data: instanceUpdatesMessage.instanceUpdates.newInstances},
messages[0],
)
assert.Equal(
t,
&bus.Message{Topic: bus.DeletedInstancesTopic, Data: instanceUpdatesMessage.instanceUpdates.deletedInstances},
messages[1],
)
}

func TestWatcher_Info(t *testing.T) {
Expand Down

0 comments on commit a52437a

Please sign in to comment.