diff --git a/pkg/etcd/util.go b/pkg/etcd/util.go index 95c4a60a87..e5e078e5f9 100644 --- a/pkg/etcd/util.go +++ b/pkg/etcd/util.go @@ -4,11 +4,9 @@ import ( "context" "crypto/tls" "fmt" - "path/filepath" "strings" "time" - "github.com/loft-sh/vcluster/pkg/certs" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/wait" @@ -20,11 +18,43 @@ const ( waitForClientTimeout = time.Minute * 10 ) -func WaitForEtcdClient(parentCtx context.Context, certificatesDir string, endpoints ...string) (*clientv3.Client, error) { +type Certificates struct { + CaCert string + ServerCert string + ServerKey string +} + +func EndpointsAndCertificatesFromFlags(flags []string) ([]string, *Certificates, error) { + certificates := &Certificates{} + endpoints := []string{} + + // parse flags + for _, arg := range flags { + if strings.HasPrefix(arg, "--etcd-servers=") { + endpoints = strings.Split(strings.TrimPrefix(arg, "--etcd-servers="), ",") + } else if strings.HasPrefix(arg, "--etcd-cafile=") { + certificates.CaCert = strings.TrimPrefix(arg, "--etcd-cafile=") + } else if strings.HasPrefix(arg, "--etcd-certfile=") { + certificates.ServerCert = strings.TrimPrefix(arg, "--etcd-certfile=") + } else if strings.HasPrefix(arg, "--etcd-keyfile=") { + certificates.ServerKey = strings.TrimPrefix(arg, "--etcd-keyfile=") + } + } + + // fail if etcd servers is not found + if len(endpoints) == 0 { + return nil, nil, fmt.Errorf("couldn't find flag --etcd-servers within api-server flags") + } else if certificates.CaCert == "" || certificates.ServerCert == "" || certificates.ServerKey == "" { + return endpoints, nil, nil + } + return endpoints, certificates, nil +} + +func WaitForEtcdClient(parentCtx context.Context, certificates *Certificates, endpoints ...string) (*clientv3.Client, error) { var etcdClient *clientv3.Client var err error waitErr := wait.PollUntilContextTimeout(parentCtx, time.Second, waitForClientTimeout, true, func(ctx context.Context) (bool, error) { - etcdClient, err = GetEtcdClient(parentCtx, certificatesDir, endpoints...) + etcdClient, err = GetEtcdClient(parentCtx, certificates, endpoints...) if err == nil { _, err = etcdClient.MemberList(ctx) if err == nil { @@ -49,8 +79,8 @@ func WaitForEtcdClient(parentCtx context.Context, certificatesDir string, endpoi // If the runtime config does not list any endpoints, the default endpoint is used. // The returned client should be closed when no longer needed, in order to avoid leaking GRPC // client goroutines. -func GetEtcdClient(ctx context.Context, certificatesDir string, endpoints ...string) (*clientv3.Client, error) { - cfg, err := getClientConfig(ctx, certificatesDir, endpoints...) +func GetEtcdClient(ctx context.Context, certificates *Certificates, endpoints ...string) (*clientv3.Client, error) { + cfg, err := getClientConfig(ctx, certificates, endpoints...) if err != nil { return nil, err } @@ -60,7 +90,7 @@ func GetEtcdClient(ctx context.Context, certificatesDir string, endpoints ...str // getClientConfig generates an etcd client config connected to the specified endpoints. // If no endpoints are provided, getEndpoints is called to provide defaults. -func getClientConfig(ctx context.Context, certificatesDir string, endpoints ...string) (*clientv3.Config, error) { +func getClientConfig(ctx context.Context, certificates *Certificates, endpoints ...string) (*clientv3.Config, error) { config := &clientv3.Config{ Endpoints: endpoints, Context: ctx, @@ -73,22 +103,22 @@ func getClientConfig(ctx context.Context, certificatesDir string, endpoints ...s } var err error - if strings.HasPrefix(endpoints[0], "https://") && certificatesDir != "" { - config.TLS, err = toTLSConfig(certificatesDir) + if strings.HasPrefix(endpoints[0], "https://") && certificates != nil { + config.TLS, err = toTLSConfig(certificates) } return config, err } -func toTLSConfig(certificatesDir string) (*tls.Config, error) { +func toTLSConfig(certificates *Certificates) (*tls.Config, error) { clientCert, err := tls.LoadX509KeyPair( - filepath.Join(certificatesDir, certs.APIServerEtcdClientCertName), - filepath.Join(certificatesDir, certs.APIServerEtcdClientKeyName), + certificates.ServerCert, + certificates.ServerKey, ) if err != nil { return nil, err } - pool, err := certutil.NewPool(filepath.Join(certificatesDir, certs.EtcdCACertName)) + pool, err := certutil.NewPool(certificates.CaCert) if err != nil { return nil, err } diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index f2dad4e9a8..70bf74f52a 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -20,7 +20,7 @@ import ( "k8s.io/klog/v2" ) -const apiserverCmd = "APISERVER_COMMAND" +const apiServerCmd = "APISERVER_COMMAND" const schedulerCmd = "SCHEDULER_COMMAND" const controllerCmd = "CONTROLLER_COMMAND" @@ -28,75 +28,84 @@ type command struct { Command []string `json:"command,omitempty"` } -func StartK8S(ctx context.Context, apiUp chan struct{}, releaseName, serviceCIDR string) error { - // we need to retry the functions because etcd is started after the syncer, so - // the apiservers always fail when we start until etcd is up and running - // the controller needs the apiserver service to respond to start successfully - // and the service can only be reachable once the syncer can reach the api servers - - serviceCIDRarg := fmt.Sprintf("--service-cluster-ip-range=%s", serviceCIDR) +func StartK8S(ctx context.Context, serviceCIDR string) error { + serviceCIDRArg := fmt.Sprintf("--service-cluster-ip-range=%s", serviceCIDR) eg := &errgroup.Group{} - apiCommand := &command{} - apiEnv, ok := os.LookupEnv(apiserverCmd) + // start api server first + apiEnv, ok := os.LookupEnv(apiServerCmd) if ok { + apiCommand := &command{} err := yaml.Unmarshal([]byte(apiEnv), apiCommand) if err != nil { return fmt.Errorf("parsing apiserver command %s: %w", apiEnv, err) } - apiCommand.Command = append(apiCommand.Command, serviceCIDRarg) + + apiCommand.Command = append(apiCommand.Command, serviceCIDRArg) eg.Go(func() error { - _, err := etcd.WaitForEtcdClient(ctx, "/pki", "https://"+releaseName+"-etcd:2379") + // get etcd endpoints and certificates from flags + endpoints, certificates, err := etcd.EndpointsAndCertificatesFromFlags(apiCommand.Command) + if err != nil { + return fmt.Errorf("get etcd certificates and endpoint: %w", err) + } + + // wait until etcd is up and running + _, err = etcd.WaitForEtcdClient(ctx, certificates, endpoints...) if err != nil { return err } - return RunCommand(ctx, *apiCommand, "apiserver") + + // now start the api server + return RunCommand(ctx, apiCommand, "apiserver") }) } + // wait for api server to be up as otherwise controller and scheduler might fail isUp := waitForAPI(ctx) if !isUp { return errors.New("waited until timeout for the api to be up, but it never did") } - close(apiUp) - controllerCommand := &command{} + // start controller command controllerEnv, ok := os.LookupEnv(controllerCmd) if ok { + controllerCommand := &command{} err := yaml.Unmarshal([]byte(controllerEnv), controllerCommand) if err != nil { return fmt.Errorf("parsing controller command %s: %w", controllerEnv, err) } - controllerCommand.Command = append(controllerCommand.Command, serviceCIDRarg) + + controllerCommand.Command = append(controllerCommand.Command, serviceCIDRArg) eg.Go(func() error { - return RunCommand(ctx, *controllerCommand, "controller") + return RunCommand(ctx, controllerCommand, "controller") }) } - schedulerCommand := &command{} + // start scheduler command schedulerEnv, ok := os.LookupEnv(schedulerCmd) if ok { + schedulerCommand := &command{} err := yaml.Unmarshal([]byte(schedulerEnv), schedulerCommand) if err != nil { return fmt.Errorf("parsing scheduler command %s: %w", schedulerEnv, err) } + eg.Go(func() error { - return RunCommand(ctx, *schedulerCommand, "scheduler") + return RunCommand(ctx, schedulerCommand, "scheduler") }) } - err := eg.Wait() - // regular stop case, will return as soon as a component returns an error. // we don't expect the components to stop by themselves since they're supposed // to run until killed or until they fail + err := eg.Wait() if err == nil || err.Error() == "signal: killed" { return nil } return err } -func RunCommand(ctx context.Context, command command, component string) error { +func RunCommand(ctx context.Context, command *command, component string) error { reader, writer := io.Pipe() done := make(chan struct{}) diff --git a/pkg/setup/initialize.go b/pkg/setup/initialize.go index e1a7c41b2e..b68dd691f0 100644 --- a/pkg/setup/initialize.go +++ b/pkg/setup/initialize.go @@ -3,8 +3,8 @@ package setup import ( "context" "fmt" + "path/filepath" "strconv" - "strings" "time" "github.com/loft-sh/vcluster/pkg/certs" @@ -31,12 +31,6 @@ func Initialize( vClusterName string, options *options.VirtualClusterOptions, ) error { - // check if we should create certificates - certificatesDir := "" - if strings.HasPrefix(options.ServerCaCert, "/pki/") { - certificatesDir = "/pki" - } - // Ensure that service CIDR range is written into the expected location err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 2*time.Minute, true, func(waitCtx context.Context) (bool, error) { err := initialize( @@ -48,7 +42,6 @@ func Initialize( currentNamespace, vClusterName, options, - certificatesDir, ) if err != nil { klog.Errorf("error initializing service cidr, certs and token: %v", err) @@ -76,7 +69,6 @@ func initialize( currentNamespace, vClusterName string, options *options.VirtualClusterOptions, - certificatesDir string, ) error { distro := constants.GetVClusterDistro() @@ -125,8 +117,9 @@ func initialize( } }() case constants.K8SDistro, constants.EKSDistro: - if certificatesDir != "" { - // generate k8s certificates + // try to generate k8s certificates + certificatesDir := filepath.Dir(options.ServerCaCert) + if certificatesDir == "/pki" { err := GenerateK8sCerts(ctx, currentNamespaceClient, vClusterName, currentNamespace, serviceCIDR, certificatesDir, options.ClusterDomain) if err != nil { return err @@ -134,19 +127,17 @@ func initialize( } // start k8s - apiUp := make(chan struct{}) go func() { // we need to run this with the parent ctx as otherwise this context will be cancelled by the wait // loop in Initialize - err := k8s.StartK8S(parentCtx, apiUp, options.Name, serviceCIDR) + err := k8s.StartK8S(parentCtx, serviceCIDR) if err != nil { klog.Fatalf("Error running k8s: %v", err) } }() - klog.Info("waiting for the api to be up") - <-apiUp case constants.Unknown: - if certificatesDir != "" { + certificatesDir := filepath.Dir(options.ServerCaCert) + if certificatesDir == "/pki" { // generate k8s certificates err := GenerateK8sCerts(ctx, currentNamespaceClient, vClusterName, currentNamespace, serviceCIDR, certificatesDir, options.ClusterDomain) if err != nil {