Skip to content

Commit

Permalink
chore: review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
RRashmit committed Oct 29, 2024
1 parent 108e062 commit 40b2c53
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 46 deletions.
34 changes: 17 additions & 17 deletions internal/bus/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@
package bus

const (
AddInstancesTopic = "add-instances"
UpdatedInstancesTopic = "updated-instances"
DeletedInstancesTopic = "deleted-instances"
ResourceUpdateTopic = "resource-update"
NginxConfigUpdateTopic = "nginx-config-update"
InstanceHealthTopic = "instance-health"
ConfigUploadRequestTopic = "config-upload-request"
DataPlaneResponseTopic = "data-plane-response"
ConnectionCreatedTopic = "connection-created"
ConfigApplyRequestTopic = "config-apply-request"
WriteConfigSuccessfulTopic = "write-config-successful"
ConfigApplySuccessfulTopic = "config-apply-successful"
ConfigApplyFailedTopic = "config-apply-failed"
RollbackCompleteTopic = "rollback-complete"
RollbackWriteTopic = "rollback-write"
DataplaneHealthTopic = "dataplane-health"
DataplaneHealthProcessTopic = "dataplane-health-process"
AddInstancesTopic = "add-instances"
UpdatedInstancesTopic = "updated-instances"
DeletedInstancesTopic = "deleted-instances"
ResourceUpdateTopic = "resource-update"
NginxConfigUpdateTopic = "nginx-config-update"
InstanceHealthTopic = "instance-health"
ConfigUploadRequestTopic = "config-upload-request"
DataPlaneResponseTopic = "data-plane-response"
ConnectionCreatedTopic = "connection-created"
ConfigApplyRequestTopic = "config-apply-request"
WriteConfigSuccessfulTopic = "write-config-successful"
ConfigApplySuccessfulTopic = "config-apply-successful"
ConfigApplyFailedTopic = "config-apply-failed"
RollbackCompleteTopic = "rollback-complete"
RollbackWriteTopic = "rollback-write"
DataPlaneHealthRequestTopic = "data-plane-health-request"
DataPlaneHealthResponseTopic = "data-plane-health-response"
)
10 changes: 5 additions & 5 deletions internal/command/command_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
cp.processResourceUpdate(ctx, msg)
case bus.InstanceHealthTopic:
cp.processInstanceHealth(ctx, msg)
case bus.DataplaneHealthProcessTopic:
cp.processDataplaneHealth(ctx, msg)
case bus.DataPlaneHealthResponseTopic:
cp.processDataPlaneHealth(ctx, msg)
case bus.DataPlaneResponseTopic:
cp.processDataPlaneResponse(ctx, msg)
default:
Expand Down Expand Up @@ -116,7 +116,7 @@ func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Res
}
}

