Skip to content

Commit

Permalink
refactor: refactor Option
Browse files Browse the repository at this point in the history
  • Loading branch information
anthhub committed Jul 29, 2021
1 parent c620beb commit 8a0abfa
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 189 deletions.
45 changes: 45 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package forwarder

import (
v1 "k8s.io/api/core/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
)

type portForwardAPodRequest struct {
RestConfig *rest.Config // RestConfig is the kubernetes config
Pod v1.Pod // Pod is the selected pod for this port forwarding
LocalPort int // LocalPort is the local port that will be selected to expose the PodPort
PodPort int // PodPort is the target port for the pod
Streams genericclioptions.IOStreams // Steams configures where to write or read input from
StopCh <-chan struct{} // StopCh is the channel used to manage the port forward lifecycle
ReadyCh chan struct{} // ReadyCh communicates when the tunnel is ready to receive traffic
}

type carry struct {
StopCh chan struct{} // StopCh is the channel used to manage the port forward lifecycle
ReadyCh chan struct{} // ReadyCh communicates when the tunnel is ready to receive traffic
PF *portforward.PortForwarder // the instance of Portforwarder
}

type PodOption struct {
LocalPort int // the local port for forwarding
PodPort int // the k8s pod port
Pod v1.Pod // the k8s pod metadata
}

type Option struct {
LocalPort int // the local port for forwarding
RemotePort int // the remote port port for forwarding
Namespace string // the k8s namespace metadata
PodName string // the k8s pod metadata
ServiceName string // the k8s service metadata
Source string // the k8s source string, eg: svc/my-nginx-svc po/my-nginx-66b6c48dd5-ttdb2
}

type Result struct {
Close func() // close the port forwarding
Ready func() ([][]portforward.ForwardedPort, error) // block till the forwarding ready
Wait func() // block and listen IOStreams close signal
}
28 changes: 5 additions & 23 deletions example/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (

"github.com/anthhub/forwarder"
"github.com/gin-gonic/gin"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/namsral/flag"
v1 "k8s.io/api/core/v1"
)

func setupRouter() *gin.Engine {
Expand All @@ -38,30 +36,14 @@ func main() {

options := []*forwarder.Option{
{
// the local port for forwarding
LocalPort: 8080,
// the k8s pod port
PodPort: 80,
// the k8s pod metadata
Pod: v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "my-nginx-66b6c48dd5-ttdb2",
Namespace: "default",
},
},
LocalPort: 8080,
RemotePort: 80,
ServiceName: "my-nginx-svc",
},
{
// if local port isn't provided, forwarder will generate a random port number
// LocalPort: 8081,
PodPort: 80,
// the k8s service metadata, it's to forward service
Service: v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "my-nginx-svc",
// the namespace default is "default"
// Namespace: "default",
},
},
// RemotePort: 80,
Source: "po/my-nginx-66b6c48dd5-ttdb2",
},
}

