From 68a10b0dcefc5371760289e1cfa0a571c0e60dbb Mon Sep 17 00:00:00 2001 From: sharkpc138 Date: Wed, 8 Jan 2025 10:16:18 +0900 Subject: [PATCH] add a pod list cache to continue tailing logs even if the API server or kubelet fails --- pkg/lobster/client/client.go | 16 +++++++++++----- pkg/lobster/distributor/distributor.go | 6 +----- pkg/lobster/sink/exporter/exporter.go | 7 +------ 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/pkg/lobster/client/client.go b/pkg/lobster/client/client.go index ce5a048..85c76bf 100644 --- a/pkg/lobster/client/client.go +++ b/pkg/lobster/client/client.go @@ -24,6 +24,7 @@ import ( "log" "time" + "github.com/golang/glog" v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -43,6 +44,7 @@ type Client struct { hostName string *kubernetes.Clientset timeout time.Duration + cache map[string]v1.Pod } func New() (Client, error) { @@ -56,10 +58,10 @@ func New() (Client, error) { clientset, err := kubernetes.NewForConfig(config) - return Client{*conf.HostName, clientset, timeout}, err + return Client{*conf.HostName, clientset, timeout, map[string]v1.Pod{}}, err } -func (c Client) GetPods() (map[string]v1.Pod, error) { +func (c *Client) GetPods() map[string]v1.Pod { podMap := map[string]v1.Pod{} podList := v1.PodList{} @@ -71,16 +73,20 @@ func (c Client) GetPods() (map[string]v1.Pod, error) { AbsPath(fmt.Sprintf("/api/v1/nodes/%s/proxy/pods", c.hostName)). DoRaw(ctx) if err != nil { - return podMap, err + glog.Warningf("using cached pod information: failed to make a request to the k8s API server or kubelet: %s", err.Error()) + return c.cache } if err := json.Unmarshal(data, &podList); err != nil { - return podMap, err + glog.Warningf("using cached pod information: failed to unmarshal the pod list response: %s", err.Error()) + return c.cache } for _, pod := range podList.Items { podMap[string(pod.UID)] = pod } - return podMap, nil + c.cache = podMap + + return podMap } diff --git a/pkg/lobster/distributor/distributor.go b/pkg/lobster/distributor/distributor.go index 5ee934a..faa2124 100644 --- a/pkg/lobster/distributor/distributor.go +++ b/pkg/lobster/distributor/distributor.go @@ -82,11 +82,7 @@ func (d *Distributor) Run(stopChan chan struct{}) { for { select { case <-inspectTicker.C: - podMap, err := d.client.GetPods() - if err != nil { - glog.Error(err) - continue - } + podMap := d.client.GetPods() if len(podMap) == 0 { panic("no pods found") diff --git a/pkg/lobster/sink/exporter/exporter.go b/pkg/lobster/sink/exporter/exporter.go index 750cc01..9d00e58 100644 --- a/pkg/lobster/sink/exporter/exporter.go +++ b/pkg/lobster/sink/exporter/exporter.go @@ -76,12 +76,7 @@ func (e *LogExporter) Run(stopChan chan struct{}) { now := time.Now() current := now.Truncate(time.Second) newOrders := map[string]order.Order{} - - podMap, err := e.client.GetPods() - if err != nil { - glog.Error(err) - continue - } + podMap := e.client.GetPods() if len(podMap) == 0 { panic("no pods found")