func (cp *CommandPlugin) processDataplaneHealth(ctx context.Context, msg *bus.Message) {
func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Message) {
if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok {
err := cp.commandService.UpdateDataPlaneHealth(ctx, instances)
correlationID := logger.GetCorrelationID(ctx)
Expand Down Expand Up @@ -158,7 +158,7 @@ func (cp *CommandPlugin) Subscriptions() []string {
return []string{
bus.ResourceUpdateTopic,
bus.InstanceHealthTopic,
bus.DataplaneHealthProcessTopic,
bus.DataPlaneHealthResponseTopic,
bus.DataPlaneResponseTopic,
}
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func (cp *CommandPlugin) handleConfigUploadRequest(newCtx context.Context, messa
}

func (cp *CommandPlugin) handleHealthRequest(newCtx context.Context) {
cp.messagePipe.Process(newCtx, &bus.Message{Topic: bus.DataplaneHealthTopic})
cp.messagePipe.Process(newCtx, &bus.Message{Topic: bus.DataPlaneHealthRequestTopic})
}

func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
Expand Down
30 changes: 15 additions & 15 deletions internal/command/command_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestCommandPlugin_Subscriptions(t *testing.T) {
[]string{
bus.ResourceUpdateTopic,
bus.InstanceHealthTopic,
bus.DataplaneHealthProcessTopic,
bus.DataPlaneHealthResponseTopic,
bus.DataPlaneResponseTopic,
},
subscriptions,
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestCommandPlugin_Process(t *testing.T) {
require.Equal(t, 1, fakeCommandService.SendDataPlaneResponseCallCount())

commandPlugin.Process(ctx, &bus.Message{
Topic: bus.DataplaneHealthProcessTopic,
Topic: bus.DataPlaneHealthResponseTopic,
Data: protos.GetHealthyInstanceHealth(),
})
require.Equal(t, 1, fakeCommandService.UpdateDataPlaneHealthCallCount())
Expand All @@ -106,15 +106,15 @@ func TestCommandPlugin_Process(t *testing.T) {

func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {
tests := []struct {
mpiRequest *mpi.ManagementPlaneRequest
expectedTopic *bus.Message
name string
isUploadRequest bool
isApplyRequest bool
managementPlaneRequest *mpi.ManagementPlaneRequest
expectedTopic *bus.Message
name string
isUploadRequest bool
isApplyRequest bool
}{
{
name: "Test 1: Config Upload Request",
mpiRequest: &mpi.ManagementPlaneRequest{
managementPlaneRequest: &mpi.ManagementPlaneRequest{
Request: &mpi.ManagementPlaneRequest_ConfigUploadRequest{
ConfigUploadRequest: &mpi.ConfigUploadRequest{},
},
Expand All @@ -124,7 +124,7 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {
},
{
name: "Test 2: Config Apply Request",
mpiRequest: &mpi.ManagementPlaneRequest{
managementPlaneRequest: &mpi.ManagementPlaneRequest{
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
ConfigApplyRequest: &mpi.ConfigApplyRequest{},
},
Expand All @@ -133,13 +133,13 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {
isApplyRequest: true,
},
{
name: "Test 3: Config Health Request",
mpiRequest: &mpi.ManagementPlaneRequest{
name: "Test 3: Health Request",
managementPlaneRequest: &mpi.ManagementPlaneRequest{
Request: &mpi.ManagementPlaneRequest_HealthRequest{
HealthRequest: &mpi.HealthRequest{},
},
},
expectedTopic: &bus.Message{Topic: bus.DataplaneHealthTopic},
expectedTopic: &bus.Message{Topic: bus.DataPlaneHealthRequestTopic},
},
}

Expand All @@ -156,7 +156,7 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {

go commandPlugin.monitorSubscribeChannel(ctx)

commandPlugin.subscribeChannel <- test.mpiRequest
commandPlugin.subscribeChannel <- test.managementPlaneRequest

assert.Eventually(
t,
Expand All @@ -173,11 +173,11 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {

if test.isUploadRequest {
assert.True(t, ok)
require.NotNil(t, test.mpiRequest.GetConfigUploadRequest())
require.NotNil(t, test.managementPlaneRequest.GetConfigUploadRequest())
}
if test.isApplyRequest {
assert.True(t, ok)
require.NotNil(t, test.mpiRequest.GetConfigApplyRequest())
require.NotNil(t, test.managementPlaneRequest.GetConfigApplyRequest())
}
})
}
Expand Down
22 changes: 18 additions & 4 deletions internal/watcher/health/health_watcher_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"log/slog"
"sync"
"time"

"google.golang.org/protobuf/proto"
Expand All @@ -28,10 +29,11 @@ type (
}

HealthWatcherService struct {
agentConfig *config.Config
cache map[string]*mpi.InstanceHealth // key is instanceID
watchers map[string]healthWatcherOperator // key is instanceID
instances map[string]*mpi.Instance // key is instanceID
agentConfig *config.Config
cache map[string]*mpi.InstanceHealth // key is instanceID
watchers map[string]healthWatcherOperator // key is instanceID
instances map[string]*mpi.Instance // key is instanceID
healthWatcherMutex sync.Mutex
}

InstanceHealthMessage struct {
Expand Down Expand Up @@ -81,6 +83,9 @@ func (hw *HealthWatcherService) DeleteHealthWatcher(instances []*mpi.Instance) {
}

func (hw *HealthWatcherService) GetInstancesHealth() []*mpi.InstanceHealth {
hw.healthWatcherMutex.Lock()
defer hw.healthWatcherMutex.Unlock()

healthList := make([]*mpi.InstanceHealth, 0, len(hw.cache))

for _, health := range hw.cache {
Expand Down Expand Up @@ -150,6 +155,9 @@ func (hw *HealthWatcherService) health(ctx context.Context) (updatedStatuses []*

// update the cache with the most recent instance healths
func (hw *HealthWatcherService) updateCache(currentHealth map[string]*mpi.InstanceHealth) {
hw.healthWatcherMutex.Lock()
defer hw.healthWatcherMutex.Unlock()

for instanceID, healthStatus := range currentHealth {
hw.cache[instanceID] = healthStatus
}
Expand All @@ -166,6 +174,9 @@ func (hw *HealthWatcherService) updateCache(currentHealth map[string]*mpi.Instan
// compare the cache with the current list of instances to check if an instance has been deleted
// if an instance has been deleted add an UNHEALTHY health status to the list of instances for that instance
func (hw *HealthWatcherService) compareCache(healthStatuses []*mpi.InstanceHealth) []*mpi.InstanceHealth {
hw.healthWatcherMutex.Lock()
defer hw.healthWatcherMutex.Unlock()

if len(hw.cache) != len(hw.instances) {
for instanceID := range hw.cache {
if _, ok := hw.instances[instanceID]; !ok {
Expand All @@ -185,6 +196,9 @@ func (hw *HealthWatcherService) compareCache(healthStatuses []*mpi.InstanceHealt

// compare current health with cached health to see if the health of an instance has changed
func (hw *HealthWatcherService) compareHealth(currentHealth map[string]*mpi.InstanceHealth) bool {
hw.healthWatcherMutex.Lock()
defer hw.healthWatcherMutex.Unlock()

if len(currentHealth) != len(hw.cache) {
return true
}
Expand Down
2 changes: 1 addition & 1 deletion internal/watcher/health/health_watcher_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestHealthWatcherService_compareCache(t *testing.T) {
}
}

func TestHealthWatcherService_getCache(t *testing.T) {
func TestHealthWatcherService_GetInstancesHealth(t *testing.T) {
ossInstance := protos.GetNginxOssInstance([]string{})
plusInstance := protos.GetNginxPlusInstance([]string{})
ossInstanceHealth := protos.GetHealthyInstanceHealth()
Expand Down
6 changes: 3 additions & 3 deletions internal/watcher/watcher_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) {
w.handleConfigApplySuccess(ctx, msg)
case bus.RollbackCompleteTopic:
w.handleRollbackComplete(ctx, msg)
case bus.DataplaneHealthTopic:
case bus.DataPlaneHealthRequestTopic:
w.handleHealthRequest(ctx)
default:
slog.DebugContext(ctx, "Watcher plugin unknown topic", "topic", msg.Topic)
Expand All @@ -125,7 +125,7 @@ func (*Watcher) Subscriptions() []string {
bus.ConfigApplyRequestTopic,
bus.ConfigApplySuccessfulTopic,
bus.RollbackCompleteTopic,
bus.DataplaneHealthTopic,
bus.DataPlaneHealthRequestTopic,
}
}

Expand Down Expand Up @@ -173,7 +173,7 @@ func (w *Watcher) handleConfigApplySuccess(ctx context.Context, msg *bus.Message

func (w *Watcher) handleHealthRequest(ctx context.Context) {
w.messagePipe.Process(ctx, &bus.Message{
Topic: bus.DataplaneHealthProcessTopic, Data: w.healthWatcherService.GetInstancesHealth(),
Topic: bus.DataPlaneHealthResponseTopic, Data: w.healthWatcherService.GetInstancesHealth(),
})
}

Expand Down
2 changes: 1 addition & 1 deletion internal/watcher/watcher_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestWatcher_Subscriptions(t *testing.T) {
bus.ConfigApplyRequestTopic,
bus.ConfigApplySuccessfulTopic,
bus.RollbackCompleteTopic,
bus.DataplaneHealthTopic,
bus.DataPlaneHealthRequestTopic,
},
watcherPlugin.Subscriptions(),
)
Expand Down

0 comments on commit 40b2c53

Please sign in to comment.