Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Management Plane Health Request #910

Merged
merged 14 commits into from
Nov 7, 2024
32 changes: 17 additions & 15 deletions internal/bus/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@
package bus

const (
AddInstancesTopic = "add-instances"
UpdatedInstancesTopic = "updated-instances"
DeletedInstancesTopic = "deleted-instances"
ResourceUpdateTopic = "resource-update"
NginxConfigUpdateTopic = "nginx-config-update"
InstanceHealthTopic = "instance-health"
ConfigUploadRequestTopic = "config-upload-request"
DataPlaneResponseTopic = "data-plane-response"
ConnectionCreatedTopic = "connection-created"
ConfigApplyRequestTopic = "config-apply-request"
WriteConfigSuccessfulTopic = "write-config-successful"
ConfigApplySuccessfulTopic = "config-apply-successful"
ConfigApplyFailedTopic = "config-apply-failed"
RollbackCompleteTopic = "rollback-complete"
RollbackWriteTopic = "rollback-write"
AddInstancesTopic = "add-instances"
UpdatedInstancesTopic = "updated-instances"
DeletedInstancesTopic = "deleted-instances"
ResourceUpdateTopic = "resource-update"
NginxConfigUpdateTopic = "nginx-config-update"
InstanceHealthTopic = "instance-health"
ConfigUploadRequestTopic = "config-upload-request"
DataPlaneResponseTopic = "data-plane-response"
ConnectionCreatedTopic = "connection-created"
ConfigApplyRequestTopic = "config-apply-request"
WriteConfigSuccessfulTopic = "write-config-successful"
ConfigApplySuccessfulTopic = "config-apply-successful"
ConfigApplyFailedTopic = "config-apply-failed"
RollbackCompleteTopic = "rollback-complete"
RollbackWriteTopic = "rollback-write"
DataPlaneHealthRequestTopic = "data-plane-health-request"
DataPlaneHealthResponseTopic = "data-plane-health-response"
)
50 changes: 50 additions & 0 deletions internal/command/command_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"context"
"log/slog"

"google.golang.org/protobuf/types/known/timestamppb"

"github.com/google/uuid"

mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
"github.com/nginx/agent/v3/internal/bus"
"github.com/nginx/agent/v3/internal/config"
Expand Down Expand Up @@ -77,6 +81,8 @@ func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
cp.processResourceUpdate(ctx, msg)
case bus.InstanceHealthTopic:
cp.processInstanceHealth(ctx, msg)
case bus.DataPlaneHealthResponseTopic:
cp.processDataPlaneHealth(ctx, msg)
case bus.DataPlaneResponseTopic:
cp.processDataPlaneResponse(ctx, msg)
default:
Expand Down Expand Up @@ -110,6 +116,26 @@ func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Res
}
}

func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Message) {
if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok {
err := cp.commandService.UpdateDataPlaneHealth(ctx, instances)
correlationID := logger.GetCorrelationID(ctx)
if err != nil {
slog.ErrorContext(ctx, "Unable to update data plane health", "error", err)
cp.messagePipe.Process(ctx, &bus.Message{
Topic: bus.DataPlaneResponseTopic,
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
"Failed to send the health status update", err.Error()),
})
}
cp.messagePipe.Process(ctx, &bus.Message{
Topic: bus.DataPlaneResponseTopic,
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
"Successfully sent the health status update", ""),
})
}
}

func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Message) {
if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok {
err := cp.commandService.UpdateDataPlaneHealth(ctx, instances)
Expand All @@ -132,6 +158,7 @@ func (cp *CommandPlugin) Subscriptions() []string {
return []string{
bus.ResourceUpdateTopic,
bus.InstanceHealthTopic,
bus.DataPlaneHealthResponseTopic,
bus.DataPlaneResponseTopic,
}
}
Expand All @@ -154,6 +181,8 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
cp.handleConfigUploadRequest(newCtx, message)
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
cp.handleConfigApplyRequest(newCtx, message)
case *mpi.ManagementPlaneRequest_HealthRequest:
cp.handleHealthRequest(newCtx)
default:
slog.DebugContext(newCtx, "Management plane request not implemented yet")
}
Expand Down Expand Up @@ -210,3 +239,24 @@ func (cp *CommandPlugin) handleConfigUploadRequest(newCtx context.Context, messa
}
}
}

func (cp *CommandPlugin) handleHealthRequest(newCtx context.Context) {
cp.messagePipe.Process(newCtx, &bus.Message{Topic: bus.DataPlaneHealthRequestTopic})
}

func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
message, err string,
) *mpi.DataPlaneResponse {
return &mpi.DataPlaneResponse{
MessageMeta: &mpi.MessageMeta{
MessageId: uuid.NewString(),
CorrelationId: correlationID,
Timestamp: timestamppb.Now(),
},
CommandResponse: &mpi.CommandResponse{
Status: status,
Message: message,
Error: err,
},
}
}
110 changes: 81 additions & 29 deletions internal/command/command_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestCommandPlugin_Subscriptions(t *testing.T) {
[]string{
bus.ResourceUpdateTopic,
bus.InstanceHealthTopic,
bus.DataPlaneHealthResponseTopic,
bus.DataPlaneResponseTopic,
},
subscriptions,
Expand Down Expand Up @@ -94,41 +95,92 @@ func TestCommandPlugin_Process(t *testing.T) {

commandPlugin.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: protos.OKDataPlaneResponse()})
require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount())

