Skip to content

Commit

Permalink
Add config apply request queue (#949)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhurley authored and sean-breen committed Jan 15, 2025
1 parent 36f9e3c commit 81a70e1
Show file tree
Hide file tree
Showing 2 changed files with 310 additions and 13 deletions.
127 changes: 114 additions & 13 deletions internal/command/command_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ const (

type (
CommandService struct {
commandServiceClient mpi.CommandServiceClient
subscribeClient mpi.CommandService_SubscribeClient
agentConfig *config.Config
isConnected *atomic.Bool
subscribeCancel context.CancelFunc
subscribeChannel chan *mpi.ManagementPlaneRequest
subscribeMutex sync.Mutex
subscribeClientMutex sync.Mutex
commandServiceClient mpi.CommandServiceClient
subscribeClient mpi.CommandService_SubscribeClient
agentConfig *config.Config
isConnected *atomic.Bool
subscribeCancel context.CancelFunc
subscribeChannel chan *mpi.ManagementPlaneRequest
configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID
subscribeMutex sync.Mutex
subscribeClientMutex sync.Mutex
configApplyRequestQueueMutex sync.Mutex
}
)

Expand All @@ -54,10 +56,11 @@ func NewCommandService(
isConnected.Store(false)

commandService := &CommandService{
commandServiceClient: commandServiceClient,
agentConfig: agentConfig,
isConnected: isConnected,
subscribeChannel: subscribeChannel,
commandServiceClient: commandServiceClient,
agentConfig: agentConfig,
isConnected: isConnected,
subscribeChannel: subscribeChannel,
configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest),
}

var subscribeCtx context.Context
Expand Down Expand Up @@ -165,6 +168,11 @@ func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *m
backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime)
defer backoffCancel()

err := cs.handleConfigApplyResponse(ctx, response)
if err != nil {
return err
}

return backoff.Retry(
cs.sendDataPlaneResponseCallback(ctx, response),
backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff),
Expand Down Expand Up @@ -271,6 +279,81 @@ func (cs *CommandService) sendDataPlaneResponseCallback(
}
}

func (cs *CommandService) handleConfigApplyResponse(
ctx context.Context,
response *mpi.DataPlaneResponse,
) error {
cs.configApplyRequestQueueMutex.Lock()
defer cs.configApplyRequestQueueMutex.Unlock()

isConfigApplyResponse := false
var indexOfConfigApplyRequest int

for index, configApplyRequest := range cs.configApplyRequestQueue[response.GetInstanceId()] {
if configApplyRequest.GetMessageMeta().GetCorrelationId() == response.GetMessageMeta().GetCorrelationId() {
indexOfConfigApplyRequest = index
isConfigApplyResponse = true

break
}
}

if isConfigApplyResponse {
err := cs.sendResponseForQueuedConfigApplyRequests(ctx, response, indexOfConfigApplyRequest)
if err != nil {
return err
}
}

return nil
}

func (cs *CommandService) sendResponseForQueuedConfigApplyRequests(
ctx context.Context,
response *mpi.DataPlaneResponse,
indexOfConfigApplyRequest int,
) error {
instanceID := response.GetInstanceId()
for i := 0; i < indexOfConfigApplyRequest; i++ {
newResponse := response

newResponse.GetMessageMeta().MessageId = proto.GenerateMessageID()

request := cs.configApplyRequestQueue[instanceID][i]
newResponse.GetMessageMeta().CorrelationId = request.GetMessageMeta().GetCorrelationId()

slog.DebugContext(
ctx,
"Sending data plane response for queued config apply request",
"response", newResponse,
)

backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime)

err := backoff.Retry(
cs.sendDataPlaneResponseCallback(ctx, newResponse),
backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff),
)
if err != nil {
slog.ErrorContext(ctx, "Failed to send data plane response", "error", err)
backoffCancel()

return err
}

backoffCancel()
}

cs.configApplyRequestQueue[instanceID] = cs.configApplyRequestQueue[instanceID][indexOfConfigApplyRequest+1:]
slog.DebugContext(ctx, "Removed config apply requests from queue", "queue", cs.configApplyRequestQueue[instanceID])

if len(cs.configApplyRequestQueue[instanceID]) > 0 {
cs.subscribeChannel <- cs.configApplyRequestQueue[instanceID][len(cs.configApplyRequestQueue[instanceID])-1]
}

return nil
}

// Retry callback for sending a data plane health status to the Management Plane.
func (cs *CommandService) dataPlaneHealthCallback(
ctx context.Context,
Expand Down Expand Up @@ -333,7 +416,25 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
return recvError
}

cs.subscribeChannel <- request
switch request.GetRequest().(type) {
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
cs.configApplyRequestQueueMutex.Lock()
defer cs.configApplyRequestQueueMutex.Unlock()

instanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId()
cs.configApplyRequestQueue[instanceID] = append(cs.configApplyRequestQueue[instanceID], request)
if len(cs.configApplyRequestQueue[instanceID]) == 1 {
cs.subscribeChannel <- request
} else {
slog.DebugContext(
ctx,
"Config apply request is already in progress, queuing new config apply request",
"request", request,
)
}
default:
cs.subscribeChannel <- request
}

