From 1a17cd54c1fe35cb2c155f30f64e63d0677e7cc0 Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Mon, 6 Jan 2025 15:22:14 +0000 Subject: [PATCH 01/22] support reading token from file via config --- internal/config/config.go | 11 ++++++ internal/config/defaults.go | 1 + internal/config/flags.go | 1 + internal/config/types.go | 3 +- internal/grpc/grpc.go | 77 ++++++++++++++++++++++++++++++++----- internal/grpc/token | 0 6 files changed, 82 insertions(+), 11 deletions(-) create mode 100644 internal/grpc/token diff --git a/internal/config/config.go b/internal/config/config.go index 4bd76679ca..e081c0b1ad 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -239,6 +239,12 @@ func registerCommandFlags(fs *flag.FlagSet) { DefCommandAuthTokenKey, "The token used in the authentication handshake with the command server endpoint for command and control.", ) + fs.String( + CommandAuthTokenPathKey, + DefCommandAuthTokenPathKey, + "Path to the token used in the authentication handshake with the command server endpoint "+ + "for command and control.", + ) fs.String( CommandTLSCertKey, DefCommandTLSCertKey, @@ -649,6 +655,11 @@ func resolveCommand() *Command { } } + if viperInstance.IsSet(CommandAuthTokenPathKey) { + command.Auth = &AuthConfig{ + TokenPath: viperInstance.GetString(CommandAuthTokenPathKey), + } + } if areTLSSettingsSet() { command.TLS = &TLSConfig{ Cert: viperInstance.GetString(CommandTLSCertKey), diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 1716320b34..3c1c729e85 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -20,6 +20,7 @@ const ( DefCommandServerPortKey = 0 DefCommandServerTypeKey = "grpc" DefCommandAuthTokenKey = "" + DefCommandAuthTokenPathKey = "" DefCommandTLSCertKey = "" DefCommandTLSKeyKey = "" DefCommandTLSCaKey = "" diff --git a/internal/config/flags.go b/internal/config/flags.go index 8cfa72b97a..5b813a2d43 100644 --- a/internal/config/flags.go +++ b/internal/config/flags.go @@ -69,6 +69,7 @@ var ( CollectorLogPathKey = pre(CollectorLogKey) + "path" CommandAuthKey = pre(CommandRootKey) + "auth" CommandAuthTokenKey = pre(CommandAuthKey) + "token" + CommandAuthTokenPathKey = pre(CommandAuthKey) + "token-path" CommandServerHostKey = pre(CommandServerKey) + "host" CommandServerKey = pre(CommandRootKey) + "server" CommandServerPortKey = pre(CommandServerKey) + "port" diff --git a/internal/config/types.go b/internal/config/types.go index 39fe789251..c1cefc5a1b 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -247,7 +247,8 @@ type ( } AuthConfig struct { - Token string `yaml:"-" mapstructure:"token"` + Token string `yaml:"-" mapstructure:"token"` + TokenPath string `yaml:"-" mapstructure:"token-path"` } TLSConfig struct { diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index 4a558cad66..f3b7d44bcc 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -6,12 +6,14 @@ package grpc import ( + "bytes" "context" "crypto/tls" "errors" "fmt" "log/slog" "net" + "os" "sync" "github.com/cenkalti/backoff/v4" @@ -158,7 +160,6 @@ func (w *wrappedStream) SendMsg(message any) error { } func GetDialOptions(agentConfig *config.Config, resourceID string) []grpc.DialOption { - skipToken := false streamClientInterceptors := []grpc.StreamClientInterceptor{grpcRetry.StreamClientInterceptor()} unaryClientInterceptors := []grpc.UnaryClientInterceptor{grpcRetry.UnaryClientInterceptor()} @@ -208,6 +209,17 @@ func GetDialOptions(agentConfig *config.Config, resourceID string) []grpc.DialOp opts = append(opts, sendRecOpts...) + opts, skipToken := addTransportCredentials(agentConfig, opts) + + if agentConfig.Command.Auth != nil && !skipToken { + opts = addPerRPCCredentials(agentConfig, resourceID, opts) + } + + return opts +} + +func addTransportCredentials(agentConfig *config.Config, opts []grpc.DialOption) ([]grpc.DialOption, bool) { + skipToken := false transportCredentials, err := getTransportCredentials(agentConfig) if err == nil { slog.Debug("Adding transport credentials to gRPC dial options") @@ -215,27 +227,72 @@ func GetDialOptions(agentConfig *config.Config, resourceID string) []grpc.DialOp grpc.WithTransportCredentials(transportCredentials), ) } else { + slog.Error("Unable to add transport credentials to gRPC dial options", "error", err) slog.Debug("Adding default transport credentials to gRPC dial options") opts = append(opts, grpc.WithTransportCredentials(defaultCredentials), ) skipToken = true } + return opts, skipToken +} - if agentConfig.Command.Auth != nil && !skipToken { - slog.Debug("Adding token to RPC credentials") - opts = append(opts, - grpc.WithPerRPCCredentials( - &PerRPCCredentials{ - Token: agentConfig.Command.Auth.Token, - ID: resourceID, - }), - ) +func addPerRPCCredentials(agentConfig *config.Config, resourceID string, opts []grpc.DialOption) []grpc.DialOption { + key := agentConfig.Command.Auth.Token + + if agentConfig.Command.Auth.TokenPath != "" { + var err error + key, err = validateTokenFile(agentConfig.Command.Auth.TokenPath) + if err != nil { + slog.Error("Unable to add token to gRPC dial options, token will be empty", "error", err) + } } + slog.Debug("Adding token to RPC credentials") + opts = append(opts, + grpc.WithPerRPCCredentials( + &PerRPCCredentials{ + Token: key, + ID: resourceID, + }), + ) + return opts } +func validateTokenFile(path string) (string, error) { + if path == "" { + slog.Error("Token file path is empty") + return "", errors.New("token file path is empty") + } + + slog.Debug("Checking token file", "path", path) + _, err := os.Stat(path) + if err != nil { + slog.Error("Unable to find token file", "path", path, "error", err) + return "", err + } + + slog.Debug("Reading dataplane key from file", "path", path) + var keyVal string + keyBytes, err := os.ReadFile(path) + if err != nil { + slog.Error("Unable to read token from file", "error", err) + return "", err + } + + keyBytes = bytes.TrimSpace(keyBytes) + keyBytes = bytes.TrimRight(keyBytes, "\n") + keyVal = string(keyBytes) + + if keyVal == "" { + slog.Error("failed to load token, please check agent configuration") + return "", errors.New("failed to load token, please check agent configuration") + } + + return keyVal, nil +} + // Have to create our own UnaryClientInterceptor function since protovalidate only provides a UnaryServerInterceptor // https://pkg.go.dev/github.com/grpc-ecosystem/go-grpc-middleware/v2@v2.1.0/interceptors/protovalidate func ProtoValidatorUnaryClientInterceptor() (grpc.UnaryClientInterceptor, error) { diff --git a/internal/grpc/token b/internal/grpc/token new file mode 100644 index 0000000000..e69de29bb2 From c4b962deb3192944a661d967987b4b9f1a6f5f51 Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Mon, 6 Jan 2025 16:18:06 +0000 Subject: [PATCH 02/22] remove empty file --- internal/grpc/token | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 internal/grpc/token diff --git a/internal/grpc/token b/internal/grpc/token deleted file mode 100644 index e69de29bb2..0000000000 From d68fe7e2f9ccd7eecacc3fc23481453b8abc30bf Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Mon, 6 Jan 2025 17:16:43 +0000 Subject: [PATCH 03/22] simplify token validation and add unit tests --- internal/grpc/grpc.go | 7 ---- internal/grpc/grpc_test.go | 69 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 7 deletions(-) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index f3b7d44bcc..5185163279 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -266,13 +266,6 @@ func validateTokenFile(path string) (string, error) { return "", errors.New("token file path is empty") } - slog.Debug("Checking token file", "path", path) - _, err := os.Stat(path) - if err != nil { - slog.Error("Unable to find token file", "path", path, "error", err) - return "", err - } - slog.Debug("Reading dataplane key from file", "path", path) var keyVal string keyBytes, err := os.ReadFile(path) diff --git a/internal/grpc/grpc_test.go b/internal/grpc/grpc_test.go index 4feaa0e3d8..997001baec 100644 --- a/internal/grpc/grpc_test.go +++ b/internal/grpc/grpc_test.go @@ -8,6 +8,7 @@ package grpc import ( "context" "fmt" + "os" "testing" "github.com/cenkalti/backoff/v4" @@ -352,3 +353,71 @@ func Test_ValidateGrpcError(t *testing.T) { result = ValidateGrpcError(status.Errorf(codes.InvalidArgument, "error")) assert.IsType(t, &backoff.PermanentError{}, result) } + +func Test_validateTokenFile(t *testing.T) { + type args struct { + path string + } + tests := []struct { + name string + createToken bool + args args + want string + wantErrMsg string + }{ + { + name: "File exists", + createToken: true, + args: args{ + path: "test-tkn", + }, + want: "test-tkn", + wantErrMsg: "", + }, + { + name: "File does not exist", + createToken: false, + args: args{ + path: "test-tkn", + }, + want: "", + wantErrMsg: "open test-tkn: no such file or directory", + }, + { + name: "Empty path", + createToken: false, + args: args{ + path: "", + }, + want: "", + wantErrMsg: "token file path is empty", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer func() { + if tt.createToken { + err := os.Remove(tt.args.path) + if err != nil { + t.Log(err) + } + } + }() + + if tt.createToken { + err := os.WriteFile(tt.args.path, []byte(tt.args.path), 0666) + if err != nil { + t.Fatal(err) + } + } + + got, err := validateTokenFile(tt.args.path) + if err != nil { + if err.Error() != tt.wantErrMsg { + t.Errorf("validateTokenFile() error = %v, wantErr %v", err, tt.wantErrMsg) + } + } + assert.Equalf(t, tt.want, got, "validateTokenFile(%v)", tt.args.path) + }) + } +} From 1b2b0293b72ba7b57491ae94869324c4635fbc43 Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Tue, 7 Jan 2025 13:17:20 +0000 Subject: [PATCH 04/22] add unit tests for transport credentials funtions --- api/grpc/mpi/v1/command.pb.go | 2 +- api/grpc/mpi/v1/common.pb.go | 2 +- api/grpc/mpi/v1/files.pb.go | 2 +- internal/grpc/grpc.go | 18 ++++----- internal/grpc/grpc_test.go | 75 ++++++++++++++++++++++------------- 5 files changed, 60 insertions(+), 39 deletions(-) diff --git a/api/grpc/mpi/v1/command.pb.go b/api/grpc/mpi/v1/command.pb.go index 7a29c30dd7..6e46a0c118 100644 --- a/api/grpc/mpi/v1/command.pb.go +++ b/api/grpc/mpi/v1/command.pb.go @@ -8,7 +8,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.0 +// protoc-gen-go v1.36.1 // protoc (unknown) // source: mpi/v1/command.proto diff --git a/api/grpc/mpi/v1/common.pb.go b/api/grpc/mpi/v1/common.pb.go index 493030c7db..8d51fa6b93 100644 --- a/api/grpc/mpi/v1/common.pb.go +++ b/api/grpc/mpi/v1/common.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.0 +// protoc-gen-go v1.36.1 // protoc (unknown) // source: mpi/v1/common.proto diff --git a/api/grpc/mpi/v1/files.pb.go b/api/grpc/mpi/v1/files.pb.go index 38592679c8..896e045753 100644 --- a/api/grpc/mpi/v1/files.pb.go +++ b/api/grpc/mpi/v1/files.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.0 +// protoc-gen-go v1.36.1 // protoc (unknown) // source: mpi/v1/files.proto diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index 5185163279..9a11c709b1 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -219,22 +219,22 @@ func GetDialOptions(agentConfig *config.Config, resourceID string) []grpc.DialOp } func addTransportCredentials(agentConfig *config.Config, opts []grpc.DialOption) ([]grpc.DialOption, bool) { - skipToken := false transportCredentials, err := getTransportCredentials(agentConfig) - if err == nil { - slog.Debug("Adding transport credentials to gRPC dial options") - opts = append(opts, - grpc.WithTransportCredentials(transportCredentials), - ) - } else { + if err != nil { slog.Error("Unable to add transport credentials to gRPC dial options", "error", err) slog.Debug("Adding default transport credentials to gRPC dial options") opts = append(opts, grpc.WithTransportCredentials(defaultCredentials), ) - skipToken = true + + return opts, true } - return opts, skipToken + slog.Debug("Adding transport credentials to gRPC dial options") + opts = append(opts, + grpc.WithTransportCredentials(transportCredentials), + ) + + return opts, false } func addPerRPCCredentials(agentConfig *config.Config, resourceID string, opts []grpc.DialOption) []grpc.DialOption { diff --git a/internal/grpc/grpc_test.go b/internal/grpc/grpc_test.go index 997001baec..403541f26a 100644 --- a/internal/grpc/grpc_test.go +++ b/internal/grpc/grpc_test.go @@ -11,6 +11,8 @@ import ( "os" "testing" + "google.golang.org/grpc/credentials" + "github.com/cenkalti/backoff/v4" "github.com/nginx/agent/v3/test/helpers" "github.com/nginx/agent/v3/test/protos" @@ -354,50 +356,42 @@ func Test_ValidateGrpcError(t *testing.T) { assert.IsType(t, &backoff.PermanentError{}, result) } +// nolint:revive,gocognit func Test_validateTokenFile(t *testing.T) { - type args struct { - path string - } tests := []struct { name string - createToken bool - args args + path string want string wantErrMsg string + createToken bool }{ { - name: "File exists", + name: "Test 1: File exists", createToken: true, - args: args{ - path: "test-tkn", - }, - want: "test-tkn", - wantErrMsg: "", + path: "test-tkn", + want: "test-tkn", + wantErrMsg: "", }, { - name: "File does not exist", + name: "Test 2: File does not exist", createToken: false, - args: args{ - path: "test-tkn", - }, - want: "", - wantErrMsg: "open test-tkn: no such file or directory", + path: "test-tkn", + want: "", + wantErrMsg: "open test-tkn: no such file or directory", }, { - name: "Empty path", + name: "Test 3: Empty path", createToken: false, - args: args{ - path: "", - }, - want: "", - wantErrMsg: "token file path is empty", + path: "", + want: "", + wantErrMsg: "token file path is empty", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { defer func() { if tt.createToken { - err := os.Remove(tt.args.path) + err := os.Remove(tt.path) if err != nil { t.Log(err) } @@ -405,19 +399,46 @@ func Test_validateTokenFile(t *testing.T) { }() if tt.createToken { - err := os.WriteFile(tt.args.path, []byte(tt.args.path), 0666) + err := os.WriteFile(tt.path, []byte(tt.path), 0o600) if err != nil { t.Fatal(err) } } - got, err := validateTokenFile(tt.args.path) + got, err := validateTokenFile(tt.path) if err != nil { if err.Error() != tt.wantErrMsg { t.Errorf("validateTokenFile() error = %v, wantErr %v", err, tt.wantErrMsg) } } - assert.Equalf(t, tt.want, got, "validateTokenFile(%v)", tt.args.path) + assert.Equalf(t, tt.want, got, "validateTokenFile(%v)", tt.path) + }) + } +} + +func Test_getTransportCredentials(t *testing.T) { + tests := []struct { + want credentials.TransportCredentials + conf *config.Config + wantErr assert.ErrorAssertionFunc + name string + }{ + { + name: "No TLS config returns default credentials", + conf: &config.Config{ + Command: &config.Command{}, + }, + want: defaultCredentials, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getTransportCredentials(tt.conf) + if !tt.wantErr(t, err, fmt.Sprintf("getTransportCredentials(%v)", tt.conf)) { + return + } + assert.Equalf(t, tt.want, got, "getTransportCredentials(%v)", tt.conf) }) } } From a43cb5380c83777508f33e107fe9f5ad131d4a7e Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Fri, 10 Jan 2025 11:13:52 +0000 Subject: [PATCH 05/22] address PR feedback --- internal/grpc/grpc.go | 15 ++++++--------- internal/grpc/grpc_test.go | 6 +++--- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index 9a11c709b1..e8073c2138 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -221,8 +221,7 @@ func GetDialOptions(agentConfig *config.Config, resourceID string) []grpc.DialOp func addTransportCredentials(agentConfig *config.Config, opts []grpc.DialOption) ([]grpc.DialOption, bool) { transportCredentials, err := getTransportCredentials(agentConfig) if err != nil { - slog.Error("Unable to add transport credentials to gRPC dial options", "error", err) - slog.Debug("Adding default transport credentials to gRPC dial options") + slog.Error("Unable to get transport credentials from agent configuration, adding default transport credentials to gRPC dial options", "error", err) opts = append(opts, grpc.WithTransportCredentials(defaultCredentials), ) @@ -238,11 +237,11 @@ func addTransportCredentials(agentConfig *config.Config, opts []grpc.DialOption) } func addPerRPCCredentials(agentConfig *config.Config, resourceID string, opts []grpc.DialOption) []grpc.DialOption { - key := agentConfig.Command.Auth.Token + token := agentConfig.Command.Auth.Token if agentConfig.Command.Auth.TokenPath != "" { var err error - key, err = validateTokenFile(agentConfig.Command.Auth.TokenPath) + token, err = retrieveTokenFile(agentConfig.Command.Auth.TokenPath) if err != nil { slog.Error("Unable to add token to gRPC dial options, token will be empty", "error", err) } @@ -252,7 +251,7 @@ func addPerRPCCredentials(agentConfig *config.Config, resourceID string, opts [] opts = append(opts, grpc.WithPerRPCCredentials( &PerRPCCredentials{ - Token: key, + Token: token, ID: resourceID, }), ) @@ -260,9 +259,8 @@ func addPerRPCCredentials(agentConfig *config.Config, resourceID string, opts [] return opts } -func validateTokenFile(path string) (string, error) { +func retrieveTokenFile(path string) (string, error) { if path == "" { - slog.Error("Token file path is empty") return "", errors.New("token file path is empty") } @@ -279,8 +277,7 @@ func validateTokenFile(path string) (string, error) { keyVal = string(keyBytes) if keyVal == "" { - slog.Error("failed to load token, please check agent configuration") - return "", errors.New("failed to load token, please check agent configuration") + return "", errors.New("failed to retrieve token, token file is empty") } return keyVal, nil diff --git a/internal/grpc/grpc_test.go b/internal/grpc/grpc_test.go index 403541f26a..5de547e2f6 100644 --- a/internal/grpc/grpc_test.go +++ b/internal/grpc/grpc_test.go @@ -405,13 +405,13 @@ func Test_validateTokenFile(t *testing.T) { } } - got, err := validateTokenFile(tt.path) + got, err := retrieveTokenFile(tt.path) if err != nil { if err.Error() != tt.wantErrMsg { - t.Errorf("validateTokenFile() error = %v, wantErr %v", err, tt.wantErrMsg) + t.Errorf("retrieveTokenFile() error = %v, wantErr %v", err, tt.wantErrMsg) } } - assert.Equalf(t, tt.want, got, "validateTokenFile(%v)", tt.path) + assert.Equalf(t, tt.want, got, "retrieveTokenFile(%v)", tt.path) }) } } From 2ecf10290ee708f5b2e6ba9b41f8a09c8b636367 Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Fri, 10 Jan 2025 11:15:22 +0000 Subject: [PATCH 06/22] proto updates --- api/grpc/mpi/v1/command.pb.go | 2 +- api/grpc/mpi/v1/common.pb.go | 2 +- api/grpc/mpi/v1/files.pb.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/api/grpc/mpi/v1/command.pb.go b/api/grpc/mpi/v1/command.pb.go index 6e46a0c118..94368f7d33 100644 --- a/api/grpc/mpi/v1/command.pb.go +++ b/api/grpc/mpi/v1/command.pb.go @@ -8,7 +8,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.1 +// protoc-gen-go v1.36.2 // protoc (unknown) // source: mpi/v1/command.proto diff --git a/api/grpc/mpi/v1/common.pb.go b/api/grpc/mpi/v1/common.pb.go index 8d51fa6b93..763aae6c0c 100644 --- a/api/grpc/mpi/v1/common.pb.go +++ b/api/grpc/mpi/v1/common.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.1 +// protoc-gen-go v1.36.2 // protoc (unknown) // source: mpi/v1/common.proto diff --git a/api/grpc/mpi/v1/files.pb.go b/api/grpc/mpi/v1/files.pb.go index 896e045753..cd41aa7389 100644 --- a/api/grpc/mpi/v1/files.pb.go +++ b/api/grpc/mpi/v1/files.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.1 +// protoc-gen-go v1.36.2 // protoc (unknown) // source: mpi/v1/files.proto From 687220727619c317885bdea423e92c2ff3806f40 Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Fri, 10 Jan 2025 11:15:38 +0000 Subject: [PATCH 07/22] fix function name --- internal/grpc/grpc.go | 4 ++-- internal/grpc/grpc_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index e8073c2138..90dd74db37 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -241,7 +241,7 @@ func addPerRPCCredentials(agentConfig *config.Config, resourceID string, opts [] if agentConfig.Command.Auth.TokenPath != "" { var err error - token, err = retrieveTokenFile(agentConfig.Command.Auth.TokenPath) + token, err = retrieveTokenFromFile(agentConfig.Command.Auth.TokenPath) if err != nil { slog.Error("Unable to add token to gRPC dial options, token will be empty", "error", err) } @@ -259,7 +259,7 @@ func addPerRPCCredentials(agentConfig *config.Config, resourceID string, opts [] return opts } -func retrieveTokenFile(path string) (string, error) { +func retrieveTokenFromFile(path string) (string, error) { if path == "" { return "", errors.New("token file path is empty") } diff --git a/internal/grpc/grpc_test.go b/internal/grpc/grpc_test.go index 5de547e2f6..388b46a841 100644 --- a/internal/grpc/grpc_test.go +++ b/internal/grpc/grpc_test.go @@ -405,13 +405,13 @@ func Test_validateTokenFile(t *testing.T) { } } - got, err := retrieveTokenFile(tt.path) + got, err := retrieveTokenFromFile(tt.path) if err != nil { if err.Error() != tt.wantErrMsg { - t.Errorf("retrieveTokenFile() error = %v, wantErr %v", err, tt.wantErrMsg) + t.Errorf("retrieveTokenFromFile() error = %v, wantErr %v", err, tt.wantErrMsg) } } - assert.Equalf(t, tt.want, got, "retrieveTokenFile(%v)", tt.path) + assert.Equalf(t, tt.want, got, "retrieveTokenFromFile(%v)", tt.path) }) } } From 2c30d2c8b47d132143943db6bc70a3fcd076a53a Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Fri, 10 Jan 2025 11:27:05 +0000 Subject: [PATCH 08/22] fix lint error: lll --- internal/grpc/grpc.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index 90dd74db37..4521551b3a 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -221,7 +221,8 @@ func GetDialOptions(agentConfig *config.Config, resourceID string) []grpc.DialOp func addTransportCredentials(agentConfig *config.Config, opts []grpc.DialOption) ([]grpc.DialOption, bool) { transportCredentials, err := getTransportCredentials(agentConfig) if err != nil { - slog.Error("Unable to get transport credentials from agent configuration, adding default transport credentials to gRPC dial options", "error", err) + slog.Error("Unable to get transport credentials from agent configuration, "+ + "adding default transport credentials to gRPC dial options", "error", err) opts = append(opts, grpc.WithTransportCredentials(defaultCredentials), ) From 2ee40ed861205ad9a981489ee26ce83175d7deef Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Fri, 10 Jan 2025 11:29:58 +0000 Subject: [PATCH 09/22] add missing PR feedback --- internal/grpc/grpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index 4521551b3a..31ea79f00a 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -265,7 +265,7 @@ func retrieveTokenFromFile(path string) (string, error) { return "", errors.New("token file path is empty") } - slog.Debug("Reading dataplane key from file", "path", path) + slog.Debug("Reading token from file", "path", path) var keyVal string keyBytes, err := os.ReadFile(path) if err != nil { From 3cbf80a08c1dde6c89b8e533c504fd72ebb13fab Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Tue, 14 Jan 2025 13:07:15 +0000 Subject: [PATCH 10/22] remove error log message --- internal/grpc/grpc.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index 31ea79f00a..402aab21c3 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -269,8 +269,7 @@ func retrieveTokenFromFile(path string) (string, error) { var keyVal string keyBytes, err := os.ReadFile(path) if err != nil { - slog.Error("Unable to read token from file", "error", err) - return "", err + return "", fmt.Errorf("unable to read token from file: %w", err) } keyBytes = bytes.TrimSpace(keyBytes) From 95403c9408450b0307f30b7d35398ed7603210ca Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Tue, 14 Jan 2025 16:04:27 +0000 Subject: [PATCH 11/22] fix unit test --- internal/grpc/grpc_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/grpc/grpc_test.go b/internal/grpc/grpc_test.go index 388b46a841..5dee66471b 100644 --- a/internal/grpc/grpc_test.go +++ b/internal/grpc/grpc_test.go @@ -377,7 +377,7 @@ func Test_validateTokenFile(t *testing.T) { createToken: false, path: "test-tkn", want: "", - wantErrMsg: "open test-tkn: no such file or directory", + wantErrMsg: "unable to read token from file: open test-tkn: no such file or directory", }, { name: "Test 3: Empty path", From b36e5fd95cbddf2ee5de3069b0a8e52b7bbadb5f Mon Sep 17 00:00:00 2001 From: Sean Breen <101327931+sean-breen@users.noreply.github.com> Date: Tue, 14 Jan 2025 14:49:54 +0000 Subject: [PATCH 12/22] Fix apk test package naming (#961) * modify alpine package name: nginx-agent-3.0.0_1234 -> nginx-agent-3.0.0.1234 * protoc-gen update --- Makefile.packaging | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile.packaging b/Makefile.packaging index 000ab6c1d8..35bc95bf8e 100644 --- a/Makefile.packaging +++ b/Makefile.packaging @@ -10,7 +10,7 @@ BINARY_PATH := $(BUILD_DIR)/$(BINARY_NAME) GPG_PUBLIC_KEY := .key PACKAGE_BUILD ?= 1 PACKAGE_VERSION := $(shell echo ${VERSION} | tr -d 'v')-$(PACKAGE_BUILD) -APK_PACKAGE_VERSION := $(shell echo ${VERSION} | tr -d 'v')_$(PACKAGE_BUILD) +APK_PACKAGE_VERSION := $(shell echo ${VERSION} | tr -d 'v').$(PACKAGE_BUILD) TARBALL_NAME := $(PACKAGE_PREFIX)v3.tar.gz DEB_DISTROS ?= ubuntu-noble-24.04 ubuntu-jammy-22.04 ubuntu-focal-20.04 debian-bookworm-12 debian-bullseye-11 From 36f9e3cae5f9456639654b5ba50f2eed36306f28 Mon Sep 17 00:00:00 2001 From: aphralG <108004222+aphralG@users.noreply.github.com> Date: Wed, 15 Jan 2025 11:48:03 +0000 Subject: [PATCH 13/22] Update agent config defaults and format (#959) * update config defaults and format --- internal/backoff/backoff.go | 6 +- internal/backoff/backoff_test.go | 20 +-- internal/collector/otel_collector_plugin.go | 4 +- internal/command/command_service.go | 34 +++-- internal/config/config.go | 128 +++++++++++++----- internal/config/config_test.go | 103 +++++++++++--- internal/config/defaults.go | 19 ++- internal/config/flags.go | 22 ++- internal/config/testdata/nginx-agent.conf | 21 ++- internal/config/types.go | 46 ++++--- internal/file/file_manager_service.go | 13 +- internal/grpc/grpc.go | 16 +-- internal/resource/resource_plugin_test.go | 4 +- .../watcher/instance/nginx_config_parser.go | 4 +- test/mock/grpc/cmd/main.go | 6 +- test/mock/grpc/mock_management_server.go | 15 +- test/types/config.go | 29 ++-- 17 files changed, 327 insertions(+), 163 deletions(-) diff --git a/internal/backoff/backoff.go b/internal/backoff/backoff.go index c525fbe775..0ceef8c730 100644 --- a/internal/backoff/backoff.go +++ b/internal/backoff/backoff.go @@ -48,7 +48,7 @@ import ( // Information from https://pkg.go.dev/github.com/cenkalti/backoff/v4#section-readme func WaitUntil( ctx context.Context, - backoffSettings *config.CommonSettings, + backoffSettings *config.BackOff, operation backoff.Operation, ) error { eb := backoff.NewExponentialBackOff() @@ -68,7 +68,7 @@ func WaitUntil( // nolint: ireturn func WaitUntilWithData[T any]( ctx context.Context, - backoffSettings *config.CommonSettings, + backoffSettings *config.BackOff, operation backoff.OperationWithData[T], ) (T, error) { backoffWithContext := Context(ctx, backoffSettings) @@ -77,7 +77,7 @@ func WaitUntilWithData[T any]( } // nolint: ireturn -func Context(ctx context.Context, backoffSettings *config.CommonSettings) backoff.BackOffContext { +func Context(ctx context.Context, backoffSettings *config.BackOff) backoff.BackOffContext { eb := backoff.NewExponentialBackOff() eb.InitialInterval = backoffSettings.InitialInterval eb.MaxInterval = backoffSettings.MaxInterval diff --git a/internal/backoff/backoff_test.go b/internal/backoff/backoff_test.go index 43e794292a..60cbc75a9b 100644 --- a/internal/backoff/backoff_test.go +++ b/internal/backoff/backoff_test.go @@ -82,14 +82,16 @@ func TestWaitUntil(t *testing.T) { for _, test := range tests { invocations = 0 - settings := &config.CommonSettings{ - InitialInterval: test.initialInterval, - MaxInterval: test.maxInterval, - MaxElapsedTime: test.maxElapsedTime, - RandomizationFactor: config.DefBackoffRandomizationFactor, - Multiplier: config.DefBackoffMultiplier, + settings := &config.Client{ + Backoff: &config.BackOff{ + InitialInterval: test.initialInterval, + MaxInterval: test.maxInterval, + MaxElapsedTime: test.maxElapsedTime, + RandomizationFactor: config.DefBackoffRandomizationFactor, + Multiplier: config.DefBackoffMultiplier, + }, } - result := WaitUntil(test.context, settings, test.operation) + result := WaitUntil(test.context, settings.Backoff, test.operation) if test.expectedError { assert.Errorf(t, result, test.name) @@ -164,7 +166,7 @@ func TestWaitUntilWithData(t *testing.T) { } for _, test := range tests { - settings := &config.CommonSettings{ + settings := &config.BackOff{ InitialInterval: test.initialInterval, MaxInterval: test.maxInterval, MaxElapsedTime: test.maxElapsedTime, @@ -185,7 +187,7 @@ func TestWaitUntilWithData(t *testing.T) { } func TestContext(t *testing.T) { - settings := &config.CommonSettings{ + settings := &config.BackOff{ InitialInterval: 10 * time.Millisecond, MaxInterval: 10 * time.Millisecond, MaxElapsedTime: 10 * time.Millisecond, diff --git a/internal/collector/otel_collector_plugin.go b/internal/collector/otel_collector_plugin.go index 07f4252360..cb3429be63 100644 --- a/internal/collector/otel_collector_plugin.go +++ b/internal/collector/otel_collector_plugin.go @@ -203,9 +203,9 @@ func (oc *Collector) Close(ctx context.Context) error { oc.service.Shutdown() oc.cancel() - settings := oc.config.Common + settings := oc.config.Client.Backoff settings.MaxElapsedTime = maxTimeToWaitForShutdown - err := backoff.WaitUntil(ctx, oc.config.Common, func() error { + err := backoff.WaitUntil(ctx, settings, func() error { if oc.service.GetState() == otelcol.StateClosed { return nil } diff --git a/internal/command/command_service.go b/internal/command/command_service.go index 5c991e76d2..419b198c9d 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -11,7 +11,6 @@ import ( "log/slog" "sync" "sync/atomic" - "time" "github.com/cenkalti/backoff/v4" @@ -29,7 +28,6 @@ import ( var _ commandService = (*CommandService)(nil) const ( - retryInterval = 5 * time.Second createConnectionMaxElapsedTime = 0 ) @@ -95,7 +93,7 @@ func (cs *CommandService) UpdateDataPlaneStatus( Resource: resource, } - backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Common.MaxElapsedTime) + backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() sendDataPlaneStatus := func() (*mpi.UpdateDataPlaneStatusResponse, error) { @@ -119,7 +117,7 @@ func (cs *CommandService) UpdateDataPlaneStatus( response, err := backoff.RetryWithData( sendDataPlaneStatus, - backoffHelpers.Context(backOffCtx, cs.agentConfig.Common), + backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff), ) if err != nil { return err @@ -145,12 +143,12 @@ func (cs *CommandService) UpdateDataPlaneHealth(ctx context.Context, instanceHea InstanceHealths: instanceHealths, } - backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Common.MaxElapsedTime) + backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() response, err := backoff.RetryWithData( cs.dataPlaneHealthCallback(ctx, request), - backoffHelpers.Context(backOffCtx, cs.agentConfig.Common), + backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff), ) if err != nil { return err @@ -164,12 +162,12 @@ func (cs *CommandService) UpdateDataPlaneHealth(ctx context.Context, instanceHea func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *mpi.DataPlaneResponse) error { slog.DebugContext(ctx, "Sending data plane response", "response", response) - backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Common.MaxElapsedTime) + backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() return backoff.Retry( cs.sendDataPlaneResponseCallback(ctx, response), - backoffHelpers.Context(backOffCtx, cs.agentConfig.Common), + backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff), ) } @@ -184,12 +182,12 @@ func (cs *CommandService) CancelSubscription(ctx context.Context) { } func (cs *CommandService) subscribe(ctx context.Context) { - commonSettings := &config.CommonSettings{ - InitialInterval: cs.agentConfig.Common.InitialInterval, - MaxInterval: cs.agentConfig.Common.MaxInterval, + commonSettings := &config.BackOff{ + InitialInterval: cs.agentConfig.Client.Backoff.InitialInterval, + MaxInterval: cs.agentConfig.Client.Backoff.MaxInterval, MaxElapsedTime: createConnectionMaxElapsedTime, - RandomizationFactor: cs.agentConfig.Common.RandomizationFactor, - Multiplier: cs.agentConfig.Common.Multiplier, + RandomizationFactor: cs.agentConfig.Client.Backoff.RandomizationFactor, + Multiplier: cs.agentConfig.Client.Backoff.Multiplier, } for { @@ -223,12 +221,12 @@ func (cs *CommandService) CreateConnection( Resource: resource, } - commonSettings := &config.CommonSettings{ - InitialInterval: cs.agentConfig.Common.InitialInterval, - MaxInterval: cs.agentConfig.Common.MaxInterval, + commonSettings := &config.BackOff{ + InitialInterval: cs.agentConfig.Client.Backoff.InitialInterval, + MaxInterval: cs.agentConfig.Client.Backoff.MaxInterval, MaxElapsedTime: createConnectionMaxElapsedTime, - RandomizationFactor: cs.agentConfig.Common.RandomizationFactor, - Multiplier: cs.agentConfig.Common.Multiplier, + RandomizationFactor: cs.agentConfig.Client.Backoff.RandomizationFactor, + Multiplier: cs.agentConfig.Client.Backoff.Multiplier, } slog.DebugContext(ctx, "Sending create connection request", "request", request) diff --git a/internal/config/config.go b/internal/config/config.go index e081c0b1ad..aa8fa07116 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -14,7 +14,6 @@ import ( "path/filepath" "slices" "strings" - "time" selfsignedcerts "github.com/nginx/agent/v3/pkg/tls" uuidLibrary "github.com/nginx/agent/v3/pkg/uuid" @@ -101,7 +100,6 @@ func ResolveConfig() (*Config, error) { AllowedDirectories: allowedDirs, Collector: collector, Command: resolveCommand(), - Common: resolveCommon(), Watchers: resolveWatchers(), Features: viperInstance.GetStringSlice(FeaturesKey), } @@ -155,7 +153,6 @@ func registerFlags() { "collection or error monitoring", ) - fs.Duration(ClientTimeoutKey, time.Minute, "Client timeout") fs.StringSlice(AllowedDirectoriesKey, DefaultAllowedDirectories(), "A comma-separated list of paths that you want to grant NGINX Agent read/write access to") @@ -178,24 +175,6 @@ func registerFlags() { "How often the NGINX Agent will check for file changes.", ) - fs.Int( - ClientMaxMessageSizeKey, - DefMaxMessageSize, - "The value used, if not 0, for both max_message_send_size and max_message_receive_size", - ) - - fs.Int( - ClientMaxMessageReceiveSizeKey, - DefMaxMessageRecieveSize, - "Updates the client grpc setting MaxRecvMsgSize with the specific value in MB.", - ) - - fs.Int( - ClientMaxMessageSendSizeKey, - DefMaxMessageSendSize, - "Updates the client grpc setting MaxSendMsgSize with the specific value in MB.", - ) - fs.StringSlice( FeaturesKey, DefaultFeatures(), @@ -204,6 +183,7 @@ func registerFlags() { registerCommandFlags(fs) registerCollectorFlags(fs) + registerClientFlags(fs) fs.SetNormalizeFunc(normalizeFunc) @@ -218,6 +198,76 @@ func registerFlags() { }) } +func registerClientFlags(fs *flag.FlagSet) { + // HTTP Flags + fs.Duration( + ClientHTTPTimeoutKey, + DefHTTPTimeout, + "The client HTTP Timeout, value in seconds") + + // Backoff Flags + fs.Duration( + ClientBackoffInitialIntervalKey, + DefBackoffInitialInterval, + "The client backoff initial interval, value in seconds") + + fs.Duration( + ClientBackoffMaxIntervalKey, + DefBackoffMaxInterval, + "The client backoff max interval, value in seconds") + + fs.Duration( + ClientBackoffMaxElapsedTimeKey, + DefBackoffMaxElapsedTime, + "The client backoff max elapsed time, value in seconds") + + fs.Float64( + ClientBackoffRandomizationFactorKey, + DefBackoffRandomizationFactor, + "The client backoff randomization factor, value float") + + fs.Float64( + ClientBackoffMultiplierKey, + DefBackoffMultiplier, + "The client backoff multiplier, value float") + + // GRPC Flags + fs.Duration( + ClientKeepAliveTimeoutKey, + DefGRPCKeepAliveTimeout, + "Updates the client grpc setting, KeepAlive Timeout with the specific value in seconds.", + ) + + fs.Duration( + ClientKeepAliveTimeKey, + DefGRPCKeepAliveTime, + "Updates the client grpc setting, KeepAlive Time with the specific value in seconds.", + ) + + fs.Bool( + ClientKeepAlivePermitWithoutStreamKey, + DefGRPCKeepAlivePermitWithoutStream, + "Update the client grpc setting, KeepAlive PermitWithoutStream value") + + fs.Int( + ClientGRPCMaxMessageSizeKey, + DefMaxMessageSize, + "The value used, if not 0, for both max_message_send_size and max_message_receive_size", + ) + + fs.Int( + ClientGRPCMaxMessageReceiveSizeKey, + DefMaxMessageRecieveSize, + "Updates the client grpc setting MaxRecvMsgSize with the specific value in MB.", + ) + + fs.Int( + ClientGRPCMaxMessageSendSizeKey, + DefMaxMessageSendSize, + "Updates the client grpc setting MaxSendMsgSize with the specific value in MB.", + ) +} + func registerCommandFlags(fs *flag.FlagSet) { fs.String( CommandServerHostKey, @@ -424,12 +474,26 @@ func resolveDataPlaneConfig() *DataPlaneConfig { func resolveClient() *Client { return &Client{ - Timeout: viperInstance.GetDuration(ClientTimeoutKey), - Time: viperInstance.GetDuration(ClientTimeKey), - PermitWithoutStream: viperInstance.GetBool(ClientPermitWithoutStreamKey), - MaxMessageSize: viperInstance.GetInt(ClientMaxMessageSizeKey), - MaxMessageRecieveSize: viperInstance.GetInt(ClientMaxMessageReceiveSizeKey), - MaxMessageSendSize: viperInstance.GetInt(ClientMaxMessageSendSizeKey), + HTTP: &HTTP{ + Timeout: viperInstance.GetDuration(ClientHTTPTimeoutKey), + }, + Grpc: &GRPC{ + KeepAlive: &KeepAlive{ + Timeout: viperInstance.GetDuration(ClientKeepAliveTimeoutKey), + Time: viperInstance.GetDuration(ClientKeepAliveTimeKey), + PermitWithoutStream: viperInstance.GetBool(ClientKeepAlivePermitWithoutStreamKey), + }, + MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey), + MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey), + MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey), + }, + Backoff: &BackOff{ + InitialInterval: viperInstance.GetDuration(ClientBackoffInitialIntervalKey), + MaxInterval: viperInstance.GetDuration(ClientBackoffMaxIntervalKey), + MaxElapsedTime: viperInstance.GetDuration(ClientBackoffMaxElapsedTimeKey), + RandomizationFactor: viperInstance.GetFloat64(ClientBackoffRandomizationFactorKey), + Multiplier: viperInstance.GetFloat64(ClientBackoffMultiplierKey), + }, } } @@ -697,16 +761,6 @@ func arePrometheusExportTLSSettingsSet() bool { viperInstance.IsSet(CollectorPrometheusExporterTLSServerNameKey) } -func resolveCommon() *CommonSettings { - return &CommonSettings{ - InitialInterval: DefBackoffInitialInterval, - MaxInterval: DefBackoffMaxInterval, - MaxElapsedTime: DefBackoffMaxElapsedTime, - RandomizationFactor: DefBackoffRandomizationFactor, - Multiplier: DefBackoffMultiplier, - } -} - func resolveWatchers() *Watchers { return &Watchers{ InstanceWatcher: InstanceWatcher{ diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 9cfb9d36e6..70501d76ff 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -73,7 +73,23 @@ func TestResolveConfig(t *testing.T) { assert.NotEmpty(t, actual.Collector.Exporters) assert.NotEmpty(t, actual.Collector.Extensions) - assert.Equal(t, 10*time.Second, actual.Client.Timeout) + // Client GRPC Settings + assert.Equal(t, 15*time.Second, actual.Client.Grpc.KeepAlive.Timeout) + assert.Equal(t, 10*time.Second, actual.Client.Grpc.KeepAlive.Time) + assert.False(t, actual.Client.Grpc.KeepAlive.PermitWithoutStream) + assert.Equal(t, 1048575, actual.Client.Grpc.MaxMessageSize) + assert.Equal(t, 1048575, actual.Client.Grpc.MaxMessageReceiveSize) + assert.Equal(t, 1048575, actual.Client.Grpc.MaxMessageSendSize) + + // Client HTTP Settings + assert.Equal(t, 15*time.Second, actual.Client.HTTP.Timeout) + + // Client Backoff Settings + assert.Equal(t, 200*time.Millisecond, actual.Client.Backoff.InitialInterval) + assert.Equal(t, 10*time.Second, actual.Client.Backoff.MaxInterval) + assert.Equal(t, 25*time.Second, actual.Client.Backoff.MaxElapsedTime) + assert.InDelta(t, 1.5, actual.Client.Backoff.RandomizationFactor, 0.01) + assert.InDelta(t, 2.5, actual.Client.Backoff.Multiplier, 0.01) assert.Equal(t, allowedDir, @@ -101,12 +117,35 @@ func TestRegisterFlags(t *testing.T) { t.Setenv("NGINX_AGENT_PROCESS_MONITOR_MONITORING_FREQUENCY", "10s") t.Setenv("NGINX_AGENT_DATA_PLANE_API_HOST", "example.com") t.Setenv("NGINX_AGENT_DATA_PLANE_API_PORT", "9090") - t.Setenv("NGINX_AGENT_CLIENT_TIMEOUT", "10s") + t.Setenv("NGINX_AGENT_CLIENT_GRPC_KEEPALIVE_TIMEOUT", "10s") registerFlags() assert.Equal(t, "warn", viperInstance.GetString(LogLevelKey)) assert.Equal(t, "/var/log/test/agent.log", viperInstance.GetString(LogPathKey)) - assert.Equal(t, 10*time.Second, viperInstance.GetDuration(ClientTimeoutKey)) + assert.Equal(t, 10*time.Second, viperInstance.GetDuration(ClientKeepAliveTimeoutKey)) + + checkDefaultsClientValues(t, viperInstance) +} + +func checkDefaultsClientValues(t *testing.T, viperInstance *viper.Viper) { + t.Helper() + + assert.Equal(t, DefHTTPTimeout, viperInstance.GetDuration(ClientHTTPTimeoutKey)) + + assert.Equal(t, DefBackoffInitialInterval, viperInstance.GetDuration(ClientBackoffInitialIntervalKey)) + assert.Equal(t, DefBackoffMaxInterval, viperInstance.GetDuration(ClientBackoffMaxIntervalKey)) + assert.InDelta(t, DefBackoffRandomizationFactor, viperInstance.GetFloat64(ClientBackoffRandomizationFactorKey), + 0.01) + assert.InDelta(t, DefBackoffMultiplier, viperInstance.GetFloat64(ClientBackoffMultiplierKey), 0.01) + assert.Equal(t, DefBackoffMaxElapsedTime, viperInstance.GetDuration(ClientBackoffMaxElapsedTimeKey)) + + assert.Equal(t, DefGRPCKeepAliveTimeout, viperInstance.GetDuration(ClientKeepAliveTimeoutKey)) + assert.Equal(t, DefGRPCKeepAliveTime, viperInstance.GetDuration(ClientKeepAliveTimeKey)) + assert.Equal(t, DefGRPCKeepAlivePermitWithoutStream, viperInstance.GetBool(ClientKeepAlivePermitWithoutStreamKey)) + + assert.Equal(t, DefMaxMessageSize, viperInstance.GetInt(ClientGRPCMaxMessageSizeKey)) + assert.Equal(t, DefMaxMessageRecieveSize, viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey)) + assert.Equal(t, DefMaxMessageSendSize, viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey)) } func TestSeekFileInPaths(t *testing.T) { @@ -140,7 +179,7 @@ func TestLoadPropertiesFromFile(t *testing.T) { assert.Equal(t, "debug", viperInstance.GetString(LogLevelKey)) assert.Equal(t, "./", viperInstance.GetString(LogPathKey)) - assert.Equal(t, 10*time.Second, viperInstance.GetDuration(ClientTimeoutKey)) + assert.Equal(t, 15*time.Second, viperInstance.GetDuration(ClientKeepAliveTimeoutKey)) err = loadPropertiesFromFile("./testdata/unknown.conf") require.Error(t, err) @@ -165,10 +204,10 @@ func TestResolveLog(t *testing.T) { func TestResolveClient(t *testing.T) { viperInstance = viper.NewWithOptions(viper.KeyDelimiter(KeyDelimiter)) - viperInstance.Set(ClientTimeoutKey, time.Hour) + viperInstance.Set(ClientKeepAliveTimeoutKey, time.Hour) result := resolveClient() - assert.Equal(t, time.Hour, result.Timeout) + assert.Equal(t, time.Hour, result.Grpc.KeepAlive.Timeout) } func TestResolveCollector(t *testing.T) { @@ -269,18 +308,26 @@ func TestClient(t *testing.T) { viperInstance = viper.NewWithOptions(viper.KeyDelimiter(KeyDelimiter)) expected := getAgentConfig().Client - viperInstance.Set(ClientMaxMessageSizeKey, expected.MaxMessageSize) - viperInstance.Set(ClientPermitWithoutStreamKey, expected.PermitWithoutStream) - viperInstance.Set(ClientTimeKey, expected.Time) - viperInstance.Set(ClientTimeoutKey, expected.Timeout) + viperInstance.Set(ClientGRPCMaxMessageSizeKey, expected.Grpc.MaxMessageSize) + viperInstance.Set(ClientKeepAlivePermitWithoutStreamKey, expected.Grpc.KeepAlive.PermitWithoutStream) + viperInstance.Set(ClientKeepAliveTimeKey, expected.Grpc.KeepAlive.Time) + viperInstance.Set(ClientKeepAliveTimeoutKey, expected.Grpc.KeepAlive.Timeout) + + viperInstance.Set(ClientHTTPTimeoutKey, expected.HTTP.Timeout) + + viperInstance.Set(ClientBackoffMaxIntervalKey, expected.Backoff.MaxInterval) + viperInstance.Set(ClientBackoffMultiplierKey, expected.Backoff.Multiplier) + viperInstance.Set(ClientBackoffMaxElapsedTimeKey, expected.Backoff.MaxElapsedTime) + viperInstance.Set(ClientBackoffInitialIntervalKey, expected.Backoff.InitialInterval) + viperInstance.Set(ClientBackoffRandomizationFactorKey, expected.Backoff.RandomizationFactor) // root keys for sections are set appropriately - assert.True(t, viperInstance.IsSet(ClientMaxMessageSizeKey)) - assert.False(t, viperInstance.IsSet(ClientMaxMessageReceiveSizeKey)) - assert.False(t, viperInstance.IsSet(ClientMaxMessageSendSizeKey)) + assert.True(t, viperInstance.IsSet(ClientGRPCMaxMessageSizeKey)) + assert.False(t, viperInstance.IsSet(ClientGRPCMaxMessageReceiveSizeKey)) + assert.False(t, viperInstance.IsSet(ClientGRPCMaxMessageSendSizeKey)) - viperInstance.Set(ClientMaxMessageReceiveSizeKey, expected.MaxMessageRecieveSize) - viperInstance.Set(ClientMaxMessageSendSizeKey, expected.MaxMessageSendSize) + viperInstance.Set(ClientGRPCMaxMessageReceiveSizeKey, expected.Grpc.MaxMessageReceiveSize) + viperInstance.Set(ClientGRPCMaxMessageSendSizeKey, expected.Grpc.MaxMessageSendSize) result := resolveClient() @@ -294,12 +341,26 @@ func getAgentConfig() *Config { Path: "", Log: &Log{}, Client: &Client{ - Timeout: 5 * time.Second, - Time: 4 * time.Second, - PermitWithoutStream: true, - MaxMessageSize: 1, - MaxMessageRecieveSize: 20, - MaxMessageSendSize: 40, + HTTP: &HTTP{ + Timeout: 10 * time.Second, + }, + Grpc: &GRPC{ + KeepAlive: &KeepAlive{ + Timeout: 5 * time.Second, + Time: 4 * time.Second, + PermitWithoutStream: true, + }, + MaxMessageSize: 1, + MaxMessageReceiveSize: 20, + MaxMessageSendSize: 40, + }, + Backoff: &BackOff{ + InitialInterval: 500 * time.Millisecond, + MaxInterval: 5 * time.Second, + MaxElapsedTime: 30 * time.Second, + RandomizationFactor: 0.5, + Multiplier: 1.5, + }, }, AllowedDirectories: []string{ "/etc/nginx", "/usr/local/etc/nginx", "/var/run/nginx", "/var/log/nginx", "/usr/share/nginx/modules", diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 3c1c729e85..9cc7f822e5 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -27,16 +27,25 @@ const ( DefCommandTLSSkipVerifyKey = false DefCommandTLServerNameKey = "" + // Client GRPC Settings DefMaxMessageSize = 0 // 0 = unset DefMaxMessageRecieveSize = 4194304 // default 4 MB DefMaxMessageSendSize = math.MaxInt32 - // Backoff defaults - DefBackoffInitialInterval = 50 * time.Millisecond - DefBackoffRandomizationFactor = 0.1 // the value is 0 <= and < 1 + // Client HTTP Settings + DefHTTPTimeout = 10 * time.Second + + // Client GRPC Keep Alive Settings + DefGRPCKeepAliveTimeout = 10 * time.Second + DefGRPCKeepAliveTime = 20 * time.Second + DefGRPCKeepAlivePermitWithoutStream = true + + // Client Backoff defaults + DefBackoffInitialInterval = 500 * time.Millisecond + DefBackoffRandomizationFactor = 0.5 // the value is 0 <= and < 1 DefBackoffMultiplier = 1.5 - DefBackoffMaxInterval = 200 * time.Millisecond - DefBackoffMaxElapsedTime = 3 * time.Second + DefBackoffMaxInterval = 5 * time.Second + DefBackoffMaxElapsedTime = 30 * time.Second // Watcher defaults DefInstanceWatcherMonitoringFrequency = 5 * time.Second diff --git a/internal/config/flags.go b/internal/config/flags.go index 5b813a2d43..7b4bb793df 100644 --- a/internal/config/flags.go +++ b/internal/config/flags.go @@ -26,12 +26,22 @@ const ( var ( // child flags saved as vars to enable easier prefixing. - ClientPermitWithoutStreamKey = pre(ClientRootKey) + "permit_without_stream" - ClientTimeKey = pre(ClientRootKey) + "time" - ClientTimeoutKey = pre(ClientRootKey) + "timeout" - ClientMaxMessageSendSizeKey = pre(ClientRootKey) + "max_message_send_size" - ClientMaxMessageReceiveSizeKey = pre(ClientRootKey) + "max_message_receive_size" - ClientMaxMessageSizeKey = pre(ClientRootKey) + "max_message_size" + GrpcKeepAlive = pre(ClientRootKey) + "grpc_keepalive" + ClientKeepAlivePermitWithoutStreamKey = pre(GrpcKeepAlive) + "permit_without_stream" + ClientKeepAliveTimeKey = pre(GrpcKeepAlive) + "time" + ClientKeepAliveTimeoutKey = pre(GrpcKeepAlive) + "timeout" + + ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout" + ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size" + ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size" + ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size" + + ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval" + ClientBackoffMaxIntervalKey = pre(ClientRootKey) + "backoff_max_interval" + ClientBackoffMaxElapsedTimeKey = pre(ClientRootKey) + "backoff_max_elapsed_time" + ClientBackoffRandomizationFactorKey = pre(ClientRootKey) + "backoff_randomization_factor" + ClientBackoffMultiplierKey = pre(ClientRootKey) + "backoff_multiplier" + CollectorConfigPathKey = pre(CollectorRootKey) + "config_path" CollectorExportersKey = pre(CollectorRootKey) + "exporters" CollectorAttributeProcessorKey = pre(CollectorProcessorsKey) + "attribute" diff --git a/internal/config/testdata/nginx-agent.conf b/internal/config/testdata/nginx-agent.conf index 70fa0aa3cb..e63ece84fb 100644 --- a/internal/config/testdata/nginx-agent.conf +++ b/internal/config/testdata/nginx-agent.conf @@ -17,9 +17,24 @@ data_plane_config: exclude_logs: - /var/log/nginx/error.log - /var/log/nginx/access.log -client: - timeout: 10s - +client: + http: + timeout: 15s + grpc: + keepalive: + timeout: 15s + time: 10s + permit_without_stream: false + max_message_size: 1048575 + max_message_receive_size: 1048575 + max_message_send_size: 1048575 + backoff: + initial_interval: 200ms + max_interval: 10s + max_elapsed_time: 25s + randomization_factor: 1.5 + multiplier: 2.5 + collector: config_path: "/etc/nginx-agent/nginx-agent-otelcol.yaml" receivers: diff --git a/internal/config/types.go b/internal/config/types.go index c1cefc5a1b..fe910bdc58 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -38,7 +38,6 @@ type ( Client *Client `yaml:"-" mapstructure:"client"` Collector *Collector `yaml:"-" mapstructure:"collector"` File *File `yaml:"-" mapstructure:"file"` - Common *CommonSettings `yaml:"-"` Watchers *Watchers `yaml:"-"` Version string `yaml:"-"` Path string `yaml:"-"` @@ -63,16 +62,38 @@ type ( } Client struct { - Timeout time.Duration `yaml:"-" mapstructure:"timeout"` - Time time.Duration `yaml:"-" mapstructure:"time"` - PermitWithoutStream bool `yaml:"-" mapstructure:"permit_without_stream"` + HTTP *HTTP `yaml:"http" mapstructure:"http"` + Grpc *GRPC `yaml:"grpc" mapstructure:"grpc"` + Backoff *BackOff `yaml:"backoff" mapstructure:"backoff"` + } + + HTTP struct { + Timeout time.Duration `yaml:"-" mapstructure:"timeout"` + } + + BackOff struct { + InitialInterval time.Duration `yaml:"-" mapstructure:"initial_interval"` + MaxInterval time.Duration `yaml:"-" mapstructure:"max_interval"` + MaxElapsedTime time.Duration `yaml:"-" mapstructure:"max_elapsed_time"` + RandomizationFactor float64 `yaml:"-" mapstructure:"randomization_factor"` + Multiplier float64 `yaml:"-" mapstructure:"multiplier"` + } + + GRPC struct { + KeepAlive *KeepAlive `yaml:"-" mapstructure:"target"` // if MaxMessageSize is size set then we use that value, // otherwise MaxMessageRecieveSize and MaxMessageSendSize for individual settings MaxMessageSize int `yaml:"-" mapstructure:"max_message_size"` - MaxMessageRecieveSize int `yaml:"-" mapstructure:"max_message_receive_size"` + MaxMessageReceiveSize int `yaml:"-" mapstructure:"max_message_receive_size"` MaxMessageSendSize int `yaml:"-" mapstructure:"max_message_send_size"` } + KeepAlive struct { + Timeout time.Duration `yaml:"-" mapstructure:"timeout"` + Time time.Duration `yaml:"-" mapstructure:"time"` + PermitWithoutStream bool `yaml:"-" mapstructure:"permit_without_stream"` + } + Collector struct { ConfigPath string `yaml:"-" mapstructure:"config_path"` Log *Log `yaml:"-" mapstructure:"log"` @@ -227,13 +248,6 @@ type ( MemoryScraper struct{} NetworkScraper struct{} - GRPC struct { - Target string `yaml:"-" mapstructure:"target"` - ConnTimeout time.Duration `yaml:"-" mapstructure:"connection_timeout"` - MinConnTimeout time.Duration `yaml:"-" mapstructure:"minimum_connection_timeout"` - BackoffDelay time.Duration `yaml:"-" mapstructure:"backoff_delay"` - } - Command struct { Server *ServerConfig `yaml:"-" mapstructure:"server"` Auth *AuthConfig `yaml:"-" mapstructure:"auth"` @@ -274,14 +288,6 @@ type ( Location string `yaml:"-" mapstructure:"location"` } - CommonSettings struct { - InitialInterval time.Duration `yaml:"-" mapstructure:"initial_interval"` - MaxInterval time.Duration `yaml:"-" mapstructure:"max_interval"` - MaxElapsedTime time.Duration `yaml:"-" mapstructure:"max_elapsed_time"` - RandomizationFactor float64 `yaml:"-" mapstructure:"randomization_factor"` - Multiplier float64 `yaml:"-" mapstructure:"multiplier"` - } - Watchers struct { InstanceWatcher InstanceWatcher `yaml:"-" mapstructure:"instance_watcher"` InstanceHealthWatcher InstanceHealthWatcher `yaml:"-" mapstructure:"instance_health_watcher"` diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index af9f782b76..6d9bb1114d 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -117,7 +117,7 @@ func (fms *FileManagerService) UpdateOverview( }, } - backOffCtx, backoffCancel := context.WithTimeout(newCtx, fms.agentConfig.Common.MaxElapsedTime) + backOffCtx, backoffCancel := context.WithTimeout(newCtx, fms.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() sendUpdateOverview := func() (*mpi.UpdateOverviewResponse, error) { @@ -146,7 +146,7 @@ func (fms *FileManagerService) UpdateOverview( response, err := backoff.RetryWithData( sendUpdateOverview, - backoffHelpers.Context(backOffCtx, fms.agentConfig.Common), + backoffHelpers.Context(backOffCtx, fms.agentConfig.Client.Backoff), ) if err != nil { return err @@ -230,7 +230,7 @@ func (fms *FileManagerService) UpdateFile( }, } - backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Common.MaxElapsedTime) + backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() sendUpdateFile := func() (*mpi.UpdateFileResponse, error) { @@ -256,7 +256,8 @@ func (fms *FileManagerService) UpdateFile( return response, nil } - response, err := backoff.RetryWithData(sendUpdateFile, backoffHelpers.Context(backOffCtx, fms.agentConfig.Common)) + response, err := backoff.RetryWithData(sendUpdateFile, backoffHelpers.Context(backOffCtx, + fms.agentConfig.Client.Backoff)) if err != nil { return err } @@ -375,7 +376,7 @@ func (fms *FileManagerService) executeFileActions(ctx context.Context) error { } func (fms *FileManagerService) fileUpdate(ctx context.Context, file *mpi.File) error { - backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Common.MaxElapsedTime) + backOffCtx, backoffCancel := context.WithTimeout(ctx, fms.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() getFile := func() (*mpi.GetFileResponse, error) { @@ -391,7 +392,7 @@ func (fms *FileManagerService) fileUpdate(ctx context.Context, file *mpi.File) e getFileResp, getFileErr := backoff.RetryWithData( getFile, - backoffHelpers.Context(backOffCtx, fms.agentConfig.Common), + backoffHelpers.Context(backOffCtx, fms.agentConfig.Client.Backoff), ) if getFileErr != nil { diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index 402aab21c3..85984ea837 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -179,21 +179,21 @@ func GetDialOptions(agentConfig *config.Config, resourceID string) []grpc.DialOp sendRecOpts := []grpc.DialOption{} if agentConfig.Client != nil { - if agentConfig.Client.MaxMessageSize != 0 { + if agentConfig.Client.Grpc.MaxMessageSize != 0 { sendRecOpts = append(sendRecOpts, grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(agentConfig.Client.MaxMessageSize), - grpc.MaxCallSendMsgSize(agentConfig.Client.MaxMessageSize), + grpc.MaxCallRecvMsgSize(agentConfig.Client.Grpc.MaxMessageSize), + grpc.MaxCallSendMsgSize(agentConfig.Client.Grpc.MaxMessageSize), )) } else { sendRecOpts = append(sendRecOpts, grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(agentConfig.Client.MaxMessageRecieveSize), - grpc.MaxCallSendMsgSize(agentConfig.Client.MaxMessageSendSize), + grpc.MaxCallRecvMsgSize(agentConfig.Client.Grpc.MaxMessageReceiveSize), + grpc.MaxCallSendMsgSize(agentConfig.Client.Grpc.MaxMessageSendSize), )) } keepAlive := keepalive.ClientParameters{ - Time: agentConfig.Client.Time, - Timeout: agentConfig.Client.Timeout, - PermitWithoutStream: agentConfig.Client.PermitWithoutStream, + Time: agentConfig.Client.Grpc.KeepAlive.Time, + Timeout: agentConfig.Client.Grpc.KeepAlive.Timeout, + PermitWithoutStream: agentConfig.Client.Grpc.KeepAlive.PermitWithoutStream, } sendRecOpts = append(sendRecOpts, diff --git a/internal/resource/resource_plugin_test.go b/internal/resource/resource_plugin_test.go index 643cd0c1a7..807b380f28 100644 --- a/internal/resource/resource_plugin_test.go +++ b/internal/resource/resource_plugin_test.go @@ -335,7 +335,7 @@ func TestResource_Process_APIAction_UpdateHTTPUpstreams(t *testing.T) { upstreams []client.UpstreamServer }{ { - name: "Test 1: Success, Update Http Upstream Servers", + name: "Test 1: Success, Update HTTP Upstream Servers", message: &bus.Message{ Topic: bus.APIActionRequestTopic, Data: protos.CreatAPIActionRequestNginxPlusUpdateHTTPServers("test_upstream", @@ -359,7 +359,7 @@ func TestResource_Process_APIAction_UpdateHTTPUpstreams(t *testing.T) { expectedLog: "Successfully updated http upstream", }, { - name: "Test 2: Fail, Update Http Upstream Servers", + name: "Test 2: Fail, Update HTTP Upstream Servers", message: &bus.Message{ Topic: bus.APIActionRequestTopic, Data: protos.CreatAPIActionRequestNginxPlusUpdateHTTPServers("test_upstream", diff --git a/internal/watcher/instance/nginx_config_parser.go b/internal/watcher/instance/nginx_config_parser.go index 5ca41c4995..505c41098e 100644 --- a/internal/watcher/instance/nginx_config_parser.go +++ b/internal/watcher/instance/nginx_config_parser.go @@ -482,7 +482,7 @@ func (ncp *NginxConfigParser) pingAPIEndpoint(ctx context.Context, statusAPIDeta if strings.HasPrefix(listen, "unix:") { httpClient = ncp.SocketClient(strings.TrimPrefix(listen, "unix:")) } else { - httpClient = http.Client{Timeout: ncp.agentConfig.Client.Timeout} + httpClient = http.Client{Timeout: ncp.agentConfig.Client.Grpc.KeepAlive.Timeout} } req, err := http.NewRequestWithContext(ctx, http.MethodGet, statusAPI, nil) if err != nil { @@ -674,7 +674,7 @@ func (ncp *NginxConfigParser) isPort(value string) bool { func (ncp *NginxConfigParser) SocketClient(socketPath string) http.Client { return http.Client{ - Timeout: ncp.agentConfig.Client.Timeout, + Timeout: ncp.agentConfig.Client.Grpc.KeepAlive.Timeout, Transport: &http.Transport{ DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { return net.Dial("unix", socketPath) diff --git a/test/mock/grpc/cmd/main.go b/test/mock/grpc/cmd/main.go index 620b869f3a..1b0fcc7615 100644 --- a/test/mock/grpc/cmd/main.go +++ b/test/mock/grpc/cmd/main.go @@ -54,9 +54,9 @@ func main() { agentConfig.Command.Server.Port = portInt agentConfig.Command.Auth = nil agentConfig.Command.TLS = nil - agentConfig.Common.MaxElapsedTime = *sleepDuration - agentConfig.Client.MaxMessageRecieveSize = 4194304 - agentConfig.Client.MaxMessageSendSize = math.MaxInt + agentConfig.Client.Backoff.MaxElapsedTime = *sleepDuration + agentConfig.Client.Grpc.MaxMessageReceiveSize = 4194304 + agentConfig.Client.Grpc.MaxMessageSendSize = math.MaxInt newLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ Level: logger.GetLogLevel(*logLevel), diff --git a/test/mock/grpc/mock_management_server.go b/test/mock/grpc/mock_management_server.go index 45ada71c64..8459c71da1 100644 --- a/test/mock/grpc/mock_management_server.go +++ b/test/mock/grpc/mock_management_server.go @@ -158,15 +158,14 @@ func getServerOptions(agentConfig *config.Config) []grpc.ServerOption { } if agentConfig.Client != nil { - if agentConfig.Client.MaxMessageSize != 0 { - slog.Info("grpc MaxMessageSize") - opts = append(opts, grpc.MaxSendMsgSize(agentConfig.Client.MaxMessageSize), - grpc.MaxRecvMsgSize(agentConfig.Client.MaxMessageSize), + if agentConfig.Client.Grpc.MaxMessageSize != 0 { + opts = append(opts, grpc.MaxSendMsgSize(agentConfig.Client.Grpc.MaxMessageSize), + grpc.MaxRecvMsgSize(agentConfig.Client.Grpc.MaxMessageSize), ) } else { // both are defulted to math.MaxInt for ServerOption - opts = append(opts, grpc.MaxSendMsgSize(agentConfig.Client.MaxMessageSendSize), - grpc.MaxRecvMsgSize(agentConfig.Client.MaxMessageRecieveSize), + opts = append(opts, grpc.MaxSendMsgSize(agentConfig.Client.Grpc.MaxMessageSendSize), + grpc.MaxRecvMsgSize(agentConfig.Client.Grpc.MaxMessageReceiveSize), ) } } @@ -217,10 +216,10 @@ func createListener(apiAddress string, agentConfig *config.Config) (net.Listener func reportHealth(healthcheck *health.Server, agentConfig *config.Config) { var sleep time.Duration var serverName string - if agentConfig.Common == nil { + if agentConfig.Client.Backoff == nil { sleep = maxElapsedTime } else { - sleep = agentConfig.Common.MaxElapsedTime + sleep = agentConfig.Client.Backoff.MaxElapsedTime } if agentConfig.Command.TLS == nil { diff --git a/test/types/config.go b/test/types/config.go index 681aa0cc02..60ec80e37b 100644 --- a/test/types/config.go +++ b/test/types/config.go @@ -20,6 +20,8 @@ const ( clientTime = 50 * time.Second clientTimeout = 5 * time.Second + clientHTTPTimeout = 5 * time.Second + commonInitialInterval = 100 * time.Microsecond commonMaxInterval = 1000 * time.Microsecond commonMaxElapsedTime = 10 * time.Millisecond @@ -37,9 +39,23 @@ func AgentConfig() *config.Config { Path: "/etc/nginx-agent", Log: &config.Log{}, Client: &config.Client{ - Timeout: clientTimeout, - Time: clientTime, - PermitWithoutStream: clientPermitWithoutStream, + HTTP: &config.HTTP{ + Timeout: clientHTTPTimeout, + }, + Grpc: &config.GRPC{ + KeepAlive: &config.KeepAlive{ + Timeout: clientTimeout, + Time: clientTime, + PermitWithoutStream: clientPermitWithoutStream, + }, + }, + Backoff: &config.BackOff{ + InitialInterval: commonInitialInterval, + MaxInterval: commonMaxInterval, + MaxElapsedTime: commonMaxElapsedTime, + RandomizationFactor: commonRandomizationFactor, + Multiplier: commonMultiplier, + }, }, AllowedDirectories: []string{"/tmp/"}, Collector: &config.Collector{ @@ -128,13 +144,6 @@ func AgentConfig() *config.Config { }, }, File: &config.File{}, - Common: &config.CommonSettings{ - InitialInterval: commonInitialInterval, - MaxInterval: commonMaxInterval, - MaxElapsedTime: commonMaxElapsedTime, - RandomizationFactor: commonRandomizationFactor, - Multiplier: commonMultiplier, - }, DataPlaneConfig: &config.DataPlaneConfig{ Nginx: &config.NginxDataPlaneConfig{ TreatWarningsAsErrors: true, From 81a70e1d46cfab4bdb2088839a799b7801dea94b Mon Sep 17 00:00:00 2001 From: Donal Hurley Date: Wed, 15 Jan 2025 15:06:02 +0000 Subject: [PATCH 14/22] Add config apply request queue (#949) --- internal/command/command_service.go | 127 +++++++++++++-- internal/command/command_service_test.go | 196 +++++++++++++++++++++++ 2 files changed, 310 insertions(+), 13 deletions(-) diff --git a/internal/command/command_service.go b/internal/command/command_service.go index 419b198c9d..bb7e92bf00 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -33,14 +33,16 @@ const ( type ( CommandService struct { - commandServiceClient mpi.CommandServiceClient - subscribeClient mpi.CommandService_SubscribeClient - agentConfig *config.Config - isConnected *atomic.Bool - subscribeCancel context.CancelFunc - subscribeChannel chan *mpi.ManagementPlaneRequest - subscribeMutex sync.Mutex - subscribeClientMutex sync.Mutex + commandServiceClient mpi.CommandServiceClient + subscribeClient mpi.CommandService_SubscribeClient + agentConfig *config.Config + isConnected *atomic.Bool + subscribeCancel context.CancelFunc + subscribeChannel chan *mpi.ManagementPlaneRequest + configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID + subscribeMutex sync.Mutex + subscribeClientMutex sync.Mutex + configApplyRequestQueueMutex sync.Mutex } ) @@ -54,10 +56,11 @@ func NewCommandService( isConnected.Store(false) commandService := &CommandService{ - commandServiceClient: commandServiceClient, - agentConfig: agentConfig, - isConnected: isConnected, - subscribeChannel: subscribeChannel, + commandServiceClient: commandServiceClient, + agentConfig: agentConfig, + isConnected: isConnected, + subscribeChannel: subscribeChannel, + configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest), } var subscribeCtx context.Context @@ -165,6 +168,11 @@ func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *m backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime) defer backoffCancel() + err := cs.handleConfigApplyResponse(ctx, response) + if err != nil { + return err + } + return backoff.Retry( cs.sendDataPlaneResponseCallback(ctx, response), backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff), @@ -271,6 +279,81 @@ func (cs *CommandService) sendDataPlaneResponseCallback( } } +func (cs *CommandService) handleConfigApplyResponse( + ctx context.Context, + response *mpi.DataPlaneResponse, +) error { + cs.configApplyRequestQueueMutex.Lock() + defer cs.configApplyRequestQueueMutex.Unlock() + + isConfigApplyResponse := false + var indexOfConfigApplyRequest int + + for index, configApplyRequest := range cs.configApplyRequestQueue[response.GetInstanceId()] { + if configApplyRequest.GetMessageMeta().GetCorrelationId() == response.GetMessageMeta().GetCorrelationId() { + indexOfConfigApplyRequest = index + isConfigApplyResponse = true + + break + } + } + + if isConfigApplyResponse { + err := cs.sendResponseForQueuedConfigApplyRequests(ctx, response, indexOfConfigApplyRequest) + if err != nil { + return err + } + } + + return nil +} + +func (cs *CommandService) sendResponseForQueuedConfigApplyRequests( + ctx context.Context, + response *mpi.DataPlaneResponse, + indexOfConfigApplyRequest int, +) error { + instanceID := response.GetInstanceId() + for i := 0; i < indexOfConfigApplyRequest; i++ { + newResponse := response + + newResponse.GetMessageMeta().MessageId = proto.GenerateMessageID() + + request := cs.configApplyRequestQueue[instanceID][i] + newResponse.GetMessageMeta().CorrelationId = request.GetMessageMeta().GetCorrelationId() + + slog.DebugContext( + ctx, + "Sending data plane response for queued config apply request", + "response", newResponse, + ) + + backOffCtx, backoffCancel := context.WithTimeout(ctx, cs.agentConfig.Client.Backoff.MaxElapsedTime) + + err := backoff.Retry( + cs.sendDataPlaneResponseCallback(ctx, newResponse), + backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff), + ) + if err != nil { + slog.ErrorContext(ctx, "Failed to send data plane response", "error", err) + backoffCancel() + + return err + } + + backoffCancel() + } + + cs.configApplyRequestQueue[instanceID] = cs.configApplyRequestQueue[instanceID][indexOfConfigApplyRequest+1:] + slog.DebugContext(ctx, "Removed config apply requests from queue", "queue", cs.configApplyRequestQueue[instanceID]) + + if len(cs.configApplyRequestQueue[instanceID]) > 0 { + cs.subscribeChannel <- cs.configApplyRequestQueue[instanceID][len(cs.configApplyRequestQueue[instanceID])-1] + } + + return nil +} + // Retry callback for sending a data plane health status to the Management Plane. func (cs *CommandService) dataPlaneHealthCallback( ctx context.Context, @@ -333,7 +416,25 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error { return recvError } - cs.subscribeChannel <- request + switch request.GetRequest().(type) { + case *mpi.ManagementPlaneRequest_ConfigApplyRequest: + cs.configApplyRequestQueueMutex.Lock() + defer cs.configApplyRequestQueueMutex.Unlock() + + instanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId() + cs.configApplyRequestQueue[instanceID] = append(cs.configApplyRequestQueue[instanceID], request) + if len(cs.configApplyRequestQueue[instanceID]) == 1 { + cs.subscribeChannel <- request + } else { + slog.DebugContext( + ctx, + "Config apply request is already in progress, queuing new config apply request", + "request", request, + ) + } + default: + cs.subscribeChannel <- request + } return nil } diff --git a/internal/command/command_service_test.go b/internal/command/command_service_test.go index 9534414dbd..fc7f58139b 100644 --- a/internal/command/command_service_test.go +++ b/internal/command/command_service_test.go @@ -10,9 +10,13 @@ import ( "context" "errors" "log/slog" + "sync" "testing" "time" + "github.com/google/uuid" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/nginx/agent/v3/internal/logger" "github.com/nginx/agent/v3/test/helpers" "github.com/nginx/agent/v3/test/stub" @@ -37,9 +41,41 @@ func (*FakeSubscribeClient) Send(*mpi.DataPlaneResponse) error { // nolint: nilnil func (*FakeSubscribeClient) Recv() (*mpi.ManagementPlaneRequest, error) { + time.Sleep(1 * time.Second) + return nil, nil } +type FakeConfigApplySubscribeClient struct { + grpc.ClientStream +} + +func (*FakeConfigApplySubscribeClient) Send(*mpi.DataPlaneResponse) error { + return nil +} + +// nolint: nilnil +func (*FakeConfigApplySubscribeClient) Recv() (*mpi.ManagementPlaneRequest, error) { + protos.CreateManagementPlaneRequest() + return &mpi.ManagementPlaneRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: "1", + CorrelationId: "123", + Timestamp: timestamppb.Now(), + }, + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: &mpi.ConfigApplyRequest{ + Overview: &mpi.FileOverview{ + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: "12314", + Version: "4215432", + }, + }, + }, + }, + }, nil +} + func TestCommandService_NewCommandService(t *testing.T) { ctx := context.Background() commandServiceClient := &v1fakes.FakeCommandServiceClient{} @@ -61,6 +97,46 @@ func TestCommandService_NewCommandService(t *testing.T) { ) } +func TestCommandService_receiveCallback_configApplyRequest(t *testing.T) { + ctx := context.Background() + fakeSubscribeClient := &FakeConfigApplySubscribeClient{} + + commandServiceClient := &v1fakes.FakeCommandServiceClient{} + commandServiceClient.SubscribeReturns(fakeSubscribeClient, nil) + + subscribeChannel := make(chan *mpi.ManagementPlaneRequest) + + commandService := NewCommandService( + ctx, + commandServiceClient, + types.AgentConfig(), + subscribeChannel, + ) + + defer commandService.CancelSubscription(ctx) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + requestFromChannel := <-subscribeChannel + assert.NotNil(t, requestFromChannel) + wg.Done() + }() + + assert.Eventually( + t, + func() bool { return commandServiceClient.SubscribeCallCount() > 0 }, + 2*time.Second, + 10*time.Millisecond, + ) + + commandService.configApplyRequestQueueMutex.Lock() + defer commandService.configApplyRequestQueueMutex.Unlock() + assert.Len(t, commandService.configApplyRequestQueue, 1) + wg.Wait() +} + func TestCommandService_UpdateDataPlaneStatus(t *testing.T) { ctx := context.Background() @@ -193,3 +269,123 @@ func TestCommandService_SendDataPlaneResponse(t *testing.T) { require.NoError(t, err) } + +func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) { + ctx := context.Background() + commandServiceClient := &v1fakes.FakeCommandServiceClient{} + subscribeClient := &FakeSubscribeClient{} + subscribeChannel := make(chan *mpi.ManagementPlaneRequest) + + commandService := NewCommandService( + ctx, + commandServiceClient, + types.AgentConfig(), + subscribeChannel, + ) + + defer commandService.CancelSubscription(ctx) + + request1 := &mpi.ManagementPlaneRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: "1", + CorrelationId: "123", + Timestamp: timestamppb.Now(), + }, + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: &mpi.ConfigApplyRequest{ + Overview: &mpi.FileOverview{ + Files: []*mpi.File{}, + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: "12314", + Version: "4215432", + }, + }, + }, + }, + } + + request2 := &mpi.ManagementPlaneRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: "2", + CorrelationId: "1232", + Timestamp: timestamppb.Now(), + }, + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: &mpi.ConfigApplyRequest{ + Overview: &mpi.FileOverview{ + Files: []*mpi.File{}, + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: "12314", + Version: "4215432", + }, + }, + }, + }, + } + + request3 := &mpi.ManagementPlaneRequest{ + MessageMeta: &mpi.MessageMeta{ + MessageId: "3", + CorrelationId: "1233", + Timestamp: timestamppb.Now(), + }, + Request: &mpi.ManagementPlaneRequest_ConfigApplyRequest{ + ConfigApplyRequest: &mpi.ConfigApplyRequest{ + Overview: &mpi.FileOverview{ + Files: []*mpi.File{}, + ConfigVersion: &mpi.ConfigVersion{ + InstanceId: "12314", + Version: "4215432", + }, + }, + }, + }, + } + + commandService.configApplyRequestQueueMutex.Lock() + commandService.configApplyRequestQueue = map[string][]*mpi.ManagementPlaneRequest{ + "12314": { + request1, + request2, + request3, + }, + } + commandService.configApplyRequestQueueMutex.Unlock() + + commandService.subscribeClientMutex.Lock() + commandService.subscribeClient = subscribeClient + commandService.subscribeClientMutex.Unlock() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + requestFromChannel := <-subscribeChannel + assert.Equal(t, request3, requestFromChannel) + wg.Done() + }() + + err := commandService.SendDataPlaneResponse( + ctx, + &mpi.DataPlaneResponse{ + MessageMeta: &mpi.MessageMeta{ + MessageId: uuid.NewString(), + CorrelationId: "1232", + Timestamp: timestamppb.Now(), + }, + CommandResponse: &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_OK, + Message: "Success", + }, + InstanceId: "12314", + }, + ) + + require.NoError(t, err) + + commandService.configApplyRequestQueueMutex.Lock() + defer commandService.configApplyRequestQueueMutex.Unlock() + assert.Len(t, commandService.configApplyRequestQueue, 1) + assert.Equal(t, request3, commandService.configApplyRequestQueue["12314"][0]) + wg.Wait() +} From e096397c997916f5dde905a1fe2f3b0b28b1a8b3 Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Tue, 7 Jan 2025 13:17:20 +0000 Subject: [PATCH 15/22] add unit tests for transport credentials funtions --- api/grpc/mpi/v1/command.pb.go | 2 +- api/grpc/mpi/v1/common.pb.go | 2 +- api/grpc/mpi/v1/files.pb.go | 2 +- internal/grpc/grpc.go | 4 ++-- internal/grpc/grpc_test.go | 3 +++ 5 files changed, 8 insertions(+), 5 deletions(-) diff --git a/api/grpc/mpi/v1/command.pb.go b/api/grpc/mpi/v1/command.pb.go index 94368f7d33..6e46a0c118 100644 --- a/api/grpc/mpi/v1/command.pb.go +++ b/api/grpc/mpi/v1/command.pb.go @@ -8,7 +8,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.1 // protoc (unknown) // source: mpi/v1/command.proto diff --git a/api/grpc/mpi/v1/common.pb.go b/api/grpc/mpi/v1/common.pb.go index 763aae6c0c..8d51fa6b93 100644 --- a/api/grpc/mpi/v1/common.pb.go +++ b/api/grpc/mpi/v1/common.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.1 // protoc (unknown) // source: mpi/v1/common.proto diff --git a/api/grpc/mpi/v1/files.pb.go b/api/grpc/mpi/v1/files.pb.go index cd41aa7389..896e045753 100644 --- a/api/grpc/mpi/v1/files.pb.go +++ b/api/grpc/mpi/v1/files.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.1 // protoc (unknown) // source: mpi/v1/files.proto diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index 85984ea837..34172160c8 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -221,8 +221,8 @@ func GetDialOptions(agentConfig *config.Config, resourceID string) []grpc.DialOp func addTransportCredentials(agentConfig *config.Config, opts []grpc.DialOption) ([]grpc.DialOption, bool) { transportCredentials, err := getTransportCredentials(agentConfig) if err != nil { - slog.Error("Unable to get transport credentials from agent configuration, "+ - "adding default transport credentials to gRPC dial options", "error", err) + slog.Error("Unable to add transport credentials to gRPC dial options", "error", err) + slog.Debug("Adding default transport credentials to gRPC dial options") opts = append(opts, grpc.WithTransportCredentials(defaultCredentials), ) diff --git a/internal/grpc/grpc_test.go b/internal/grpc/grpc_test.go index 5dee66471b..2d65f96774 100644 --- a/internal/grpc/grpc_test.go +++ b/internal/grpc/grpc_test.go @@ -358,6 +358,9 @@ func Test_ValidateGrpcError(t *testing.T) { // nolint:revive,gocognit func Test_validateTokenFile(t *testing.T) { + type args struct { + path string + } tests := []struct { name string path string From df3b79356753940fe339846f26c25c548caf99a5 Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Wed, 15 Jan 2025 16:24:15 +0000 Subject: [PATCH 16/22] fix test name --- api/grpc/mpi/v1/command.pb.go | 2 +- api/grpc/mpi/v1/common.pb.go | 2 +- api/grpc/mpi/v1/files.pb.go | 2 +- internal/grpc/grpc_test.go | 5 +---- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/api/grpc/mpi/v1/command.pb.go b/api/grpc/mpi/v1/command.pb.go index 6e46a0c118..b3cdd5cf1f 100644 --- a/api/grpc/mpi/v1/command.pb.go +++ b/api/grpc/mpi/v1/command.pb.go @@ -8,7 +8,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.1 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: mpi/v1/command.proto diff --git a/api/grpc/mpi/v1/common.pb.go b/api/grpc/mpi/v1/common.pb.go index 8d51fa6b93..d2cb5d35b5 100644 --- a/api/grpc/mpi/v1/common.pb.go +++ b/api/grpc/mpi/v1/common.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.1 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: mpi/v1/common.proto diff --git a/api/grpc/mpi/v1/files.pb.go b/api/grpc/mpi/v1/files.pb.go index 896e045753..2cbfb57ec1 100644 --- a/api/grpc/mpi/v1/files.pb.go +++ b/api/grpc/mpi/v1/files.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.1 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: mpi/v1/files.proto diff --git a/internal/grpc/grpc_test.go b/internal/grpc/grpc_test.go index 2d65f96774..05ce16ca18 100644 --- a/internal/grpc/grpc_test.go +++ b/internal/grpc/grpc_test.go @@ -357,10 +357,7 @@ func Test_ValidateGrpcError(t *testing.T) { } // nolint:revive,gocognit -func Test_validateTokenFile(t *testing.T) { - type args struct { - path string - } +func Test_retrieveTokenFromFile(t *testing.T) { tests := []struct { name string path string From a44ca08322c149302d812b9f5b40291e5b0876a4 Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Thu, 16 Jan 2025 09:50:25 +0000 Subject: [PATCH 17/22] fix error message --- internal/grpc/grpc.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index 34172160c8..de099dcbce 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -221,8 +221,7 @@ func GetDialOptions(agentConfig *config.Config, resourceID string) []grpc.DialOp func addTransportCredentials(agentConfig *config.Config, opts []grpc.DialOption) ([]grpc.DialOption, bool) { transportCredentials, err := getTransportCredentials(agentConfig) if err != nil { - slog.Error("Unable to add transport credentials to gRPC dial options", "error", err) - slog.Debug("Adding default transport credentials to gRPC dial options") + slog.Error("Unable to add transport credentials to gRPC dial options, adding default transport credentials", "error", err) opts = append(opts, grpc.WithTransportCredentials(defaultCredentials), ) @@ -265,11 +264,12 @@ func retrieveTokenFromFile(path string) (string, error) { return "", errors.New("token file path is empty") } - slog.Debug("Reading token from file", "path", path) + slog.Debug("Reading dataplane key from file", "path", path) var keyVal string keyBytes, err := os.ReadFile(path) if err != nil { - return "", fmt.Errorf("unable to read token from file: %w", err) + slog.Error("Unable to read token from file", "error", err) + return "", err } keyBytes = bytes.TrimSpace(keyBytes) @@ -277,7 +277,8 @@ func retrieveTokenFromFile(path string) (string, error) { keyVal = string(keyBytes) if keyVal == "" { - return "", errors.New("failed to retrieve token, token file is empty") + slog.Error("failed to load token, please check agent configuration") + return "", errors.New("failed to load token, please check agent configuration") } return keyVal, nil From 8d61593141fe843d2b48c30673a42539a1816af6 Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Thu, 16 Jan 2025 10:15:04 +0000 Subject: [PATCH 18/22] fix lint error: lll --- internal/grpc/grpc.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index de099dcbce..e27f88e751 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -221,7 +221,8 @@ func GetDialOptions(agentConfig *config.Config, resourceID string) []grpc.DialOp func addTransportCredentials(agentConfig *config.Config, opts []grpc.DialOption) ([]grpc.DialOption, bool) { transportCredentials, err := getTransportCredentials(agentConfig) if err != nil { - slog.Error("Unable to add transport credentials to gRPC dial options, adding default transport credentials", "error", err) + slog.Error("Unable to add transport credentials to gRPC dial options, adding "+ + "default transport credentials", "error", err) opts = append(opts, grpc.WithTransportCredentials(defaultCredentials), ) From 459c0d56e1a4d77178d164cc7777920c8992e7cb Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Thu, 16 Jan 2025 15:27:34 +0000 Subject: [PATCH 19/22] modify error messages --- internal/grpc/grpc.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index e27f88e751..8f05cd5dc4 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -265,7 +265,7 @@ func retrieveTokenFromFile(path string) (string, error) { return "", errors.New("token file path is empty") } - slog.Debug("Reading dataplane key from file", "path", path) + slog.Debug("Reading token from file", "path", path) var keyVal string keyBytes, err := os.ReadFile(path) if err != nil { @@ -278,8 +278,7 @@ func retrieveTokenFromFile(path string) (string, error) { keyVal = string(keyBytes) if keyVal == "" { - slog.Error("failed to load token, please check agent configuration") - return "", errors.New("failed to load token, please check agent configuration") + return "", errors.New("failed to load token, token file is empty") } return keyVal, nil From f26c45e2fedbd7dae3c9310c043701f38e2645c5 Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Thu, 16 Jan 2025 15:33:44 +0000 Subject: [PATCH 20/22] remove error logging and modify messages --- internal/grpc/grpc.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index 8f05cd5dc4..dec07d0afa 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -269,8 +269,7 @@ func retrieveTokenFromFile(path string) (string, error) { var keyVal string keyBytes, err := os.ReadFile(path) if err != nil { - slog.Error("Unable to read token from file", "error", err) - return "", err + return "", fmt.Errorf("unable to read token from file: %w", err) } keyBytes = bytes.TrimSpace(keyBytes) From 371c72bc62d3f0264d9f4810927fe176ca0080ec Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Mon, 20 Jan 2025 15:26:31 +0000 Subject: [PATCH 21/22] fall back to token field if error occurs when reading file --- internal/grpc/grpc.go | 9 +++++---- test/types/config.go | 3 ++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index dec07d0afa..a273578661 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -241,10 +241,11 @@ func addPerRPCCredentials(agentConfig *config.Config, resourceID string, opts [] token := agentConfig.Command.Auth.Token if agentConfig.Command.Auth.TokenPath != "" { - var err error - token, err = retrieveTokenFromFile(agentConfig.Command.Auth.TokenPath) - if err != nil { - slog.Error("Unable to add token to gRPC dial options, token will be empty", "error", err) + tk, err := retrieveTokenFromFile(agentConfig.Command.Auth.TokenPath) + if err == nil { + token = tk + } else { + slog.Error("Unable to add token to gRPC dial options", "error", err) } } diff --git a/test/types/config.go b/test/types/config.go index 60ec80e37b..4419b07853 100644 --- a/test/types/config.go +++ b/test/types/config.go @@ -133,7 +133,8 @@ func AgentConfig() *config.Config { Type: config.Grpc, }, Auth: &config.AuthConfig{ - Token: "1234", + Token: "1234", + TokenPath: "", }, TLS: &config.TLSConfig{ Cert: "cert.pem", From c9d792d266c93957bc22f529e792d3cf56d8311b Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Mon, 20 Jan 2025 16:23:04 +0000 Subject: [PATCH 22/22] fix bad merge --- internal/command/command_service.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/internal/command/command_service.go b/internal/command/command_service.go index 1139c187bf..78bd7df084 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -435,25 +435,6 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error { cs.subscribeChannel <- request } } - switch request.GetRequest().(type) { - case *mpi.ManagementPlaneRequest_ConfigApplyRequest: - cs.configApplyRequestQueueMutex.Lock() - defer cs.configApplyRequestQueueMutex.Unlock() - - instanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId() - cs.configApplyRequestQueue[instanceID] = append(cs.configApplyRequestQueue[instanceID], request) - if len(cs.configApplyRequestQueue[instanceID]) == 1 { - cs.subscribeChannel <- request - } else { - slog.DebugContext( - ctx, - "Config apply request is already in progress, queuing new config apply request", - "request", request, - ) - } - default: - cs.subscribeChannel <- request - } return nil }