diff --git a/internal/command/command_service.go b/internal/command/command_service.go index 9d24c245d..46fc499a9 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -33,14 +33,16 @@ const ( type ( CommandService struct { - commandServiceClient mpi.CommandServiceClient - subscribeClient mpi.CommandService_SubscribeClient - agentConfig *config.Config - isConnected *atomic.Bool - subscribeCancel context.CancelFunc - subscribeChannel chan *mpi.ManagementPlaneRequest - subscribeMutex sync.Mutex - subscribeClientMutex sync.Mutex + commandServiceClient mpi.CommandServiceClient + subscribeClient mpi.CommandService_SubscribeClient + agentConfig *config.Config + isConnected *atomic.Bool + subscribeCancel context.CancelFunc + subscribeChannel chan *mpi.ManagementPlaneRequest + configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID + subscribeMutex sync.Mutex + subscribeClientMutex sync.Mutex + configApplyRequestQueueMutex sync.Mutex } ) @@ -54,10 +56,11 @@ func NewCommandService( isConnected.Store(false) commandService := &CommandService{ - commandServiceClient: commandServiceClient, - agentConfig: agentConfig, - isConnected: isConnected, - subscribeChannel: subscribeChannel, + commandServiceClient: commandServiceClient, + agentConfig: agentConfig, + isConnected: isConnected, + subscribeChannel: subscribeChannel, + configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest), } var subscribeCtx context.Context @@ -165,6 +168,11 @@ func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *m backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Common.MaxElapsedTime) defer backoffCancel() + err := cs.handleConfigApplyResponse(ctx, response) + if err != nil { + return err + } + return backoff.Retry( cs.sendDataPlaneResponseCallback(ctx, response), backoffHelpers.Context(backOffCtx, cs.agentConfig.Common), @@ -271,6 +279,87 @@ func (cs *CommandService) sendDataPlaneResponseCallback( } } +func (cs *CommandService) handleConfigApplyResponse( + ctx context.Context, + response *mpi.DataPlaneResponse, +) error { + cs.configApplyRequestQueueMutex.Lock() + defer cs.configApplyRequestQueueMutex.Unlock() + + isConfigApplyResponse := false + var indexOfConfigApplyRequest int + + for index, configApplyRequest := range cs.configApplyRequestQueue[response.GetInstanceId()] { + if configApplyRequest.GetMessageMeta().GetCorrelationId() == response.GetMessageMeta().GetCorrelationId() { + indexOfConfigApplyRequest = index + isConfigApplyResponse = true + + break + } + } + + // TODO: fix this + if isConfigApplyResponse { + err := cs.sendResponseForQueuedConfigApplyRequests(ctx, response, indexOfConfigApplyRequest) + if err != nil { + return err + } + } + + return nil +} + +func (cs *CommandService) sendResponseForQueuedConfigApplyRequests( + ctx context.Context, + response *mpi.DataPlaneResponse, + indexOfConfigApplyRequest int, +) error { + instanceID := response.GetInstanceId() + for i := 0; i < indexOfConfigApplyRequest; i++ { + newResponse := response + + newMessageID, err := uuid.NewV7() + if err != nil { + slog.DebugContext(ctx, "Failed to create new message ID", "error", err) + } else { + newResponse.GetMessageMeta().MessageId = newMessageID.String() + } + + request := cs.configApplyRequestQueue[instanceID][i] + newResponse.GetMessageMeta().CorrelationId = request.GetMessageMeta().GetCorrelationId() + + slog.DebugContext( + ctx, + "Sending data plane response for queued config apply request", + "response", newResponse, + ) + + backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Common.MaxElapsedTime) + + err = backoff.Retry( + cs.sendDataPlaneResponseCallback(ctx, newResponse), + backoffHelpers.Context(backOffCtx, cs.agentConfig.Common), + ) + if err != nil { + slog.ErrorContext(ctx, "Failed to send data plane response", "error", err) + backoffCancel() + + return err + } + + backoffCancel() + } + + cs.configApplyRequestQueue[instanceID] = cs.configApplyRequestQueue[instanceID][indexOfConfigApplyRequest+1:] + slog.DebugContext(ctx, "Removed config apply requests from queue", "queue", cs.configApplyRequestQueue[instanceID]) + + if len(cs.configApplyRequestQueue[instanceID]) > 0 { + cs.subscribeChannel <- cs.configApplyRequestQueue[instanceID][len(cs.configApplyRequestQueue[instanceID])-1] + } + + return nil +} + // Retry callback for sending a data plane health status to the Management Plane. func (cs *CommandService) dataPlaneHealthCallback( ctx context.Context, @@ -333,7 +422,25 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error { return recvError } - cs.subscribeChannel <- request + switch request.GetRequest().(type) { + case *mpi.ManagementPlaneRequest_ConfigApplyRequest: + cs.configApplyRequestQueueMutex.Lock() + defer cs.configApplyRequestQueueMutex.Unlock() + + instanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId() + cs.configApplyRequestQueue[instanceID] = append(cs.configApplyRequestQueue[instanceID], request) + if len(cs.configApplyRequestQueue[instanceID]) == 1 { + cs.subscribeChannel <- request + } else { + slog.DebugContext( + ctx, + "Config apply request is already in progress, queuing new config apply request", + "request", request, + ) + } + default: + cs.subscribeChannel <- request + } return nil } diff --git a/internal/command/command_service_test.go b/internal/command/command_service_test.go index ad8c6f790..4213cb4e3 100644 --- a/internal/command/command_service_test.go +++ b/internal/command/command_service_test.go @@ -11,9 +11,13 @@ import ( "errors" "log/slog" "strings" + "sync" "testing" "time" + "github.com/google/uuid" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/nginx/agent/v3/internal/logger" "github.com/nginx/agent/v3/test/helpers" "github.com/nginx/agent/v3/test/stub" @@ -38,9 +42,41 @@ func (*FakeSubscribeClient) Send(*mpi.DataPlaneResponse) error { // nolint: nilnil func (*FakeSubscribeClient) Recv() (*mpi.ManagementPlaneRequest, error) { + time.Sleep(1 * time.Second) + return nil, nil } +type FakeConfigApplySubscribeClient struct { + grpc.ClientStream +} + +func (*FakeConfigApplySubscribeClient) Send(*mpi.DataPlaneResponse) error { + return nil +} + +// nolint: nilnil +func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, error) { + return &mpi.ManagementPlaneRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: "1", + CorrelationId: "123", + Timestamp: timestamppb.Now(), + }, + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: &mpi.ConfigApplyRequest{ + Overview: &mpi.FileOverview{ + Files: []*mpi.File{}, + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: "12314", + Version: "4215432", + }, + }, + }, + }, + }, nil +} + func TestCommandService_NewCommandService(t *testing.T) { ctx := context.Background() commandServiceClient := &v1fakes.FakeCommandServiceClient{} @@ -62,6 +98,32 @@ func TestCommandService_NewCommandService(t *testing.T) { ) } +func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) { + ctx := context.Background() + fakeSubscribeClient := &FakeConfigApplySubscribeClient{} + + commandServiceClient := &v1fakes.FakeCommandServiceClient{} + commandServiceClient.SubscribeReturns(fakeSubscribeClient, nil) + + commandService := NewCommandService( + ctx, + commandServiceClient, + types.AgentConfig(), + make(chan *mpi.ManagementPlaneRequest), + ) + + defer commandService.CancelSubscription(ctx) + + assert.Eventually( + t, + func() bool { return commandServiceClient.SubscribeCallCount() > 0 }, + 2*time.Second, + 10*time.Millisecond, + ) + + assert.Len(t, commandService.configApplyRequestQueue, 1) +} + func TestCommandService_UpdateDataPlaneStatus(t *testing.T) { ctx := context.Background() @@ -194,3 +256,119 @@ func TestCommandService_SendDataPlaneResponse(t *testing.T) { require.NoError(t, err) } + +func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) { + ctx := context.Background() + commandServiceClient := &v1fakes.FakeCommandServiceClient{} + subscribeClient := &FakeSubscribeClient{} + subscribeChannel := make(chan *mpi.ManagementPlaneRequest) + + commandService := NewCommandService( + ctx, + commandServiceClient, + types.AgentConfig(), + subscribeChannel, + ) + + request1 := &mpi.ManagementPlaneRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: "1", + CorrelationId: "123", + Timestamp: timestamppb.Now(), + }, + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: &mpi.ConfigApplyRequest{ + Overview: &mpi.FileOverview{ + Files: []*mpi.File{}, + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: "12314", + Version: "4215432", + }, + }, + }, + }, + } + + request2 := &mpi.ManagementPlaneRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: "2", + CorrelationId: "1232", + Timestamp: timestamppb.Now(), + }, + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: &mpi.ConfigApplyRequest{ + Overview: &mpi.FileOverview{ + Files: []*mpi.File{}, + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: "12314", + Version: "4215432", + }, + }, + }, + }, + } + + request3 := &mpi.ManagementPlaneRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: "3", + CorrelationId: "1233", + Timestamp: timestamppb.Now(), + }, + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: &mpi.ConfigApplyRequest{ + Overview: &mpi.FileOverview{ + Files: []*mpi.File{}, + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: "12314", + Version: "4215432", + }, + }, + }, + }, + } + + commandService.configApplyRequestQueueMutex.Lock() + commandService.configApplyRequestQueue = map[string][]*mpi.ManagementPlaneRequest{ + "12314": { + request1, + request2, + request3, + }, + } + commandService.configApplyRequestQueueMutex.Unlock() + + commandService.subscribeClientMutex.Lock() + commandService.subscribeClient = subscribeClient + commandService.subscribeClientMutex.Unlock() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + requestFromChannel := <-subscribeChannel + assert.Equal(t, request3, requestFromChannel) + wg.Done() + }() + + err := commandService.SendDataPlaneResponse( + ctx, + &mpi.DataPlaneResponse{ + MessageMeta: &mpi.MessageMeta{ + MessageId: uuid.NewString(), + CorrelationId: "1232", + Timestamp: timestamppb.Now(), + }, + CommandResponse: &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_OK, + Message: "Success", + }, + InstanceId: "12314", + }, + ) + + require.NoError(t, err) + + assert.Len(t, commandService.configApplyRequestQueue, 1) + assert.Equal(t, request3, commandService.configApplyRequestQueue["12314"][0]) + wg.Wait() +}