Expand Down
137 changes: 9 additions & 128 deletions forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,49 +12,15 @@ import (
"syscall"

"golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)

var once sync.Once

type portForwardAPodRequest struct {
RestConfig *rest.Config // RestConfig is the kubernetes config
Pod v1.Pod // Pod is the selected pod for this port forwarding
LocalPort int // LocalPort is the local port that will be selected to expose the PodPort
PodPort int // PodPort is the target port for the pod
Streams genericclioptions.IOStreams // Steams configures where to write or read input from
StopCh <-chan struct{} // StopCh is the channel used to manage the port forward lifecycle
ReadyCh chan struct{} // ReadyCh communicates when the tunnel is ready to receive traffic
}

type carry struct {
StopCh chan struct{}
ReadyCh chan struct{}
PF *portforward.PortForwarder
}

type Option struct {
LocalPort int // the local port for forwarding
PodPort int // the k8s pod port
Pod v1.Pod // the k8s pod metadata
Service v1.Service // the k8s service metadata
}

type Result struct {
Close func() // close the port forwarding
Ready func() ([][]portforward.ForwardedPort, error) // block till the forwarding ready
Wait func() // block and listen IOStreams close signal
}

// It is to forward port for k8s cloud services.
func WithForwarders(ctx context.Context, options []*Option, kubeconfig string) (*Result, error) {
if kubeconfig == "" {
Expand All @@ -66,7 +32,12 @@ func WithForwarders(ctx context.Context, options []*Option, kubeconfig string) (
return nil, err
}

newOptions, err := handleOptions(ctx, options, config)
newOptions, err := parseOptions(options)
if err != nil {
return nil, err
}

podOptions, err := handleOptions(ctx, newOptions, config)
if err != nil {
return nil, err
}
Expand All @@ -77,11 +48,11 @@ func WithForwarders(ctx context.Context, options []*Option, kubeconfig string) (
ErrOut: os.Stderr,
}

carries := make([]*carry, len(newOptions))
carries := make([]*carry, len(podOptions))

var g errgroup.Group

for index, option := range newOptions {
for index, option := range podOptions {
index := index
stopCh := make(chan struct{}, 1)
readyCh := make(chan struct{})
Expand Down Expand Up @@ -148,13 +119,8 @@ func WithForwarders(ctx context.Context, options []*Option, kubeconfig string) (
}

func portForwardAPod(req *portForwardAPodRequest) (*portforward.PortForwarder, error) {
namespace := req.Pod.Namespace
if namespace == "" {
namespace = "default"
}

path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward",
namespace, req.Pod.Name)
req.Pod.Namespace, req.Pod.Name)
hostIP := strings.TrimLeft(req.RestConfig.Host, "htps:/")

transport, upgrader, err := spdy.RoundTripperFor(req.RestConfig)
Expand All @@ -176,88 +142,3 @@ func portForwardAPod(req *portForwardAPodRequest) (*portforward.PortForwarder, e

return fw, nil
}

func handleOptions(ctx context.Context, options []*Option, config *restclient.Config) ([]*Option, error) {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
newOptions := make([]*Option, len(options))
var g errgroup.Group

for index, option := range options {
option := option
index := index

g.Go(func() error {
podName := option.Pod.ObjectMeta.Name

if podName != "" {
namespace := option.Pod.ObjectMeta.Namespace
if namespace == "" {
namespace = "default"
}
pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return err
}
if pod == nil {
return fmt.Errorf("no such pod: %v", podName)
}

newOptions[index] = option
return nil
}

svcName := option.Service.ObjectMeta.Name
if svcName == "" {
return fmt.Errorf("please provide a pod or service")
}
namespace := option.Service.ObjectMeta.Namespace
if namespace == "" {
namespace = "default"
}

svc, err := clientset.CoreV1().Services(namespace).Get(ctx, svcName, metav1.GetOptions{})
if err != nil {
return err
}
if svc == nil {
return fmt.Errorf("no such service: %v", svcName)
}
labels := []string{}
for key, val := range svc.Spec.Selector {
labels = append(labels, key+"="+val)
}
label := strings.Join(labels, ",")

pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: label, Limit: 1})
if err != nil {
return err
}
if len(pods.Items) == 0 {
return fmt.Errorf("no such pods of the service of %v", svcName)
}
pod := pods.Items[0]

fmt.Printf("\nForwarding service: %v to pod %v ...\n\n", svcName, pod.Name)

newOptions[index] = &Option{
LocalPort: option.LocalPort,
PodPort: option.PodPort,
Pod: v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
},
}
return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}
return newOptions, nil
}
26 changes: 7 additions & 19 deletions forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"testing"

"github.com/namsral/flag"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestBasic(t *testing.T) {
Expand All @@ -21,28 +19,18 @@ func TestBasic(t *testing.T) {
`)
flag.Parse()
fmt.Printf("kubecfg: %v", kubecfg)
fmt.Printf("kubecfg: %v\n", kubecfg)

options := []*Option{
{
// LocalPort: 8080,
PodPort: 80,
Service: v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "my-nginx-svc",
// Namespace: "default",
},
},
LocalPort: 8080,
RemotePort: 80,
ServiceName: "my-nginx-svc",
},
{
LocalPort: 8081,
PodPort: 80,
Pod: v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "my-nginx-66b6c48dd5-ttdb2",
// Namespace: "default",
},
},
// LocalPort: 8081,
// RemotePort: 80,
Source: "po/my-nginx-66b6c48dd5-ttdb2",
},
}

Expand Down
34 changes: 15 additions & 19 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,29 @@ go get github.com/anthhub/forwarder
options := []*forwarder.Option{
{
// the local port for forwarding
LocalPort: 8080,
LocalPort: 8080,
// the k8s pod port
PodPort: 80,
// the k8s pod metadata
Pod: v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-deployment-66b6c48dd5-5pkwm",
Namespace: "default",
},
},
RemotePort: 80,
// the forwarding service name
ServiceName: "my-nginx-svc",
// the k8s source string, eg: svc/my-nginx-svc po/my-nginx-666
// the Source field will be parsed and override ServiceName or RemotePort field
Source: "svc/my-nginx-66b6c48dd5-ttdb2",
// namespace default is "default"
Namespace: "default"
},
{
{
// if local port isn't provided, forwarder will generate a random port number
// LocalPort: 8081,
PodPort: 80,
// the k8s service metadata, it's to forward service
Service: v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "my-nginx-svc",
// the namespace default is "default"
// Namespace: "default",
},
},
//
// if target port isn't provided, forwarder find the first container port of the pod or service
// RemotePort: 80,
Source: "po/my-nginx-66b6c48dd5-ttdb2",
},
}

// it's to create a forwarder, and you need provide a path of kubeconfig
// the path of kubeconfig, default is "~/.kube/config"
ret, err := forwarder.WithForwarders(context.Background(), options, "./kubecfg")
if err != nil {
panic(err)
Expand Down
Loading

0 comments on commit 8a0abfa

Please sign in to comment.