From 50dff9330fe6d56c5bea41f457c1b4c7a1532dec Mon Sep 17 00:00:00 2001 From: RRashmit Date: Wed, 23 Oct 2024 14:00:11 +0100 Subject: [PATCH 01/14] feat: added health responses --- Makefile | 6 +-- internal/bus/topics.go | 32 ++++++------ internal/command/command_plugin.go | 51 +++++++++++++++++++ internal/config/defaults.go | 1 + .../watcher/health/health_watcher_service.go | 10 ++++ internal/watcher/watcher_plugin.go | 9 ++++ nginx-agent.conf | 5 ++ pkg/config/features.go | 1 + 8 files changed, 97 insertions(+), 18 deletions(-) diff --git a/Makefile b/Makefile index 8307a677af..9b5eaa2ab4 100644 --- a/Makefile +++ b/Makefile @@ -62,9 +62,9 @@ DEB_PACKAGE := ./build/$(PACKAGE_NAME).deb RPM_PACKAGE := ./build/$(PACKAGE_NAME).rpm MOCK_MANAGEMENT_PLANE_CONFIG_DIRECTORY ?= -MOCK_MANAGEMENT_PLANE_LOG_LEVEL ?= INFO -MOCK_MANAGEMENT_PLANE_GRPC_ADDRESS ?= 127.0.0.1:0 -MOCK_MANAGEMENT_PLANE_API_ADDRESS ?= 127.0.0.1:0 +MOCK_MANAGEMENT_PLANE_LOG_LEVEL ?= DEBUG +MOCK_MANAGEMENT_PLANE_GRPC_ADDRESS ?= 127.0.0.1:64863 +MOCK_MANAGEMENT_PLANE_API_ADDRESS ?= 127.0.0.1:64862 OLD_BENCHMARK_RESULTS_FILE ?= $(TEST_BUILD_DIR)/benchmark.txt uname_m := $(shell uname -m) diff --git a/internal/bus/topics.go b/internal/bus/topics.go index 8099189b51..469fe0b8b8 100644 --- a/internal/bus/topics.go +++ b/internal/bus/topics.go @@ -6,19 +6,21 @@ package bus const ( - AddInstancesTopic = "add-instances" - UpdatedInstancesTopic = "updated-instances" - DeletedInstancesTopic = "deleted-instances" - ResourceUpdateTopic = "resource-update" - NginxConfigUpdateTopic = "nginx-config-update" - InstanceHealthTopic = "instance-health" - ConfigUploadRequestTopic = "config-upload-request" - DataPlaneResponseTopic = "data-plane-response" - ConnectionCreatedTopic = "connection-created" - ConfigApplyRequestTopic = "config-apply-request" - WriteConfigSuccessfulTopic = "write-config-successful" - ConfigApplySuccessfulTopic = "config-apply-successful" - ConfigApplyFailedTopic = "config-apply-failed" - RollbackCompleteTopic = "rollback-complete" - RollbackWriteTopic = "rollback-write" + AddInstancesTopic = "add-instances" + UpdatedInstancesTopic = "updated-instances" + DeletedInstancesTopic = "deleted-instances" + ResourceUpdateTopic = "resource-update" + NginxConfigUpdateTopic = "nginx-config-update" + InstanceHealthTopic = "instance-health" + ConfigUploadRequestTopic = "config-upload-request" + DataPlaneResponseTopic = "data-plane-response" + ConnectionCreatedTopic = "connection-created" + ConfigApplyRequestTopic = "config-apply-request" + WriteConfigSuccessfulTopic = "write-config-successful" + ConfigApplySuccessfulTopic = "config-apply-successful" + ConfigApplyFailedTopic = "config-apply-failed" + RollbackCompleteTopic = "rollback-complete" + RollbackWriteTopic = "rollback-write" + DataplaneHealthTopic = "dataplane-health" + DataplaneHealthProcessTopic = "dataplane-health-process" ) diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go index e81fdfebf8..3d3b6ef6af 100644 --- a/internal/command/command_plugin.go +++ b/internal/command/command_plugin.go @@ -9,6 +9,10 @@ import ( "context" "log/slog" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/google/uuid" + mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" "github.com/nginx/agent/v3/internal/bus" "github.com/nginx/agent/v3/internal/config" @@ -77,6 +81,8 @@ func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) { cp.processResourceUpdate(ctx, msg) case bus.InstanceHealthTopic: cp.processInstanceHealth(ctx, msg) + case bus.DataplaneHealthProcessTopic: + cp.processDataplaneHealth(ctx, msg) case bus.DataPlaneResponseTopic: cp.processDataPlaneResponse(ctx, msg) default: @@ -110,6 +116,26 @@ func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Res } } +func (cp *CommandPlugin) processDataplaneHealth(ctx context.Context, msg *bus.Message) { + if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok { + err := cp.commandService.UpdateDataPlaneHealth(ctx, instances) + correlationID := logger.GetCorrelationID(ctx) + if err != nil { + slog.ErrorContext(ctx, "Unable to update data plane health", "error", err) + cp.messagePipe.Process(ctx, &bus.Message{ + Topic: bus.DataPlaneResponseTopic, + Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, + "Failed to send the health status update", instances[0].GetInstanceId(), err.Error()), + }) + } + cp.messagePipe.Process(ctx, &bus.Message{ + Topic: bus.DataPlaneResponseTopic, + Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, + "Successfully sent the health status update", instances[0].GetInstanceId(), ""), + }) + } +} + func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Message) { if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok { err := cp.commandService.UpdateDataPlaneHealth(ctx, instances) @@ -132,6 +158,7 @@ func (cp *CommandPlugin) Subscriptions() []string { return []string{ bus.ResourceUpdateTopic, bus.InstanceHealthTopic, + bus.DataplaneHealthProcessTopic, bus.DataPlaneResponseTopic, } } @@ -154,6 +181,8 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) { cp.handleConfigUploadRequest(newCtx, message) case *mpi.ManagementPlaneRequest_ConfigApplyRequest: cp.handleConfigApplyRequest(newCtx, message) + case *mpi.ManagementPlaneRequest_HealthRequest: + cp.handleHealthRequest(newCtx) default: slog.DebugContext(newCtx, "Management plane request not implemented yet") } @@ -210,3 +239,25 @@ func (cp *CommandPlugin) handleConfigUploadRequest(newCtx context.Context, messa } } } + +func (cp *CommandPlugin) handleHealthRequest(newCtx context.Context) { + cp.messagePipe.Process(newCtx, &bus.Message{Topic: bus.DataplaneHealthTopic}) +} + +func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus, + message, instanceID, err string, +) *mpi.DataPlaneResponse { + return &mpi.DataPlaneResponse{ + MessageMeta: &mpi.MessageMeta{ + MessageId: uuid.NewString(), + CorrelationId: correlationID, + Timestamp: timestamppb.Now(), + }, + CommandResponse: &mpi.CommandResponse{ + Status: status, + Message: message, + Error: err, + }, + InstanceId: instanceID, + } +} diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 526ea914c9..49166c6f49 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -79,6 +79,7 @@ func DefaultFeatures() []string { pkg.FeatureConnection, pkg.FeatureMetrics, pkg.FeatureFileWatcher, + pkg.FeatureDataplaneHealth, } } diff --git a/internal/watcher/health/health_watcher_service.go b/internal/watcher/health/health_watcher_service.go index d66a30f482..ace8ea394d 100644 --- a/internal/watcher/health/health_watcher_service.go +++ b/internal/watcher/health/health_watcher_service.go @@ -40,6 +40,16 @@ type ( } ) +func (hw *HealthWatcherService) GetCache() []*mpi.InstanceHealth { + healthList := make([]*mpi.InstanceHealth, 0, len(hw.cache)) + + for _, health := range hw.cache { + healthList = append(healthList, health) + } + + return healthList +} + func NewHealthWatcherService(agentConfig *config.Config) *HealthWatcherService { return &HealthWatcherService{ watchers: make(map[string]healthWatcherOperator), diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index 255df0b115..322cfe30c5 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -113,6 +113,8 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) { w.handleConfigApplySuccess(ctx, msg) case bus.RollbackCompleteTopic: w.handleRollbackComplete(ctx, msg) + case bus.DataplaneHealthTopic: + w.handleHealthRequest(ctx) default: slog.DebugContext(ctx, "Watcher plugin unknown topic", "topic", msg.Topic) } @@ -123,6 +125,7 @@ func (*Watcher) Subscriptions() []string { bus.ConfigApplyRequestTopic, bus.ConfigApplySuccessfulTopic, bus.RollbackCompleteTopic, + bus.DataplaneHealthTopic, } } @@ -168,6 +171,12 @@ func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message w.instanceWatcherService.ReparseConfig(ctx, data) } +func (w *Watcher) handleHealthRequest(ctx context.Context) { + w.messagePipe.Process(ctx, &bus.Message{ + Topic: bus.DataplaneHealthProcessTopic, Data: w.healthWatcherService.GetCache(), + }) +} + func (w *Watcher) handleRollbackComplete(ctx context.Context, msg *bus.Message) { instanceID, ok := msg.Data.(string) if !ok { diff --git a/nginx-agent.conf b/nginx-agent.conf index 24dd050694..b9e6805745 100644 --- a/nginx-agent.conf +++ b/nginx-agent.conf @@ -17,6 +17,11 @@ allowed_directories: - /var/run/nginx - /var/log/nginx +command: + server: + host: localhost + port: 64863 + type: grpc ## command server settings # command: # server: diff --git a/pkg/config/features.go b/pkg/config/features.go index 4bc857ca24..522428683d 100644 --- a/pkg/config/features.go +++ b/pkg/config/features.go @@ -15,4 +15,5 @@ const ( FeatureMetricsInstance = "metrics-instance" FeatureFileWatcher = "file-watcher" FeatureAgentAPI = "agent-api" + FeatureDataplaneHealth = "dataplane-health" ) From ad00ab9b2dcfe128c7a0608a78015d81867a9d75 Mon Sep 17 00:00:00 2001 From: RRashmit Date: Wed, 23 Oct 2024 14:07:23 +0100 Subject: [PATCH 02/14] chore: fix unit tests --- Makefile | 6 +++--- internal/command/command_plugin_test.go | 1 + internal/watcher/watcher_plugin_test.go | 1 + 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 9b5eaa2ab4..8307a677af 100644 --- a/Makefile +++ b/Makefile @@ -62,9 +62,9 @@ DEB_PACKAGE := ./build/$(PACKAGE_NAME).deb RPM_PACKAGE := ./build/$(PACKAGE_NAME).rpm MOCK_MANAGEMENT_PLANE_CONFIG_DIRECTORY ?= -MOCK_MANAGEMENT_PLANE_LOG_LEVEL ?= DEBUG -MOCK_MANAGEMENT_PLANE_GRPC_ADDRESS ?= 127.0.0.1:64863 -MOCK_MANAGEMENT_PLANE_API_ADDRESS ?= 127.0.0.1:64862 +MOCK_MANAGEMENT_PLANE_LOG_LEVEL ?= INFO +MOCK_MANAGEMENT_PLANE_GRPC_ADDRESS ?= 127.0.0.1:0 +MOCK_MANAGEMENT_PLANE_API_ADDRESS ?= 127.0.0.1:0 OLD_BENCHMARK_RESULTS_FILE ?= $(TEST_BUILD_DIR)/benchmark.txt uname_m := $(shell uname -m) diff --git a/internal/command/command_plugin_test.go b/internal/command/command_plugin_test.go index 36513d386f..f408a71fc9 100644 --- a/internal/command/command_plugin_test.go +++ b/internal/command/command_plugin_test.go @@ -39,6 +39,7 @@ func TestCommandPlugin_Subscriptions(t *testing.T) { []string{ bus.ResourceUpdateTopic, bus.InstanceHealthTopic, + bus.DataplaneHealthProcessTopic, bus.DataPlaneResponseTopic, }, subscriptions, diff --git a/internal/watcher/watcher_plugin_test.go b/internal/watcher/watcher_plugin_test.go index 28dcc59616..06a19bed5e 100644 --- a/internal/watcher/watcher_plugin_test.go +++ b/internal/watcher/watcher_plugin_test.go @@ -170,6 +170,7 @@ func TestWatcher_Subscriptions(t *testing.T) { bus.ConfigApplyRequestTopic, bus.ConfigApplySuccessfulTopic, bus.RollbackCompleteTopic, + bus.DataplaneHealthTopic, }, watcherPlugin.Subscriptions(), ) From 890567a8369144658c6333da6edeac61545e0a4d Mon Sep 17 00:00:00 2001 From: RRashmit Date: Wed, 23 Oct 2024 14:58:14 +0100 Subject: [PATCH 03/14] chore: unit tests --- internal/command/command_plugin.go | 7 ++-- internal/command/command_plugin_test.go | 34 +++++++++++++++++++ .../watcher/health/health_watcher_service.go | 20 +++++------ .../health/health_watcher_service_test.go | 29 +++++++++++++++- internal/watcher/watcher_plugin.go | 2 +- 5 files changed, 76 insertions(+), 16 deletions(-) diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go index 3d3b6ef6af..5fcbd4b592 100644 --- a/internal/command/command_plugin.go +++ b/internal/command/command_plugin.go @@ -125,13 +125,13 @@ func (cp *CommandPlugin) processDataplaneHealth(ctx context.Context, msg *bus.Me cp.messagePipe.Process(ctx, &bus.Message{ Topic: bus.DataPlaneResponseTopic, Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Failed to send the health status update", instances[0].GetInstanceId(), err.Error()), + "Failed to send the health status update", err.Error()), }) } cp.messagePipe.Process(ctx, &bus.Message{ Topic: bus.DataPlaneResponseTopic, Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, - "Successfully sent the health status update", instances[0].GetInstanceId(), ""), + "Successfully sent the health status update", ""), }) } } @@ -245,7 +245,7 @@ func (cp *CommandPlugin) handleHealthRequest(newCtx context.Context) { } func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus, - message, instanceID, err string, + message, err string, ) *mpi.DataPlaneResponse { return &mpi.DataPlaneResponse{ MessageMeta: &mpi.MessageMeta{ @@ -258,6 +258,5 @@ func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mp Message: message, Error: err, }, - InstanceId: instanceID, } } diff --git a/internal/command/command_plugin_test.go b/internal/command/command_plugin_test.go index f408a71fc9..acaa15da3a 100644 --- a/internal/command/command_plugin_test.go +++ b/internal/command/command_plugin_test.go @@ -95,6 +95,9 @@ func TestCommandPlugin_Process(t *testing.T) { commandPlugin.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: protos.OKDataPlaneResponse()}) require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount()) + + commandPlugin.Process(ctx, &bus.Message{Topic: bus.DataplaneHealthProcessTopic, Data: protos.OKDataPlaneResponse()}) + require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount()) } func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) { @@ -132,6 +135,37 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) { require.NotNil(t, request.GetConfigUploadRequest()) } +func TestCommandPlugin_monitorHealthRequest(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + messagePipe := bus.NewFakeMessagePipe() + + commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}) + err := commandPlugin.Init(ctx, messagePipe) + require.NoError(t, err) + defer commandPlugin.Close(ctx) + + go commandPlugin.monitorSubscribeChannel(ctx) + + commandPlugin.subscribeChannel <- &mpi.ManagementPlaneRequest{ + Request: &mpi.ManagementPlaneRequest_HealthRequest{ + HealthRequest: &mpi.HealthRequest{}, + }, + } + + assert.Eventually( + t, + func() bool { return len(messagePipe.GetMessages()) == 1 }, + 2*time.Second, + 10*time.Millisecond, + ) + + messages := messagePipe.GetMessages() + assert.Len(t, messages, 1) + assert.Equal(t, bus.DataplaneHealthTopic, messages[0].Topic) +} + func TestMonitorSubscribeChannel(t *testing.T) { ctx, cncl := context.WithCancel(context.Background()) defer cncl() diff --git a/internal/watcher/health/health_watcher_service.go b/internal/watcher/health/health_watcher_service.go index ace8ea394d..2e2cb2455c 100644 --- a/internal/watcher/health/health_watcher_service.go +++ b/internal/watcher/health/health_watcher_service.go @@ -40,16 +40,6 @@ type ( } ) -func (hw *HealthWatcherService) GetCache() []*mpi.InstanceHealth { - healthList := make([]*mpi.InstanceHealth, 0, len(hw.cache)) - - for _, health := range hw.cache { - healthList = append(healthList, health) - } - - return healthList -} - func NewHealthWatcherService(agentConfig *config.Config) *HealthWatcherService { return &HealthWatcherService{ watchers: make(map[string]healthWatcherOperator), @@ -90,6 +80,16 @@ func (hw *HealthWatcherService) DeleteHealthWatcher(instances []*mpi.Instance) { } } +func (hw *HealthWatcherService) GetInstancesHealth() []*mpi.InstanceHealth { + healthList := make([]*mpi.InstanceHealth, 0, len(hw.cache)) + + for _, health := range hw.cache { + healthList = append(healthList, health) + } + + return healthList +} + func (hw *HealthWatcherService) Watch(ctx context.Context, ch chan<- InstanceHealthMessage) { monitoringFrequency := hw.agentConfig.Watchers.InstanceHealthWatcher.MonitoringFrequency slog.DebugContext(ctx, "Starting health watcher monitoring", "monitoring_frequency", monitoringFrequency) diff --git a/internal/watcher/health/health_watcher_service_test.go b/internal/watcher/health/health_watcher_service_test.go index fef3c44494..baf898e402 100644 --- a/internal/watcher/health/health_watcher_service_test.go +++ b/internal/watcher/health/health_watcher_service_test.go @@ -208,7 +208,7 @@ func TestHealthWatcherService_compareCache(t *testing.T) { }, }, { - name: "Test 1: No change to instance list", + name: "Test 2: No change to instance list", expectedHealth: []*mpi.InstanceHealth{ protos.GetHealthyInstanceHealth(), }, @@ -235,3 +235,30 @@ func TestHealthWatcherService_compareCache(t *testing.T) { }) } } + +func TestHealthWatcherService_getCache(t *testing.T) { + ossInstance := protos.GetNginxOssInstance([]string{}) + plusInstance := protos.GetNginxPlusInstance([]string{}) + healthCache := map[string]*mpi.InstanceHealth{ + ossInstance.GetInstanceMeta().GetInstanceId(): protos.GetHealthyInstanceHealth(), + plusInstance.GetInstanceMeta().GetInstanceId(): { + InstanceId: plusInstance.GetInstanceMeta().GetInstanceId(), + InstanceHealthStatus: mpi.InstanceHealth_INSTANCE_HEALTH_STATUS_HEALTHY, + }, + } + + expectedInstancesHealth := []*mpi.InstanceHealth{ + protos.GetHealthyInstanceHealth(), + { + InstanceId: plusInstance.GetInstanceMeta().GetInstanceId(), + InstanceHealthStatus: mpi.InstanceHealth_INSTANCE_HEALTH_STATUS_HEALTHY, + }, + } + agentConfig := types.AgentConfig() + healthWatcher := NewHealthWatcherService(agentConfig) + healthWatcher.cache = healthCache + + result := healthWatcher.GetInstancesHealth() + + assert.Equal(t, expectedInstancesHealth, result) +} diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index 322cfe30c5..8f4bd2df3e 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -173,7 +173,7 @@ func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message func (w *Watcher) handleHealthRequest(ctx context.Context) { w.messagePipe.Process(ctx, &bus.Message{ - Topic: bus.DataplaneHealthProcessTopic, Data: w.healthWatcherService.GetCache(), + Topic: bus.DataplaneHealthProcessTopic, Data: w.healthWatcherService.GetInstancesHealth(), }) } From 67632703746120b9d423605d9c3c29c3ec8ef5d9 Mon Sep 17 00:00:00 2001 From: RRashmit Date: Thu, 24 Oct 2024 10:09:20 +0100 Subject: [PATCH 04/14] chore: integration test for health req --- .../grpc_management_plane_api_test.go | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/test/integration/grpc_management_plane_api_test.go b/test/integration/grpc_management_plane_api_test.go index 9ee12e604f..a852809aa1 100644 --- a/test/integration/grpc_management_plane_api_test.go +++ b/test/integration/grpc_management_plane_api_test.go @@ -354,6 +354,37 @@ func TestGrpc_FileWatcher(t *testing.T) { verifyUpdateDataPlaneStatus(t) } +func TestGrpc_DataplaneHealthRequest(t *testing.T) { + teardownTest := setupConnectionTest(t, true) + defer teardownTest(t) + + verifyConnection(t) + assert.False(t, t.Failed()) + + request := `{ + "message_meta": { + "message_id": "5d0fa83e-351c-4009-90cd-1f2acce2d184", + "correlation_id": "79794c1c-8e91-47c1-a92c-b9a0c3f1a263", + "timestamp": "2023-01-15T01:30:15.01Z" + }, + "health_request": {} + }` + + client := resty.New() + client.SetRetryCount(retryCount).SetRetryWaitTime(retryWaitTime).SetRetryMaxWaitTime(retryMaxWaitTime) + + url := fmt.Sprintf("http://%s/api/v1/requests", mockManagementPlaneAPIAddress) + resp, err := client.R().EnableTrace().SetBody(request).Post(url) + + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode()) + + responses := getManagementPlaneResponses(t, 2) + + assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[1].GetCommandResponse().GetStatus()) + assert.Equal(t, "Successfully sent the health status update", responses[1].GetCommandResponse().GetMessage()) +} + func performConfigApply(t *testing.T, nginxInstanceID string) { t.Helper() From d47bb8320a295c57ce678aca1cc8514fa9e1c640 Mon Sep 17 00:00:00 2001 From: RRashmit Date: Thu, 24 Oct 2024 11:00:00 +0100 Subject: [PATCH 05/14] chore: unit test --- .../health/health_watcher_service_test.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/internal/watcher/health/health_watcher_service_test.go b/internal/watcher/health/health_watcher_service_test.go index baf898e402..06b7272bc0 100644 --- a/internal/watcher/health/health_watcher_service_test.go +++ b/internal/watcher/health/health_watcher_service_test.go @@ -239,20 +239,17 @@ func TestHealthWatcherService_compareCache(t *testing.T) { func TestHealthWatcherService_getCache(t *testing.T) { ossInstance := protos.GetNginxOssInstance([]string{}) plusInstance := protos.GetNginxPlusInstance([]string{}) + ossInstanceHealth := protos.GetHealthyInstanceHealth() + plusInstanceHealth := protos.GetUnhealthyInstanceHealth() + healthCache := map[string]*mpi.InstanceHealth{ - ossInstance.GetInstanceMeta().GetInstanceId(): protos.GetHealthyInstanceHealth(), - plusInstance.GetInstanceMeta().GetInstanceId(): { - InstanceId: plusInstance.GetInstanceMeta().GetInstanceId(), - InstanceHealthStatus: mpi.InstanceHealth_INSTANCE_HEALTH_STATUS_HEALTHY, - }, + ossInstance.GetInstanceMeta().GetInstanceId(): ossInstanceHealth, + plusInstance.GetInstanceMeta().GetInstanceId(): plusInstanceHealth, } expectedInstancesHealth := []*mpi.InstanceHealth{ - protos.GetHealthyInstanceHealth(), - { - InstanceId: plusInstance.GetInstanceMeta().GetInstanceId(), - InstanceHealthStatus: mpi.InstanceHealth_INSTANCE_HEALTH_STATUS_HEALTHY, - }, + ossInstanceHealth, + plusInstanceHealth, } agentConfig := types.AgentConfig() healthWatcher := NewHealthWatcherService(agentConfig) From 2a6181554ae07c138da37494e8d7de0fb5bd6cc5 Mon Sep 17 00:00:00 2001 From: RRashmit Date: Thu, 24 Oct 2024 11:36:09 +0100 Subject: [PATCH 06/14] chore: more assertions --- internal/command/command_plugin_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/command/command_plugin_test.go b/internal/command/command_plugin_test.go index acaa15da3a..9fbf2233cc 100644 --- a/internal/command/command_plugin_test.go +++ b/internal/command/command_plugin_test.go @@ -96,7 +96,8 @@ func TestCommandPlugin_Process(t *testing.T) { commandPlugin.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: protos.OKDataPlaneResponse()}) require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount()) - commandPlugin.Process(ctx, &bus.Message{Topic: bus.DataplaneHealthProcessTopic, Data: protos.OKDataPlaneResponse()}) + commandPlugin.Process(ctx, &bus.Message{Topic: bus.DataplaneHealthProcessTopic, Data: protos.GetHealthyInstanceHealth()}) + require.Equal(t, 1, fakeCommandService.UpdateDataPlaneHealthCallCount()) require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount()) } From b35cc811f7cb679c712dd68c12e0f724bde4c9a5 Mon Sep 17 00:00:00 2001 From: RRashmit Date: Thu, 24 Oct 2024 12:24:45 +0100 Subject: [PATCH 07/14] chore: lint --- internal/command/command_plugin_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/command/command_plugin_test.go b/internal/command/command_plugin_test.go index 9fbf2233cc..f32ace4841 100644 --- a/internal/command/command_plugin_test.go +++ b/internal/command/command_plugin_test.go @@ -96,7 +96,10 @@ func TestCommandPlugin_Process(t *testing.T) { commandPlugin.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: protos.OKDataPlaneResponse()}) require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount()) - commandPlugin.Process(ctx, &bus.Message{Topic: bus.DataplaneHealthProcessTopic, Data: protos.GetHealthyInstanceHealth()}) + commandPlugin.Process(ctx, &bus.Message{ + Topic: bus.DataplaneHealthProcessTopic, + Data: protos.GetHealthyInstanceHealth(), + }) require.Equal(t, 1, fakeCommandService.UpdateDataPlaneHealthCallCount()) require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount()) } From 10f4c2152f6d21a8df86229fddb664a85d45a208 Mon Sep 17 00:00:00 2001 From: RRashmit Date: Thu, 24 Oct 2024 14:32:04 +0100 Subject: [PATCH 08/14] chore: remove extra line --- internal/config/defaults.go | 1 - internal/watcher/health/health_watcher_service_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 49166c6f49..526ea914c9 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -79,7 +79,6 @@ func DefaultFeatures() []string { pkg.FeatureConnection, pkg.FeatureMetrics, pkg.FeatureFileWatcher, - pkg.FeatureDataplaneHealth, } } diff --git a/internal/watcher/health/health_watcher_service_test.go b/internal/watcher/health/health_watcher_service_test.go index 06b7272bc0..b5b28dcf7c 100644 --- a/internal/watcher/health/health_watcher_service_test.go +++ b/internal/watcher/health/health_watcher_service_test.go @@ -257,5 +257,5 @@ func TestHealthWatcherService_getCache(t *testing.T) { result := healthWatcher.GetInstancesHealth() - assert.Equal(t, expectedInstancesHealth, result) + assert.ElementsMatch(t, expectedInstancesHealth, result) } From 48e1fcdba978f4751f1c1f2ff95aef85e35ebd72 Mon Sep 17 00:00:00 2001 From: RRashmit Date: Thu, 24 Oct 2024 15:31:51 +0100 Subject: [PATCH 09/14] chore: race condition --- nginx-agent.conf | 5 ----- test/integration/grpc_management_plane_api_test.go | 9 ++++++++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/nginx-agent.conf b/nginx-agent.conf index b9e6805745..24dd050694 100644 --- a/nginx-agent.conf +++ b/nginx-agent.conf @@ -17,11 +17,6 @@ allowed_directories: - /var/run/nginx - /var/log/nginx -command: - server: - host: localhost - port: 64863 - type: grpc ## command server settings # command: # server: diff --git a/test/integration/grpc_management_plane_api_test.go b/test/integration/grpc_management_plane_api_test.go index a852809aa1..89da5d5d15 100644 --- a/test/integration/grpc_management_plane_api_test.go +++ b/test/integration/grpc_management_plane_api_test.go @@ -381,8 +381,15 @@ func TestGrpc_DataplaneHealthRequest(t *testing.T) { responses := getManagementPlaneResponses(t, 2) + var allMessages []string + for _, response := range responses { + message := response.GetCommandResponse().GetMessage() + allMessages = append(allMessages, message) + } + + assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus()) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[1].GetCommandResponse().GetStatus()) - assert.Equal(t, "Successfully sent the health status update", responses[1].GetCommandResponse().GetMessage()) + assert.Contains(t, allMessages, "Successfully sent the health status update") } func performConfigApply(t *testing.T, nginxInstanceID string) { From afb3ca056972cc771ff767716829d5659412aec9 Mon Sep 17 00:00:00 2001 From: RRashmit Date: Thu, 24 Oct 2024 15:54:03 +0100 Subject: [PATCH 10/14] chore: lint --- test/integration/grpc_management_plane_api_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/grpc_management_plane_api_test.go b/test/integration/grpc_management_plane_api_test.go index 89da5d5d15..2b422d9fa8 100644 --- a/test/integration/grpc_management_plane_api_test.go +++ b/test/integration/grpc_management_plane_api_test.go @@ -381,7 +381,7 @@ func TestGrpc_DataplaneHealthRequest(t *testing.T) { responses := getManagementPlaneResponses(t, 2) - var allMessages []string + allMessages := make([]string, 0, 2) for _, response := range responses { message := response.GetCommandResponse().GetMessage() allMessages = append(allMessages, message) From 108e062062b0679bc76562cb2681d682261eeb55 Mon Sep 17 00:00:00 2001 From: RRashmit Date: Fri, 25 Oct 2024 11:05:08 +0100 Subject: [PATCH 11/14] chore: added config apply test --- internal/command/command_plugin_test.go | 131 +++++++++++++----------- 1 file changed, 72 insertions(+), 59 deletions(-) diff --git a/internal/command/command_plugin_test.go b/internal/command/command_plugin_test.go index f32ace4841..4c4f401457 100644 --- a/internal/command/command_plugin_test.go +++ b/internal/command/command_plugin_test.go @@ -105,69 +105,82 @@ func TestCommandPlugin_Process(t *testing.T) { } func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - messagePipe := bus.NewFakeMessagePipe() - - commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}) - err := commandPlugin.Init(ctx, messagePipe) - require.NoError(t, err) - defer commandPlugin.Close(ctx) - - go commandPlugin.monitorSubscribeChannel(ctx) - - commandPlugin.subscribeChannel <- &mpi.ManagementPlaneRequest{ - Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{ - ConfigUploadRequest: &mpi.ConfigUploadRequest{}, + tests := []struct { + mpiRequest *mpi.ManagementPlaneRequest + expectedTopic *bus.Message + name string + isUploadRequest bool + isApplyRequest bool + }{ + { + name: "Test 1: Config Upload Request", + mpiRequest: &mpi.ManagementPlaneRequest{ + Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{ + ConfigUploadRequest: &mpi.ConfigUploadRequest{}, + }, + }, + expectedTopic: &bus.Message{Topic: bus.ConfigUploadRequestTopic}, + isUploadRequest: true, }, - } - - assert.Eventually( - t, - func() bool { return len(messagePipe.GetMessages()) == 1 }, - 2*time.Second, - 10*time.Millisecond, - ) - - messages := messagePipe.GetMessages() - assert.Len(t, messages, 1) - assert.Equal(t, bus.ConfigUploadRequestTopic, messages[0].Topic) - - request, ok := messages[0].Data.(*mpi.ManagementPlaneRequest) - assert.True(t, ok) - require.NotNil(t, request.GetConfigUploadRequest()) -} - -func TestCommandPlugin_monitorHealthRequest(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - messagePipe := bus.NewFakeMessagePipe() - - commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}) - err := commandPlugin.Init(ctx, messagePipe) - require.NoError(t, err) - defer commandPlugin.Close(ctx) - - go commandPlugin.monitorSubscribeChannel(ctx) - - commandPlugin.subscribeChannel <- &mpi.ManagementPlaneRequest{ - Request: &mpi.ManagementPlaneRequest_HealthRequest{ - HealthRequest: &mpi.HealthRequest{}, + { + name: "Test 2: Config Apply Request", + mpiRequest: &mpi.ManagementPlaneRequest{ + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: &mpi.ConfigApplyRequest{}, + }, + }, + expectedTopic: &bus.Message{Topic: bus.ConfigApplyRequestTopic}, + isApplyRequest: true, + }, + { + name: "Test 3: Config Health Request", + mpiRequest: &mpi.ManagementPlaneRequest{ + Request: &mpi.ManagementPlaneRequest_HealthRequest{ + HealthRequest: &mpi.HealthRequest{}, + }, + }, + expectedTopic: &bus.Message{Topic: bus.DataplaneHealthTopic}, }, } - assert.Eventually( - t, - func() bool { return len(messagePipe.GetMessages()) == 1 }, - 2*time.Second, - 10*time.Millisecond, - ) - - messages := messagePipe.GetMessages() - assert.Len(t, messages, 1) - assert.Equal(t, bus.DataplaneHealthTopic, messages[0].Topic) + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + messagePipe := bus.NewFakeMessagePipe() + + commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}) + err := commandPlugin.Init(ctx, messagePipe) + require.NoError(t, err) + defer commandPlugin.Close(ctx) + + go commandPlugin.monitorSubscribeChannel(ctx) + + commandPlugin.subscribeChannel <- test.mpiRequest + + assert.Eventually( + t, + func() bool { return len(messagePipe.GetMessages()) == 1 }, + 2*time.Second, + 10*time.Millisecond, + ) + + messages := messagePipe.GetMessages() + assert.Len(t, messages, 1) + assert.Equal(t, test.expectedTopic.Topic, messages[0].Topic) + + _, ok := messages[0].Data.(*mpi.ManagementPlaneRequest) + + if test.isUploadRequest { + assert.True(t, ok) + require.NotNil(t, test.mpiRequest.GetConfigUploadRequest()) + } + if test.isApplyRequest { + assert.True(t, ok) + require.NotNil(t, test.mpiRequest.GetConfigApplyRequest()) + } + }) + } } func TestMonitorSubscribeChannel(t *testing.T) { From 40b2c53264a98f553efea93d13ee2379b1ce3149 Mon Sep 17 00:00:00 2001 From: RRashmit Date: Tue, 29 Oct 2024 12:52:28 +0000 Subject: [PATCH 12/14] chore: review feedback --- internal/bus/topics.go | 34 +++++++++---------- internal/command/command_plugin.go | 10 +++--- internal/command/command_plugin_test.go | 30 ++++++++-------- .../watcher/health/health_watcher_service.go | 22 +++++++++--- .../health/health_watcher_service_test.go | 2 +- internal/watcher/watcher_plugin.go | 6 ++-- internal/watcher/watcher_plugin_test.go | 2 +- 7 files changed, 60 insertions(+), 46 deletions(-) diff --git a/internal/bus/topics.go b/internal/bus/topics.go index 469fe0b8b8..46a32e24fb 100644 --- a/internal/bus/topics.go +++ b/internal/bus/topics.go @@ -6,21 +6,21 @@ package bus const ( - AddInstancesTopic = "add-instances" - UpdatedInstancesTopic = "updated-instances" - DeletedInstancesTopic = "deleted-instances" - ResourceUpdateTopic = "resource-update" - NginxConfigUpdateTopic = "nginx-config-update" - InstanceHealthTopic = "instance-health" - ConfigUploadRequestTopic = "config-upload-request" - DataPlaneResponseTopic = "data-plane-response" - ConnectionCreatedTopic = "connection-created" - ConfigApplyRequestTopic = "config-apply-request" - WriteConfigSuccessfulTopic = "write-config-successful" - ConfigApplySuccessfulTopic = "config-apply-successful" - ConfigApplyFailedTopic = "config-apply-failed" - RollbackCompleteTopic = "rollback-complete" - RollbackWriteTopic = "rollback-write" - DataplaneHealthTopic = "dataplane-health" - DataplaneHealthProcessTopic = "dataplane-health-process" + AddInstancesTopic = "add-instances" + UpdatedInstancesTopic = "updated-instances" + DeletedInstancesTopic = "deleted-instances" + ResourceUpdateTopic = "resource-update" + NginxConfigUpdateTopic = "nginx-config-update" + InstanceHealthTopic = "instance-health" + ConfigUploadRequestTopic = "config-upload-request" + DataPlaneResponseTopic = "data-plane-response" + ConnectionCreatedTopic = "connection-created" + ConfigApplyRequestTopic = "config-apply-request" + WriteConfigSuccessfulTopic = "write-config-successful" + ConfigApplySuccessfulTopic = "config-apply-successful" + ConfigApplyFailedTopic = "config-apply-failed" + RollbackCompleteTopic = "rollback-complete" + RollbackWriteTopic = "rollback-write" + DataPlaneHealthRequestTopic = "data-plane-health-request" + DataPlaneHealthResponseTopic = "data-plane-health-response" ) diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go index 5fcbd4b592..be66f8823d 100644 --- a/internal/command/command_plugin.go +++ b/internal/command/command_plugin.go @@ -81,8 +81,8 @@ func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) { cp.processResourceUpdate(ctx, msg) case bus.InstanceHealthTopic: cp.processInstanceHealth(ctx, msg) - case bus.DataplaneHealthProcessTopic: - cp.processDataplaneHealth(ctx, msg) + case bus.DataPlaneHealthResponseTopic: + cp.processDataPlaneHealth(ctx, msg) case bus.DataPlaneResponseTopic: cp.processDataPlaneResponse(ctx, msg) default: @@ -116,7 +116,7 @@ func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Res } } -func (cp *CommandPlugin) processDataplaneHealth(ctx context.Context, msg *bus.Message) { +func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Message) { if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok { err := cp.commandService.UpdateDataPlaneHealth(ctx, instances) correlationID := logger.GetCorrelationID(ctx) @@ -158,7 +158,7 @@ func (cp *CommandPlugin) Subscriptions() []string { return []string{ bus.ResourceUpdateTopic, bus.InstanceHealthTopic, - bus.DataplaneHealthProcessTopic, + bus.DataPlaneHealthResponseTopic, bus.DataPlaneResponseTopic, } } @@ -241,7 +241,7 @@ func (cp *CommandPlugin) handleConfigUploadRequest(newCtx context.Context, messa } func (cp *CommandPlugin) handleHealthRequest(newCtx context.Context) { - cp.messagePipe.Process(newCtx, &bus.Message{Topic: bus.DataplaneHealthTopic}) + cp.messagePipe.Process(newCtx, &bus.Message{Topic: bus.DataPlaneHealthRequestTopic}) } func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus, diff --git a/internal/command/command_plugin_test.go b/internal/command/command_plugin_test.go index 4c4f401457..6851195604 100644 --- a/internal/command/command_plugin_test.go +++ b/internal/command/command_plugin_test.go @@ -39,7 +39,7 @@ func TestCommandPlugin_Subscriptions(t *testing.T) { []string{ bus.ResourceUpdateTopic, bus.InstanceHealthTopic, - bus.DataplaneHealthProcessTopic, + bus.DataPlaneHealthResponseTopic, bus.DataPlaneResponseTopic, }, subscriptions, @@ -97,7 +97,7 @@ func TestCommandPlugin_Process(t *testing.T) { require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount()) commandPlugin.Process(ctx, &bus.Message{ - Topic: bus.DataplaneHealthProcessTopic, + Topic: bus.DataPlaneHealthResponseTopic, Data: protos.GetHealthyInstanceHealth(), }) require.Equal(t, 1, fakeCommandService.UpdateDataPlaneHealthCallCount()) @@ -106,15 +106,15 @@ func TestCommandPlugin_Process(t *testing.T) { func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) { tests := []struct { - mpiRequest *mpi.ManagementPlaneRequest - expectedTopic *bus.Message - name string - isUploadRequest bool - isApplyRequest bool + managementPlaneRequest *mpi.ManagementPlaneRequest + expectedTopic *bus.Message + name string + isUploadRequest bool + isApplyRequest bool }{ { name: "Test 1: Config Upload Request", - mpiRequest: &mpi.ManagementPlaneRequest{ + managementPlaneRequest: &mpi.ManagementPlaneRequest{ Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{ ConfigUploadRequest: &mpi.ConfigUploadRequest{}, }, @@ -124,7 +124,7 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) { }, { name: "Test 2: Config Apply Request", - mpiRequest: &mpi.ManagementPlaneRequest{ + managementPlaneRequest: &mpi.ManagementPlaneRequest{ Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ ConfigApplyRequest: &mpi.ConfigApplyRequest{}, }, @@ -133,13 +133,13 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) { isApplyRequest: true, }, { - name: "Test 3: Config Health Request", - mpiRequest: &mpi.ManagementPlaneRequest{ + name: "Test 3: Health Request", + managementPlaneRequest: &mpi.ManagementPlaneRequest{ Request: &mpi.ManagementPlaneRequest_HealthRequest{ HealthRequest: &mpi.HealthRequest{}, }, }, - expectedTopic: &bus.Message{Topic: bus.DataplaneHealthTopic}, + expectedTopic: &bus.Message{Topic: bus.DataPlaneHealthRequestTopic}, }, } @@ -156,7 +156,7 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) { go commandPlugin.monitorSubscribeChannel(ctx) - commandPlugin.subscribeChannel <- test.mpiRequest + commandPlugin.subscribeChannel <- test.managementPlaneRequest assert.Eventually( t, @@ -173,11 +173,11 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) { if test.isUploadRequest { assert.True(t, ok) - require.NotNil(t, test.mpiRequest.GetConfigUploadRequest()) + require.NotNil(t, test.managementPlaneRequest.GetConfigUploadRequest()) } if test.isApplyRequest { assert.True(t, ok) - require.NotNil(t, test.mpiRequest.GetConfigApplyRequest()) + require.NotNil(t, test.managementPlaneRequest.GetConfigApplyRequest()) } }) } diff --git a/internal/watcher/health/health_watcher_service.go b/internal/watcher/health/health_watcher_service.go index 2e2cb2455c..011f18d6ef 100644 --- a/internal/watcher/health/health_watcher_service.go +++ b/internal/watcher/health/health_watcher_service.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "log/slog" + "sync" "time" "google.golang.org/protobuf/proto" @@ -28,10 +29,11 @@ type ( } HealthWatcherService struct { - agentConfig *config.Config - cache map[string]*mpi.InstanceHealth // key is instanceID - watchers map[string]healthWatcherOperator // key is instanceID - instances map[string]*mpi.Instance // key is instanceID + agentConfig *config.Config + cache map[string]*mpi.InstanceHealth // key is instanceID + watchers map[string]healthWatcherOperator // key is instanceID + instances map[string]*mpi.Instance // key is instanceID + healthWatcherMutex sync.Mutex } InstanceHealthMessage struct { @@ -81,6 +83,9 @@ func (hw *HealthWatcherService) DeleteHealthWatcher(instances []*mpi.Instance) { } func (hw *HealthWatcherService) GetInstancesHealth() []*mpi.InstanceHealth { + hw.healthWatcherMutex.Lock() + defer hw.healthWatcherMutex.Unlock() + healthList := make([]*mpi.InstanceHealth, 0, len(hw.cache)) for _, health := range hw.cache { @@ -150,6 +155,9 @@ func (hw *HealthWatcherService) health(ctx context.Context) (updatedStatuses []* // update the cache with the most recent instance healths func (hw *HealthWatcherService) updateCache(currentHealth map[string]*mpi.InstanceHealth) { + hw.healthWatcherMutex.Lock() + defer hw.healthWatcherMutex.Unlock() + for instanceID, healthStatus := range currentHealth { hw.cache[instanceID] = healthStatus } @@ -166,6 +174,9 @@ func (hw *HealthWatcherService) updateCache(currentHealth map[string]*mpi.Instan // compare the cache with the current list of instances to check if an instance has been deleted // if an instance has been deleted add an UNHEALTHY health status to the list of instances for that instance func (hw *HealthWatcherService) compareCache(healthStatuses []*mpi.InstanceHealth) []*mpi.InstanceHealth { + hw.healthWatcherMutex.Lock() + defer hw.healthWatcherMutex.Unlock() + if len(hw.cache) != len(hw.instances) { for instanceID := range hw.cache { if _, ok := hw.instances[instanceID]; !ok { @@ -185,6 +196,9 @@ func (hw *HealthWatcherService) compareCache(healthStatuses []*mpi.InstanceHealt // compare current health with cached health to see if the health of an instance has changed func (hw *HealthWatcherService) compareHealth(currentHealth map[string]*mpi.InstanceHealth) bool { + hw.healthWatcherMutex.Lock() + defer hw.healthWatcherMutex.Unlock() + if len(currentHealth) != len(hw.cache) { return true } diff --git a/internal/watcher/health/health_watcher_service_test.go b/internal/watcher/health/health_watcher_service_test.go index b5b28dcf7c..fbe3ed3d79 100644 --- a/internal/watcher/health/health_watcher_service_test.go +++ b/internal/watcher/health/health_watcher_service_test.go @@ -236,7 +236,7 @@ func TestHealthWatcherService_compareCache(t *testing.T) { } } -func TestHealthWatcherService_getCache(t *testing.T) { +func TestHealthWatcherService_GetInstancesHealth(t *testing.T) { ossInstance := protos.GetNginxOssInstance([]string{}) plusInstance := protos.GetNginxPlusInstance([]string{}) ossInstanceHealth := protos.GetHealthyInstanceHealth() diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index 8f4bd2df3e..08537c33a8 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -113,7 +113,7 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) { w.handleConfigApplySuccess(ctx, msg) case bus.RollbackCompleteTopic: w.handleRollbackComplete(ctx, msg) - case bus.DataplaneHealthTopic: + case bus.DataPlaneHealthRequestTopic: w.handleHealthRequest(ctx) default: slog.DebugContext(ctx, "Watcher plugin unknown topic", "topic", msg.Topic) @@ -125,7 +125,7 @@ func (*Watcher) Subscriptions() []string { bus.ConfigApplyRequestTopic, bus.ConfigApplySuccessfulTopic, bus.RollbackCompleteTopic, - bus.DataplaneHealthTopic, + bus.DataPlaneHealthRequestTopic, } } @@ -173,7 +173,7 @@ func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message func (w *Watcher) handleHealthRequest(ctx context.Context) { w.messagePipe.Process(ctx, &bus.Message{ - Topic: bus.DataplaneHealthProcessTopic, Data: w.healthWatcherService.GetInstancesHealth(), + Topic: bus.DataPlaneHealthResponseTopic, Data: w.healthWatcherService.GetInstancesHealth(), }) } diff --git a/internal/watcher/watcher_plugin_test.go b/internal/watcher/watcher_plugin_test.go index 06a19bed5e..3551ddcb72 100644 --- a/internal/watcher/watcher_plugin_test.go +++ b/internal/watcher/watcher_plugin_test.go @@ -170,7 +170,7 @@ func TestWatcher_Subscriptions(t *testing.T) { bus.ConfigApplyRequestTopic, bus.ConfigApplySuccessfulTopic, bus.RollbackCompleteTopic, - bus.DataplaneHealthTopic, + bus.DataPlaneHealthRequestTopic, }, watcherPlugin.Subscriptions(), ) From 7195603baaaf410606a485e7c9048badbe75037c Mon Sep 17 00:00:00 2001 From: RRashmit Date: Tue, 29 Oct 2024 14:58:16 +0000 Subject: [PATCH 13/14] chore: pr feedback --- .../grpc_management_plane_api_test.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/test/integration/grpc_management_plane_api_test.go b/test/integration/grpc_management_plane_api_test.go index 2b422d9fa8..6d4bd0cada 100644 --- a/test/integration/grpc_management_plane_api_test.go +++ b/test/integration/grpc_management_plane_api_test.go @@ -359,6 +359,11 @@ func TestGrpc_DataplaneHealthRequest(t *testing.T) { defer teardownTest(t) verifyConnection(t) + + responses := getManagementPlaneResponses(t, 1) + assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus()) + assert.Equal(t, "Successfully updated all files", responses[0].GetCommandResponse().GetMessage()) + assert.False(t, t.Failed()) request := `{ @@ -379,17 +384,10 @@ func TestGrpc_DataplaneHealthRequest(t *testing.T) { require.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode()) - responses := getManagementPlaneResponses(t, 2) - - allMessages := make([]string, 0, 2) - for _, response := range responses { - message := response.GetCommandResponse().GetMessage() - allMessages = append(allMessages, message) - } + responses = getManagementPlaneResponses(t, 2) - assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[0].GetCommandResponse().GetStatus()) assert.Equal(t, mpi.CommandResponse_COMMAND_STATUS_OK, responses[1].GetCommandResponse().GetStatus()) - assert.Contains(t, allMessages, "Successfully sent the health status update") + assert.Equal(t, "Successfully sent the health status update", responses[1].GetCommandResponse().GetMessage()) } func performConfigApply(t *testing.T, nginxInstanceID string) { From 9ca51850a7cf6053223ae142f8857733d184aa11 Mon Sep 17 00:00:00 2001 From: RRashmit Date: Wed, 6 Nov 2024 16:54:56 +0000 Subject: [PATCH 14/14] chore: remove extra --- pkg/config/features.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/config/features.go b/pkg/config/features.go index 522428683d..4bc857ca24 100644 --- a/pkg/config/features.go +++ b/pkg/config/features.go @@ -15,5 +15,4 @@ const ( FeatureMetricsInstance = "metrics-instance" FeatureFileWatcher = "file-watcher" FeatureAgentAPI = "agent-api" - FeatureDataplaneHealth = "dataplane-health" )