Skip to content

Commit

Permalink
refactor: improve startup
Browse files Browse the repository at this point in the history
  • Loading branch information
FabianKramm committed Jan 26, 2024
1 parent 0e8e71a commit de318c7
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 51 deletions.
56 changes: 43 additions & 13 deletions pkg/etcd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"crypto/tls"
"fmt"
"path/filepath"
"strings"
"time"

"github.com/loft-sh/vcluster/pkg/certs"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -20,11 +18,43 @@ const (
waitForClientTimeout = time.Minute * 10
)

func WaitForEtcdClient(parentCtx context.Context, certificatesDir string, endpoints ...string) (*clientv3.Client, error) {
type Certificates struct {
CaCert string
ServerCert string
ServerKey string
}

func EndpointsAndCertificatesFromFlags(flags []string) ([]string, *Certificates, error) {
certificates := &Certificates{}
endpoints := []string{}

// parse flags
for _, arg := range flags {
if strings.HasPrefix(arg, "--etcd-servers=") {
endpoints = strings.Split(strings.TrimPrefix(arg, "--etcd-servers="), ",")
} else if strings.HasPrefix(arg, "--etcd-cafile=") {
certificates.CaCert = strings.TrimPrefix(arg, "--etcd-cafile=")
} else if strings.HasPrefix(arg, "--etcd-certfile=") {
certificates.ServerCert = strings.TrimPrefix(arg, "--etcd-certfile=")
} else if strings.HasPrefix(arg, "--etcd-keyfile=") {
certificates.ServerKey = strings.TrimPrefix(arg, "--etcd-keyfile=")
}
}

// fail if etcd servers is not found
if len(endpoints) == 0 {
return nil, nil, fmt.Errorf("couldn't find flag --etcd-servers within api-server flags")
} else if certificates.CaCert == "" || certificates.ServerCert == "" || certificates.ServerKey == "" {
return endpoints, nil, nil
}
return endpoints, certificates, nil
}

func WaitForEtcdClient(parentCtx context.Context, certificates *Certificates, endpoints ...string) (*clientv3.Client, error) {
var etcdClient *clientv3.Client
var err error
waitErr := wait.PollUntilContextTimeout(parentCtx, time.Second, waitForClientTimeout, true, func(ctx context.Context) (bool, error) {
etcdClient, err = GetEtcdClient(parentCtx, certificatesDir, endpoints...)
etcdClient, err = GetEtcdClient(parentCtx, certificates, endpoints...)
if err == nil {
_, err = etcdClient.MemberList(ctx)
if err == nil {
Expand All @@ -49,8 +79,8 @@ func WaitForEtcdClient(parentCtx context.Context, certificatesDir string, endpoi
// If the runtime config does not list any endpoints, the default endpoint is used.
// The returned client should be closed when no longer needed, in order to avoid leaking GRPC
// client goroutines.
func GetEtcdClient(ctx context.Context, certificatesDir string, endpoints ...string) (*clientv3.Client, error) {
cfg, err := getClientConfig(ctx, certificatesDir, endpoints...)
func GetEtcdClient(ctx context.Context, certificates *Certificates, endpoints ...string) (*clientv3.Client, error) {
cfg, err := getClientConfig(ctx, certificates, endpoints...)
if err != nil {
return nil, err
}
Expand All @@ -60,7 +90,7 @@ func GetEtcdClient(ctx context.Context, certificatesDir string, endpoints ...str

// getClientConfig generates an etcd client config connected to the specified endpoints.
// If no endpoints are provided, getEndpoints is called to provide defaults.
func getClientConfig(ctx context.Context, certificatesDir string, endpoints ...string) (*clientv3.Config, error) {
func getClientConfig(ctx context.Context, certificates *Certificates, endpoints ...string) (*clientv3.Config, error) {
config := &clientv3.Config{
Endpoints: endpoints,
Context: ctx,
Expand All @@ -73,22 +103,22 @@ func getClientConfig(ctx context.Context, certificatesDir string, endpoints ...s
}

var err error
if strings.HasPrefix(endpoints[0], "https://") && certificatesDir != "" {
config.TLS, err = toTLSConfig(certificatesDir)
if strings.HasPrefix(endpoints[0], "https://") && certificates != nil {
config.TLS, err = toTLSConfig(certificates)
}
return config, err
}

func toTLSConfig(certificatesDir string) (*tls.Config, error) {
func toTLSConfig(certificates *Certificates) (*tls.Config, error) {
clientCert, err := tls.LoadX509KeyPair(
filepath.Join(certificatesDir, certs.APIServerEtcdClientCertName),
filepath.Join(certificatesDir, certs.APIServerEtcdClientKeyName),
certificates.ServerCert,
certificates.ServerKey,
)
if err != nil {
return nil, err
}

pool, err := certutil.NewPool(filepath.Join(certificatesDir, certs.EtcdCACertName))
pool, err := certutil.NewPool(certificates.CaCert)
if err != nil {
return nil, err
}
Expand Down
53 changes: 31 additions & 22 deletions pkg/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,83 +20,92 @@ import (
"k8s.io/klog/v2"
)

const apiserverCmd = "APISERVER_COMMAND"
const apiServerCmd = "APISERVER_COMMAND"
const schedulerCmd = "SCHEDULER_COMMAND"
const controllerCmd = "CONTROLLER_COMMAND"

type command struct {
Command []string `json:"command,omitempty"`
}

func StartK8S(ctx context.Context, apiUp chan struct{}, releaseName, serviceCIDR string) error {
// we need to retry the functions because etcd is started after the syncer, so
// the apiservers always fail when we start until etcd is up and running
// the controller needs the apiserver service to respond to start successfully
// and the service can only be reachable once the syncer can reach the api servers

serviceCIDRarg := fmt.Sprintf("--service-cluster-ip-range=%s", serviceCIDR)
func StartK8S(ctx context.Context, serviceCIDR string) error {
serviceCIDRArg := fmt.Sprintf("--service-cluster-ip-range=%s", serviceCIDR)
eg := &errgroup.Group{}

apiCommand := &command{}
apiEnv, ok := os.LookupEnv(apiserverCmd)
// start api server first
apiEnv, ok := os.LookupEnv(apiServerCmd)
if ok {
apiCommand := &command{}
err := yaml.Unmarshal([]byte(apiEnv), apiCommand)
if err != nil {
return fmt.Errorf("parsing apiserver command %s: %w", apiEnv, err)
}
apiCommand.Command = append(apiCommand.Command, serviceCIDRarg)

apiCommand.Command = append(apiCommand.Command, serviceCIDRArg)
eg.Go(func() error {
_, err := etcd.WaitForEtcdClient(ctx, "/pki", "https://"+releaseName+"-etcd:2379")
// get etcd endpoints and certificates from flags
endpoints, certificates, err := etcd.EndpointsAndCertificatesFromFlags(apiCommand.Command)
if err != nil {
return fmt.Errorf("get etcd certificates and endpoint: %w", err)
}

// wait until etcd is up and running
_, err = etcd.WaitForEtcdClient(ctx, certificates, endpoints...)
if err != nil {
return err
}
return RunCommand(ctx, *apiCommand, "apiserver")

// now start the api server
return RunCommand(ctx, apiCommand, "apiserver")
})
}

// wait for api server to be up as otherwise controller and scheduler might fail
isUp := waitForAPI(ctx)
if !isUp {
return errors.New("waited until timeout for the api to be up, but it never did")
}
close(apiUp)

controllerCommand := &command{}
// start controller command
controllerEnv, ok := os.LookupEnv(controllerCmd)
if ok {
controllerCommand := &command{}
err := yaml.Unmarshal([]byte(controllerEnv), controllerCommand)
if err != nil {
return fmt.Errorf("parsing controller command %s: %w", controllerEnv, err)
}
controllerCommand.Command = append(controllerCommand.Command, serviceCIDRarg)

controllerCommand.Command = append(controllerCommand.Command, serviceCIDRArg)
eg.Go(func() error {
return RunCommand(ctx, *controllerCommand, "controller")
return RunCommand(ctx, controllerCommand, "controller")
})
}

schedulerCommand := &command{}
// start scheduler command
schedulerEnv, ok := os.LookupEnv(schedulerCmd)
if ok {
schedulerCommand := &command{}
err := yaml.Unmarshal([]byte(schedulerEnv), schedulerCommand)
if err != nil {
return fmt.Errorf("parsing scheduler command %s: %w", schedulerEnv, err)
}

eg.Go(func() error {
return RunCommand(ctx, *schedulerCommand, "scheduler")
return RunCommand(ctx, schedulerCommand, "scheduler")
})
}

err := eg.Wait()

// regular stop case, will return as soon as a component returns an error.
// we don't expect the components to stop by themselves since they're supposed
// to run until killed or until they fail
err := eg.Wait()
if err == nil || err.Error() == "signal: killed" {
return nil
}
return err
}

func RunCommand(ctx context.Context, command command, component string) error {
func RunCommand(ctx context.Context, command *command, component string) error {
reader, writer := io.Pipe()

done := make(chan struct{})
Expand Down
23 changes: 7 additions & 16 deletions pkg/setup/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package setup
import (
"context"
"fmt"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/loft-sh/vcluster/pkg/certs"
Expand All @@ -31,12 +31,6 @@ func Initialize(
vClusterName string,
options *options.VirtualClusterOptions,
) error {
// check if we should create certificates
certificatesDir := ""
if strings.HasPrefix(options.ServerCaCert, "/pki/") {
certificatesDir = "/pki"
}

// Ensure that service CIDR range is written into the expected location
err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 2*time.Minute, true, func(waitCtx context.Context) (bool, error) {
err := initialize(
Expand All @@ -48,7 +42,6 @@ func Initialize(
currentNamespace,
vClusterName,
options,
certificatesDir,
)
if err != nil {
klog.Errorf("error initializing service cidr, certs and token: %v", err)
Expand Down Expand Up @@ -76,7 +69,6 @@ func initialize(
currentNamespace,
vClusterName string,
options *options.VirtualClusterOptions,
certificatesDir string,
) error {
distro := constants.GetVClusterDistro()

Expand Down Expand Up @@ -125,28 +117,27 @@ func initialize(
}
}()
case constants.K8SDistro, constants.EKSDistro:
if certificatesDir != "" {
// generate k8s certificates
// try to generate k8s certificates
certificatesDir := filepath.Dir(options.ServerCaCert)
if certificatesDir == "/pki" {
err := GenerateK8sCerts(ctx, currentNamespaceClient, vClusterName, currentNamespace, serviceCIDR, certificatesDir, options.ClusterDomain)
if err != nil {
return err
}
}

// start k8s
apiUp := make(chan struct{})
go func() {
// we need to run this with the parent ctx as otherwise this context will be cancelled by the wait
// loop in Initialize
err := k8s.StartK8S(parentCtx, apiUp, options.Name, serviceCIDR)
err := k8s.StartK8S(parentCtx, serviceCIDR)
if err != nil {
klog.Fatalf("Error running k8s: %v", err)
}
}()
klog.Info("waiting for the api to be up")
<-apiUp
case constants.Unknown:
if certificatesDir != "" {
certificatesDir := filepath.Dir(options.ServerCaCert)
if certificatesDir == "/pki" {
// generate k8s certificates
err := GenerateK8sCerts(ctx, currentNamespaceClient, vClusterName, currentNamespace, serviceCIDR, certificatesDir, options.ClusterDomain)
if err != nil {
Expand Down

0 comments on commit de318c7

Please sign in to comment.