Skip to content

Commit

Permalink
Merge branch 'v3' into fix/handle-unknown-instance-id
Browse files Browse the repository at this point in the history
  • Loading branch information
dhurley committed Jan 15, 2025
2 parents 7bf0812 + 9b1bc5b commit 3b2e6a5
Show file tree
Hide file tree
Showing 19 changed files with 645 additions and 180 deletions.
2 changes: 1 addition & 1 deletion Makefile.packaging
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ BINARY_PATH := $(BUILD_DIR)/$(BINARY_NAME)
GPG_PUBLIC_KEY := .key
PACKAGE_BUILD ?= 1
PACKAGE_VERSION := $(shell echo ${VERSION} | tr -d 'v')-$(PACKAGE_BUILD)
APK_PACKAGE_VERSION := $(shell echo ${VERSION} | tr -d 'v')_$(PACKAGE_BUILD)
APK_PACKAGE_VERSION := $(shell echo ${VERSION} | tr -d 'v').$(PACKAGE_BUILD)
TARBALL_NAME := $(PACKAGE_PREFIX)v3.tar.gz

DEB_DISTROS ?= ubuntu-noble-24.04 ubuntu-jammy-22.04 ubuntu-focal-20.04 debian-bookworm-12 debian-bullseye-11
Expand Down
6 changes: 3 additions & 3 deletions internal/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (
// Information from https://pkg.go.dev/github.com/cenkalti/backoff/v4#section-readme
func WaitUntil(
ctx context.Context,
backoffSettings *config.CommonSettings,
backoffSettings *config.BackOff,
operation backoff.Operation,
) error {
eb := backoff.NewExponentialBackOff()
Expand All @@ -68,7 +68,7 @@ func WaitUntil(
// nolint: ireturn
func WaitUntilWithData[T any](
ctx context.Context,
backoffSettings *config.CommonSettings,
backoffSettings *config.BackOff,
operation backoff.OperationWithData[T],
) (T, error) {
backoffWithContext := Context(ctx, backoffSettings)
Expand All @@ -77,7 +77,7 @@ func WaitUntilWithData[T any](
}

// nolint: ireturn
func Context(ctx context.Context, backoffSettings *config.CommonSettings) backoff.BackOffContext {
func Context(ctx context.Context, backoffSettings *config.BackOff) backoff.BackOffContext {
eb := backoff.NewExponentialBackOff()
eb.InitialInterval = backoffSettings.InitialInterval
eb.MaxInterval = backoffSettings.MaxInterval
Expand Down
20 changes: 11 additions & 9 deletions internal/backoff/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ func TestWaitUntil(t *testing.T) {

for _, test := range tests {
invocations = 0
settings := &config.CommonSettings{
InitialInterval: test.initialInterval,
MaxInterval: test.maxInterval,
MaxElapsedTime: test.maxElapsedTime,
RandomizationFactor: config.DefBackoffRandomizationFactor,
Multiplier: config.DefBackoffMultiplier,
settings := &config.Client{
Backoff: &config.BackOff{
InitialInterval: test.initialInterval,
MaxInterval: test.maxInterval,
MaxElapsedTime: test.maxElapsedTime,
RandomizationFactor: config.DefBackoffRandomizationFactor,
Multiplier: config.DefBackoffMultiplier,
},
}
result := WaitUntil(test.context, settings, test.operation)
result := WaitUntil(test.context, settings.Backoff, test.operation)

if test.expectedError {
assert.Errorf(t, result, test.name)
Expand Down Expand Up @@ -164,7 +166,7 @@ func TestWaitUntilWithData(t *testing.T) {
}

for _, test := range tests {
settings := &config.CommonSettings{
settings := &config.BackOff{
InitialInterval: test.initialInterval,
MaxInterval: test.maxInterval,
MaxElapsedTime: test.maxElapsedTime,
Expand All @@ -185,7 +187,7 @@ func TestWaitUntilWithData(t *testing.T) {
}

func TestContext(t *testing.T) {
settings := &config.CommonSettings{
settings := &config.BackOff{
InitialInterval: 10 * time.Millisecond,
MaxInterval: 10 * time.Millisecond,
MaxElapsedTime: 10 * time.Millisecond,
Expand Down
4 changes: 2 additions & 2 deletions internal/collector/otel_collector_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ func (oc *Collector) Close(ctx context.Context) error {
oc.service.Shutdown()
oc.cancel()

settings := oc.config.Common
settings := oc.config.Client.Backoff
settings.MaxElapsedTime = maxTimeToWaitForShutdown
err := backoff.WaitUntil(ctx, oc.config.Common, func() error {
err := backoff.WaitUntil(ctx, settings, func() error {
if oc.service.GetState() == otelcol.StateClosed {
return nil
}
Expand Down
171 changes: 137 additions & 34 deletions internal/command/command_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"log/slog"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"

Expand All @@ -29,22 +28,23 @@ import (
var _ commandService = (*CommandService)(nil)

const (
retryInterval = 5 * time.Second
createConnectionMaxElapsedTime = 0
)

type (
CommandService struct {
commandServiceClient mpi.CommandServiceClient
subscribeClient mpi.CommandService_SubscribeClient
agentConfig *config.Config
isConnected *atomic.Bool
subscribeCancel context.CancelFunc
subscribeChannel chan *mpi.ManagementPlaneRequest
instances []*mpi.Instance
subscribeMutex sync.Mutex
subscribeClientMutex sync.Mutex
instancesMutex sync.Mutex
commandServiceClient mpi.CommandServiceClient
subscribeClient mpi.CommandService_SubscribeClient
agentConfig *config.Config
isConnected *atomic.Bool
subscribeCancel context.CancelFunc
subscribeChannel chan *mpi.ManagementPlaneRequest
configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID
instances []*mpi.Instance
subscribeMutex sync.Mutex
subscribeClientMutex sync.Mutex
configApplyRequestQueueMutex sync.Mutex
instancesMutex sync.Mutex
}
)

Expand All @@ -58,11 +58,12 @@ func NewCommandService(
isConnected.Store(false)

commandService := &CommandService{
commandServiceClient: commandServiceClient,
agentConfig: agentConfig,
isConnected: isConnected,
subscribeChannel: subscribeChannel,
instances: []*mpi.Instance{},
commandServiceClient: commandServiceClient,
agentConfig: agentConfig,
isConnected: isConnected,
subscribeChannel: subscribeChannel,
configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest),
instances: []*mpi.Instance{},
}

var subscribeCtx context.Context
Expand Down Expand Up @@ -98,7 +99,7 @@ func (cs *CommandService) UpdateDataPlaneStatus(
Resource: resource,
}

backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Common.MaxElapsedTime)
backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime)
defer backoffCancel()

sendDataPlaneStatus := func() (*mpi.UpdateDataPlaneStatusResponse, error) {
Expand All @@ -122,7 +123,7 @@ func (cs *CommandService) UpdateDataPlaneStatus(

response, err := backoff.RetryWithData(
sendDataPlaneStatus,
backoffHelpers.Context(backOffCtx, cs.agentConfig.Common),
backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff),
)
if err != nil {
return err
Expand Down Expand Up @@ -152,12 +153,12 @@ func (cs *CommandService) UpdateDataPlaneHealth(ctx context.Context, instanceHea
InstanceHealths: instanceHealths,
}

backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Common.MaxElapsedTime)
backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime)
defer backoffCancel()

response, err := backoff.RetryWithData(
cs.dataPlaneHealthCallback(ctx, request),
backoffHelpers.Context(backOffCtx, cs.agentConfig.Common),
backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff),
)
if err != nil {
return err
Expand All @@ -171,12 +172,17 @@ func (cs *CommandService) UpdateDataPlaneHealth(ctx context.Context, instanceHea
func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *mpi.DataPlaneResponse) error {
slog.DebugContext(ctx, "Sending data plane response", "response", response)

backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Common.MaxElapsedTime)
backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime)
defer backoffCancel()

err := cs.handleConfigApplyResponse(ctx, response)
if err != nil {
return err
}

return backoff.Retry(
cs.sendDataPlaneResponseCallback(ctx, response),
backoffHelpers.Context(backOffCtx, cs.agentConfig.Common),
backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff),
)
}

Expand All @@ -191,12 +197,12 @@ func (cs *CommandService) CancelSubscription(ctx context.Context) {
}

func (cs *CommandService) subscribe(ctx context.Context) {
commonSettings := &config.CommonSettings{
InitialInterval: cs.agentConfig.Common.InitialInterval,
MaxInterval: cs.agentConfig.Common.MaxInterval,
commonSettings := &config.BackOff{
InitialInterval: cs.agentConfig.Client.Backoff.InitialInterval,
MaxInterval: cs.agentConfig.Client.Backoff.MaxInterval,
MaxElapsedTime: createConnectionMaxElapsedTime,
RandomizationFactor: cs.agentConfig.Common.RandomizationFactor,
Multiplier: cs.agentConfig.Common.Multiplier,
RandomizationFactor: cs.agentConfig.Client.Backoff.RandomizationFactor,
Multiplier: cs.agentConfig.Client.Backoff.Multiplier,
}

for {
Expand Down Expand Up @@ -230,12 +236,12 @@ func (cs *CommandService) CreateConnection(
Resource: resource,
}

commonSettings := &config.CommonSettings{
InitialInterval: cs.agentConfig.Common.InitialInterval,
MaxInterval: cs.agentConfig.Common.MaxInterval,
commonSettings := &config.BackOff{
InitialInterval: cs.agentConfig.Client.Backoff.InitialInterval,
MaxInterval: cs.agentConfig.Client.Backoff.MaxInterval,
MaxElapsedTime: createConnectionMaxElapsedTime,
RandomizationFactor: cs.agentConfig.Common.RandomizationFactor,
Multiplier: cs.agentConfig.Common.Multiplier,
RandomizationFactor: cs.agentConfig.Client.Backoff.RandomizationFactor,
Multiplier: cs.agentConfig.Client.Backoff.Multiplier,
}

slog.DebugContext(ctx, "Sending create connection request", "request", request)
Expand Down Expand Up @@ -284,6 +290,81 @@ func (cs *CommandService) sendDataPlaneResponseCallback(
}
}

func (cs *CommandService) handleConfigApplyResponse(
ctx context.Context,
response *mpi.DataPlaneResponse,
) error {
cs.configApplyRequestQueueMutex.Lock()
defer cs.configApplyRequestQueueMutex.Unlock()

isConfigApplyResponse := false
var indexOfConfigApplyRequest int

for index, configApplyRequest := range cs.configApplyRequestQueue[response.GetInstanceId()] {
if configApplyRequest.GetMessageMeta().GetCorrelationId() == response.GetMessageMeta().GetCorrelationId() {
indexOfConfigApplyRequest = index
isConfigApplyResponse = true

break
}
}

if isConfigApplyResponse {
err := cs.sendResponseForQueuedConfigApplyRequests(ctx, response, indexOfConfigApplyRequest)
if err != nil {
return err
}
}

return nil
}

func (cs *CommandService) sendResponseForQueuedConfigApplyRequests(
ctx context.Context,
response *mpi.DataPlaneResponse,
indexOfConfigApplyRequest int,
) error {
instanceID := response.GetInstanceId()
for i := 0; i < indexOfConfigApplyRequest; i++ {
newResponse := response

newResponse.GetMessageMeta().MessageId = proto.GenerateMessageID()

request := cs.configApplyRequestQueue[instanceID][i]
newResponse.GetMessageMeta().CorrelationId = request.GetMessageMeta().GetCorrelationId()

slog.DebugContext(
ctx,
"Sending data plane response for queued config apply request",
"response", newResponse,
)

backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime)

err := backoff.Retry(
cs.sendDataPlaneResponseCallback(ctx, newResponse),
backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff),
)
if err != nil {
slog.ErrorContext(ctx, "Failed to send data plane response", "error", err)
backoffCancel()

return err
}

backoffCancel()
}

cs.configApplyRequestQueue[instanceID] = cs.configApplyRequestQueue[instanceID][indexOfConfigApplyRequest+1:]
slog.DebugContext(ctx, "Removed config apply requests from queue", "queue", cs.configApplyRequestQueue[instanceID])

if len(cs.configApplyRequestQueue[instanceID]) > 0 {
cs.subscribeChannel <- cs.configApplyRequestQueue[instanceID][len(cs.configApplyRequestQueue[instanceID])-1]
}

return nil
}

// Retry callback for sending a data plane health status to the Management Plane.
func (cs *CommandService) dataPlaneHealthCallback(
ctx context.Context,
Expand Down Expand Up @@ -347,13 +428,35 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
}

if cs.isValidRequest(ctx, request) {
cs.subscribeChannel <- request
switch request.GetRequest().(type) {
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
cs.queueConfigApplyRequests(ctx, request)
default:
cs.subscribeChannel <- request
}
}

return nil
}
}

func (cs *CommandService) queueConfigApplyRequests(ctx context.Context, request *mpi.ManagementPlaneRequest) {
cs.configApplyRequestQueueMutex.Lock()
defer cs.configApplyRequestQueueMutex.Unlock()

instanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId()
cs.configApplyRequestQueue[instanceID] = append(cs.configApplyRequestQueue[instanceID], request)
if len(cs.configApplyRequestQueue[instanceID]) == 1 {
cs.subscribeChannel <- request
} else {
slog.DebugContext(
ctx,
"Config apply request is already in progress, queuing new config apply request",
"request", request,
)
}
}

func (cs *CommandService) isValidRequest(ctx context.Context, request *mpi.ManagementPlaneRequest) bool {
var validRequest bool

Expand Down
Loading

0 comments on commit 3b2e6a5

Please sign in to comment.