Skip to content

Commit

Permalink
add a pod list cache to continue tailing logs even if the API server …
Browse files Browse the repository at this point in the history
…or kubelet fails
  • Loading branch information
sharkpc138 committed Jan 8, 2025
1 parent 31d45f6 commit 68a10b0
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 16 deletions.
16 changes: 11 additions & 5 deletions pkg/lobster/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"log"
"time"

"github.com/golang/glog"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -43,6 +44,7 @@ type Client struct {
hostName string
*kubernetes.Clientset
timeout time.Duration
cache map[string]v1.Pod
}

func New() (Client, error) {
Expand All @@ -56,10 +58,10 @@ func New() (Client, error) {

clientset, err := kubernetes.NewForConfig(config)

return Client{*conf.HostName, clientset, timeout}, err
return Client{*conf.HostName, clientset, timeout, map[string]v1.Pod{}}, err
}

func (c Client) GetPods() (map[string]v1.Pod, error) {
func (c *Client) GetPods() map[string]v1.Pod {
podMap := map[string]v1.Pod{}
podList := v1.PodList{}

Expand All @@ -71,16 +73,20 @@ func (c Client) GetPods() (map[string]v1.Pod, error) {
AbsPath(fmt.Sprintf("/api/v1/nodes/%s/proxy/pods", c.hostName)).
DoRaw(ctx)
if err != nil {
return podMap, err
glog.Warningf("using cached pod information: failed to make a request to the k8s API server or kubelet: %s", err.Error())
return c.cache
}

if err := json.Unmarshal(data, &podList); err != nil {
return podMap, err
glog.Warningf("using cached pod information: failed to unmarshal the pod list response: %s", err.Error())
return c.cache
}

for _, pod := range podList.Items {
podMap[string(pod.UID)] = pod
}

return podMap, nil
c.cache = podMap

return podMap
}
6 changes: 1 addition & 5 deletions pkg/lobster/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,7 @@ func (d *Distributor) Run(stopChan chan struct{}) {
for {
select {
case <-inspectTicker.C:
podMap, err := d.client.GetPods()
if err != nil {
glog.Error(err)
continue
}
podMap := d.client.GetPods()

if len(podMap) == 0 {
panic("no pods found")
Expand Down
7 changes: 1 addition & 6 deletions pkg/lobster/sink/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,7 @@ func (e *LogExporter) Run(stopChan chan struct{}) {
now := time.Now()
current := now.Truncate(time.Second)
newOrders := map[string]order.Order{}

podMap, err := e.client.GetPods()
if err != nil {
glog.Error(err)
continue
}
podMap := e.client.GetPods()

if len(podMap) == 0 {
panic("no pods found")
Expand Down

0 comments on commit 68a10b0

Please sign in to comment.