From 8a0abfaaad27663737a2c752576f3c9554db9ec7 Mon Sep 17 00:00:00 2001 From: anthhub Date: Thu, 29 Jul 2021 15:32:39 +0800 Subject: [PATCH] refactor: refactor Option --- const.go | 45 +++++++++++++ example/httpserver.go | 28 ++------ forwarder.go | 137 +++------------------------------------- forwarder_test.go | 26 ++------ readme.md | 34 +++++----- utils.go | 144 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 225 insertions(+), 189 deletions(-) create mode 100644 const.go create mode 100644 utils.go diff --git a/const.go b/const.go new file mode 100644 index 0000000..09e8ece --- /dev/null +++ b/const.go @@ -0,0 +1,45 @@ +package forwarder + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" +) + +type portForwardAPodRequest struct { + RestConfig *rest.Config // RestConfig is the kubernetes config + Pod v1.Pod // Pod is the selected pod for this port forwarding + LocalPort int // LocalPort is the local port that will be selected to expose the PodPort + PodPort int // PodPort is the target port for the pod + Streams genericclioptions.IOStreams // Steams configures where to write or read input from + StopCh <-chan struct{} // StopCh is the channel used to manage the port forward lifecycle + ReadyCh chan struct{} // ReadyCh communicates when the tunnel is ready to receive traffic +} + +type carry struct { + StopCh chan struct{} // StopCh is the channel used to manage the port forward lifecycle + ReadyCh chan struct{} // ReadyCh communicates when the tunnel is ready to receive traffic + PF *portforward.PortForwarder // the instance of Portforwarder +} + +type PodOption struct { + LocalPort int // the local port for forwarding + PodPort int // the k8s pod port + Pod v1.Pod // the k8s pod metadata +} + +type Option struct { + LocalPort int // the local port for forwarding + RemotePort int // the remote port port for forwarding + Namespace string // the k8s namespace metadata + PodName string // the k8s pod metadata + ServiceName string // the k8s service metadata + Source string // the k8s source string, eg: svc/my-nginx-svc po/my-nginx-66b6c48dd5-ttdb2 +} + +type Result struct { + Close func() // close the port forwarding + Ready func() ([][]portforward.ForwardedPort, error) // block till the forwarding ready + Wait func() // block and listen IOStreams close signal +} diff --git a/example/httpserver.go b/example/httpserver.go index c110778..4471eca 100644 --- a/example/httpserver.go +++ b/example/httpserver.go @@ -8,10 +8,8 @@ import ( "github.com/anthhub/forwarder" "github.com/gin-gonic/gin" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/namsral/flag" - v1 "k8s.io/api/core/v1" ) func setupRouter() *gin.Engine { @@ -38,30 +36,14 @@ func main() { options := []*forwarder.Option{ { - // the local port for forwarding - LocalPort: 8080, - // the k8s pod port - PodPort: 80, - // the k8s pod metadata - Pod: v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "my-nginx-66b6c48dd5-ttdb2", - Namespace: "default", - }, - }, + LocalPort: 8080, + RemotePort: 80, + ServiceName: "my-nginx-svc", }, { - // if local port isn't provided, forwarder will generate a random port number // LocalPort: 8081, - PodPort: 80, - // the k8s service metadata, it's to forward service - Service: v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "my-nginx-svc", - // the namespace default is "default" - // Namespace: "default", - }, - }, + // RemotePort: 80, + Source: "po/my-nginx-66b6c48dd5-ttdb2", }, } diff --git a/forwarder.go b/forwarder.go index a84b638..8bc2094 100644 --- a/forwarder.go +++ b/forwarder.go @@ -12,13 +12,8 @@ import ( "syscall" "golang.org/x/sync/errgroup" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/cli-runtime/pkg/genericclioptions" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" @@ -26,35 +21,6 @@ import ( var once sync.Once -type portForwardAPodRequest struct { - RestConfig *rest.Config // RestConfig is the kubernetes config - Pod v1.Pod // Pod is the selected pod for this port forwarding - LocalPort int // LocalPort is the local port that will be selected to expose the PodPort - PodPort int // PodPort is the target port for the pod - Streams genericclioptions.IOStreams // Steams configures where to write or read input from - StopCh <-chan struct{} // StopCh is the channel used to manage the port forward lifecycle - ReadyCh chan struct{} // ReadyCh communicates when the tunnel is ready to receive traffic -} - -type carry struct { - StopCh chan struct{} - ReadyCh chan struct{} - PF *portforward.PortForwarder -} - -type Option struct { - LocalPort int // the local port for forwarding - PodPort int // the k8s pod port - Pod v1.Pod // the k8s pod metadata - Service v1.Service // the k8s service metadata -} - -type Result struct { - Close func() // close the port forwarding - Ready func() ([][]portforward.ForwardedPort, error) // block till the forwarding ready - Wait func() // block and listen IOStreams close signal -} - // It is to forward port for k8s cloud services. func WithForwarders(ctx context.Context, options []*Option, kubeconfig string) (*Result, error) { if kubeconfig == "" { @@ -66,7 +32,12 @@ func WithForwarders(ctx context.Context, options []*Option, kubeconfig string) ( return nil, err } - newOptions, err := handleOptions(ctx, options, config) + newOptions, err := parseOptions(options) + if err != nil { + return nil, err + } + + podOptions, err := handleOptions(ctx, newOptions, config) if err != nil { return nil, err } @@ -77,11 +48,11 @@ func WithForwarders(ctx context.Context, options []*Option, kubeconfig string) ( ErrOut: os.Stderr, } - carries := make([]*carry, len(newOptions)) + carries := make([]*carry, len(podOptions)) var g errgroup.Group - for index, option := range newOptions { + for index, option := range podOptions { index := index stopCh := make(chan struct{}, 1) readyCh := make(chan struct{}) @@ -148,13 +119,8 @@ func WithForwarders(ctx context.Context, options []*Option, kubeconfig string) ( } func portForwardAPod(req *portForwardAPodRequest) (*portforward.PortForwarder, error) { - namespace := req.Pod.Namespace - if namespace == "" { - namespace = "default" - } - path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", - namespace, req.Pod.Name) + req.Pod.Namespace, req.Pod.Name) hostIP := strings.TrimLeft(req.RestConfig.Host, "htps:/") transport, upgrader, err := spdy.RoundTripperFor(req.RestConfig) @@ -176,88 +142,3 @@ func portForwardAPod(req *portForwardAPodRequest) (*portforward.PortForwarder, e return fw, nil } - -func handleOptions(ctx context.Context, options []*Option, config *restclient.Config) ([]*Option, error) { - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - newOptions := make([]*Option, len(options)) - var g errgroup.Group - - for index, option := range options { - option := option - index := index - - g.Go(func() error { - podName := option.Pod.ObjectMeta.Name - - if podName != "" { - namespace := option.Pod.ObjectMeta.Namespace - if namespace == "" { - namespace = "default" - } - pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) - if err != nil { - return err - } - if pod == nil { - return fmt.Errorf("no such pod: %v", podName) - } - - newOptions[index] = option - return nil - } - - svcName := option.Service.ObjectMeta.Name - if svcName == "" { - return fmt.Errorf("please provide a pod or service") - } - namespace := option.Service.ObjectMeta.Namespace - if namespace == "" { - namespace = "default" - } - - svc, err := clientset.CoreV1().Services(namespace).Get(ctx, svcName, metav1.GetOptions{}) - if err != nil { - return err - } - if svc == nil { - return fmt.Errorf("no such service: %v", svcName) - } - labels := []string{} - for key, val := range svc.Spec.Selector { - labels = append(labels, key+"="+val) - } - label := strings.Join(labels, ",") - - pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: label, Limit: 1}) - if err != nil { - return err - } - if len(pods.Items) == 0 { - return fmt.Errorf("no such pods of the service of %v", svcName) - } - pod := pods.Items[0] - - fmt.Printf("\nForwarding service: %v to pod %v ...\n\n", svcName, pod.Name) - - newOptions[index] = &Option{ - LocalPort: option.LocalPort, - PodPort: option.PodPort, - Pod: v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: pod.Name, - Namespace: pod.Namespace, - }, - }, - } - return nil - }) - } - - if err := g.Wait(); err != nil { - return nil, err - } - return newOptions, nil -} diff --git a/forwarder_test.go b/forwarder_test.go index f058090..7103afa 100644 --- a/forwarder_test.go +++ b/forwarder_test.go @@ -7,8 +7,6 @@ import ( "testing" "github.com/namsral/flag" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestBasic(t *testing.T) { @@ -21,28 +19,18 @@ func TestBasic(t *testing.T) { `) flag.Parse() - fmt.Printf("kubecfg: %v", kubecfg) + fmt.Printf("kubecfg: %v\n", kubecfg) options := []*Option{ { - // LocalPort: 8080, - PodPort: 80, - Service: v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "my-nginx-svc", - // Namespace: "default", - }, - }, + LocalPort: 8080, + RemotePort: 80, + ServiceName: "my-nginx-svc", }, { - LocalPort: 8081, - PodPort: 80, - Pod: v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "my-nginx-66b6c48dd5-ttdb2", - // Namespace: "default", - }, - }, + // LocalPort: 8081, + // RemotePort: 80, + Source: "po/my-nginx-66b6c48dd5-ttdb2", }, } diff --git a/readme.md b/readme.md index 1a8eb50..2f07c0a 100644 --- a/readme.md +++ b/readme.md @@ -21,33 +21,29 @@ go get github.com/anthhub/forwarder options := []*forwarder.Option{ { // the local port for forwarding - LocalPort: 8080, + LocalPort: 8080, // the k8s pod port - PodPort: 80, - // the k8s pod metadata - Pod: v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "nginx-deployment-66b6c48dd5-5pkwm", - Namespace: "default", - }, - }, + RemotePort: 80, + // the forwarding service name + ServiceName: "my-nginx-svc", + // the k8s source string, eg: svc/my-nginx-svc po/my-nginx-666 + // the Source field will be parsed and override ServiceName or RemotePort field + Source: "svc/my-nginx-66b6c48dd5-ttdb2", + // namespace default is "default" + Namespace: "default" }, - { + { // if local port isn't provided, forwarder will generate a random port number // LocalPort: 8081, - PodPort: 80, - // the k8s service metadata, it's to forward service - Service: v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "my-nginx-svc", - // the namespace default is "default" - // Namespace: "default", - }, - }, + // + // if target port isn't provided, forwarder find the first container port of the pod or service + // RemotePort: 80, + Source: "po/my-nginx-66b6c48dd5-ttdb2", }, } // it's to create a forwarder, and you need provide a path of kubeconfig + // the path of kubeconfig, default is "~/.kube/config" ret, err := forwarder.WithForwarders(context.Background(), options, "./kubecfg") if err != nil { panic(err) diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..376c99e --- /dev/null +++ b/utils.go @@ -0,0 +1,144 @@ +package forwarder + +import ( + "context" + "fmt" + "strings" + + "golang.org/x/sync/errgroup" + v1 "k8s.io/api/core/v1" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" +) + +func parseSource(source string) (*Option, error) { + list := strings.Split(source, "/") + if len(list) != 2 { + return nil, fmt.Errorf("invalid source: %v", source) + } + + kind := list[0] + name := list[1] + + if kind == "svc" || kind == "service" || kind == "services" { + return &Option{ServiceName: name}, nil + } + if kind == "po" || kind == "pod" || kind == "pods" { + return &Option{PodName: name}, nil + } + + return nil, fmt.Errorf("invalid source: %v", source) +} + +func parseOptions(options []*Option) ([]*Option, error) { + newOptions := []*Option{} + + for _, o := range options { + o := o + if o.Namespace == "" { + o.Namespace = "default" + } + if o.Source != "" { + opt, err := parseSource(o.Source) + if err != nil { + return nil, err + } + if opt.ServiceName != "" { + o.ServiceName = opt.ServiceName + } + if opt.PodName != "" { + o.PodName = opt.PodName + } + } + + if o.PodName == "" && o.ServiceName == "" { + return nil, fmt.Errorf("please provide a name of pod or service") + } + + newOptions = append(newOptions, o) + } + + return newOptions, nil +} + +func handleOptions(ctx context.Context, options []*Option, config *restclient.Config) ([]*PodOption, error) { + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + podOptions := make([]*PodOption, len(options)) + + var g errgroup.Group + + for index, option := range options { + option := option + index := index + + g.Go(func() error { + if option.PodName != "" { + pod, err := clientset.CoreV1().Pods(option.Namespace).Get(ctx, option.PodName, metav1.GetOptions{}) + if err != nil { + return err + } + if pod == nil { + return fmt.Errorf("no such pod: %v", option.PodName) + } + + podOptions[index] = buildPodOption(option, pod) + return nil + } + + svc, err := clientset.CoreV1().Services(option.Namespace).Get(ctx, option.ServiceName, metav1.GetOptions{}) + if err != nil { + return err + } + if svc == nil { + return fmt.Errorf("no such service: %+v", option.ServiceName) + } + + labels := []string{} + for key, val := range svc.Spec.Selector { + labels = append(labels, key+"="+val) + } + label := strings.Join(labels, ",") + + pods, err := clientset.CoreV1().Pods(option.Namespace).List(ctx, metav1.ListOptions{LabelSelector: label, Limit: 1}) + if err != nil { + return err + } + if len(pods.Items) == 0 { + return fmt.Errorf("no such pods of the service of %v", option.ServiceName) + } + pod := pods.Items[0] + + fmt.Printf("Forwarding service: %v to pod %v ...\n", option.ServiceName, pod.Name) + + podOptions[index] = buildPodOption(option, &pod) + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + return podOptions, nil +} + +func buildPodOption(option *Option, pod *v1.Pod) *PodOption { + if option.RemotePort == 0 { + option.RemotePort = int(pod.Spec.Containers[0].Ports[0].ContainerPort) + } + + return &PodOption{ + LocalPort: option.LocalPort, + PodPort: option.RemotePort, + Pod: v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + }, + } +}