-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathk8s.go
279 lines (254 loc) · 7.99 KB
/
k8s.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
package main
import (
"context"
"fmt"
"os"
"strings"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
type Neighbors struct {
ctx context.Context
clientset *kubernetes.Clientset
a10 *A10
label string
}
type InformerManager interface {
StartInformer()
add(obj interface{})
update(_ interface{}, obj interface{})
delete(obj interface{})
}
// add adds a new node to the A10 device.
// It first checks if the node is eligible, and if so,
// adds the node to the A10 device.
func (n *Neighbors) add(obj interface{}) {
node := obj.(*v1.Node)
logger := logger.With(
"node", node.Name,
)
logger.Info("Node add event")
eligible, address := nodeEligible(node, n.label)
if eligible {
logger.Info("Node should be added")
if err := n.a10.AddNeighbor(address, node.Name); err != nil {
logger.Error("Error adding neighbor to A10:", "error", err)
}
}
}
// update updates a node in the A10 device.
// It first checks if the node is eligible, and if so,
// adds the node to the A10 device.
// If the node is not eligible, it removes the node from the A10 device.
func (n *Neighbors) update(_ interface{}, obj interface{}) {
node := obj.(*v1.Node)
logger := logger.With(
"node", node.Name,
)
logger.Info("Node update event")
eligible, address := nodeEligible(node, n.label)
if eligible {
logger.Info("Node should be added")
if err := n.a10.AddNeighbor(address, node.Name); err != nil {
logger.Error("Error adding neighbor to A10:", "error", err)
}
} else {
logger.Info("Node should be removed")
if err := n.a10.RemoveNeighbor(nodeExternalAddress(node), node.Name); err != nil {
logger.Error("Error removing neighbor from A10:", "error", err)
}
}
}
// delete deletes a node from the A10 device.
// It first checks if the node is labeled, and if so,
// removes the node from the A10 device.
func (n *Neighbors) delete(obj interface{}) {
node := obj.(*v1.Node)
logger := logger.With(
"node", node.Name,
)
logger.Info("Node delete event")
if nodeLabeled(node, n.label) {
logger.Info("Node should be removed")
if err := n.a10.RemoveNeighbor(nodeExternalAddress(node), node.Name); err != nil {
logger.Error("Error removing neighbor from A10:", "error", err)
}
}
}
// StartInformer starts the informer.
// It creates the shared informer factory and uses the client to connect to
// Kubernetes.
func (n *Neighbors) StartInformer() {
// Create the shared informer factory and use the client to connect to
// Kubernetes
factory := informers.NewSharedInformerFactory(n.clientset, 10*time.Minute)
// Get the informer for the right resource, in this case a Node
informer := factory.Core().V1().Nodes().Informer()
// Kubernetes serves an utility to handle API crashes
defer runtime.HandleCrash()
// This is the part where your custom code gets triggered based on the
// event that the shared informer catches
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// When a new node gets created
AddFunc: n.add,
// When a node gets updated
UpdateFunc: n.update,
// When a node gets deleted
DeleteFunc: n.delete,
})
// You need to start the informer, in my case, it runs in the background
go informer.Run(n.ctx.Done())
if !cache.WaitForCacheSync(n.ctx.Done(), informer.HasSynced) {
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
<-n.ctx.Done()
}
// nodeEligible checks if a node is eligible to be added to the A10 device.
// It first checks if the node is ready, not cordoned, has an external address,
// and is labeled.
// Returns true if the node is eligible, false otherwise.
func nodeEligible(node *v1.Node, label string) (bool, string) {
logger := logger.With(
"node", node.Name,
)
logger.Debug("Checking node eligibility")
eligible := false
address := nodeExternalAddress(node)
if nodeReady(node) && !nodeCordoned(node) && address != "" &&
nodeLabeled(node, label) {
eligible = true
}
logger.Info("Node eligible to add to A10", "eligible", eligible)
return eligible, address
}
// nodeReady checks if a node is ready.
// It first checks if the node is ready, and if so,
// returns true. Else, it returns false.
func nodeReady(node *v1.Node) bool {
logger := logger.With(
"node", node.Name,
)
logger.Debug("Checking node readiness")
ready := false
for _, condition := range node.Status.Conditions {
if condition.Type == "Ready" {
ready = condition.Status == v1.ConditionTrue
}
}
logger.Info("Node readiness", "ready", ready)
return ready
}
// nodeCordoned checks if a node is cordoned.
// It first checks if the node is cordoned, and if so,
// returns true. Else, it returns false.
func nodeCordoned(node *v1.Node) bool {
cordoned := node.Spec.Unschedulable
logger := logger.With(
"node", node.Name,
)
logger.Info("Node cordoned", "cordoned", cordoned)
return cordoned
}
// nodeLabeled checks if a node is labeled.
// It first checks if the node is labeled, and if so,
// returns true. Else, it returns false.
func nodeLabeled(node *v1.Node, label string) bool {
logger := logger.With(
"label", label,
"node", node.Name,
)
// split label into key and value
parts := strings.Split(label, "=")
if len(parts) != 2 {
logger.Error("Invalid label format")
return false
}
key := parts[0]
value := parts[1]
logger.Debug("Node labels", "labels", node.Labels)
labeled := node.Labels[key] == value
logger.Info("Node labeled", "key", key, "value", value, "labeled", labeled)
return labeled
}
// nodeExternalAddress gets the external address of a node.
// It first checks if the node has an external address, and if so,
// returns the external address. Else, it returns an empty string.
func nodeExternalAddress(node *v1.Node) string {
logger := logger.With(
"name", node.Name,
)
logger.Debug("Getting node external address")
for _, address := range node.Status.Addresses {
if address.Type == "ExternalIP" {
logger.Info("Node external address", "address", address.Address)
return address.Address
}
}
logger.Debug("Node external address not found")
return ""
}
// getKubernetesClient creates the Kubernetes client.
func getKubernetesClient() (*kubernetes.Clientset, error) {
var config *rest.Config
var err error
logger.Info("Getting Kubernetes client")
// Detect if running inside a Kubernetes cluster or using kubeconfig
if kubeconfig := os.Getenv("KUBECONFIG"); kubeconfig != "" {
// Load kubeconfig file for out-of-cluster use
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, fmt.Errorf("error loading kubeconfig: %w", err)
}
} else {
// Use in-cluster configuration
config, err = rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("error creating in-cluster config: %w", err)
}
}
// Create a new Kubernetes client using the in-cluster config
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("error creating Kubernetes client: %w", err)
}
return clientset, nil
}
type KubeNodes struct {
clientset *kubernetes.Clientset
label string
Nodes []string
}
type KubeNodesManager interface {
GetNodes() error
}
// GetNodes gets the nodes from the Kubernetes cluster.
// It first gets the nodes from the Kubernetes cluster, and then
// checks if the nodes are eligible.
// Returns an error if the operation fails.
func (n *KubeNodes) GetNodes() error {
logger.Info("Getting nodes from k8s")
nodes, err := n.clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{
LabelSelector: n.label,
})
if err != nil {
return fmt.Errorf("error fetching nodes: %w", err)
}
// Find nodes that are ready, not drained and have an external address
// They are bgp neighbors
for _, node := range nodes.Items {
logger.Debug("Checking node", "name", node.Name)
eligible, address := nodeEligible(&node, n.label)
if eligible {
n.Nodes = append(n.Nodes, address)
}
}
return nil
}