From ed0d3af2d064bcef303237062fc4adb8b98d60eb Mon Sep 17 00:00:00 2001 From: Mike Nguyen Date: Wed, 10 Jul 2024 17:37:34 +0100 Subject: [PATCH] fix: scheduler host address passed to runtime (#1421) * fix: scheduler host address passed to runtime Signed-off-by: mikeee * fix: scheduler client stream initialised for 1.14< Signed-off-by: mikeee * fix: modify scheduler host address validation if the scheduler container is not active, the scheduler flag will not be passed to the runtime Signed-off-by: mikeee * fix: lint and refactor Signed-off-by: mikeee --------- Signed-off-by: mikeee --- cmd/annotate.go | 1 - cmd/run.go | 111 ++++++++++-------- go.mod | 6 +- pkg/kubernetes/renew_certificate.go | 2 - pkg/kubernetes/uninstall.go | 1 - pkg/standalone/run.go | 42 ++++--- pkg/standalone/standalone.go | 4 - tests/e2e/common/common.go | 1 - .../standalone/windows_run_template_test.go | 2 - 9 files changed, 91 insertions(+), 79 deletions(-) diff --git a/cmd/annotate.go b/cmd/annotate.go index a5e9b6472..ab77757ba 100644 --- a/cmd/annotate.go +++ b/cmd/annotate.go @@ -221,7 +221,6 @@ func readInputsFromFS(path string) ([]io.Reader, error) { inputs = append(inputs, file) return nil }) - if err != nil { return nil, err } diff --git a/cmd/run.go b/cmd/run.go index 4b5959ec0..f91c772e6 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -24,6 +24,8 @@ import ( "strings" "time" + "golang.org/x/mod/semver" + "github.com/spf13/cobra" "github.com/spf13/viper" @@ -38,34 +40,35 @@ import ( ) var ( - appPort int - profilePort int - appID string - configFile string - port int - grpcPort int - internalGRPCPort int - maxConcurrency int - enableProfiling bool - logLevel string - protocol string - componentsPath string - resourcesPaths []string - appSSL bool - metricsPort int - maxRequestBodySize int - readBufferSize int - unixDomainSocket string - enableAppHealth bool - appHealthPath string - appHealthInterval int - appHealthTimeout int - appHealthThreshold int - enableAPILogging bool - apiListenAddresses string - runFilePath string - appChannelAddress string - enableRunK8s bool + appPort int + profilePort int + appID string + configFile string + port int + grpcPort int + internalGRPCPort int + maxConcurrency int + enableProfiling bool + logLevel string + protocol string + componentsPath string + resourcesPaths []string + appSSL bool + metricsPort int + maxRequestBodySize int + readBufferSize int + unixDomainSocket string + enableAppHealth bool + appHealthPath string + appHealthInterval int + appHealthTimeout int + appHealthThreshold int + enableAPILogging bool + apiListenAddresses string + schedulerHostAddress string + runFilePath string + appChannelAddress string + enableRunK8s bool ) const ( @@ -120,7 +123,6 @@ dapr run --run-file /path/to/directory -k Args: cobra.MinimumNArgs(0), PreRun: func(cmd *cobra.Command, args []string) { viper.BindPFlag("placement-host-address", cmd.Flags().Lookup("placement-host-address")) - viper.BindPFlag("scheduler-host-address", cmd.Flags().Lookup("scheduler-host-address")) }, Run: func(cmd *cobra.Command, args []string) { if len(runFilePath) > 0 { @@ -166,26 +168,26 @@ dapr run --run-file /path/to/directory -k } sharedRunConfig := &standalone.SharedRunConfig{ - ConfigFile: configFile, - EnableProfiling: enableProfiling, - LogLevel: logLevel, - MaxConcurrency: maxConcurrency, - AppProtocol: protocol, - PlacementHostAddr: viper.GetString("placement-host-address"), - SchedulerHostAddr: viper.GetString("scheduler-host-address"), - ComponentsPath: componentsPath, - ResourcesPaths: resourcesPaths, - AppSSL: appSSL, - MaxRequestBodySize: maxRequestBodySize, - HTTPReadBufferSize: readBufferSize, - EnableAppHealth: enableAppHealth, - AppHealthPath: appHealthPath, - AppHealthInterval: appHealthInterval, - AppHealthTimeout: appHealthTimeout, - AppHealthThreshold: appHealthThreshold, - EnableAPILogging: enableAPILogging, - APIListenAddresses: apiListenAddresses, - DaprdInstallPath: daprRuntimePath, + ConfigFile: configFile, + EnableProfiling: enableProfiling, + LogLevel: logLevel, + MaxConcurrency: maxConcurrency, + AppProtocol: protocol, + PlacementHostAddr: viper.GetString("placement-host-address"), + ComponentsPath: componentsPath, + ResourcesPaths: resourcesPaths, + AppSSL: appSSL, + MaxRequestBodySize: maxRequestBodySize, + HTTPReadBufferSize: readBufferSize, + EnableAppHealth: enableAppHealth, + AppHealthPath: appHealthPath, + AppHealthInterval: appHealthInterval, + AppHealthTimeout: appHealthTimeout, + AppHealthThreshold: appHealthThreshold, + EnableAPILogging: enableAPILogging, + APIListenAddresses: apiListenAddresses, + SchedulerHostAddress: schedulerHostAddress, + DaprdInstallPath: daprRuntimePath, } output, err := runExec.NewOutput(&standalone.RunConfig{ AppID: appID, @@ -227,6 +229,15 @@ dapr run --run-file /path/to/directory -k output.DaprHTTPPort, output.DaprGRPCPort) } + + if semver.Compare(fmt.Sprintf("v%v", daprVer.RuntimeVersion), "v1.14.0-rc.1") == -1 { + print.InfoStatusEvent(os.Stdout, "The scheduler is only compatible with dapr runtime 1.14 onwards.") + for i, arg := range output.DaprCMD.Args { + if strings.HasPrefix(arg, "--scheduler-host-address") { + output.DaprCMD.Args[i] = "" + } + } + } print.InfoStatusEvent(os.Stdout, startInfo) output.DaprCMD.Stdout = os.Stdout @@ -456,7 +467,7 @@ func init() { // By marking this as deprecated, the flag will be hidden from the help menu, but will continue to work. It will show a warning message when used. RunCmd.Flags().MarkDeprecated("components-path", "This flag is deprecated and will be removed in the future releases. Use \"resources-path\" flag instead") RunCmd.Flags().String("placement-host-address", "localhost", "The address of the placement service. Format is either for default port or : for custom port") - RunCmd.Flags().String("scheduler-host-address", "localhost", "The address of the scheduler service. Format is either for default port or : for custom port") + RunCmd.Flags().StringVarP(&schedulerHostAddress, "scheduler-host-address", "", "localhost", "The address of the scheduler service. Format is either for default port or : for custom port") // TODO: Remove below flag once the flag is removed in runtime in future release. RunCmd.Flags().BoolVar(&appSSL, "app-ssl", false, "Enable https when Dapr invokes the application") RunCmd.Flags().MarkDeprecated("app-ssl", "This flag is deprecated and will be removed in the future releases. Use \"app-protocol\" flag with https or grpcs values instead") diff --git a/go.mod b/go.mod index ea59e3d7b..146089460 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,10 @@ require ( sigs.k8s.io/yaml v1.4.0 ) -require github.com/Masterminds/semver/v3 v3.2.0 +require ( + github.com/Masterminds/semver/v3 v3.2.0 + golang.org/x/mod v0.14.0 +) require ( github.com/alphadose/haxmap v1.3.1 // indirect @@ -61,7 +64,6 @@ require ( go.mongodb.org/mongo-driver v1.12.1 // indirect go.opentelemetry.io/otel/metric v1.23.1 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/mod v0.14.0 // indirect golang.org/x/tools v0.17.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240116215550-a9fa1716bcac // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 // indirect diff --git a/pkg/kubernetes/renew_certificate.go b/pkg/kubernetes/renew_certificate.go index 8880a7eb3..28dd49e77 100644 --- a/pkg/kubernetes/renew_certificate.go +++ b/pkg/kubernetes/renew_certificate.go @@ -52,7 +52,6 @@ func RenewCertificate(conf RenewCertificateParams) error { conf.RootCertificateFilePath, conf.IssuerCertificateFilePath, conf.IssuerPrivateKeyFilePath) - if err != nil { return err } @@ -60,7 +59,6 @@ func RenewCertificate(conf RenewCertificateParams) error { rootCertBytes, issuerCertBytes, issuerKeyBytes, err = GenerateNewCertificates( conf.ValidUntil, conf.RootPrivateKeyFilePath) - if err != nil { return err } diff --git a/pkg/kubernetes/uninstall.go b/pkg/kubernetes/uninstall.go index 7a0240920..1ab874dd0 100644 --- a/pkg/kubernetes/uninstall.go +++ b/pkg/kubernetes/uninstall.go @@ -54,7 +54,6 @@ func Uninstall(namespace string, uninstallAll bool, uninstallDev bool, timeout u } _, err = uninstallClient.Run(daprReleaseName) - if err != nil { return err } diff --git a/pkg/standalone/run.go b/pkg/standalone/run.go index 6f1f81127..3205b3599 100644 --- a/pkg/standalone/run.go +++ b/pkg/standalone/run.go @@ -14,6 +14,7 @@ limitations under the License. package standalone import ( + "context" "fmt" "net" "os" @@ -23,6 +24,8 @@ import ( "strconv" "strings" + dockerClient "github.com/docker/docker/client" + "github.com/Pallinder/sillyname-go" "github.com/phayes/freeport" "gopkg.in/yaml.v2" @@ -70,8 +73,6 @@ type SharedRunConfig struct { MaxConcurrency int `arg:"app-max-concurrency" annotation:"dapr.io/app-max-concurrerncy" yaml:"appMaxConcurrency" default:"-1"` // Speicifcally omitted from annotations similar to config file path above. PlacementHostAddr string `arg:"placement-host-address" yaml:"placementHostAddress"` - // Must use env for scheduler host address because using arg would cause a sidecar crash in older daprd versions. - SchedulerHostAddr string `env:"DAPR_SCHEDULER_HOST_ADDRESS" yaml:"schedulerHostAddress"` // Speicifcally omitted from annotations similar to config file path above. ComponentsPath string `arg:"components-path"` // Deprecated in run template file: use ResourcesPaths instead. // Speicifcally omitted from annotations similar to config file path above. @@ -89,10 +90,11 @@ type SharedRunConfig struct { AppHealthThreshold int `arg:"app-health-threshold" annotation:"dapr.io/app-health-threshold" ifneq:"0" yaml:"appHealthThreshold"` EnableAPILogging bool `arg:"enable-api-logging" annotation:"dapr.io/enable-api-logging" yaml:"enableApiLogging"` // Specifically omitted from annotations see https://github.com/dapr/cli/issues/1324 . - DaprdInstallPath string `yaml:"runtimePath"` - Env map[string]string `yaml:"env"` - DaprdLogDestination LogDestType `yaml:"daprdLogDestination"` - AppLogDestination LogDestType `yaml:"appLogDestination"` + DaprdInstallPath string `yaml:"runtimePath"` + Env map[string]string `yaml:"env"` + DaprdLogDestination LogDestType `yaml:"daprdLogDestination"` + AppLogDestination LogDestType `yaml:"appLogDestination"` + SchedulerHostAddress string `arg:"scheduler-host-address" yaml:"schedulerHostAddress"` } func (meta *DaprMeta) newAppID() string { @@ -139,18 +141,27 @@ func (config *RunConfig) validatePlacementHostAddr() error { } func (config *RunConfig) validateSchedulerHostAddr() error { - schedulerHostAddr := config.SchedulerHostAddr - if len(schedulerHostAddr) == 0 { - schedulerHostAddr = "localhost" + // If the scheduler isn't running - don't add the flag to the runtime cmd. + docker, err := dockerClient.NewClientWithOpts() + if err != nil { + return err } - if indx := strings.Index(schedulerHostAddr, ":"); indx == -1 { - if runtime.GOOS == daprWindowsOS { - schedulerHostAddr = fmt.Sprintf("%s:6060", schedulerHostAddr) - } else { - schedulerHostAddr = fmt.Sprintf("%s:50006", schedulerHostAddr) + _, err = docker.ContainerInspect(context.Background(), "dapr_scheduler") + if err == nil { + schedulerHostAddr := config.SchedulerHostAddress + if len(schedulerHostAddr) == 0 { + schedulerHostAddr = "localhost" } + if indx := strings.Index(schedulerHostAddr, ":"); indx == -1 { + if runtime.GOOS == daprWindowsOS { + schedulerHostAddr = fmt.Sprintf("%s:6060", schedulerHostAddr) + } else { + schedulerHostAddr = fmt.Sprintf("%s:50006", schedulerHostAddr) + } + } + config.SchedulerHostAddress = schedulerHostAddr + return nil } - config.SchedulerHostAddr = schedulerHostAddr return nil } @@ -237,7 +248,6 @@ func (config *RunConfig) Validate() error { } err = config.validateSchedulerHostAddr() - if err != nil { return err } diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 456ed74aa..775420679 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -415,7 +415,6 @@ func runZipkin(wg *sync.WaitGroup, errorChan chan<- error, info initInfo) { args = append(args, imageName) } _, err = utils.RunCmdAndWait(runtimeCmd, args...) - if err != nil { runError := isContainerRunError(err) if !runError { @@ -481,7 +480,6 @@ func runRedis(wg *sync.WaitGroup, errorChan chan<- error, info initInfo) { args = append(args, imageName) } _, err = utils.RunCmdAndWait(runtimeCmd, args...) - if err != nil { runError := isContainerRunError(err) if !runError { @@ -568,7 +566,6 @@ func runPlacementService(wg *sync.WaitGroup, errorChan chan<- error, info initIn args = append(args, image) _, err = utils.RunCmdAndWait(runtimeCmd, args...) - if err != nil { runError := isContainerRunError(err) if !runError { @@ -668,7 +665,6 @@ func runSchedulerService(wg *sync.WaitGroup, errorChan chan<- error, info initIn args = append(args, image) _, err = utils.RunCmdAndWait(runtimeCmd, args...) - if err != nil { runError := isContainerRunError(err) if !runError { diff --git a/tests/e2e/common/common.go b/tests/e2e/common/common.go index b6c1b5b4a..e52412944 100644 --- a/tests/e2e/common/common.go +++ b/tests/e2e/common/common.go @@ -1210,7 +1210,6 @@ func exportCurrentCertificate(daprPath string) error { os.RemoveAll("./certs") } _, err = spawn.Command(daprPath, "mtls", "export", "-o", "./certs") - if err != nil { return fmt.Errorf("error in exporting certificate %w", err) } diff --git a/tests/e2e/standalone/windows_run_template_test.go b/tests/e2e/standalone/windows_run_template_test.go index 28ccefed9..c570f94c7 100644 --- a/tests/e2e/standalone/windows_run_template_test.go +++ b/tests/e2e/standalone/windows_run_template_test.go @@ -115,7 +115,6 @@ func startAppsWithAppLogDestFile(t *testing.T, file string) { assert.NotContains(t, output, "msg=\"All outstanding components processed\" app_id=emit-metrics") assert.Contains(t, output, "Received signal to stop Dapr and app processes. Shutting down Dapr and app processes.") - } func startAppsWithAppLogDestConsole(t *testing.T, file string) { @@ -139,5 +138,4 @@ func startAppsWithAppLogDestConsole(t *testing.T, file string) { assert.NotContains(t, output, "msg=\"All outstanding components processed\" app_id=emit-metrics") assert.Contains(t, output, "Received signal to stop Dapr and app processes. Shutting down Dapr and app processes.") - }