commandPlugin.Process(ctx, &bus.Message{
Topic: bus.DataPlaneHealthResponseTopic,
Data: protos.GetHealthyInstanceHealth(),
})
require.Equal(t, 1, fakeCommandService.UpdateDataPlaneHealthCallCount())
require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount())
}

func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

messagePipe := bus.NewFakeMessagePipe()

commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
err := commandPlugin.Init(ctx, messagePipe)
require.NoError(t, err)
defer commandPlugin.Close(ctx)

go commandPlugin.monitorSubscribeChannel(ctx)

commandPlugin.subscribeChannel <- &mpi.ManagementPlaneRequest{
Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{
ConfigUploadRequest: &mpi.ConfigUploadRequest{},
tests := []struct {
managementPlaneRequest *mpi.ManagementPlaneRequest
expectedTopic *bus.Message
name string
isUploadRequest bool
isApplyRequest bool
}{
{
name: "Test 1: Config Upload Request",
managementPlaneRequest: &mpi.ManagementPlaneRequest{
Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{
ConfigUploadRequest: &mpi.ConfigUploadRequest{},
},
},
expectedTopic: &bus.Message{Topic: bus.ConfigUploadRequestTopic},
isUploadRequest: true,
},
{
name: "Test 2: Config Apply Request",
managementPlaneRequest: &mpi.ManagementPlaneRequest{
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
ConfigApplyRequest: &mpi.ConfigApplyRequest{},
},
},
expectedTopic: &bus.Message{Topic: bus.ConfigApplyRequestTopic},
isApplyRequest: true,
},
{
name: "Test 3: Health Request",
managementPlaneRequest: &mpi.ManagementPlaneRequest{
Request: &mpi.ManagementPlaneRequest_HealthRequest{
HealthRequest: &mpi.HealthRequest{},
},
},
expectedTopic: &bus.Message{Topic: bus.DataPlaneHealthRequestTopic},
},
}

assert.Eventually(
t,
func() bool { return len(messagePipe.GetMessages()) == 1 },
2*time.Second,
10*time.Millisecond,
)

messages := messagePipe.GetMessages()
assert.Len(t, messages, 1)
assert.Equal(t, bus.ConfigUploadRequestTopic, messages[0].Topic)

request, ok := messages[0].Data.(*mpi.ManagementPlaneRequest)
assert.True(t, ok)
require.NotNil(t, request.GetConfigUploadRequest())
for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
messagePipe := bus.NewFakeMessagePipe()

commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
err := commandPlugin.Init(ctx, messagePipe)
require.NoError(t, err)
defer commandPlugin.Close(ctx)

go commandPlugin.monitorSubscribeChannel(ctx)

commandPlugin.subscribeChannel <- test.managementPlaneRequest

assert.Eventually(
t,
func() bool { return len(messagePipe.GetMessages()) == 1 },
2*time.Second,
10*time.Millisecond,
)

messages := messagePipe.GetMessages()
assert.Len(t, messages, 1)
assert.Equal(t, test.expectedTopic.Topic, messages[0].Topic)

_, ok := messages[0].Data.(*mpi.ManagementPlaneRequest)

if test.isUploadRequest {
assert.True(t, ok)
require.NotNil(t, test.managementPlaneRequest.GetConfigUploadRequest())
}
if test.isApplyRequest {
assert.True(t, ok)
require.NotNil(t, test.managementPlaneRequest.GetConfigApplyRequest())
}
})
}
}

