From 6ba7ead8b3b4556c3eaadf124abb292de36725ff Mon Sep 17 00:00:00 2001 From: Fabian Kramm Date: Fri, 26 Jan 2024 16:49:55 +0100 Subject: [PATCH] fix: metrics server --- pkg/metricsapiservice/register.go | 47 +++- pkg/server/filters/metrics_server.go | 335 +++++++++++++++++---------- 2 files changed, 253 insertions(+), 129 deletions(-) diff --git a/pkg/metricsapiservice/register.go b/pkg/metricsapiservice/register.go index e583f406f3..43746faf4e 100644 --- a/pkg/metricsapiservice/register.go +++ b/pkg/metricsapiservice/register.go @@ -6,12 +6,14 @@ import ( "time" "github.com/loft-sh/vcluster/pkg/setup/options" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" "k8s.io/metrics/pkg/apis/metrics" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -73,21 +75,50 @@ func deleteOperation(ctrlCtx *options.ControllerContext) wait.ConditionWithConte func createOperation(ctrlCtx *options.ControllerContext) wait.ConditionWithContextFunc { return func(ctx context.Context) (bool, error) { - spec := apiregistrationv1.APIServiceSpec{ - Group: metrics.GroupName, - GroupPriorityMinimum: 100, - Version: MetricsVersion, - VersionPriority: 100, + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "metrics-service", + Namespace: "kube-system", + }, + } + _, err := controllerutil.CreateOrUpdate(ctx, ctrlCtx.VirtualManager.GetClient(), service, func() error { + service.Spec.Type = corev1.ServiceTypeExternalName + service.Spec.ExternalName = "localhost" + service.Spec.Ports = []corev1.ServicePort{ + { + Port: 8443, + }, + } + return nil + }) + if err != nil { + if kerrors.IsAlreadyExists(err) { + return true, nil + } + + klog.Errorf("error creating api service %v", err) + return false, nil } + apiServiceSpec := apiregistrationv1.APIServiceSpec{ + Service: &apiregistrationv1.ServiceReference{ + Namespace: "kube-system", + Name: "metrics-service", + Port: ptr.To(int32(8443)), + }, + InsecureSkipTLSVerify: true, + Group: metrics.GroupName, + GroupPriorityMinimum: 100, + Version: MetricsVersion, + VersionPriority: 100, + } apiService := &apiregistrationv1.APIService{ ObjectMeta: metav1.ObjectMeta{ Name: MetricsAPIServiceName, }, } - - _, err := controllerutil.CreateOrUpdate(ctx, ctrlCtx.VirtualManager.GetClient(), apiService, func() error { - apiService.Spec = spec + _, err = controllerutil.CreateOrUpdate(ctx, ctrlCtx.VirtualManager.GetClient(), apiService, func() error { + apiService.Spec = apiServiceSpec return nil }) if err != nil { diff --git a/pkg/server/filters/metrics_server.go b/pkg/server/filters/metrics_server.go index 7b311dcc2c..0e16d3578b 100644 --- a/pkg/server/filters/metrics_server.go +++ b/pkg/server/filters/metrics_server.go @@ -13,6 +13,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/audit" @@ -37,7 +38,6 @@ const ( NodeResource = "nodes" PodResource = "pods" - HeaderContentType = "Content-Type" LabelSelectorQueryParam = "labelSelector" ) @@ -81,13 +81,24 @@ func WithMetricsServerProxy( return } - req.Header.Del("Authorization") - handleAPIResourceListRequest(w, req, proxyHandler, serializer.NewCodecFactory(cachedVirtualClient.Scheme())) + handleAPIResourceListRequest(w, req, proxyHandler, cachedVirtualClient.Scheme()) + return + } + + // is version request? + if isAPIResourceVersionListRequest(info) { + proxyHandler, err := handler.Handler("", hostConfig, nil) + if err != nil { + requestpkg.FailWithStatus(w, req, http.StatusInternalServerError, err) + return + } + + handleAPIResourceVersionListRequest(w, req, proxyHandler, cachedVirtualClient.Scheme()) return } // is new aggregated list request? - if isNewAPIResourceListRequest(info, req) { + if isNewAPIResourceListRequest(info) { proxyHandler, err := handler.Handler("", virtualConfig, nil) if err != nil { requestpkg.FailWithStatus(w, req, http.StatusInternalServerError, err) @@ -104,14 +115,18 @@ func WithMetricsServerProxy( }) } -func isNewAPIResourceListRequest(r *request.RequestInfo, req *http.Request) bool { - return r.Path == "/apis" && strings.Contains(req.Header.Get("Accept"), "application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList") +func isNewAPIResourceListRequest(r *request.RequestInfo) bool { + return r.Path == "/apis" } func isAPIResourceListRequest(r *request.RequestInfo) bool { return r.Path == "/apis/metrics.k8s.io/v1beta1" } +func isAPIResourceVersionListRequest(r *request.RequestInfo) bool { + return r.Path == "/apis/metrics.k8s.io" +} + func isMetricsServerProxyRequest(r *request.RequestInfo) bool { if !r.IsResourceRequest { return false @@ -122,124 +137,46 @@ func isMetricsServerProxyRequest(r *request.RequestInfo) bool { (r.Resource == NodeResource || r.Resource == PodResource) } -func handleMetricsServerProxyRequest( - w http.ResponseWriter, - req *http.Request, - targetNamespace string, - cachedHostClient, - cachedVirtualClient client.Client, - info *request.RequestInfo, - hostConfig *rest.Config, - multiNamespaceMode bool, -) { - splitted := strings.Split(req.URL.Path, "/") - err := translateLabelSelectors(req) - if err != nil { - klog.Infof("error translating label selectors %v", err) - requestpkg.FailWithStatus(w, req, http.StatusInternalServerError, err) - return - } - - metricsServerProxy := &MetricsServerProxy{ - request: req, - requestInfo: info, - responseWriter: w, - resourceType: NodeResource, - verb: info.Verb, - - client: cachedHostClient, - } - - // request is for get particular pod - if info.Resource == PodResource && info.Verb == RequestVerbGet { - namespace := translate.Default.PhysicalNamespace(info.Namespace) - name := translate.Default.PhysicalName(info.Name, info.Namespace) - metricsServerProxy.resourceType = PodResource - - // replace the translated name and namespace - splitted[5] = namespace - splitted[7] = name - - req.URL.Path = strings.Join(splitted, "/") - } - - // request is for list pods - if info.Resource == PodResource && info.Verb == RequestVerbList { - // check if its a list request across all namespaces - if info.Namespace != "" { - splitted[5] = translate.Default.PhysicalNamespace(info.Namespace) - } else if !multiNamespaceMode { - // limit to current namespace in host cluster - splitted = append(splitted[:4], append([]string{"namespaces", targetNamespace}, splitted[4:]...)...) - } - - metricsServerProxy.resourceType = PodResource - vPodList, err := getVirtualPodObjectsInNamespace(req.Context(), cachedVirtualClient, info.Namespace) +func handleNewAPIResourceListRequest( + responseWriter http.ResponseWriter, + request *http.Request, + handler http.Handler, + scheme *runtime.Scheme, +) bool { + // try parsing data into api group discovery list + if strings.Contains(request.Header.Get("Accept"), "application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList") { + // execute the request + code, _, data, err := executeRequest(request, handler) if err != nil { - klog.Infof("error getting vpods in namespace %v", err) - requestpkg.FailWithStatus(w, req, http.StatusInternalServerError, err) - return - } - - metricsServerProxy.podsInNamespace = vPodList - req.URL.Path = strings.Join(splitted, "/") - } - - acceptHeader := req.Header.Get("Accept") - if info.Resource == NodeResource { - if strings.Contains(acceptHeader, "as=Table;") { - // respond a 403 for now as we don't want to expose all host nodes with the table response - // TODO: rewrite node table response to only show nodes synced in the vcluster - requestpkg.FailWithStatus(w, req, http.StatusForbidden, fmt.Errorf("cannot list nodes in table response format")) - return + klog.Infof("error executing request %v", err) + return false + } else if code != http.StatusOK { + klog.Infof("error status not ok %v", err) + return false } - // fetch and fill vcluster synced nodes - nodeList, err := getVirtualNodes(req.Context(), cachedVirtualClient) - if err != nil { - requestpkg.FailWithStatus(w, req, http.StatusInternalServerError, err) - return + if handleAPIGroupDiscoveryList(responseWriter, request, data, scheme) { + return true } - - metricsServerProxy.nodesInVCluster = nodeList } - proxyHandler, err := handler.Handler("", hostConfig, nil) - if err != nil { - requestpkg.FailWithStatus(w, req, http.StatusInternalServerError, err) - return - } - - req.Header.Del("Authorization") - metricsServerProxy.handler = proxyHandler - metricsServerProxy.HandleRequest() + return false } -func handleNewAPIResourceListRequest( +func handleAPIGroupDiscoveryList( responseWriter http.ResponseWriter, request *http.Request, - handler http.Handler, + data []byte, scheme *runtime.Scheme, ) bool { - // execute the request - code, _, data, err := executeRequest(request, handler) - if err != nil { - klog.Infof("error executing request %v", err) - return false - } else if code != http.StatusOK { - klog.Infof("error status not ok %v", err) - return false - } - - // try parsing data response := &apidiscoveryv2beta1.APIGroupDiscoveryList{} codecFactory := serializer.NewCodecFactory(scheme) - _, _, err = codecFactory.UniversalDeserializer().Decode(data, nil, response) + _, _, err := codecFactory.UniversalDeserializer().Decode(data, nil, response) if err != nil { klog.Infof("error unmarshalling discovery list %v", err) return false } else if response.Kind != "APIGroupDiscoveryList" || response.APIVersion != apidiscoveryv2beta1.SchemeGroupVersion.String() { - klog.Infof("error retrieving discovery list: unexpected kind or apiversion %s %s", response.Kind, response.APIVersion) + klog.Infof("error retrieving discovery list: unexpected kind or apiversion %s %s %s", response.Kind, response.APIVersion, string(data)) return false } @@ -285,12 +222,55 @@ func handleNewAPIResourceListRequest( return true } +func handleAPIResourceVersionListRequest( + responseWriter http.ResponseWriter, + request *http.Request, + handler http.Handler, + scheme *runtime.Scheme, +) { + codecFactory := serializer.NewCodecFactory(scheme) + code, header, data, err := executeRequest(request, handler) + if err != nil { + klog.Infof("error executing request %v", err) + responsewriters.ErrorNegotiated(err, codecFactory, corev1.SchemeGroupVersion, responseWriter, request) + return + } else if code != http.StatusOK { + klog.Infof("error status not ok %v", err) + writeWithHeader(responseWriter, code, header, data) + return + } + + response := &metav1.APIGroup{} + _, _, err = codecFactory.UniversalDeserializer().Decode(data, nil, response) + if err != nil { + klog.Infof("error unmarshalling resource list %v", err) + responsewriters.ErrorNegotiated(err, codecFactory, corev1.SchemeGroupVersion, responseWriter, request) + return + } else if response.Kind != "APIGroup" { + err = fmt.Errorf("error retrieving resource version list: unexpected kind or apiversion %s %s %s", response.Kind, response.APIVersion, string(data)) + klog.Info(err.Error()) + responsewriters.ErrorNegotiated(err, codecFactory, corev1.SchemeGroupVersion, responseWriter, request) + return + } + + // return new data + WriteObjectNegotiatedWithGVK( + responseWriter, + request, + response, + scheme, + corev1.SchemeGroupVersion, + "", + ) +} + func handleAPIResourceListRequest( responseWriter http.ResponseWriter, request *http.Request, handler http.Handler, - codecFactory serializer.CodecFactory, + scheme *runtime.Scheme, ) { + codecFactory := serializer.NewCodecFactory(scheme) code, header, data, err := executeRequest(request, handler) if err != nil { klog.Infof("error executing request %v", err) @@ -302,13 +282,28 @@ func handleAPIResourceListRequest( return } - responseWriter.Header().Set(HeaderContentType, header.Get(HeaderContentType)) - _, err = responseWriter.Write(data) + response := &metav1.APIResourceList{} + _, _, err = codecFactory.UniversalDeserializer().Decode(data, nil, response) if err != nil { - klog.Infof("error writing response %v", err) - requestpkg.FailWithStatus(responseWriter, request, http.StatusInternalServerError, err) + klog.Infof("error unmarshalling resource list %v", err) + responsewriters.ErrorNegotiated(err, codecFactory, corev1.SchemeGroupVersion, responseWriter, request) + return + } else if response.Kind != "APIResourceList" { + err = fmt.Errorf("error retrieving resource list: unexpected kind or apiversion %s %s %s", response.Kind, response.APIVersion, string(data)) + klog.Info(err.Error()) + responsewriters.ErrorNegotiated(err, codecFactory, corev1.SchemeGroupVersion, responseWriter, request) return } + + // return new data + WriteObjectNegotiatedWithGVK( + responseWriter, + request, + response, + scheme, + corev1.SchemeGroupVersion, + "", + ) } type MetricsServerProxy struct { @@ -326,6 +321,99 @@ type MetricsServerProxy struct { client client.Client } +func handleMetricsServerProxyRequest( + w http.ResponseWriter, + req *http.Request, + targetNamespace string, + cachedHostClient, + cachedVirtualClient client.Client, + info *request.RequestInfo, + hostConfig *rest.Config, + multiNamespaceMode bool, +) { + splitted := strings.Split(req.URL.Path, "/") + err := translateLabelSelectors(req) + if err != nil { + klog.Infof("error translating label selectors %v", err) + requestpkg.FailWithStatus(w, req, http.StatusInternalServerError, err) + return + } + + metricsServerProxy := &MetricsServerProxy{ + request: req, + requestInfo: info, + responseWriter: w, + resourceType: NodeResource, + verb: info.Verb, + + client: cachedHostClient, + } + + // request is for get particular pod + if info.Resource == PodResource && info.Verb == RequestVerbGet { + namespace := translate.Default.PhysicalNamespace(info.Namespace) + name := translate.Default.PhysicalName(info.Name, info.Namespace) + metricsServerProxy.resourceType = PodResource + + // replace the translated name and namespace + splitted[5] = namespace + splitted[7] = name + + req.URL.Path = strings.Join(splitted, "/") + } + + // request is for list pods + if info.Resource == PodResource && info.Verb == RequestVerbList { + // check if its a list request across all namespaces + if info.Namespace != "" { + splitted[5] = translate.Default.PhysicalNamespace(info.Namespace) + } else if !multiNamespaceMode { + // limit to current namespace in host cluster + splitted = append(splitted[:4], append([]string{"namespaces", targetNamespace}, splitted[4:]...)...) + } + + metricsServerProxy.resourceType = PodResource + vPodList, err := getVirtualPodObjectsInNamespace(req.Context(), cachedVirtualClient, info.Namespace) + if err != nil { + klog.Infof("error getting vpods in namespace %v", err) + requestpkg.FailWithStatus(w, req, http.StatusInternalServerError, err) + return + } + + metricsServerProxy.podsInNamespace = vPodList + req.URL.Path = strings.Join(splitted, "/") + } + + acceptHeader := req.Header.Get("Accept") + if info.Resource == NodeResource { + if strings.Contains(acceptHeader, "as=Table;") { + // respond a 403 for now as we don't want to expose all host nodes with the table response + // TODO: rewrite node table response to only show nodes synced in the vcluster + requestpkg.FailWithStatus(w, req, http.StatusForbidden, fmt.Errorf("cannot list nodes in table response format")) + return + } + + // fetch and fill vcluster synced nodes + nodeList, err := getVirtualNodes(req.Context(), cachedVirtualClient) + if err != nil { + requestpkg.FailWithStatus(w, req, http.StatusInternalServerError, err) + return + } + + metricsServerProxy.nodesInVCluster = nodeList + } + + proxyHandler, err := handler.Handler("", hostConfig, nil) + if err != nil { + requestpkg.FailWithStatus(w, req, http.StatusInternalServerError, err) + return + } + + req.Header.Del("Authorization") + metricsServerProxy.handler = proxyHandler + metricsServerProxy.HandleRequest() +} + type RowData struct { Index int Cells []interface{} @@ -621,20 +709,14 @@ func WriteObjectNegotiated(w http.ResponseWriter, req *http.Request, object runt WriteObjectNegotiatedWithMediaType(w, req, object, scheme, "") } -func WriteObjectNegotiatedWithMediaType(w http.ResponseWriter, req *http.Request, object runtime.Object, scheme *runtime.Scheme, overrideMediaType string) { +func WriteObjectNegotiatedWithGVK(w http.ResponseWriter, req *http.Request, object runtime.Object, scheme *runtime.Scheme, groupVersion schema.GroupVersion, overrideMediaType string) { s := serializer.NewCodecFactory(scheme) statusCode := http.StatusOK - gvk, err := apiutil.GVKForObject(object, scheme) - if err != nil { - responsewriters.ErrorNegotiated(err, s, corev1.SchemeGroupVersion, w, req) - return - } - stream, ok := object.(apirest.ResourceStreamer) if ok { requestInfo, _ := request.RequestInfoFrom(req.Context()) metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() { - responsewriters.StreamObject(statusCode, gvk.GroupVersion(), s, stream, w, req) + responsewriters.StreamObject(statusCode, groupVersion, s, stream, w, req) }) return } @@ -646,9 +728,9 @@ func WriteObjectNegotiatedWithMediaType(w http.ResponseWriter, req *http.Request return } - audit.LogResponseObject(req.Context(), object, gvk.GroupVersion(), s) + audit.LogResponseObject(req.Context(), object, groupVersion, s) - encoder := s.EncoderForVersion(serializer.Serializer, gvk.GroupVersion()) + encoder := s.EncoderForVersion(serializer.Serializer, groupVersion) request.TrackSerializeResponseObjectLatency(req.Context(), func() { if overrideMediaType != "" { responsewriters.SerializeObject(overrideMediaType, encoder, w, req, statusCode, object) @@ -657,3 +739,14 @@ func WriteObjectNegotiatedWithMediaType(w http.ResponseWriter, req *http.Request } }) } + +func WriteObjectNegotiatedWithMediaType(w http.ResponseWriter, req *http.Request, object runtime.Object, scheme *runtime.Scheme, overrideMediaType string) { + s := serializer.NewCodecFactory(scheme) + gvk, err := apiutil.GVKForObject(object, scheme) + if err != nil { + responsewriters.ErrorNegotiated(err, s, corev1.SchemeGroupVersion, w, req) + return + } + + WriteObjectNegotiatedWithGVK(w, req, object, scheme, gvk.GroupVersion(), overrideMediaType) +}