return nil
}
Expand Down
196 changes: 196 additions & 0 deletions internal/command/command_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import (
"context"
"errors"
"log/slog"
"sync"
"testing"
"time"

"github.com/google/uuid"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/nginx/agent/v3/internal/logger"
"github.com/nginx/agent/v3/test/helpers"
"github.com/nginx/agent/v3/test/stub"
Expand All @@ -37,9 +41,41 @@ func (*FakeSubscribeClient) Send(*mpi.DataPlaneResponse) error {

// nolint: nilnil
func (*FakeSubscribeClient) Recv() (*mpi.ManagementPlaneRequest, error) {
time.Sleep(1 * time.Second)

return nil, nil
}

type FakeConfigApplySubscribeClient struct {
grpc.ClientStream
}

func (*FakeConfigApplySubscribeClient) Send(*mpi.DataPlaneResponse) error {
return nil
}

// nolint: nilnil
func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, error) {
protos.CreateManagementPlaneRequest()
return &mpi.ManagementPlaneRequest{
MessageMeta: &mpi.MessageMeta{
MessageId: "1",
CorrelationId: "123",
Timestamp: timestamppb.Now(),
},
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
ConfigApplyRequest: &mpi.ConfigApplyRequest{
Overview: &mpi.FileOverview{
ConfigVersion: &mpi.ConfigVersion{
InstanceId: "12314",
Version: "4215432",
},
},
},
},
}, nil
}

func TestCommandService_NewCommandService(t *testing.T) {
ctx := context.Background()
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
Expand All @@ -61,6 +97,46 @@ func TestCommandService_NewCommandService(t *testing.T) {
)
}

func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) {
ctx := context.Background()
fakeSubscribeClient := &FakeConfigApplySubscribeClient{}

commandServiceClient := &v1fakes.FakeCommandServiceClient{}
commandServiceClient.SubscribeReturns(fakeSubscribeClient, nil)

subscribeChannel := make(chan *mpi.ManagementPlaneRequest)

commandService := NewCommandService(
ctx,
commandServiceClient,
types.AgentConfig(),
subscribeChannel,
)

defer commandService.CancelSubscription(ctx)

var wg sync.WaitGroup

wg.Add(1)
go func() {
requestFromChannel := <-subscribeChannel
assert.NotNil(t, requestFromChannel)
wg.Done()
}()

assert.Eventually(
t,
func() bool { return commandServiceClient.SubscribeCallCount() > 0 },
2*time.Second,
10*time.Millisecond,
)

commandService.configApplyRequestQueueMutex.Lock()
defer commandService.configApplyRequestQueueMutex.Unlock()
assert.Len(t, commandService.configApplyRequestQueue, 1)
wg.Wait()
}

func TestCommandService_UpdateDataPlaneStatus(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -193,3 +269,123 @@ func TestCommandService_SendDataPlaneResponse(t *testing.T) {

require.NoError(t, err)
}

func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) {
ctx := context.Background()
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
subscribeClient := &FakeSubscribeClient{}
subscribeChannel := make(chan *mpi.ManagementPlaneRequest)

commandService := NewCommandService(
ctx,
commandServiceClient,
types.AgentConfig(),
subscribeChannel,
)

defer commandService.CancelSubscription(ctx)

request1 := &mpi.ManagementPlaneRequest{
MessageMeta: &mpi.MessageMeta{
MessageId: "1",
CorrelationId: "123",
Timestamp: timestamppb.Now(),
},
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
ConfigApplyRequest: &mpi.ConfigApplyRequest{
Overview: &mpi.FileOverview{
Files: []*mpi.File{},
ConfigVersion: &mpi.ConfigVersion{
InstanceId: "12314",
Version: "4215432",
},
},
},
},
}

request2 := &mpi.ManagementPlaneRequest{
MessageMeta: &mpi.MessageMeta{
MessageId: "2",
CorrelationId: "1232",
Timestamp: timestamppb.Now(),
},
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
ConfigApplyRequest: &mpi.ConfigApplyRequest{
Overview: &mpi.FileOverview{
Files: []*mpi.File{},
ConfigVersion: &mpi.ConfigVersion{
InstanceId: "12314",
Version: "4215432",
},
},
},
},
}

request3 := &mpi.ManagementPlaneRequest{
MessageMeta: &mpi.MessageMeta{
MessageId: "3",
CorrelationId: "1233",
Timestamp: timestamppb.Now(),
},
Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{
ConfigApplyRequest: &mpi.ConfigApplyRequest{
Overview: &mpi.FileOverview{
Files: []*mpi.File{},
ConfigVersion: &mpi.ConfigVersion{
InstanceId: "12314",
Version: "4215432",
},
},
},
},
}

commandService.configApplyRequestQueueMutex.Lock()
commandService.configApplyRequestQueue = map[string][]*mpi.ManagementPlaneRequest{
"12314": {
request1,
request2,
request3,
},
}
commandService.configApplyRequestQueueMutex.Unlock()

commandService.subscribeClientMutex.Lock()
commandService.subscribeClient = subscribeClient
commandService.subscribeClientMutex.Unlock()

var wg sync.WaitGroup

wg.Add(1)
go func() {
requestFromChannel := <-subscribeChannel
assert.Equal(t, request3, requestFromChannel)
wg.Done()
}()

err := commandService.SendDataPlaneResponse(
ctx,
&mpi.DataPlaneResponse{
MessageMeta: &mpi.MessageMeta{
MessageId: uuid.NewString(),
CorrelationId: "1232",
Timestamp: timestamppb.Now(),
},
CommandResponse: &mpi.CommandResponse{
Status: mpi.CommandResponse_COMMAND_STATUS_OK,
Message: "Success",
},
InstanceId: "12314",
},
)

require.NoError(t, err)

commandService.configApplyRequestQueueMutex.Lock()
defer commandService.configApplyRequestQueueMutex.Unlock()
assert.Len(t, commandService.configApplyRequestQueue, 1)
assert.Equal(t, request3, commandService.configApplyRequestQueue["12314"][0])
wg.Wait()
}

0 comments on commit 81a70e1

Please sign in to comment.