func TestMonitorSubscribeChannel(t *testing.T) {
Expand Down
32 changes: 28 additions & 4 deletions internal/watcher/health/health_watcher_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"log/slog"
"sync"
"time"

"google.golang.org/protobuf/proto"
Expand All @@ -28,10 +29,11 @@ type (
}

HealthWatcherService struct {
agentConfig *config.Config
cache map[string]*mpi.InstanceHealth // key is instanceID
watchers map[string]healthWatcherOperator // key is instanceID
instances map[string]*mpi.Instance // key is instanceID
agentConfig *config.Config
cache map[string]*mpi.InstanceHealth // key is instanceID
watchers map[string]healthWatcherOperator // key is instanceID
instances map[string]*mpi.Instance // key is instanceID
healthWatcherMutex sync.Mutex
}

InstanceHealthMessage struct {
Expand Down Expand Up @@ -80,6 +82,19 @@ func (hw *HealthWatcherService) DeleteHealthWatcher(instances []*mpi.Instance) {
}
}

func (hw *HealthWatcherService) GetInstancesHealth() []*mpi.InstanceHealth {
dhurley marked this conversation as resolved.
Show resolved Hide resolved
hw.healthWatcherMutex.Lock()
defer hw.healthWatcherMutex.Unlock()

healthList := make([]*mpi.InstanceHealth, 0, len(hw.cache))

for _, health := range hw.cache {
healthList = append(healthList, health)
}

return healthList
}

func (hw *HealthWatcherService) Watch(ctx context.Context, ch chan<- InstanceHealthMessage) {
monitoringFrequency := hw.agentConfig.Watchers.InstanceHealthWatcher.MonitoringFrequency
slog.DebugContext(ctx, "Starting health watcher monitoring", "monitoring_frequency", monitoringFrequency)
Expand Down Expand Up @@ -140,6 +155,9 @@ func (hw *HealthWatcherService) health(ctx context.Context) (updatedStatuses []*

// update the cache with the most recent instance healths
func (hw *HealthWatcherService) updateCache(currentHealth map[string]*mpi.InstanceHealth) {
hw.healthWatcherMutex.Lock()
defer hw.healthWatcherMutex.Unlock()

for instanceID, healthStatus := range currentHealth {
hw.cache[instanceID] = healthStatus
}
Expand All @@ -156,6 +174,9 @@ func (hw *HealthWatcherService) updateCache(currentHealth map[string]*mpi.Instan
// compare the cache with the current list of instances to check if an instance has been deleted
// if an instance has been deleted add an UNHEALTHY health status to the list of instances for that instance
func (hw *HealthWatcherService) compareCache(healthStatuses []*mpi.InstanceHealth) []*mpi.InstanceHealth {
hw.healthWatcherMutex.Lock()
defer hw.healthWatcherMutex.Unlock()

if len(hw.cache) != len(hw.instances) {
for instanceID := range hw.cache {
if _, ok := hw.instances[instanceID]; !ok {
Expand All @@ -175,6 +196,9 @@ func (hw *HealthWatcherService) compareCache(healthStatuses []*mpi.InstanceHealt

// compare current health with cached health to see if the health of an instance has changed
func (hw *HealthWatcherService) compareHealth(currentHealth map[string]*mpi.InstanceHealth) bool {
hw.healthWatcherMutex.Lock()
defer hw.healthWatcherMutex.Unlock()

if len(currentHealth) != len(hw.cache) {
return true
}
Expand Down
26 changes: 25 additions & 1 deletion internal/watcher/health/health_watcher_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestHealthWatcherService_compareCache(t *testing.T) {
},
},
{
name: "Test 1: No change to instance list",
name: "Test 2: No change to instance list",
expectedHealth: []*mpi.InstanceHealth{
protos.GetHealthyInstanceHealth(),
},
Expand All @@ -235,3 +235,27 @@ func TestHealthWatcherService_compareCache(t *testing.T) {
})
}
}

func TestHealthWatcherService_GetInstancesHealth(t *testing.T) {
ossInstance := protos.GetNginxOssInstance([]string{})
plusInstance := protos.GetNginxPlusInstance([]string{})
ossInstanceHealth := protos.GetHealthyInstanceHealth()
plusInstanceHealth := protos.GetUnhealthyInstanceHealth()

healthCache := map[string]*mpi.InstanceHealth{
ossInstance.GetInstanceMeta().GetInstanceId(): ossInstanceHealth,
plusInstance.GetInstanceMeta().GetInstanceId(): plusInstanceHealth,
}

expectedInstancesHealth := []*mpi.InstanceHealth{
ossInstanceHealth,
plusInstanceHealth,
}
agentConfig := types.AgentConfig()
healthWatcher := NewHealthWatcherService(agentConfig)
healthWatcher.cache = healthCache

result := healthWatcher.GetInstancesHealth()

assert.ElementsMatch(t, expectedInstancesHealth, result)
}
9 changes: 9 additions & 0 deletions internal/watcher/watcher_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) {
w.handleConfigApplySuccess(ctx, msg)
case bus.RollbackCompleteTopic:
w.handleRollbackComplete(ctx, msg)
case bus.DataPlaneHealthRequestTopic:
w.handleHealthRequest(ctx)
default:
slog.DebugContext(ctx, "Watcher plugin unknown topic", "topic", msg.Topic)
}
Expand All @@ -123,6 +125,7 @@ func (*Watcher) Subscriptions() []string {
bus.ConfigApplyRequestTopic,
bus.ConfigApplySuccessfulTopic,
bus.RollbackCompleteTopic,
bus.DataPlaneHealthRequestTopic,
}
}

Expand Down Expand Up @@ -168,6 +171,12 @@ func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message
w.instanceWatcherService.ReparseConfig(ctx, data)
}

func (w *Watcher) handleHealthRequest(ctx context.Context) {
w.messagePipe.Process(ctx, &bus.Message{
Topic: bus.DataPlaneHealthResponseTopic, Data: w.healthWatcherService.GetInstancesHealth(),
})
}

func (w *Watcher) handleRollbackComplete(ctx context.Context, msg *bus.Message) {
instanceID, ok := msg.Data.(string)
if !ok {
Expand Down
Loading
Loading