From a694d916dfbfd7a5565ff8bd24b13028cd7d3afa Mon Sep 17 00:00:00 2001 From: AllenXu93 Date: Fri, 13 Dec 2024 15:15:39 +0800 Subject: [PATCH 1/4] fix topology-updater cpu report --- pkg/resourcemonitor/podresourcesscanner.go | 45 ++++++++++++---------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/pkg/resourcemonitor/podresourcesscanner.go b/pkg/resourcemonitor/podresourcesscanner.go index 05d7306406..c8ee39e0fe 100644 --- a/pkg/resourcemonitor/podresourcesscanner.go +++ b/pkg/resourcemonitor/podresourcesscanner.go @@ -63,6 +63,11 @@ func (resMon *PodResourcesScanner) isWatchable(podNamespace string, podName stri return false, false, err } + // Check Pod is guaranteed QOS class and has exclusive CPUs or devices + if pod.Status.QOSClass != corev1.PodQOSGuaranteed { + return false, false, nil + } + isIntegralGuaranteed := hasExclusiveCPUs(pod) if resMon.namespace == "*" && (isIntegralGuaranteed || hasDevice) { @@ -85,9 +90,9 @@ func hasExclusiveCPUs(pod *corev1.Pod) bool { continue } totalCPU += cpuQuantity.Value() - isInitContainerGuaranteed := hasIntegralCPUs(pod, &container) - if !isInitContainerGuaranteed { - return false + isInitContainerGuaranteed := hasIntegralCPUs(&container) + if isInitContainerGuaranteed { + return true } } for _, container := range pod.Spec.Containers { @@ -96,9 +101,9 @@ func hasExclusiveCPUs(pod *corev1.Pod) bool { continue } totalCPU += cpuQuantity.Value() - isAppContainerGuaranteed := hasIntegralCPUs(pod, &container) - if !isAppContainerGuaranteed { - return false + isAppContainerGuaranteed := hasIntegralCPUs(&container) + if isAppContainerGuaranteed { + return true } } @@ -107,7 +112,7 @@ func hasExclusiveCPUs(pod *corev1.Pod) bool { } // hasIntegralCPUs returns true if a container in pod is requesting integral CPUs else returns false -func hasIntegralCPUs(pod *corev1.Pod, container *corev1.Container) bool { +func hasIntegralCPUs(container *corev1.Container) bool { cpuQuantity := container.Resources.Requests[corev1.ResourceCPU] return cpuQuantity.Value()*1000 == cpuQuantity.MilliValue() } @@ -147,7 +152,7 @@ func (resMon *PodResourcesScanner) Scan() (ScanResponse, error) { for _, podResource := range respPodResources { klog.InfoS("scanning pod", "podName", podResource.GetName()) hasDevice := hasDevice(podResource) - isWatchable, isIntegralGuaranteed, err := resMon.isWatchable(podResource.GetNamespace(), podResource.GetName(), hasDevice) + isWatchable, _, err := resMon.isWatchable(podResource.GetNamespace(), podResource.GetName(), hasDevice) if err != nil { return ScanResponse{}, fmt.Errorf("checking if pod in a namespace is watchable, namespace:%v, pod name %v: %w", podResource.GetNamespace(), podResource.GetName(), err) } @@ -165,19 +170,17 @@ func (resMon *PodResourcesScanner) Scan() (ScanResponse, error) { Name: container.Name, } - if isIntegralGuaranteed { - cpuIDs := container.GetCpuIds() - if len(cpuIDs) > 0 { - var resCPUs []string - for _, cpuID := range container.GetCpuIds() { - resCPUs = append(resCPUs, strconv.FormatInt(cpuID, 10)) - } - contRes.Resources = []ResourceInfo{ - { - Name: corev1.ResourceCPU, - Data: resCPUs, - }, - } + cpuIDs := container.GetCpuIds() + if len(cpuIDs) > 0 { + var resCPUs []string + for _, cpuID := range container.GetCpuIds() { + resCPUs = append(resCPUs, strconv.FormatInt(cpuID, 10)) + } + contRes.Resources = []ResourceInfo{ + { + Name: corev1.ResourceCPU, + Data: resCPUs, + }, } } From 17d374d78e05e99e708c2123592d0102c13af032 Mon Sep 17 00:00:00 2001 From: AllenXu93 Date: Fri, 20 Dec 2024 18:58:14 +0800 Subject: [PATCH 2/4] fix for UT --- pkg/resourcemonitor/podresourcesscanner.go | 6 +- .../podresourcesscanner_test.go | 151 ++++++++++++++++++ test/e2e/utils/pod/pod.go | 3 + 3 files changed, 157 insertions(+), 3 deletions(-) diff --git a/pkg/resourcemonitor/podresourcesscanner.go b/pkg/resourcemonitor/podresourcesscanner.go index c8ee39e0fe..67a8e32ce4 100644 --- a/pkg/resourcemonitor/podresourcesscanner.go +++ b/pkg/resourcemonitor/podresourcesscanner.go @@ -108,7 +108,7 @@ func hasExclusiveCPUs(pod *corev1.Pod) bool { } //No CPUs requested in all the containers in the pod - return totalCPU != 0 + return false } // hasIntegralCPUs returns true if a container in pod is requesting integral CPUs else returns false @@ -152,7 +152,7 @@ func (resMon *PodResourcesScanner) Scan() (ScanResponse, error) { for _, podResource := range respPodResources { klog.InfoS("scanning pod", "podName", podResource.GetName()) hasDevice := hasDevice(podResource) - isWatchable, _, err := resMon.isWatchable(podResource.GetNamespace(), podResource.GetName(), hasDevice) + isWatchable, isIntegralGuaranteed, err := resMon.isWatchable(podResource.GetNamespace(), podResource.GetName(), hasDevice) if err != nil { return ScanResponse{}, fmt.Errorf("checking if pod in a namespace is watchable, namespace:%v, pod name %v: %w", podResource.GetNamespace(), podResource.GetName(), err) } @@ -171,7 +171,7 @@ func (resMon *PodResourcesScanner) Scan() (ScanResponse, error) { } cpuIDs := container.GetCpuIds() - if len(cpuIDs) > 0 { + if len(cpuIDs) > 0 && isIntegralGuaranteed { var resCPUs []string for _, cpuID := range container.GetCpuIds() { resCPUs = append(resCPUs, strconv.FormatInt(cpuID, 10)) diff --git a/pkg/resourcemonitor/podresourcesscanner_test.go b/pkg/resourcemonitor/podresourcesscanner_test.go index ee9dfd84ed..1c63070095 100644 --- a/pkg/resourcemonitor/podresourcesscanner_test.go +++ b/pkg/resourcemonitor/podresourcesscanner_test.go @@ -165,6 +165,9 @@ func TestPodScanner(t *testing.T) { }, }, }, + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSGuaranteed, + }, } fakeCli := fakeclient.NewSimpleClientset(pod) @@ -280,6 +283,9 @@ func TestPodScanner(t *testing.T) { }, }, }, + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSGuaranteed, + }, } fakeCli = fakeclient.NewSimpleClientset(pod) @@ -368,6 +374,9 @@ func TestPodScanner(t *testing.T) { }, }, }, + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSGuaranteed, + }, } fakeCli = fakeclient.NewSimpleClientset(pod) resScan.(*PodResourcesScanner).k8sClient = fakeCli @@ -458,6 +467,9 @@ func TestPodScanner(t *testing.T) { }, }, }, + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSGuaranteed, + }, } fakeCli = fakeclient.NewSimpleClientset(pod) resScan.(*PodResourcesScanner).k8sClient = fakeCli @@ -536,6 +548,9 @@ func TestPodScanner(t *testing.T) { }, }, }, + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSGuaranteed, + }, } fakeCli = fakeclient.NewSimpleClientset(pod) resScan.(*PodResourcesScanner).k8sClient = fakeCli @@ -628,6 +643,9 @@ func TestPodScanner(t *testing.T) { }, }, }, + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSGuaranteed, + }, } fakeCli = fakeclient.NewSimpleClientset(pod) resScan.(*PodResourcesScanner).k8sClient = fakeCli @@ -825,6 +843,9 @@ func TestPodScanner(t *testing.T) { }, }, }, + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSGuaranteed, + }, } fakeCli = fakeclient.NewSimpleClientset(pod) resScan.(*PodResourcesScanner).k8sClient = fakeCli @@ -1029,6 +1050,9 @@ func TestPodScanner(t *testing.T) { }, }, }, + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSGuaranteed, + }, } fakeCli = fakeclient.NewSimpleClientset(pod) resScan.(*PodResourcesScanner).k8sClient = fakeCli @@ -1113,6 +1137,9 @@ func TestPodScanner(t *testing.T) { }, }, }, + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSGuaranteed, + }, } fakeCli = fakeclient.NewSimpleClientset(pod) resScan.(*PodResourcesScanner).k8sClient = fakeCli @@ -1145,5 +1172,129 @@ func TestPodScanner(t *testing.T) { So(reflect.DeepEqual(res.PodResources, expected), ShouldBeTrue) }) + Convey("When I successfully get valid response for guaranteed pods with not cpu pin containers", func() { + resp := &v1.ListPodResourcesResponse{ + PodResources: []*v1.PodResources{ + { + Name: "test-pod-0", + Namespace: "pod-res-test", + Containers: []*v1.ContainerResources{ + { + Name: "test-cnt-0", + CpuIds: []int64{0, 1}, + Devices: []*v1.ContainerDevices{ + { + ResourceName: "fake.io/resource", + DeviceIds: []string{"devA"}, + }, + }, + }, + { + Name: "test-cnt-1", + Devices: []*v1.ContainerDevices{ + { + ResourceName: "fake.io/resource", + DeviceIds: []string{"devA"}, + }, + }, + }, + }, + }, + }, + } + mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil) + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-0", + Namespace: "pod-res-test", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-cnt-0", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + + corev1.ResourceName("fake.io/resource"): *resource.NewQuantity(1, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI), + corev1.ResourceCPU: resource.MustParse("2"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceName("fake.io/resource"): *resource.NewQuantity(1, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI), + corev1.ResourceCPU: resource.MustParse("2"), + }, + }, + }, + { + Name: "test-cnt-1", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + + corev1.ResourceName("fake.io/resource"): *resource.NewQuantity(1, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI), + corev1.ResourceCPU: resource.MustParse("1500m"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceName("fake.io/resource"): *resource.NewQuantity(1, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI), + corev1.ResourceCPU: resource.MustParse("1500m"), + }, + }, + }, + }, + }, + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSGuaranteed, + }, + } + fakeCli = fakeclient.NewSimpleClientset(pod) + resScan.(*PodResourcesScanner).k8sClient = fakeCli + res, err := resScan.Scan() + + Convey("Error is nil", func() { + So(err, ShouldBeNil) + }) + Convey("Return PodResources should have values", func() { + So(len(res.PodResources), ShouldBeGreaterThan, 0) + }) + + expected := []PodResources{ + { + Name: "test-pod-0", + Namespace: "pod-res-test", + Containers: []ContainerResources{ + { + Name: "test-cnt-0", + Resources: []ResourceInfo{ + { + Name: corev1.ResourceCPU, + Data: []string{"0", "1"}, + }, + { + Name: "fake.io/resource", + Data: []string{"devA"}, + }, + }, + }, + { + Name: "test-cnt-1", + Resources: []ResourceInfo{ + { + Name: "fake.io/resource", + Data: []string{"devA"}, + }, + }, + }, + }, + }, + } + So(reflect.DeepEqual(res.PodResources, expected), ShouldBeTrue) + }) + }) } diff --git a/test/e2e/utils/pod/pod.go b/test/e2e/utils/pod/pod.go index 1029dca0c1..0eab766c60 100644 --- a/test/e2e/utils/pod/pod.go +++ b/test/e2e/utils/pod/pod.go @@ -61,6 +61,9 @@ func GuaranteedSleeper(opts ...func(pod *corev1.Pod)) *corev1.Pod { }, }, }, + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSGuaranteed, + }, } for _, o := range opts { o(p) From f740e1dede26be964284927570f147250f83a3e1 Mon Sep 17 00:00:00 2001 From: AllenXu93 Date: Wed, 8 Jan 2025 11:04:34 +0800 Subject: [PATCH 3/4] update isIntegralGuaranteed --- pkg/resourcemonitor/podresourcesscanner.go | 7 +------ pkg/resourcemonitor/podresourcesscanner_test.go | 3 --- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/pkg/resourcemonitor/podresourcesscanner.go b/pkg/resourcemonitor/podresourcesscanner.go index 67a8e32ce4..8787840702 100644 --- a/pkg/resourcemonitor/podresourcesscanner.go +++ b/pkg/resourcemonitor/podresourcesscanner.go @@ -63,12 +63,7 @@ func (resMon *PodResourcesScanner) isWatchable(podNamespace string, podName stri return false, false, err } - // Check Pod is guaranteed QOS class and has exclusive CPUs or devices - if pod.Status.QOSClass != corev1.PodQOSGuaranteed { - return false, false, nil - } - - isIntegralGuaranteed := hasExclusiveCPUs(pod) + isIntegralGuaranteed := pod.Status.QOSClass == corev1.PodQOSGuaranteed && hasExclusiveCPUs(pod) if resMon.namespace == "*" && (isIntegralGuaranteed || hasDevice) { return true, isIntegralGuaranteed, nil diff --git a/pkg/resourcemonitor/podresourcesscanner_test.go b/pkg/resourcemonitor/podresourcesscanner_test.go index 1c63070095..554f44d36d 100644 --- a/pkg/resourcemonitor/podresourcesscanner_test.go +++ b/pkg/resourcemonitor/podresourcesscanner_test.go @@ -548,9 +548,6 @@ func TestPodScanner(t *testing.T) { }, }, }, - Status: corev1.PodStatus{ - QOSClass: corev1.PodQOSGuaranteed, - }, } fakeCli = fakeclient.NewSimpleClientset(pod) resScan.(*PodResourcesScanner).k8sClient = fakeCli From 5d40f7966ebd38dd59a8d3f898d641f41439fc87 Mon Sep 17 00:00:00 2001 From: AllenXu93 Date: Wed, 8 Jan 2025 17:23:05 +0800 Subject: [PATCH 4/4] rewrite isWatchable in pod scan --- pkg/resourcemonitor/podresourcesscanner.go | 52 +++++++++------------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/pkg/resourcemonitor/podresourcesscanner.go b/pkg/resourcemonitor/podresourcesscanner.go index 8787840702..cbecf9105e 100644 --- a/pkg/resourcemonitor/podresourcesscanner.go +++ b/pkg/resourcemonitor/podresourcesscanner.go @@ -19,10 +19,10 @@ package resourcemonitor import ( "context" "fmt" + "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" "strconv" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" client "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" @@ -57,58 +57,49 @@ func NewPodResourcesScanner(namespace string, podResourceClient podresourcesapi. } // isWatchable tells if the the given namespace should be watched. -func (resMon *PodResourcesScanner) isWatchable(podNamespace string, podName string, hasDevice bool) (bool, bool, error) { - pod, err := resMon.k8sClient.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) +// In Scan(), if watchable is false, this pods scan will skip +// so we can return directly if pod's namespace is not watchable +func (resMon *PodResourcesScanner) isWatchable(podResource *podresourcesapi.PodResources) (bool, bool, error) { + if resMon.namespace != "*" && resMon.namespace != podResource.Namespace { + return false, false, nil + } + + pod, err := resMon.k8sClient.CoreV1().Pods(podResource.Namespace).Get(context.TODO(), podResource.Name, metav1.GetOptions{}) if err != nil { return false, false, err } - isIntegralGuaranteed := pod.Status.QOSClass == corev1.PodQOSGuaranteed && hasExclusiveCPUs(pod) + podHasExclusiveCPUs := hasExclusiveCPUs(pod) + isPodGuaranteed := qos.GetPodQOS(pod) == corev1.PodQOSGuaranteed - if resMon.namespace == "*" && (isIntegralGuaranteed || hasDevice) { - return true, isIntegralGuaranteed, nil - } - // TODO: add an explicit check for guaranteed pods and pods with devices - return resMon.namespace == podNamespace && (isIntegralGuaranteed || hasDevice), isIntegralGuaranteed, nil + return isPodGuaranteed || hasDevice(podResource), podHasExclusiveCPUs, nil } // hasExclusiveCPUs returns true if a guaranteed pod is allocated exclusive CPUs else returns false. // In isWatchable() function we check for the pod QoS and proceed if it is guaranteed (i.e. request == limit) // and hence we only check for request in the function below. func hasExclusiveCPUs(pod *corev1.Pod) bool { - var totalCPU int64 - var cpuQuantity resource.Quantity for _, container := range pod.Spec.InitContainers { - - var ok bool - if cpuQuantity, ok = container.Resources.Requests[corev1.ResourceCPU]; !ok { - continue - } - totalCPU += cpuQuantity.Value() - isInitContainerGuaranteed := hasIntegralCPUs(&container) - if isInitContainerGuaranteed { + if hasIntegralCPUs(&container) { return true } } for _, container := range pod.Spec.Containers { - var ok bool - if cpuQuantity, ok = container.Resources.Requests[corev1.ResourceCPU]; !ok { - continue - } - totalCPU += cpuQuantity.Value() - isAppContainerGuaranteed := hasIntegralCPUs(&container) - if isAppContainerGuaranteed { + if hasIntegralCPUs(&container) { return true } } - //No CPUs requested in all the containers in the pod + //No integralCPUs requested in all the containers of the pod return false } // hasIntegralCPUs returns true if a container in pod is requesting integral CPUs else returns false func hasIntegralCPUs(container *corev1.Container) bool { - cpuQuantity := container.Resources.Requests[corev1.ResourceCPU] + cpuQuantity, ok := container.Resources.Requests[corev1.ResourceCPU] + if !ok { + return false + } return cpuQuantity.Value()*1000 == cpuQuantity.MilliValue() } @@ -146,8 +137,7 @@ func (resMon *PodResourcesScanner) Scan() (ScanResponse, error) { for _, podResource := range respPodResources { klog.InfoS("scanning pod", "podName", podResource.GetName()) - hasDevice := hasDevice(podResource) - isWatchable, isIntegralGuaranteed, err := resMon.isWatchable(podResource.GetNamespace(), podResource.GetName(), hasDevice) + isWatchable, isExclusiveCPUs, err := resMon.isWatchable(podResource) if err != nil { return ScanResponse{}, fmt.Errorf("checking if pod in a namespace is watchable, namespace:%v, pod name %v: %w", podResource.GetNamespace(), podResource.GetName(), err) } @@ -166,7 +156,7 @@ func (resMon *PodResourcesScanner) Scan() (ScanResponse, error) { } cpuIDs := container.GetCpuIds() - if len(cpuIDs) > 0 && isIntegralGuaranteed { + if len(cpuIDs) > 0 && isExclusiveCPUs { var resCPUs []string for _, cpuID := range container.GetCpuIds() { resCPUs = append(resCPUs, strconv.FormatInt(cpuID, 10))