From a0fcb7e7c5d68a281119de96f6987df0fd816c99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan-Luis=20de=20Sousa-Valadas=20Casta=C3=B1o?= Date: Wed, 20 Nov 2024 16:51:44 +0100 Subject: [PATCH 1/5] Add tcpproxy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tcpproxy is a subset of github.com/inetaf/tcpproxy with some modifications: 1- Implement the method SetRoutes to allow to set routes in bulk. Also allows deletion of routes which otherwise would be impossible. 2- Implement round robin load balancing (there was no load balancing at all). 3- Remove unused code. 4- Append Mirantis copyright for the modifications. Additionaly this PR had to do some modifications in the copyright linter script and `.golangci.yaml` because this is a file copied and modified. This is required to Apache 2.0 retribution right. Signed-off-by: Juan-Luis de Sousa-Valadas Castaño --- .golangci.yml | 6 + hack/copyright.sh | 4 + .../controller/cplb/tcpproxy/tcpproxy.go | 492 ++++++++++++++++++ .../controller/cplb/tcpproxy/tcpproxy_test.go | 226 ++++++++ 4 files changed, 728 insertions(+) create mode 100644 pkg/component/controller/cplb/tcpproxy/tcpproxy.go create mode 100644 pkg/component/controller/cplb/tcpproxy/tcpproxy_test.go diff --git a/.golangci.yml b/.golangci.yml index 2b75b3a143c9..c24f2e179ecd 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -109,3 +109,9 @@ issues: - linters: - staticcheck text: "^SA1019:" + + # tcpproxy is copied from https://github.com/inetaf/tcpproxy/, as per + # Apache 2.0 license section 4 (Redistribution) we must keep the original header. + - path: "pkg/component/controller/cplb/tcpproxy/.*" + linters: + - goheader diff --git a/hack/copyright.sh b/hack/copyright.sh index 5a494c13e05b..dd4d701d35e2 100755 --- a/hack/copyright.sh +++ b/hack/copyright.sh @@ -45,6 +45,10 @@ has_date_copyright(){ # Copyright notice aren't related to the date of the document. for i in $(find cmd hack internal inttest pkg static -type f -name '*.go' -not -name 'zz_generated*'); do case "$i" in + pkg/component/controller/cplb/tcpproxy/*) + # These files have a special copyright due to being copied + # from github.com/inetaf/tcpproxy + ;; pkg/client/clientset/*) if ! has_basic_copyright "$i"; then echo "ERROR: $i doesn't have a proper copyright notice" 1>&2 diff --git a/pkg/component/controller/cplb/tcpproxy/tcpproxy.go b/pkg/component/controller/cplb/tcpproxy/tcpproxy.go new file mode 100644 index 000000000000..23b4b7fc52bd --- /dev/null +++ b/pkg/component/controller/cplb/tcpproxy/tcpproxy.go @@ -0,0 +1,492 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Modifications made by Mirantis Inc., 2024. +// Copyright 2017 Google Inc. +// +// Copyright 2024 Mirantis, Inc. + +// Package tcpproxy lets users build TCP proxies + +package tcpproxy + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "log" + "net" + "time" +) + +// Proxy is a proxy. Its zero value is a valid proxy that does +// nothing. Call methods to add routes before calling Start or Run. +// +// The order that routes are added in matters; each is matched in the order +// registered. +type Proxy struct { + configs map[string]*config // ip:port => config + + lns []net.Listener + donec chan struct{} // closed before err + err error // any error from listening + routesChan chan route + + // ListenFunc optionally specifies an alternate listen + // function. If nil, net.Dial is used. + // The provided net is always "tcp". + ListenFunc func(net, laddr string) (net.Listener, error) +} + +// Matcher reports whether hostname matches the Matcher's criteria. +type Matcher func(ctx context.Context, hostname string) bool + +// config contains the proxying state for one listener. +type config struct { + routes []route +} + +// A route matches a connection to a target. +type route interface { + // match examines the initial bytes of a connection, looking for a + // match. If a match is found, match returns a non-nil Target to + // which the stream should be proxied. match returns nil if the + // connection doesn't match. + // + // match must not consume bytes from the given bufio.Reader, it + // can only Peek. + // + // If an sni or host header was parsed successfully, that will be + // returned as the second parameter. + match(*bufio.Reader) (Target, string) +} + +func (p *Proxy) netListen() func(net, laddr string) (net.Listener, error) { + if p.ListenFunc != nil { + return p.ListenFunc + } + return net.Listen +} + +func (p *Proxy) configFor(ipPort string) *config { + if p.configs == nil { + p.configs = make(map[string]*config) + } + if p.configs[ipPort] == nil { + p.configs[ipPort] = &config{} + } + return p.configs[ipPort] +} + +func (p *Proxy) addRoute(ipPort string, r route) { + cfg := p.configFor(ipPort) + cfg.routes = append(cfg.routes, r) +} + +// AddRoute appends an always-matching route to the ipPort listener, +// directing any connection to dest. +// +// This is generally used as either the only rule (for simple TCP +// proxies), or as the final fallback rule for an ipPort. +// +// The ipPort is any valid net.Listen TCP address. +func (p *Proxy) AddRoute(ipPort string, dest Target) { + p.addRoute(ipPort, fixedTarget{dest}) +} + +func (p *Proxy) setRoutes(ipPort string, targets []Target) { + var routes []route + for _, target := range targets { + routes = append(routes, fixedTarget{target}) + } + + cfg := p.configFor(ipPort) + cfg.routes = routes +} + +// SetRoutes replaces routes for the ipPort. +// +// It's possible that the old routes are still used once after this +// function is called. If an empty slice is passed, the routes are +// preserved in order to avoid an infinite loop. +func (p *Proxy) SetRoutes(ipPort string, targets []Target) { + if len(targets) == 0 { + return + } + p.setRoutes(ipPort, targets) +} + +type fixedTarget struct { + t Target +} + +func (m fixedTarget) match(*bufio.Reader) (Target, string) { return m.t, "" } + +// Run is calls Start, and then Wait. +// +// It blocks until there's an error. The return value is always +// non-nil. +func (p *Proxy) Run() error { + if err := p.Start(); err != nil { + return err + } + return p.Wait() +} + +// Wait waits for the Proxy to finish running. Currently this can only +// happen if a Listener is closed, or Close is called on the proxy. +// +// It is only valid to call Wait after a successful call to Start. +func (p *Proxy) Wait() error { + <-p.donec + return p.err +} + +// Close closes all the proxy's self-opened listeners. +func (p *Proxy) Close() error { + for _, c := range p.lns { + c.Close() + } + return nil +} + +// Start creates a TCP listener for each unique ipPort from the +// previously created routes and starts the proxy. It returns any +// error from starting listeners. +// +// If it returns a non-nil error, any successfully opened listeners +// are closed. +func (p *Proxy) Start() error { + if p.donec != nil { + return errors.New("already started") + } + p.donec = make(chan struct{}) + errc := make(chan error, len(p.configs)) + p.lns = make([]net.Listener, 0, len(p.configs)) + for ipPort, config := range p.configs { + ln, err := p.netListen()("tcp", ipPort) + if err != nil { + p.Close() + return err + } + p.lns = append(p.lns, ln) + p.routesChan = make(chan route) + go p.serveListener(errc, ln, config) + } + go p.awaitFirstError(errc) + return nil +} + +func (p *Proxy) awaitFirstError(errc <-chan error) { + p.err = <-errc + close(p.donec) +} + +func (p *Proxy) serveListener(ret chan<- error, ln net.Listener, cfg *config) { + go p.roundRobin(cfg) + for { + c, err := ln.Accept() + if err != nil { + ret <- err + return + } + go p.serveConn(c) + } +} + +// serveConn runs in its own goroutine and matches c against routes. +// It returns whether it matched purely for testing. +func (p *Proxy) serveConn(c net.Conn) bool { + br := bufio.NewReader(c) + for route := range p.routesChan { + if target, hostName := route.match(br); target != nil { + if n := br.Buffered(); n > 0 { + peeked, _ := br.Peek(br.Buffered()) + c = &Conn{ + HostName: hostName, + Peeked: peeked, + Conn: c, + } + } + target.HandleConn(c) + return true + } + } + // TODO: hook for this? + log.Printf("tcpproxy: no routes matched conn %v/%v; closing", c.RemoteAddr().String(), c.LocalAddr().String()) + c.Close() + return false +} + +// roundRobin writes to a channel the next route to use. +func (p *Proxy) roundRobin(cfg *config) { + for { + for _, route := range cfg.routes { + p.routesChan <- route + } + } +} + +// Conn is an incoming connection that has had some bytes read from it +// to determine how to route the connection. The Read method stitches +// the peeked bytes and unread bytes back together. +type Conn struct { + // HostName is the hostname field that was sent to the request router. + // In the case of TLS, this is the SNI header, in the case of HTTPHost + // route, it will be the host header. In the case of a fixed + // route, i.e. those created with AddRoute(), this will always be + // empty. This can be useful in the case where further routing decisions + // need to be made in the Target impementation. + HostName string + + // Peeked are the bytes that have been read from Conn for the + // purposes of route matching, but have not yet been consumed + // by Read calls. It set to nil by Read when fully consumed. + Peeked []byte + + // Conn is the underlying connection. + // It can be type asserted against *net.TCPConn or other types + // as needed. It should not be read from directly unless + // Peeked is nil. + net.Conn +} + +func (c *Conn) Read(p []byte) (n int, err error) { + if len(c.Peeked) > 0 { + n = copy(p, c.Peeked) + c.Peeked = c.Peeked[n:] + if len(c.Peeked) == 0 { + c.Peeked = nil + } + return n, nil + } + return c.Conn.Read(p) +} + +// Target is what an incoming matched connection is sent to. +type Target interface { + // HandleConn is called when an incoming connection is + // matched. After the call to HandleConn, the tcpproxy + // package never touches the conn again. Implementations are + // responsible for closing the connection when needed. + // + // The concrete type of conn will be of type *Conn if any + // bytes have been consumed for the purposes of route + // matching. + HandleConn(net.Conn) +} + +// To is shorthand way of writing &tcpproxy.DialProxy{Addr: addr}. +func To(addr string) *DialProxy { + return &DialProxy{Addr: addr} +} + +// DialProxy implements Target by dialing a new connection to Addr +// and then proxying data back and forth. +// +// The To func is a shorthand way of creating a DialProxy. +type DialProxy struct { + // Addr is the TCP address to proxy to. + Addr string + + // KeepAlivePeriod sets the period between TCP keep alives. + // If zero, a default is used. To disable, use a negative number. + // The keep-alive is used for both the client connection and + KeepAlivePeriod time.Duration + + // DialTimeout optionally specifies a dial timeout. + // If zero, a default is used. + // If negative, the timeout is disabled. + DialTimeout time.Duration + + // DialContext optionally specifies an alternate dial function + // for TCP targets. If nil, the standard + // net.Dialer.DialContext method is used. + DialContext func(ctx context.Context, network, address string) (net.Conn, error) + + // OnDialError optionally specifies an alternate way to handle errors dialing Addr. + // If nil, the error is logged and src is closed. + // If non-nil, src is not closed automatically. + OnDialError func(src net.Conn, dstDialErr error) + + // ProxyProtocolVersion optionally specifies the version of + // HAProxy's PROXY protocol to use. The PROXY protocol provides + // connection metadata to the DialProxy target, via a header + // inserted ahead of the client's traffic. The DialProxy target + // must explicitly support and expect the PROXY header; there is + // no graceful downgrade. + // If zero, no PROXY header is sent. Currently, version 1 is supported. + ProxyProtocolVersion int +} + +// UnderlyingConn returns c.Conn if c of type *Conn, +// otherwise it returns c. +func UnderlyingConn(c net.Conn) net.Conn { + if wrap, ok := c.(*Conn); ok { + return wrap.Conn + } + return c +} + +func tcpConn(c net.Conn) (t *net.TCPConn, ok bool) { + if c, ok := UnderlyingConn(c).(*net.TCPConn); ok { + return c, ok + } + if c, ok := c.(*net.TCPConn); ok { + return c, ok + } + return nil, false +} + +func goCloseConn(c net.Conn) { go c.Close() } + +func closeRead(c net.Conn) { + if c, ok := tcpConn(c); ok { + _ = c.CloseRead() + } +} + +func closeWrite(c net.Conn) { + if c, ok := tcpConn(c); ok { + _ = c.CloseWrite() + } +} + +// HandleConn implements the Target interface. +func (dp *DialProxy) HandleConn(src net.Conn) { + ctx := context.Background() + var cancel context.CancelFunc + if dp.DialTimeout >= 0 { + ctx, cancel = context.WithTimeout(ctx, dp.dialTimeout()) + } + dst, err := dp.dialContext()(ctx, "tcp", dp.Addr) + if cancel != nil { + cancel() + } + if err != nil { + dp.onDialError()(src, err) + return + } + defer goCloseConn(dst) + + if err = dp.sendProxyHeader(dst, src); err != nil { + dp.onDialError()(src, err) + return + } + defer goCloseConn(src) + + if ka := dp.keepAlivePeriod(); ka > 0 { + for _, c := range []net.Conn{src, dst} { + if c, ok := tcpConn(c); ok { + _ = c.SetKeepAlive(true) + _ = c.SetKeepAlivePeriod(ka) + } + } + } + + errc := make(chan error, 2) + go proxyCopy(errc, src, dst) + go proxyCopy(errc, dst, src) + <-errc + <-errc +} + +func (dp *DialProxy) sendProxyHeader(w io.Writer, src net.Conn) error { + switch dp.ProxyProtocolVersion { + case 0: + return nil + case 1: + var srcAddr, dstAddr *net.TCPAddr + if a, ok := src.RemoteAddr().(*net.TCPAddr); ok { + srcAddr = a + } + if a, ok := src.LocalAddr().(*net.TCPAddr); ok { + dstAddr = a + } + + if srcAddr == nil || dstAddr == nil { + _, err := io.WriteString(w, "PROXY UNKNOWN\r\n") + return err + } + + family := "TCP4" + if srcAddr.IP.To4() == nil { + family = "TCP6" + } + _, err := fmt.Fprintf(w, "PROXY %s %s %s %d %d\r\n", family, srcAddr.IP, dstAddr.IP, srcAddr.Port, dstAddr.Port) + return err + default: + return fmt.Errorf("PROXY protocol version %d not supported", dp.ProxyProtocolVersion) + } +} + +// proxyCopy is the function that copies bytes around. +// It's a named function instead of a func literal so users get +// named goroutines in debug goroutine stack dumps. +func proxyCopy(errc chan<- error, dst, src net.Conn) { + defer closeRead(src) + defer closeWrite(dst) + + // Before we unwrap src and/or dst, copy any buffered data. + if wc, ok := src.(*Conn); ok && len(wc.Peeked) > 0 { + if _, err := dst.Write(wc.Peeked); err != nil { + errc <- err + return + } + wc.Peeked = nil + } + + // Unwrap the src and dst from *Conn to *net.TCPConn so Go + // 1.11's splice optimization kicks in. + src = UnderlyingConn(src) + dst = UnderlyingConn(dst) + + _, err := io.Copy(dst, src) + errc <- err +} + +func (dp *DialProxy) keepAlivePeriod() time.Duration { + if dp.KeepAlivePeriod != 0 { + return dp.KeepAlivePeriod + } + return time.Minute +} + +func (dp *DialProxy) dialTimeout() time.Duration { + if dp.DialTimeout > 0 { + return dp.DialTimeout + } + return 10 * time.Second +} + +var defaultDialer = new(net.Dialer) + +func (dp *DialProxy) dialContext() func(ctx context.Context, network, address string) (net.Conn, error) { + if dp.DialContext != nil { + return dp.DialContext + } + return defaultDialer.DialContext +} + +func (dp *DialProxy) onDialError() func(src net.Conn, dstDialErr error) { + if dp.OnDialError != nil { + return dp.OnDialError + } + return func(src net.Conn, dstDialErr error) { + log.Printf("tcpproxy: for incoming conn %v, error dialing %q: %v", src.RemoteAddr().String(), dp.Addr, dstDialErr) + src.Close() + } +} diff --git a/pkg/component/controller/cplb/tcpproxy/tcpproxy_test.go b/pkg/component/controller/cplb/tcpproxy/tcpproxy_test.go new file mode 100644 index 000000000000..777a5c95e4ed --- /dev/null +++ b/pkg/component/controller/cplb/tcpproxy/tcpproxy_test.go @@ -0,0 +1,226 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Modifications made by Mirantis Inc., 2024. +// Copyright 2017 Google Inc. +// +// Copyright 2024 Mirantis, Inc. + +package tcpproxy + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "testing" +) + +func TestProxyStartNone(t *testing.T) { + var p Proxy + if err := p.Start(); err != nil { + t.Fatal(err) + } +} + +func newLocalListener(t *testing.T) net.Listener { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + ln, err = net.Listen("tcp", "[::1]:0") + if err != nil { + t.Fatal(err) + } + } + return ln +} + +const testFrontAddr = "1.2.3.4:567" + +func testListenFunc(t *testing.T, ln net.Listener) func(network, laddr string) (net.Listener, error) { + return func(network, laddr string) (net.Listener, error) { + if network != "tcp" { + t.Errorf("got Listen call with network %q, not tcp", network) + return nil, errors.New("invalid network") + } + if laddr != testFrontAddr { + t.Fatalf("got Listen call with laddr %q, want %q", laddr, testFrontAddr) + panic("bogus address") + } + return ln, nil + } +} + +func testProxy(t *testing.T, front net.Listener) *Proxy { + return &Proxy{ + ListenFunc: testListenFunc(t, front), + } +} + +func TestBufferedClose(t *testing.T) { + front := newLocalListener(t) + defer front.Close() + back := newLocalListener(t) + defer back.Close() + + p := testProxy(t, front) + p.AddRoute(testFrontAddr, To(back.Addr().String())) + if err := p.Start(); err != nil { + t.Fatal(err) + } + + toFront, err := net.Dial("tcp", front.Addr().String()) + if err != nil { + t.Fatal(err) + } + defer toFront.Close() + + fromProxy, err := back.Accept() + if err != nil { + t.Fatal(err) + } + defer fromProxy.Close() + const msg = "message" + if _, err := io.WriteString(toFront, msg); err != nil { + t.Fatal(err) + } + // actively close toFront, the write should still make to the back. + toFront.Close() + + buf := make([]byte, len(msg)) + if _, err := io.ReadFull(fromProxy, buf); err != nil { + t.Fatal(err) + } + if string(buf) != msg { + t.Fatalf("got %q; want %q", buf, msg) + } +} + +func TestProxyAlwaysMatch(t *testing.T) { + front := newLocalListener(t) + defer front.Close() + back := newLocalListener(t) + defer back.Close() + + p := testProxy(t, front) + p.AddRoute(testFrontAddr, To(back.Addr().String())) + if err := p.Start(); err != nil { + t.Fatal(err) + } + + toFront, err := net.Dial("tcp", front.Addr().String()) + if err != nil { + t.Fatal(err) + } + defer toFront.Close() + + fromProxy, err := back.Accept() + if err != nil { + t.Fatal(err) + } + defer fromProxy.Close() + const msg = "message" + _, _ = io.WriteString(toFront, msg) + + buf := make([]byte, len(msg)) + if _, err := io.ReadFull(fromProxy, buf); err != nil { + t.Fatal(err) + } + if string(buf) != msg { + t.Fatalf("got %q; want %q", buf, msg) + } +} + +func TestProxyPROXYOut(t *testing.T) { + front := newLocalListener(t) + defer front.Close() + back := newLocalListener(t) + defer back.Close() + + p := testProxy(t, front) + p.AddRoute(testFrontAddr, &DialProxy{ + Addr: back.Addr().String(), + ProxyProtocolVersion: 1, + }) + if err := p.Start(); err != nil { + t.Fatal(err) + } + + toFront, err := net.Dial("tcp", front.Addr().String()) + if err != nil { + t.Fatal(err) + } + + _, _ = io.WriteString(toFront, "foo") + toFront.Close() + + fromProxy, err := back.Accept() + if err != nil { + t.Fatal(err) + } + + bs, err := ioutil.ReadAll(fromProxy) + if err != nil { + t.Fatal(err) + } + + want := fmt.Sprintf("PROXY TCP4 %s %s %d %d\r\nfoo", toFront.LocalAddr().(*net.TCPAddr).IP, toFront.RemoteAddr().(*net.TCPAddr).IP, toFront.LocalAddr().(*net.TCPAddr).Port, toFront.RemoteAddr().(*net.TCPAddr).Port) + if string(bs) != want { + t.Fatalf("got %q; want %q", bs, want) + } +} + +func TestSetRoutes(t *testing.T) { + + var p Proxy + ipPort := ":8080" + p.AddRoute(ipPort, To("127.0.0.2:8080")) + cfg := p.configFor(ipPort) + + expectedAddrsList := [][]string{ + {"127.0.0.1:80"}, + {"127.0.0.1:80", "127.0.0.1:443"}, + {}, + {"127.0.0.1:80"}, + } + + for _, expectedAddrs := range expectedAddrsList { + p.setRoutes(ipPort, stringsToTargets(expectedAddrs)) + if !equalRoutes(cfg.routes, expectedAddrs) { + t.Fatalf("got %v; want %v", cfg.routes, expectedAddrs) + } + } +} + +func stringsToTargets(s []string) []Target { + targets := make([]Target, len(s)) + for i, v := range s { + targets[i] = To(v) + } + + return targets +} +func equalRoutes(routes []route, expectedAddrs []string) bool { + if len(routes) != len(expectedAddrs) { + return false + } + + for i := range routes { + addr := routes[i].(fixedTarget).t.(*DialProxy).Addr + if addr != expectedAddrs[i] { + return false + } + } + return true +} From 3b3a1757c3d5f83496e290ccb246534b0afc2015 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan-Luis=20de=20Sousa-Valadas=20Casta=C3=B1o?= Date: Wed, 20 Nov 2024 20:12:04 +0100 Subject: [PATCH 2/5] Implement CPLB userspace reverse proxy LB MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit IPVS is problematic for many reasons, implement a userspace load balancer which should get the job done and should be far less problematic. Signed-off-by: Juan-Luis de Sousa-Valadas Castaño --- cmd/controller/controller.go | 7 + cmd/worker/worker.go | 11 +- inttest/Makefile.variables | 3 +- .../cplbipvs_test.go} | 16 +- inttest/cplb-userspace/cplbuserspace_test.go | 223 ++++++++++++++++++ inttest/customports/customports_test.go | 2 +- pkg/apis/k0s/v1beta1/cplb.go | 14 ++ pkg/component/controller/cplb/cplb_linux.go | 130 ++++++++-- pkg/config/cli.go | 1 + .../k0s/k0s.k0sproject.io_clusterconfigs.yaml | 9 + 10 files changed, 380 insertions(+), 36 deletions(-) rename inttest/{cplb/cplb_test.go => cplb-ipvs/cplbipvs_test.go} (92%) create mode 100644 inttest/cplb-userspace/cplbuserspace_test.go diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index 15226f20a9bf..bbe25a388d94 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -47,6 +47,7 @@ import ( "github.com/k0sproject/k0s/pkg/component/controller/cplb" "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" "github.com/k0sproject/k0s/pkg/component/controller/workerconfig" + "github.com/k0sproject/k0s/pkg/component/iptables" "github.com/k0sproject/k0s/pkg/component/manager" "github.com/k0sproject/k0s/pkg/component/prober" "github.com/k0sproject/k0s/pkg/component/status" @@ -238,6 +239,11 @@ func (c *command) start(ctx context.Context) error { // Assume a single active controller during startup numActiveControllers := value.NewLatest[uint](1) + nodeComponents.Add(ctx, &iptables.Component{ + IPTablesMode: c.WorkerOptions.IPTablesMode, + BinDir: c.K0sVars.BinDir, + }) + if cplbCfg := nodeConfig.Spec.Network.ControlPlaneLoadBalancing; cplbCfg != nil && cplbCfg.Enabled { if c.SingleNode { return errors.New("control plane load balancing cannot be used in a single-node cluster") @@ -665,6 +671,7 @@ func (c *command) startWorker(ctx context.Context, profile string, nodeConfig *v wc.TokenArg = bootstrapConfig wc.WorkerProfile = profile wc.Labels = append(wc.Labels, fields.OneTermEqualSelector(constant.K0SNodeRoleLabel, "control-plane").String()) + wc.DisableIPTables = true if !c.SingleNode && !c.NoTaints { key := path.Join(constant.NodeRoleLabelNamespace, "master") taint := fields.OneTermEqualSelector(key, ":NoSchedule") diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go index 6a22aa8b2981..6b0aefe92e24 100644 --- a/cmd/worker/worker.go +++ b/cmd/worker/worker.go @@ -148,11 +148,12 @@ func (c *Command) Start(ctx context.Context) error { c.WorkerProfile = "default-windows" } - componentManager.Add(ctx, &iptables.Component{ - IPTablesMode: c.WorkerOptions.IPTablesMode, - BinDir: c.K0sVars.BinDir, - }) - + if !c.DisableIPTables { + componentManager.Add(ctx, &iptables.Component{ + IPTablesMode: c.WorkerOptions.IPTablesMode, + BinDir: c.K0sVars.BinDir, + }) + } componentManager.Add(ctx, &worker.Kubelet{ CRISocket: c.CriSocket, diff --git a/inttest/Makefile.variables b/inttest/Makefile.variables index 2194e5612212..856fa1c64438 100644 --- a/inttest/Makefile.variables +++ b/inttest/Makefile.variables @@ -24,7 +24,8 @@ smoketests := \ check-cnichange \ check-configchange \ check-containerdimports \ - check-cplb \ + check-cplb-ipvs \ + check-cplb-userspace \ check-ctr \ check-custom-cidrs \ check-customca \ diff --git a/inttest/cplb/cplb_test.go b/inttest/cplb-ipvs/cplbipvs_test.go similarity index 92% rename from inttest/cplb/cplb_test.go rename to inttest/cplb-ipvs/cplbipvs_test.go index 0b7063fd05c2..8a3e18bf140f 100644 --- a/inttest/cplb/cplb_test.go +++ b/inttest/cplb-ipvs/cplbipvs_test.go @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/suite" ) -type keepalivedSuite struct { +type cplbIPVSSuite struct { common.BootlooseSuite } @@ -50,7 +50,7 @@ spec: // SetupTest prepares the controller and filesystem, getting it into a consistent // state which we can run tests against. -func (s *keepalivedSuite) TestK0sGetsUp() { +func (s *cplbIPVSSuite) TestK0sGetsUp() { lb := s.getLBAddress() ctx := s.Context() var joinToken string @@ -111,7 +111,7 @@ func (s *keepalivedSuite) TestK0sGetsUp() { // getLBAddress returns the IP address of the controller 0 and it adds 100 to // the last octet unless it's bigger or equal to 154, in which case it // subtracts 100. Theoretically this could result in an invalid IP address. -func (s *keepalivedSuite) getLBAddress() string { +func (s *cplbIPVSSuite) getLBAddress() string { ip := s.GetIPAddress(s.ControllerNode(0)) parts := strings.Split(ip, ".") if len(parts) != 4 { @@ -130,7 +130,7 @@ func (s *keepalivedSuite) getLBAddress() string { // validateRealServers checks that the real servers are present in the // ipvsadm output. -func (s *keepalivedSuite) validateRealServers(ctx context.Context, node string, vip string) { +func (s *cplbIPVSSuite) validateRealServers(ctx context.Context, node string, vip string) { ssh, err := s.SSH(ctx, node) s.Require().NoError(err) defer ssh.Disconnect() @@ -151,7 +151,7 @@ func (s *keepalivedSuite) validateRealServers(ctx context.Context, node string, // checkDummy checks that the dummy interface is present on the given node and // that it has only the virtual IP address. -func (s *keepalivedSuite) checkDummy(ctx context.Context, node string, vip string) { +func (s *cplbIPVSSuite) checkDummy(ctx context.Context, node string, vip string) { ssh, err := s.SSH(ctx, node) s.Require().NoError(err) defer ssh.Disconnect() @@ -167,7 +167,7 @@ func (s *keepalivedSuite) checkDummy(ctx context.Context, node string, vip strin // hasVIP checks that the dummy interface is present on the given node and // that it has only the virtual IP address. -func (s *keepalivedSuite) hasVIP(ctx context.Context, node string, vip string) bool { +func (s *cplbIPVSSuite) hasVIP(ctx context.Context, node string, vip string) bool { ssh, err := s.SSH(ctx, node) s.Require().NoError(err) defer ssh.Disconnect() @@ -180,8 +180,8 @@ func (s *keepalivedSuite) hasVIP(ctx context.Context, node string, vip string) b // TestKeepAlivedSuite runs the keepalived test suite. It verifies that the // virtual IP is working by joining a node to the cluster using the VIP. -func TestKeepAlivedSuite(t *testing.T) { - suite.Run(t, &keepalivedSuite{ +func TestCPLBIPVSSuite(t *testing.T) { + suite.Run(t, &cplbIPVSSuite{ common.BootlooseSuite{ ControllerCount: 3, WorkerCount: 1, diff --git a/inttest/cplb-userspace/cplbuserspace_test.go b/inttest/cplb-userspace/cplbuserspace_test.go new file mode 100644 index 000000000000..3dc567c82346 --- /dev/null +++ b/inttest/cplb-userspace/cplbuserspace_test.go @@ -0,0 +1,223 @@ +// Copyright 2024 k0s authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keepalived + +import ( + "context" + "crypto/tls" + "encoding/hex" + "errors" + "fmt" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "testing" + "time" + + "github.com/k0sproject/k0s/inttest/common" + + "github.com/stretchr/testify/suite" +) + +type CPLBUserSpaceSuite struct { + common.BootlooseSuite +} + +const haControllerConfig = ` +spec: + network: + controlPlaneLoadBalancing: + enabled: true + type: Keepalived + keepalived: + vrrpInstances: + - virtualIPs: ["%s/16"] + authPass: "123456" + nodeLocalLoadBalancing: + enabled: true + type: EnvoyProxy +` + +// SetupTest prepares the controller and filesystem, getting it into a consistent +// state which we can run tests against. +func (s *CPLBUserSpaceSuite) TestK0sGetsUp() { + lb := s.getLBAddress() + ctx := s.Context() + var joinToken string + + for idx := range s.BootlooseSuite.ControllerCount { + s.Require().NoError(s.WaitForSSH(s.ControllerNode(idx), 2*time.Minute, 1*time.Second)) + s.PutFile(s.ControllerNode(idx), "/tmp/k0s.yaml", fmt.Sprintf(haControllerConfig, lb)) + + // Note that the token is intentionally empty for the first controller + s.Require().NoError(s.InitController(idx, "--config=/tmp/k0s.yaml", "--disable-components=metrics-server", "--enable-worker", joinToken)) + s.Require().NoError(s.WaitJoinAPI(s.ControllerNode(idx))) + + // With the primary controller running, create the join token for subsequent controllers. + if idx == 0 { + token, err := s.GetJoinToken("controller") + s.Require().NoError(err) + joinToken = token + } + } + + // Final sanity -- ensure all nodes see each other according to etcd + for idx := range s.BootlooseSuite.ControllerCount { + s.Require().Len(s.GetMembers(idx), s.BootlooseSuite.ControllerCount) + } + + // Create a worker join token + workerJoinToken, err := s.GetJoinToken("worker") + s.Require().NoError(err) + + // Start the workers using the join token + s.Require().NoError(s.RunWorkersWithToken(workerJoinToken)) + + client, err := s.KubeClient(s.ControllerNode(0)) + s.Require().NoError(err) + + for idx := range s.BootlooseSuite.ControllerCount { + s.Require().NoError(s.WaitForNodeReady(s.ControllerNode(idx), client)) + } + s.Require().NoError(s.WaitForNodeReady(s.WorkerNode(0), client)) + + // Verify that none of the servers has the dummy interface + for idx := range s.BootlooseSuite.ControllerCount { + s.checkDummy(ctx, s.ControllerNode(idx)) + } + + // Verify that only one controller has the VIP in eth0 + count := 0 + for idx := range s.BootlooseSuite.ControllerCount { + if s.hasVIP(ctx, s.ControllerNode(idx), lb) { + count++ + } + } + s.Require().Equal(1, count, "Expected exactly one controller to have the VIP") + + // Verify that controller+worker nodes are working normally. + s.T().Log("waiting to see CNI pods ready") + s.Require().NoError(common.WaitForKubeRouterReady(s.Context(), client), "kube router did not start") + s.T().Log("waiting to see konnectivity-agent pods ready") + s.Require().NoError(common.WaitForDaemonSet(s.Context(), client, "konnectivity-agent", "kube-system"), "konnectivity-agent did not start") + s.T().Log("waiting to get logs from pods") + s.Require().NoError(common.WaitForPodLogs(s.Context(), client, "kube-system")) + + s.T().Log("Testing that the load balancer is actually balancing the load") + // Other stuff may be querying the controller, running the HTTPS request 15 times + // should be more than we need. + signatures := make(map[string]int) + url := url.URL{Scheme: "https", Host: net.JoinHostPort(lb, strconv.Itoa(6443))} + for range 15 { + signature, err := getServerCertSignature(url.String()) + s.Require().NoError(err) + signatures[signature] = 1 + } + + s.Require().Len(signatures, 3, "Expected 3 different signatures, got %d", len(signatures)) +} + +// getLBAddress returns the IP address of the controller 0 and it adds 100 to +// the last octet unless it's bigger or equal to 154, in which case it +// subtracts 100. Theoretically this could result in an invalid IP address. +func (s *CPLBUserSpaceSuite) getLBAddress() string { + ip := s.GetIPAddress(s.ControllerNode(0)) + parts := strings.Split(ip, ".") + if len(parts) != 4 { + s.T().Fatalf("Invalid IP address: %q", ip) + } + lastOctet, err := strconv.Atoi(parts[3]) + s.Require().NoErrorf(err, "Failed to convert last octet %q to int", parts[3]) + if lastOctet >= 154 { + lastOctet -= 100 + } else { + lastOctet += 100 + } + + return fmt.Sprintf("%s.%d", strings.Join(parts[:3], "."), lastOctet) +} + +// checkDummy checks that the dummy interface isn't present in the node. +func (s *CPLBUserSpaceSuite) checkDummy(ctx context.Context, node string) { + ssh, err := s.SSH(ctx, node) + s.Require().NoError(err) + defer ssh.Disconnect() + + _, err = ssh.ExecWithOutput(ctx, "ip --oneline addr show dummyvip0") + s.Require().Error(err) +} + +// hasVIP checks that the dummy interface is present on the given node and +// that it has only the virtual IP address. +func (s *CPLBUserSpaceSuite) hasVIP(ctx context.Context, node string, vip string) bool { + ssh, err := s.SSH(ctx, node) + s.Require().NoError(err) + defer ssh.Disconnect() + + output, err := ssh.ExecWithOutput(ctx, "ip --oneline addr show eth0") + s.Require().NoError(err) + + return strings.Contains(output, fmt.Sprintf("inet %s/16", vip)) +} + +// getServerCertSignature connects to the given HTTPS URL and returns the server certificate signature. +func getServerCertSignature(url string) (string, error) { + // Create a custom HTTP client with a custom TLS configuration + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, // Skip verification for demonstration purposes + }, + }, + } + + // Make a request to the URL + resp, err := client.Get(url) + if err != nil { + return "", err + } + defer resp.Body.Close() + + // Get the TLS connection state + connState := resp.TLS + if connState == nil { + return "", errors.New("no TLS connection state") + } + + // Get the server certificate + if len(connState.PeerCertificates) == 0 { + return "", errors.New("no server certificate found") + } + cert := connState.PeerCertificates[0] + + // Get the certificate signature + signature := cert.Signature + + // Return the signature as a hex string + return hex.EncodeToString(signature), nil +} + +// TestKeepAlivedSuite runs the keepalived test suite. It verifies that the +// virtual IP is working by joining a node to the cluster using the VIP. +func TestCPLBUserSpaceSuite(t *testing.T) { + suite.Run(t, &CPLBUserSpaceSuite{ + common.BootlooseSuite{ + ControllerCount: 3, + WorkerCount: 1, + }, + }) +} diff --git a/inttest/customports/customports_test.go b/inttest/customports/customports_test.go index 9ff005449485..1d8cb6e37961 100644 --- a/inttest/customports/customports_test.go +++ b/inttest/customports/customports_test.go @@ -131,7 +131,7 @@ func (s *customPortsSuite) TestControllerJoinsWithCustomPort() { s.AssertSomeKubeSystemPods(kc) s.T().Log("waiting to see CNI pods ready") - s.Require().NoError(common.WaitForKubeRouterReady(s.Context(), kc), "calico did not start") + s.Require().NoError(common.WaitForKubeRouterReady(s.Context(), kc), "kube-router did not start") s.T().Log("waiting to see konnectivity-agent pods ready") s.Require().NoError(common.WaitForDaemonSet(s.Context(), kc, "konnectivity-agent", "kube-system"), "konnectivity-agent did not start") diff --git a/pkg/apis/k0s/v1beta1/cplb.go b/pkg/apis/k0s/v1beta1/cplb.go index 0689a769212c..7fe9512d3178 100644 --- a/pkg/apis/k0s/v1beta1/cplb.go +++ b/pkg/apis/k0s/v1beta1/cplb.go @@ -67,6 +67,14 @@ type KeepalivedSpec struct { // Configuration options related to the virtual servers. This is an array // which allows to configure multiple load balancers. VirtualServers VirtualServers `json:"virtualServers,omitempty"` + // UserspaceProxyPort is the port where the userspace proxy will bind + // to. This port is only exposed on the localhost interface and is only + // used internally. Defaults to 6444. + // +kubebuilder:default=6444 + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=65535 + // +optional + UserSpaceProxyPort int `json:"userSpaceProxyBindPort,omitempty"` } // VRRPInstances is a list of VRRPInstance @@ -293,6 +301,12 @@ func (k *KeepalivedSpec) Validate(externalAddress string) (errs []error) { errs = append(errs, k.validateVRRPInstances(nil)...) errs = append(errs, k.validateVirtualServers()...) + if k.UserSpaceProxyPort == 0 { + k.UserSpaceProxyPort = 6444 + } else if k.UserSpaceProxyPort < 1 || k.UserSpaceProxyPort > 65535 { + errs = append(errs, errors.New("UserSpaceProxyPort must be in the range of 1-65535")) + } + // CPLB reconciler relies in watching kubernetes.default.svc endpoints if externalAddress != "" && len(k.VirtualServers) > 0 { errs = append(errs, errors.New(".spec.api.externalAddress and virtual servers cannot be used together")) diff --git a/pkg/component/controller/cplb/cplb_linux.go b/pkg/component/controller/cplb/cplb_linux.go index 1803435be105..f17a21e79e78 100644 --- a/pkg/component/controller/cplb/cplb_linux.go +++ b/pkg/component/controller/cplb/cplb_linux.go @@ -22,8 +22,11 @@ import ( "errors" "fmt" "net" + "os/exec" "path/filepath" "slices" + "strconv" + "strings" "syscall" "text/template" "time" @@ -32,6 +35,7 @@ import ( "github.com/k0sproject/k0s/internal/pkg/users" k0sAPI "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" "github.com/k0sproject/k0s/pkg/assets" + "github.com/k0sproject/k0s/pkg/component/controller/cplb/tcpproxy" "github.com/k0sproject/k0s/pkg/config" "github.com/k0sproject/k0s/pkg/constant" "github.com/k0sproject/k0s/pkg/supervisor" @@ -39,6 +43,11 @@ import ( "github.com/vishvananda/netlink" ) +const ( + iptablesCommandAppend = "-A" + iptablesCommandDelete = "-D" +) + // Keepalived is the controller for the keepalived process in the control plane load balancing type Keepalived struct { K0sVars *config.CfgVars @@ -55,6 +64,7 @@ type Keepalived struct { reconciler *CPLBReconciler updateCh chan struct{} reconcilerDone chan struct{} + proxy tcpproxy.Proxy } // Init extracts the needed binaries and creates the directories @@ -83,13 +93,14 @@ func (k *Keepalived) Start(_ context.Context) error { return nil } - if len(k.Config.VRRPInstances) > 0 { + // We only need the dummy interface when using IPVS. + if len(k.Config.VirtualServers) > 0 { if err := k.configureDummy(); err != nil { return fmt.Errorf("failed to configure dummy interface: %w", err) } } - if len(k.Config.VirtualServers) > 0 { + if len(k.Config.VRRPInstances) > 0 || len(k.Config.VirtualServers) > 0 { k.log.Info("Starting CPLB reconciler") updateCh := make(chan struct{}, 1) k.reconciler = NewCPLBReconciler(k.KubeConfigPath, updateCh) @@ -140,7 +151,14 @@ func (k *Keepalived) Start(_ context.Context) error { k.reconcilerDone = reconcilerDone go func() { defer close(reconcilerDone) - k.watchReconcilerUpdates() + if len(k.Config.VirtualServers) > 0 { + k.watchReconcilerUpdatesKeepalived() + } else { + + if err := k.watchReconcilerUpdatesReverseProxy(); err != nil { + k.log.WithError(err).Error("failed to watch reconciler updates") + } + } }() } return k.supervisor.Supervise() @@ -150,29 +168,37 @@ func (k *Keepalived) Start(_ context.Context) error { // k0s controller is stopped, it can still reach the other APIservers on the VIP func (k *Keepalived) Stop() error { if k.reconciler != nil { - k.log.Infof("Stopping cplb-reconciler") + k.log.Info("Stopping cplb-reconciler") k.reconciler.Stop() close(k.updateCh) <-k.reconcilerDone } - k.log.Infof("Stopping keepalived") + k.log.Info("Stopping keepalived") k.supervisor.Stop() - k.log.Infof("Deleting dummy interface") - link, err := netlink.LinkByName(dummyLinkName) - if err != nil { - if errors.As(err, &netlink.LinkNotFoundError{}) { - return nil - } - k.log.Errorf("failed to get link by name %s. Attempting to delete it anyway: %v", dummyLinkName, err) - link = &netlink.Dummy{ - LinkAttrs: netlink.LinkAttrs{ - Name: dummyLinkName, - }, + if len(k.Config.VirtualServers) > 0 { + k.log.Info("Deleting dummy interface") + link, err := netlink.LinkByName(dummyLinkName) + if err != nil { + if errors.As(err, &netlink.LinkNotFoundError{}) { + return nil + } + k.log.Errorf("failed to get link by name %s. Attempting to delete it anyway: %v", dummyLinkName, err) + link = &netlink.Dummy{ + LinkAttrs: netlink.LinkAttrs{ + Name: dummyLinkName, + }, + } } + return netlink.LinkDel(link) + } + if err := k.proxy.Close(); err != nil { + return fmt.Errorf("failed to close proxy: %w", err) } - return netlink.LinkDel(link) + + // Only clean iptables rules if we are using the userspace reverse proxy + return k.redirectToProxyIPTables(iptablesCommandDelete) } // configureDummy creates the dummy interface and sets the virtual IPs on it. @@ -321,7 +347,69 @@ func (k *Keepalived) generateKeepalivedTemplate() error { return nil } -func (k *Keepalived) watchReconcilerUpdates() { +func (k *Keepalived) watchReconcilerUpdatesReverseProxy() error { + k.proxy = tcpproxy.Proxy{} + // We don't know how long until we get the first update, so initially we + // forward everything to localhost + k.proxy.AddRoute(fmt.Sprintf(":%d", k.Config.UserSpaceProxyPort), tcpproxy.To(fmt.Sprintf("127.0.0.1:%d", k.APIPort))) + + if err := k.proxy.Start(); err != nil { + return fmt.Errorf("failed to start proxy: %w", err) + } + + fmt.Println("Waiting for updateCh") + <-k.updateCh + k.setProxyRoutes() + + // Do not create the iptables rules until we have the first update and the + // proxy is running + if err := k.redirectToProxyIPTables(iptablesCommandAppend); err != nil { + k.log.Fatal(err) + } + + for range k.updateCh { + k.setProxyRoutes() + } + return nil +} + +func (k *Keepalived) setProxyRoutes() { + routes := []tcpproxy.Target{} + for _, addr := range k.reconciler.GetIPs() { + routes = append(routes, tcpproxy.To(fmt.Sprintf("%s:%d", addr, k.APIPort))) + } + + k.proxy.SetRoutes(fmt.Sprintf(":%d", k.Config.UserSpaceProxyPort), routes) +} + +func (k *Keepalived) redirectToProxyIPTables(op string) error { + for _, vrrp := range k.Config.VRRPInstances { + for _, vipCIDR := range vrrp.VirtualIPs { + vip := strings.Split(vipCIDR, "/")[0] + + cmdArgs := []string{ + "-t", "nat", op, "PREROUTING", "-p", "tcp", + "-d", vip, "--dport", strconv.Itoa(k.APIPort), + "-j", "REDIRECT", "--to-port", strconv.Itoa(k.Config.UserSpaceProxyPort), + } + + if op == iptablesCommandAppend { + k.log.Infof("Adding iptables rule to redirect %s", vip) + } else if op == iptablesCommandDelete { + k.log.Infof("Deleting iptables rule to redirect %s", vip) + } + + cmd := exec.Command(filepath.Join(k.K0sVars.BinDir, "iptables"), cmdArgs...) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to execute iptables command: %w, output: %s", err, output) + } + } + } + return nil +} + +func (k *Keepalived) watchReconcilerUpdatesKeepalived() { // Wait for the supervisor to start keepalived before // watching for endpoint changes process := k.supervisor.GetProcess() @@ -385,9 +473,9 @@ vrrp_instance k0s-vip-{{$i}} { auth_pass {{ $instance.AuthPass }} } virtual_ipaddress { - {{ range $instance.VirtualIPs }} - {{ . }} - {{ end }} + {{ range $instance.VirtualIPs }} + {{ . }} + {{ end }} } } {{ end }} diff --git a/pkg/config/cli.go b/pkg/config/cli.go index 2cca62e56b25..4e4f4b75aba3 100644 --- a/pkg/config/cli.go +++ b/pkg/config/cli.go @@ -85,6 +85,7 @@ type WorkerOptions struct { TokenArg string WorkerProfile string IPTablesMode string + DisableIPTables bool } func (o *ControllerOptions) Normalize() error { diff --git a/static/_crds/k0s/k0s.k0sproject.io_clusterconfigs.yaml b/static/_crds/k0s/k0s.k0sproject.io_clusterconfigs.yaml index 0b3aa2b8938d..a9a3da692bb7 100644 --- a/static/_crds/k0s/k0s.k0sproject.io_clusterconfigs.yaml +++ b/static/_crds/k0s/k0s.k0sproject.io_clusterconfigs.yaml @@ -508,6 +508,15 @@ spec: Keepalived contains configuration options related to the "Keepalived" type of load balancing. properties: + userSpaceProxyBindPort: + default: 6444 + description: |- + UserspaceProxyPort is the port where the userspace proxy will bind + to. This port is only exposed on the localhost interface and is only + used internally. Defaults to 6444. + maximum: 65535 + minimum: 1 + type: integer virtualServers: description: |- Configuration options related to the virtual servers. This is an array From ccc2a71ad49b497f16b59a2288e8d51503857937 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan-Luis=20de=20Sousa-Valadas=20Casta=C3=B1o?= Date: Fri, 22 Nov 2024 13:59:44 +0100 Subject: [PATCH 3/5] Move the check for CPLB and externalAddress to cmd MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The only reason why we block externalAddres is that we need a disabled endpoint-reconciler. It doesn't make sense to move all this logic and these parameters to the config validation, instead do it all in cmd. Signed-off-by: Juan-Luis de Sousa-Valadas Castaño --- cmd/controller/controller.go | 10 ++- inttest/Makefile | 3 + inttest/Makefile.variables | 1 + inttest/cplb-userspace/cplbuserspace_test.go | 94 +++++++++++++++----- pkg/apis/k0s/v1beta1/clusterconfig_types.go | 2 +- pkg/apis/k0s/v1beta1/cplb.go | 11 +-- 6 files changed, 86 insertions(+), 35 deletions(-) diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index bbe25a388d94..23a7dda23232 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -244,11 +244,19 @@ func (c *command) start(ctx context.Context) error { BinDir: c.K0sVars.BinDir, }) + enableK0sEndpointReconciler := nodeConfig.Spec.API.ExternalAddress != "" && + !slices.Contains(c.DisableComponents, constant.APIEndpointReconcilerComponentName) + if cplbCfg := nodeConfig.Spec.Network.ControlPlaneLoadBalancing; cplbCfg != nil && cplbCfg.Enabled { if c.SingleNode { return errors.New("control plane load balancing cannot be used in a single-node cluster") } + if enableK0sEndpointReconciler { + enableK0sEndpointReconciler = false + logrus.Warn("Disabling k0s endpoint reconciler as it is incompatible with control plane load balancing") + } + nodeComponents.Add(ctx, &cplb.Keepalived{ K0sVars: c.K0sVars, Config: cplbCfg.Keepalived, @@ -260,8 +268,6 @@ func (c *command) start(ctx context.Context) error { } enableKonnectivity := !c.SingleNode && !slices.Contains(c.DisableComponents, constant.KonnectivityServerComponentName) - enableK0sEndpointReconciler := nodeConfig.Spec.API.ExternalAddress != "" && - !slices.Contains(c.DisableComponents, constant.APIEndpointReconcilerComponentName) if enableKonnectivity { nodeComponents.Add(ctx, &controller.Konnectivity{ diff --git a/inttest/Makefile b/inttest/Makefile index 62fadd3e35e0..1fd60717752e 100644 --- a/inttest/Makefile +++ b/inttest/Makefile @@ -87,6 +87,9 @@ check-customports-dynamicconfig: TEST_PACKAGE=customports check-kubeletcertrotate: TIMEOUT=15m +check-cplb-userspace-extaddr: export K0S_USE_EXTERNAL_ADDRESS=yes +check-cplb-userspace-extaddr: TEST_PACKAGE=cplb-userspace + check-dualstack-calico: export K0S_NETWORK=calico check-dualstack-calico: TEST_PACKAGE=dualstack diff --git a/inttest/Makefile.variables b/inttest/Makefile.variables index 856fa1c64438..6f2c63453507 100644 --- a/inttest/Makefile.variables +++ b/inttest/Makefile.variables @@ -26,6 +26,7 @@ smoketests := \ check-containerdimports \ check-cplb-ipvs \ check-cplb-userspace \ + check-cplb-userspace-extaddr \ check-ctr \ check-custom-cidrs \ check-customca \ diff --git a/inttest/cplb-userspace/cplbuserspace_test.go b/inttest/cplb-userspace/cplbuserspace_test.go index 3dc567c82346..1b2da563d107 100644 --- a/inttest/cplb-userspace/cplbuserspace_test.go +++ b/inttest/cplb-userspace/cplbuserspace_test.go @@ -23,12 +23,14 @@ import ( "net" "net/http" "net/url" + "os" "strconv" "strings" "testing" "time" "github.com/k0sproject/k0s/inttest/common" + "github.com/k0sproject/k0s/pkg/token" "github.com/stretchr/testify/suite" ) @@ -37,7 +39,7 @@ type CPLBUserSpaceSuite struct { common.BootlooseSuite } -const haControllerConfig = ` +const nllbControllerConfig = ` spec: network: controlPlaneLoadBalancing: @@ -52,19 +54,46 @@ spec: type: EnvoyProxy ` +const extAddrControllerConfig = ` +spec: + api: + externalAddress: %s + network: + provider: calico + controlPlaneLoadBalancing: + enabled: true + type: Keepalived + keepalived: + vrrpInstances: + - virtualIPs: ["%s/16"] + authPass: "123456" +` + // SetupTest prepares the controller and filesystem, getting it into a consistent // state which we can run tests against. func (s *CPLBUserSpaceSuite) TestK0sGetsUp() { + useExtAddr := os.Getenv("K0S_USE_EXTERNAL_ADDRESS") == "yes" + if useExtAddr { + s.T().Log("Using external address") + } else { + s.T().Log("Using CPLB + NLLB") + } lb := s.getLBAddress() ctx := s.Context() var joinToken string for idx := range s.BootlooseSuite.ControllerCount { s.Require().NoError(s.WaitForSSH(s.ControllerNode(idx), 2*time.Minute, 1*time.Second)) - s.PutFile(s.ControllerNode(idx), "/tmp/k0s.yaml", fmt.Sprintf(haControllerConfig, lb)) + if useExtAddr { + s.PutFile(s.ControllerNode(idx), "/tmp/k0s.yaml", fmt.Sprintf(extAddrControllerConfig, lb, lb)) + // Note that the token is intentionally empty for the first controller + s.Require().NoError(s.InitController(idx, "--config=/tmp/k0s.yaml", "--disable-components=endpoint-reconciler", "--enable-worker", joinToken)) + } else { + s.PutFile(s.ControllerNode(idx), "/tmp/k0s.yaml", fmt.Sprintf(nllbControllerConfig, lb)) + // Note that the token is intentionally empty for the first controller + s.Require().NoError(s.InitController(idx, "--config=/tmp/k0s.yaml", "--enable-worker", joinToken)) + } - // Note that the token is intentionally empty for the first controller - s.Require().NoError(s.InitController(idx, "--config=/tmp/k0s.yaml", "--disable-components=metrics-server", "--enable-worker", joinToken)) s.Require().NoError(s.WaitJoinAPI(s.ControllerNode(idx))) // With the primary controller running, create the join token for subsequent controllers. @@ -81,7 +110,7 @@ func (s *CPLBUserSpaceSuite) TestK0sGetsUp() { } // Create a worker join token - workerJoinToken, err := s.GetJoinToken("worker") + workerJoinToken, err := s.GetJoinToken(token.RoleWorker) s.Require().NoError(err) // Start the workers using the join token @@ -111,11 +140,16 @@ func (s *CPLBUserSpaceSuite) TestK0sGetsUp() { // Verify that controller+worker nodes are working normally. s.T().Log("waiting to see CNI pods ready") - s.Require().NoError(common.WaitForKubeRouterReady(s.Context(), client), "kube router did not start") + if useExtAddr { + s.Require().NoError(common.WaitForDaemonSet(ctx, client, "calico-node", "kube-system"), "calico-node did not start") + } else { + s.Require().NoError(common.WaitForKubeRouterReady(ctx, client), "kube router did not start") + } + s.T().Log("waiting to see konnectivity-agent pods ready") - s.Require().NoError(common.WaitForDaemonSet(s.Context(), client, "konnectivity-agent", "kube-system"), "konnectivity-agent did not start") + s.Require().NoError(common.WaitForDaemonSet(ctx, client, "konnectivity-agent", "kube-system"), "konnectivity-agent did not start") s.T().Log("waiting to get logs from pods") - s.Require().NoError(common.WaitForPodLogs(s.Context(), client, "kube-system")) + s.Require().NoError(common.WaitForPodLogs(ctx, client, "kube-system")) s.T().Log("Testing that the load balancer is actually balancing the load") // Other stuff may be querying the controller, running the HTTPS request 15 times @@ -123,7 +157,7 @@ func (s *CPLBUserSpaceSuite) TestK0sGetsUp() { signatures := make(map[string]int) url := url.URL{Scheme: "https", Host: net.JoinHostPort(lb, strconv.Itoa(6443))} for range 15 { - signature, err := getServerCertSignature(url.String()) + signature, err := getServerCertSignature(ctx, url.String()) s.Require().NoError(err) signatures[signature] = 1 } @@ -134,21 +168,22 @@ func (s *CPLBUserSpaceSuite) TestK0sGetsUp() { // getLBAddress returns the IP address of the controller 0 and it adds 100 to // the last octet unless it's bigger or equal to 154, in which case it // subtracts 100. Theoretically this could result in an invalid IP address. +// This is so that get a virtual IP in the same subnet as the controller. func (s *CPLBUserSpaceSuite) getLBAddress() string { ip := s.GetIPAddress(s.ControllerNode(0)) - parts := strings.Split(ip, ".") - if len(parts) != 4 { - s.T().Fatalf("Invalid IP address: %q", ip) - } - lastOctet, err := strconv.Atoi(parts[3]) - s.Require().NoErrorf(err, "Failed to convert last octet %q to int", parts[3]) - if lastOctet >= 154 { - lastOctet -= 100 + addr := net.ParseIP(ip) + ipv4 := addr.To4() + if ipv4 == nil { + s.T().Fatalf("This test doesn't support IPv6: %q", ip) + } + + if ipv4[3] >= 154 { + ipv4[3] -= 100 } else { - lastOctet += 100 + ipv4[3] += 100 } - return fmt.Sprintf("%s.%d", strings.Join(parts[:3], "."), lastOctet) + return ipv4.String() } // checkDummy checks that the dummy interface isn't present in the node. @@ -157,8 +192,7 @@ func (s *CPLBUserSpaceSuite) checkDummy(ctx context.Context, node string) { s.Require().NoError(err) defer ssh.Disconnect() - _, err = ssh.ExecWithOutput(ctx, "ip --oneline addr show dummyvip0") - s.Require().Error(err) + s.Require().Error(ssh.Exec(ctx, "ip --oneline addr show dummyvip0", common.SSHStreams{})) } // hasVIP checks that the dummy interface is present on the given node and @@ -175,7 +209,7 @@ func (s *CPLBUserSpaceSuite) hasVIP(ctx context.Context, node string, vip string } // getServerCertSignature connects to the given HTTPS URL and returns the server certificate signature. -func getServerCertSignature(url string) (string, error) { +func getServerCertSignature(ctx context.Context, url string) (string, error) { // Create a custom HTTP client with a custom TLS configuration client := &http.Client{ Transport: &http.Transport{ @@ -184,14 +218,26 @@ func getServerCertSignature(url string) (string, error) { }, }, } + defer client.CloseIdleConnections() // Make a request to the URL - resp, err := client.Get(url) + + // Make a request to the URL with the context + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return "", err + } + + resp, err := client.Do(req) if err != nil { return "", err } defer resp.Body.Close() + if err != nil { + return "", err + } + // Get the TLS connection state connState := resp.TLS if connState == nil { @@ -199,7 +245,7 @@ func getServerCertSignature(url string) (string, error) { } // Get the server certificate - if len(connState.PeerCertificates) == 0 { + if len(connState.PeerCertificates) != 1 { return "", errors.New("no server certificate found") } cert := connState.PeerCertificates[0] diff --git a/pkg/apis/k0s/v1beta1/clusterconfig_types.go b/pkg/apis/k0s/v1beta1/clusterconfig_types.go index b9d411a42a43..1f4dcb9fec41 100644 --- a/pkg/apis/k0s/v1beta1/clusterconfig_types.go +++ b/pkg/apis/k0s/v1beta1/clusterconfig_types.go @@ -366,7 +366,7 @@ func (s *ClusterSpec) Validate() (errs []error) { } if s.Network != nil && s.Network.ControlPlaneLoadBalancing != nil { - for _, err := range s.Network.ControlPlaneLoadBalancing.Validate(s.API.ExternalAddress) { + for _, err := range s.Network.ControlPlaneLoadBalancing.Validate() { errs = append(errs, fmt.Errorf("controlPlaneLoadBalancing: %w", err)) } } diff --git a/pkg/apis/k0s/v1beta1/cplb.go b/pkg/apis/k0s/v1beta1/cplb.go index 7fe9512d3178..f38bdd504a3e 100644 --- a/pkg/apis/k0s/v1beta1/cplb.go +++ b/pkg/apis/k0s/v1beta1/cplb.go @@ -277,7 +277,7 @@ func (k *KeepalivedSpec) validateVirtualServers() []error { } // Validate validates the ControlPlaneLoadBalancingSpec -func (c *ControlPlaneLoadBalancingSpec) Validate(externalAddress string) (errs []error) { +func (c *ControlPlaneLoadBalancingSpec) Validate() (errs []error) { if c == nil { return nil } @@ -290,11 +290,11 @@ func (c *ControlPlaneLoadBalancingSpec) Validate(externalAddress string) (errs [ errs = append(errs, fmt.Errorf("unsupported CPLB type: %s. Only allowed value: %s", c.Type, CPLBTypeKeepalived)) } - return append(errs, c.Keepalived.Validate(externalAddress)...) + return append(errs, c.Keepalived.Validate()...) } // Validate validates the KeepalivedSpec -func (k *KeepalivedSpec) Validate(externalAddress string) (errs []error) { +func (k *KeepalivedSpec) Validate() (errs []error) { if k == nil { return nil } @@ -307,10 +307,5 @@ func (k *KeepalivedSpec) Validate(externalAddress string) (errs []error) { errs = append(errs, errors.New("UserSpaceProxyPort must be in the range of 1-65535")) } - // CPLB reconciler relies in watching kubernetes.default.svc endpoints - if externalAddress != "" && len(k.VirtualServers) > 0 { - errs = append(errs, errors.New(".spec.api.externalAddress and virtual servers cannot be used together")) - } - return errs } From 4a06ba3fd80b3acf7509866c31a6678514df99d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan-Luis=20de=20Sousa-Valadas=20Casta=C3=B1o?= Date: Wed, 20 Nov 2024 21:53:08 +0100 Subject: [PATCH 4/5] Overhaul CPLB docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1- Included userspace reverse proxy and made it the default option. 2- Explained better the difference between virtual IP and a load balancer, it was confusing for many users. 3- Added a whole troubleshooting section. Signed-off-by: Juan-Luis de Sousa-Valadas Castaño --- docs/cplb.md | 338 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 250 insertions(+), 88 deletions(-) diff --git a/docs/cplb.md b/docs/cplb.md index 3b1e3055b267..f510f5c2d986 100644 --- a/docs/cplb.md +++ b/docs/cplb.md @@ -3,60 +3,70 @@ For clusters that don't have an [externally managed load balancer](high-availability.md#load-balancer) for the k0s control plane, there is another option to get a highly available control plane called control plane load balancing (CPLB). -CPLB has two features that are independent, but normally will be used together: VRRP Instances, which allows -automatic assignation of predefined IP addresses using VRRP across control plane nodes. VirtualServers allows to -do Load Balancing to the other control plane nodes. +CPLB provides clusters a highly available VIP (virtual IP) and load balancing for **accessing the cluster externally**. +For internal traffic (nodes to control plane) k0s provides [NLLB](nllb.md). Both features are fully compatible and it's +recommended to use both together if you don't have an external load balancer. -This feature is intended to be used for external traffic. This feature is fully compatible with -[node-local load balancing (NLLB)](nllb.md) which means CPLB can be used for external traffic and NLLB for -internal traffic at the same time. +Load balancing means that an IP address will forward the traffic to every control plane node, Virtual IPs mean that +this IP address will be present on at least one node at a time. -## Technical functionality +CPLB relies on [keepalived](https://www.keepalived.org) for highly available VIPs. Internally, Keepalived uses the +[VRRP protocol](https://datatracker.ietf.org/doc/html/rfc3768). Load Balancing can be done through either userspace +reverse proxy implemented in k0s (recommended for simplicity), or it can use Keepalived's virtual servers feature, +which ultimately relies on IPVS. -The k0s control plane load balancer provides k0s with virtual IPs and TCP -load Balancing on each controller node. This allows the control plane to -be highly available using VRRP (Virtual Router Redundancy Protocol) and -IPVS long as the network infrastructure allows multicast and GARP. +## Compatibility -[Keepalived](https://www.keepalived.org/) is the only load balancer that is -supported so far. Currently there are no plans to support other alternatives. +CPLB depends on multiple technologies to work together as a whole, making it difficult to work on every single scenario. -## VRRP Instances +### Single node -VRRP, or Virtual Router Redundancy Protocol, is a protocol that allows several -routers to utilize the same virtual IP address. A VRRP instance refers to a -specific configuration of this protocol. +CPLB is incompatible with running as a [single node](k0s-single-node.md). This means k0s must not be started using the `--single` flag. -Each VRRP instance must have a unique virtualRouterID, at least one IP address, -one unique password (which is sent in plain text across your network, this is -to prevent accidental conflicts between VRRP instances) and one network -interface. +### Controller + worker -Except for the network interface, all the fields of a VRRP instance must have -the same value on all the control plane nodes. +K0s only supports the userspace reverse proxy load balancer. Keepalived's VirtualServers are not supported with controller + worker. -Usually, users will define multiple VRRP instances when they need k0s to be -highly available on multiple network interfaces. +Both Kube-Router and Calico managed by k0s are supported with the userspace reverse proxy load balancer, however, k0s creates iptables +rules in the control plane nodes which may be incompatible with custom CNI plugins. -## Enabling in a cluster +### External address -In order to use control plane load balancing, the cluster needs to comply with the -following: +If [`spec.api.externalAddress`](configuration.md#specapi) is defined it's mandatory to disable +the [`endpoint-reconciler` component](configuration.md#disabling-controller-components) using the flag `--disable-components=endpoint-reconciler`. -* K0s isn't running as a [single node](k0s-single-node.md), i.e. it isn't - started using the `--single` flag. -* The cluster should have multiple controller nodes. Technically CPLB also works - with a single controller node, but is only useful in conjunction with a highly - available control plane. -* Unique virtualRouterID and authPass for each VRRP Instance in the same broadcast domain. - These do not provide any sort of security against ill-intentioned attacks, they are - safety features to prevent accidental conflicts between VRRP instances in the same - network segment. -* If `VirtualServers` are used, the cluster configuration mustn't specify a non-empty - [`spec.api.externalAddress`][specapi]. If only `VRRPInstances` are specified, a - non-empty [`spec.api.externalAddress`][specapi] may be specified. +### Node Local Load Balancing -Add the following to the cluster configuration (`k0s.yaml`): +CPLB is fully compatible with [NLLB](nllb.md), however NLLB is incompatible with [`spec.api.externalAddress`](configuration.md#specapi). + +## Virtual IPs - High availability + +### What is a VIP (virtual IP) + +A virtual IP is an IP address that isn't tied to a single network interface, +instead it floats between multiple servers. This is a failover mechanism that +grants that there is always at least a functioning server and removes a single +point of failure. + +### Configuring VIPs + +CPLB relies internally on Keepalived's VRRP Instances. A VRRP Instance is a +server that will manage one or more VIPs. Most users will need exactly one +VRRP instance with exactly one VIP, however k0s allows multiple VRRP servers +with multiple VIPs for more advanced use cases such as network segmentation. + +A virtualIP requires: + +1. A user-defined CIDR address which must be routable in the network. For most installations, this will be in the same CIDR as the physical interface. +**WARNING:** K0s is not aware of external IP address management and the administrator is responsible for ensuring that IP addresses aren't colliding. +2. A user-defined password which should be unique for each cluster. This password is a mechanism to prevent accidental conflicts. It's not encrypted +and doesn't prevent malicious attacks in any way. +3. A virtual router ID, which defaults to 51. This virtual router ID **must be unique** in the broadcast domain. +4. A network interface, if not defined, k0s will chose the network interface that owns the default route. + +Except the network interface, all the other fields must be equal on every control plane node. + +This is a minimal example: ```yaml spec: @@ -66,37 +76,59 @@ spec: type: Keepalived keepalived: vrrpInstances: - - virtualIPs: ["/ - virtualServers: - - ipAddress: "ipAddress" + - virtualIPs: ["/"] # for instance ["172.16.0.100/16"] + authPass: "" ``` -Or alternatively, if using [`k0sctl`](k0sctl-install.md), add the following to -the k0sctl configuration (`k0sctl.yaml`): +## Load Balancing + +Currently k0s allows to chose one of two load balancing mechanism: + +1. A userspace reverse proxy running in the k0s process. This is the default and recommended setting. +2. For users who may need extra performance or more flexible algorithms, k0s can use the keepalived virtual servers load balancer feature. + +All control plane nodes must use the same load balancing mechanism. Different Load balancing mechanism +is not supported and has undefined behavior. + +### Load Balancing - Userspace Reverse Proxy + +This is the default behavior, in order to enable it simple configure a VIP +using a VRRP instance. ```yaml spec: - k0s: - config: - spec: - network: - controlPlaneLoadBalancing: - enabled: true - type: Keepalived - keepalived: - vrrpInstances: - - virtualIPs: ["/"] - authPass: - virtualServers: - - ipAddress: "" + network: + controlPlaneLoadBalancing: + enabled: true + type: Keepalived + keepalived: + vrrpInstances: + - virtualIPs: ["/"] # for instance ["172.16.0.100/16"] + authPass: "" ``` -Because this is a feature intended to configure the apiserver, CPLB does not -support dynamic configuration and in order to make changes you need to restart -the k0s controllers to make changes. +### Keepalived Virtual Servers Load Balancing + +The Keepalived virtual servers Load Balancing is more performant than the userspace reverse proxy load balancer. However, it's + not recommended because it has some drawbacks: -[specapi]: configuration.md#specapi +1. It's incompatible with controller+worker. +2. May not work on every infrastructure. +3. Troubleshooting is significantly more complex. + +```yaml +spec: + network: + controlPlaneLoadBalancing: + enabled: true + type: Keepalived + keepalived: + vrrpInstances: + - virtualIPs: ["/"] # for instance ["172.16.0.100/16"] + authPass: "" + virtualServers: + - ipAddress: "" # for instance 172.16.0.100 +``` ## Full example using `k0sctl` @@ -156,9 +188,6 @@ spec: version: v{{{ extra.k8s_version }}}+k0s.0 config: spec: - api: - sans: - - 192.168.122.200 network: controlPlaneLoadBalancing: enabled: true @@ -167,8 +196,9 @@ spec: vrrpInstances: - virtualIPs: ["192.168.122.200/24"] authPass: Example - virtualServers: - - ipAddress: "" + nodeLocalLoadBalancing: # optional, but CPLB will often be used with NLLB. + enabled: true + type: EnvoyProxy ``` Save the above configuration into a file called `k0sctl.yaml` and apply it in @@ -287,7 +317,7 @@ level=info msg=" k0sctl kubeconfig" The cluster with the two nodes should be available by now. Setup the kubeconfig file in order to interact with it: -```shell +```console k0sctl kubeconfig > k0s-kubeconfig export KUBECONFIG=$(pwd)/k0s-kubeconfig ``` @@ -302,21 +332,17 @@ worker-1.k0s.lab Ready 8m51s v{{{ extra.k8s_version } worker-2.k0s.lab Ready 8m51s v{{{ extra.k8s_version }}}+k0s ``` -Each controller node has a dummy interface with the VIP and /32 netmask, -but only one has it in the real nic: +Only one controller has the VIP: ```console -$ for i in controller-{0..2} ; do echo $i ; ssh $i -- ip -4 --oneline addr show | grep -e eth0 -e dummyvip0; done +$ for i in controller-{0..2} ; do echo $i ; ssh $i -- ip -4 --oneline addr show | grep eth0; done controller-0 2: eth0 inet 192.168.122.37/24 brd 192.168.122.255 scope global dynamic noprefixroute eth0\ valid_lft 2381sec preferred_lft 2381sec 2: eth0 inet 192.168.122.200/24 scope global secondary eth0\ valid_lft forever preferred_lft forever -3: dummyvip0 inet 192.168.122.200/32 scope global dummyvip0\ valid_lft forever preferred_lft forever controller-1 2: eth0 inet 192.168.122.185/24 brd 192.168.122.255 scope global dynamic noprefixroute eth0\ valid_lft 2390sec preferred_lft 2390sec -3: dummyvip0 inet 192.168.122.200/32 scope global dummyvip0\ valid_lft forever preferred_lft forever controller-2 2: eth0 inet 192.168.122.87/24 brd 192.168.122.255 scope global dynamic noprefixroute eth0\ valid_lft 2399sec preferred_lft 2399sec -3: dummyvip0 inet 192.168.122.200/32 scope global dummyvip0\ valid_lft forever preferred_lft forever ``` The cluster is using control plane load balancing and is able to tolerate the @@ -331,23 +357,12 @@ Connection to 192.168.122.37 closed by remote host. Control plane load balancing provides high availability, the VIP will have moved to a different node: ```console -$ for i in controller-{0..2} ; do echo $i ; ssh $i -- ip -4 --oneline addr show | grep -e eth0 -e dummyvip0; done +$ for i in controller-{1..2} ; do echo $i ; ssh $i -- ip -4 --oneline addr show | grep eth0; done controller-1 2: eth0 inet 192.168.122.185/24 brd 192.168.122.255 scope global dynamic noprefixroute eth0\ valid_lft 2173sec preferred_lft 2173sec 2: eth0 inet 192.168.122.200/24 scope global secondary eth0\ valid_lft forever preferred_lft forever -3: dummyvip0 inet 192.168.122.200/32 scope global dummyvip0\ valid_lft forever preferred_lft forever controller-2 2: eth0 inet 192.168.122.87/24 brd 192.168.122.255 scope global dynamic noprefixroute eth0\ valid_lft 2182sec preferred_lft 2182sec -3: dummyvip0 inet 192.168.122.200/32 scope global dummyvip0\ valid_lft forever preferred_lft forever - -$ for i in controller-{0..2} ; do echo $i ; ipvsadm --save -n; done -IP Virtual Server version 1.2.1 (size=4096) -Prot LocalAddress:Port Scheduler Flags - -> RemoteAddress:Port Forward Weight ActiveConn InActConn -TCP 192.168.122.200:6443 rr persistent 360 - -> 192.168.122.185:6443 Route 1 0 0 - -> 192.168.122.87:6443 Route 1 0 0 - -> 192.168.122.122:6443 Route 1 0 0 ```` And the cluster will be working normally: @@ -359,3 +374,150 @@ worker-0.k0s.lab Ready 8m51s v{{{ extra.k8s_version } worker-1.k0s.lab Ready 8m51s v{{{ extra.k8s_version }}}+k0s worker-2.k0s.lab Ready 8m51s v{{{ extra.k8s_version }}}+k0s ``` + +## Troubleshooting + +Although Virtual IPs and Load Balancing work together and are closely related, these are two independent +processes and must be troubleshooting as two independent features. + +### Troubleshooting Virtual IPs + +The first thing to check is that the VIP is present in exactly one node at a time, +for instance if a cluster has an `172.17.0.102/16` address and the interface is `eth0`, +the expected output is similar to: + +```console +controller0:/# ip a s eth0 +53: eth0@if54: mtu 1500 qdisc noqueue state UP + link/ether 02:42:ac:11:00:02 brd ff:ff:ff:ff:ff:ff + inet 172.17.0.2/16 brd 172.17.255.255 scope global eth0 + valid_lft forever preferred_lft forever + inet 172.17.0.102/16 scope global secondary eth0 + valid_lft forever preferred_lft forever +``` + +```console +controller1:/# ip a s eth0 +55: eth0@if56: mtu 1500 qdisc noqueue state UP + link/ether 02:42:ac:11:00:03 brd ff:ff:ff:ff:ff:ff + inet 172.17.0.3/16 brd 172.17.255.255 scope global eth0 + valid_lft forever preferred_lft forever +``` + +If the virtualServers feature is used, there must be a dummy interface on the node +called `dummyvip0` which has the VIP but with `32` netmask. This isn't the VIP and +has to be there even if the VIP is held by another node. + +```console +controller0:/# ip a s dummyvip0 | grep 172.17.0.102 + inet 172.17.0.102/32 scope global dummyvip0 +``` + +```console +controller1:/# ip a s dummyvip0 | grep 172.17.0.102 + inet 172.17.0.102/32 scope global dummyvip0 +``` + +If this isn't present in the nodes, keepalived logs can be seen in the k0s-logs, and +can be filtered with `component=keepalived`. + +```console +controller0:/# journalctl -u k0scontroller | grep component=keepalived +time="2024-11-19 12:56:11" level=info msg="Starting to supervise" component=keepalived +time="2024-11-19 12:56:11" level=info msg="Started successfully, go nuts pid 409" component=keepalived +time="2024-11-19 12:56:11" level=info msg="Tue Nov 19 12:56:11 2024: Starting Keepalived v2.2.8 (04/04,2023), git commit v2.2.7-154-g292b299e+" component=keepalived stream=stderr +[...] +``` + +The keepalived configuration is stored in a file called keepalived.conf in the k0s run +directory, by default `/run/k0s/keepalived.conf`, in this file there should be a +`vrrp_instance`section for each `vrrpInstance`. + +Finally, k0s should have two keepalived processes running. + +### Troubleshooting the Load Balancer's Endpoint List + +Both the userspace reverse proxy load balancer and Keepalived's virtual servers need an endpoint list to +do the load balancing. They share a component called `cplb-reconciler` which responsible for setting the +load balancer's endpoint list. This component monitors constantly the endpoint `kubernetes` in the +`default`namespace: + +```console +controller0:/# kubectl get ep kubernetes -n default +NAME ENDPOINTS AGE +kubernetes 172.17.0.6:6443,172.17.0.7:6443,172.17.0.8:6443 9m14s +``` + +You can see the `cplb-reconciler` updates by running: + +```console +controller0:/# journalctl -u k0scontroller | grep component=cplb-reconciler +time="2024-11-20 20:29:28" level=error msg="Failed to watch API server endpoints, last observed version is \"\", starting over in 10s ..." component=cplb-reconciler error="Get \"https://172.17.0.6:6443/api/v1/namespaces/default/endpoints?fieldSelector=metadata.name%3Dkubernetes&timeout=30s&timeoutSeconds=30\": dial tcp 172.17.0.6:6443: connect: connection refused" +time="2024-11-20 20:29:38" level=info msg="Updated the list of IPs: [172.17.0.6]" component=cplb-reconciler +time="2024-11-20 20:29:55" level=info msg="Updated the list of IPs: [172.17.0.6 172.17.0.7]" component=cplb-reconciler +time="2024-11-20 20:29:59" level=info msg="Updated the list of IPs: [172.17.0.6 172.17.0.7 172.17.0.8]" component=cplb-reconciler +``` + +### Troubleshooting the Userspace Reverse Proxy Load Balancer + +The userspace reverse proxy load balancer runs in the k0s process. It listens a separate socket, by default on port 6444: + +```console +controller0:/# netstat -tlpn | grep 6444 +tcp 0 0 :::6444 :::* LISTEN 345/k0s +``` + +Then the requests to the VIP on the apiserver port are forwarded to this socket using one iptables rule: + +```console +-A PREROUTING -d /32 -p tcp -m tcp --dport -j REDIRECT --to-ports +``` + +A real life example of a cluster using using the VIP `17.177.0.102` looks like: + +```console +controller0:/# /var/lib/k0s/bin/iptables-save | grep 6444 +-A PREROUTING -d 172.17.0.102/32 -p tcp -m tcp --dport 6443 -j REDIRECT --to-ports 6444 +``` + +It the load balancer is not load balancing for whatever reason, you can establish connections to it directly. A good way to see if it's actually load balancing is checking the serving certificate: + +```console +controller0:/# ip -o addr s eth0 +43: eth0 inet 172.17.0.2/16 brd 172.17.255.255 scope global eth0\ valid_lft forever preferred_lft forever +43: eth0 inet 172.17.0.102/16 scope global secondary eth0\ valid_lft forever preferred_lft forever + +controller0:/# openssl s_client -connect 172.17.0.102:6444 /dev/null | openssl x509 -noout -fingerprint +SHA1 Fingerprint=B7:90:E6:E4:E1:EE:5B:19:72:99:02:28:54:36:D9:84:D5:39:67:8B +controller0:/# openssl s_client -connect 172.17.0.102:6444 /dev/null | openssl x509 -noout -fingerprint +SHA1 Fingerprint=89:94:5C:E5:50:7E:40:B2:E5:20:E7:70:E8:58:91:ED:63:B0:EC:65 +controller0:/# openssl s_client -connect 172.17.0.102:6444 /dev/null | openssl x509 -noout -fingerprint +SHA1 Fingerprint=49:0D:79:FD:79:6F:A0:E4:9D:BA:A1:65:9C:C5:54:CF:E5:20:BF:A8 +controller0:/# openssl s_client -connect 172.17.0.102:6444 /dev/null | openssl x509 -noout -fingerprint +SHA1 Fingerprint=B7:90:E6:E4:E1:EE:5B:19:72:99:02:28:54:36:D9:84:D5:39:67:8B +``` + +Note: You can't query the port 6444 on the localhost address, there is an iptables conflict. You are expected to +be able to reach the port 6443 on any address and the port 6444 on any address except localhost. + +### Troubleshooting Keepalived Virtual Servers + +You can verify the keepalived's logs and configuration file using the steps described in the section +[troubleshooting virtual IPs](#troubleshooting-virtual-ips) IPs above. Additionally, you can check +the actual IPVS configuration using `ipvsadm`: + +```console +controller0:/# ipvsadm --save -n +IP Virtual Server version 1.2.1 (size=4096) +Prot LocalAddress:Port Scheduler Flags + -> RemoteAddress:Port Forward Weight ActiveConn InActConn +TCP 192.168.122.200:6443 rr persistent 360 + -> 192.168.122.185:6443 Route 1 0 0 + -> 192.168.122.87:6443 Route 1 0 0 + -> 192.168.122.122:6443 Route 1 0 0 + ``` + + In this example `192.168.122.200` is the virtual IP, and `192.168.122.185`, `192.168.122.87` + and `192.168.122.122` are the control plane nodes. + + All control plane nodes are expected to have the same output. \ No newline at end of file From b1337696cb92b285dce176cf22db570b2682b7f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan-Luis=20de=20Sousa-Valadas=20Casta=C3=B1o?= Date: Thu, 19 Dec 2024 12:06:14 +0100 Subject: [PATCH 5/5] Multiple refactors on cplb.tcpproxy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1- Simplify tcpproxy by reomivng useless interfaces: The original tcpproxy allowed different types of routes and needed to do a bunch of interfacing for it to work. Since we only implement one kind of route and one kind of target, we remove all the interfaces and merge both structs into a unique struct. 2- Remove proxy.AddRoute: We only used it once and setRoutes can cover that use case 3- Lock tcpproxy.Proxy when modifying routes to make it thread safe. Prior to this we relied on the proxy being called only from one goroutine. Now it can be called concurrently, not that we expect to do that but a lock gives us extra safety. 4- Panic if tcpproxy.SetRoutes gets and empty route list. We now check this in cplb_unix.go. 5- Remove the route feeding goroutine for round robin, since we added a lock to make proxy.SetRoutes threadsafe we don't need that anymore and it can be made much simpler by adding a lock. Signed-off-by: Juan-Luis de Sousa-Valadas Castaño --- inttest/cplb-userspace/cplbuserspace_test.go | 7 +- pkg/component/controller/cplb/cplb_linux.go | 24 ++- .../controller/cplb/tcpproxy/tcpproxy.go | 176 ++++++------------ .../controller/cplb/tcpproxy/tcpproxy_test.go | 18 +- 4 files changed, 86 insertions(+), 139 deletions(-) diff --git a/inttest/cplb-userspace/cplbuserspace_test.go b/inttest/cplb-userspace/cplbuserspace_test.go index 1b2da563d107..349f6f659362 100644 --- a/inttest/cplb-userspace/cplbuserspace_test.go +++ b/inttest/cplb-userspace/cplbuserspace_test.go @@ -154,15 +154,16 @@ func (s *CPLBUserSpaceSuite) TestK0sGetsUp() { s.T().Log("Testing that the load balancer is actually balancing the load") // Other stuff may be querying the controller, running the HTTPS request 15 times // should be more than we need. + attempt := 0 signatures := make(map[string]int) url := url.URL{Scheme: "https", Host: net.JoinHostPort(lb, strconv.Itoa(6443))} - for range 15 { + for len(signatures) < 3 { signature, err := getServerCertSignature(ctx, url.String()) s.Require().NoError(err) signatures[signature] = 1 + attempt++ + s.Require().LessOrEqual(attempt, 15, "Failed to get a signature from all controllers") } - - s.Require().Len(signatures, 3, "Expected 3 different signatures, got %d", len(signatures)) } // getLBAddress returns the IP address of the controller 0 and it adds 100 to diff --git a/pkg/component/controller/cplb/cplb_linux.go b/pkg/component/controller/cplb/cplb_linux.go index f17a21e79e78..0496ffa08c48 100644 --- a/pkg/component/controller/cplb/cplb_linux.go +++ b/pkg/component/controller/cplb/cplb_linux.go @@ -87,7 +87,7 @@ func (k *Keepalived) Init(_ context.Context) error { } // Start generates the keepalived configuration and starts the keepalived process -func (k *Keepalived) Start(_ context.Context) error { +func (k *Keepalived) Start(ctx context.Context) error { if k.Config == nil || (len(k.Config.VRRPInstances) == 0 && len(k.Config.VirtualServers) == 0) { k.log.Warn("No VRRP instances or virtual servers defined, skipping keepalived start") return nil @@ -154,8 +154,7 @@ func (k *Keepalived) Start(_ context.Context) error { if len(k.Config.VirtualServers) > 0 { k.watchReconcilerUpdatesKeepalived() } else { - - if err := k.watchReconcilerUpdatesReverseProxy(); err != nil { + if err := k.watchReconcilerUpdatesReverseProxy(ctx); err != nil { k.log.WithError(err).Error("failed to watch reconciler updates") } } @@ -347,18 +346,23 @@ func (k *Keepalived) generateKeepalivedTemplate() error { return nil } -func (k *Keepalived) watchReconcilerUpdatesReverseProxy() error { +func (k *Keepalived) watchReconcilerUpdatesReverseProxy(ctx context.Context) error { k.proxy = tcpproxy.Proxy{} // We don't know how long until we get the first update, so initially we // forward everything to localhost - k.proxy.AddRoute(fmt.Sprintf(":%d", k.Config.UserSpaceProxyPort), tcpproxy.To(fmt.Sprintf("127.0.0.1:%d", k.APIPort))) + k.proxy.SetRoutes(fmt.Sprintf(":%d", k.Config.UserSpaceProxyPort), []tcpproxy.Route{tcpproxy.To(fmt.Sprintf("127.0.0.1:%d", k.APIPort))}) if err := k.proxy.Start(); err != nil { return fmt.Errorf("failed to start proxy: %w", err) } - fmt.Println("Waiting for updateCh") - <-k.updateCh + k.log.Info("Waiting for the first cplb-reconciler update") + + select { + case <-ctx.Done(): + return errors.New("context cancelled while starting the reverse proxy") + case <-k.updateCh: + } k.setProxyRoutes() // Do not create the iptables rules until we have the first update and the @@ -374,11 +378,15 @@ func (k *Keepalived) watchReconcilerUpdatesReverseProxy() error { } func (k *Keepalived) setProxyRoutes() { - routes := []tcpproxy.Target{} + routes := []tcpproxy.Route{} for _, addr := range k.reconciler.GetIPs() { routes = append(routes, tcpproxy.To(fmt.Sprintf("%s:%d", addr, k.APIPort))) } + if len(routes) == 0 { + k.log.Error("No API servers available, leave previous configuration") + return + } k.proxy.SetRoutes(fmt.Sprintf(":%d", k.Config.UserSpaceProxyPort), routes) } diff --git a/pkg/component/controller/cplb/tcpproxy/tcpproxy.go b/pkg/component/controller/cplb/tcpproxy/tcpproxy.go index 23b4b7fc52bd..2bdbffe50490 100644 --- a/pkg/component/controller/cplb/tcpproxy/tcpproxy.go +++ b/pkg/component/controller/cplb/tcpproxy/tcpproxy.go @@ -27,9 +27,11 @@ import ( "errors" "fmt" "io" - "log" "net" + "sync" "time" + + "github.com/sirupsen/logrus" ) // Proxy is a proxy. Its zero value is a valid proxy that does @@ -38,16 +40,18 @@ import ( // The order that routes are added in matters; each is matched in the order // registered. type Proxy struct { + mux sync.RWMutex configs map[string]*config // ip:port => config lns []net.Listener donec chan struct{} // closed before err err error // any error from listening - routesChan chan route + connNumber int // connection number counter, used for round robin // ListenFunc optionally specifies an alternate listen // function. If nil, net.Dial is used. - // The provided net is always "tcp". + // The provided net is always "tcp". This is to match + // the signature of net.Listen. ListenFunc func(net, laddr string) (net.Listener, error) } @@ -56,22 +60,7 @@ type Matcher func(ctx context.Context, hostname string) bool // config contains the proxying state for one listener. type config struct { - routes []route -} - -// A route matches a connection to a target. -type route interface { - // match examines the initial bytes of a connection, looking for a - // match. If a match is found, match returns a non-nil Target to - // which the stream should be proxied. match returns nil if the - // connection doesn't match. - // - // match must not consume bytes from the given bufio.Reader, it - // can only Peek. - // - // If an sni or host header was parsed successfully, that will be - // returned as the second parameter. - match(*bufio.Reader) (Target, string) + routes []Route } func (p *Proxy) netListen() func(net, laddr string) (net.Listener, error) { @@ -91,28 +80,7 @@ func (p *Proxy) configFor(ipPort string) *config { return p.configs[ipPort] } -func (p *Proxy) addRoute(ipPort string, r route) { - cfg := p.configFor(ipPort) - cfg.routes = append(cfg.routes, r) -} - -// AddRoute appends an always-matching route to the ipPort listener, -// directing any connection to dest. -// -// This is generally used as either the only rule (for simple TCP -// proxies), or as the final fallback rule for an ipPort. -// -// The ipPort is any valid net.Listen TCP address. -func (p *Proxy) AddRoute(ipPort string, dest Target) { - p.addRoute(ipPort, fixedTarget{dest}) -} - -func (p *Proxy) setRoutes(ipPort string, targets []Target) { - var routes []route - for _, target := range targets { - routes = append(routes, fixedTarget{target}) - } - +func (p *Proxy) setRoutes(ipPort string, routes []Route) { cfg := p.configFor(ipPort) cfg.routes = routes } @@ -122,19 +90,15 @@ func (p *Proxy) setRoutes(ipPort string, targets []Target) { // It's possible that the old routes are still used once after this // function is called. If an empty slice is passed, the routes are // preserved in order to avoid an infinite loop. -func (p *Proxy) SetRoutes(ipPort string, targets []Target) { +func (p *Proxy) SetRoutes(ipPort string, targets []Route) { + p.mux.Lock() + defer p.mux.Unlock() if len(targets) == 0 { - return + panic("SetRoutes with empty targets") } p.setRoutes(ipPort, targets) } -type fixedTarget struct { - t Target -} - -func (m fixedTarget) match(*bufio.Reader) (Target, string) { return m.t, "" } - // Run is calls Start, and then Wait. // // It blocks until there's an error. The return value is always @@ -183,7 +147,6 @@ func (p *Proxy) Start() error { return err } p.lns = append(p.lns, ln) - p.routesChan = make(chan route) go p.serveListener(errc, ln, config) } go p.awaitFirstError(errc) @@ -196,48 +159,35 @@ func (p *Proxy) awaitFirstError(errc <-chan error) { } func (p *Proxy) serveListener(ret chan<- error, ln net.Listener, cfg *config) { - go p.roundRobin(cfg) for { c, err := ln.Accept() if err != nil { ret <- err return } - go p.serveConn(c) + go p.serveConn(c, cfg) } } // serveConn runs in its own goroutine and matches c against routes. // It returns whether it matched purely for testing. -func (p *Proxy) serveConn(c net.Conn) bool { +func (p *Proxy) serveConn(c net.Conn, cfg *config) bool { br := bufio.NewReader(c) - for route := range p.routesChan { - if target, hostName := route.match(br); target != nil { - if n := br.Buffered(); n > 0 { - peeked, _ := br.Peek(br.Buffered()) - c = &Conn{ - HostName: hostName, - Peeked: peeked, - Conn: c, - } - } - target.HandleConn(c) - return true - } - } - // TODO: hook for this? - log.Printf("tcpproxy: no routes matched conn %v/%v; closing", c.RemoteAddr().String(), c.LocalAddr().String()) - c.Close() - return false -} -// roundRobin writes to a channel the next route to use. -func (p *Proxy) roundRobin(cfg *config) { - for { - for _, route := range cfg.routes { - p.routesChan <- route + p.mux.RLock() + p.connNumber++ + route := cfg.routes[p.connNumber%(len(cfg.routes))] + p.mux.RUnlock() + + if n := br.Buffered(); n > 0 { + peeked, _ := br.Peek(br.Buffered()) + c = &Conn{ + Peeked: peeked, + Conn: c, } } + route.HandleConn(c) + return true } // Conn is an incoming connection that has had some bytes read from it @@ -276,29 +226,17 @@ func (c *Conn) Read(p []byte) (n int, err error) { return c.Conn.Read(p) } -// Target is what an incoming matched connection is sent to. -type Target interface { - // HandleConn is called when an incoming connection is - // matched. After the call to HandleConn, the tcpproxy - // package never touches the conn again. Implementations are - // responsible for closing the connection when needed. - // - // The concrete type of conn will be of type *Conn if any - // bytes have been consumed for the purposes of route - // matching. - HandleConn(net.Conn) -} - // To is shorthand way of writing &tcpproxy.DialProxy{Addr: addr}. -func To(addr string) *DialProxy { - return &DialProxy{Addr: addr} +func To(addr string) Route { + return Route{Addr: addr} } -// DialProxy implements Target by dialing a new connection to Addr +// Route is what an incoming connection is sent to. +// It handles them by dialing a new connection to Addr // and then proxying data back and forth. // -// The To func is a shorthand way of creating a DialProxy. -type DialProxy struct { +// The To func is a shorthand way of creating a Route. +type Route struct { // Addr is the TCP address to proxy to. Addr string @@ -366,29 +304,29 @@ func closeWrite(c net.Conn) { } // HandleConn implements the Target interface. -func (dp *DialProxy) HandleConn(src net.Conn) { +func (r *Route) HandleConn(src net.Conn) { ctx := context.Background() var cancel context.CancelFunc - if dp.DialTimeout >= 0 { - ctx, cancel = context.WithTimeout(ctx, dp.dialTimeout()) + if r.DialTimeout >= 0 { + ctx, cancel = context.WithTimeout(ctx, r.dialTimeout()) } - dst, err := dp.dialContext()(ctx, "tcp", dp.Addr) + dst, err := r.dialContext()(ctx, "tcp", r.Addr) if cancel != nil { cancel() } if err != nil { - dp.onDialError()(src, err) + r.onDialError()(src, err) return } defer goCloseConn(dst) - if err = dp.sendProxyHeader(dst, src); err != nil { - dp.onDialError()(src, err) + if err = r.sendProxyHeader(dst, src); err != nil { + r.onDialError()(src, err) return } defer goCloseConn(src) - if ka := dp.keepAlivePeriod(); ka > 0 { + if ka := r.keepAlivePeriod(); ka > 0 { for _, c := range []net.Conn{src, dst} { if c, ok := tcpConn(c); ok { _ = c.SetKeepAlive(true) @@ -404,8 +342,8 @@ func (dp *DialProxy) HandleConn(src net.Conn) { <-errc } -func (dp *DialProxy) sendProxyHeader(w io.Writer, src net.Conn) error { - switch dp.ProxyProtocolVersion { +func (r *Route) sendProxyHeader(w io.Writer, src net.Conn) error { + switch r.ProxyProtocolVersion { case 0: return nil case 1: @@ -429,7 +367,7 @@ func (dp *DialProxy) sendProxyHeader(w io.Writer, src net.Conn) error { _, err := fmt.Fprintf(w, "PROXY %s %s %s %d %d\r\n", family, srcAddr.IP, dstAddr.IP, srcAddr.Port, dstAddr.Port) return err default: - return fmt.Errorf("PROXY protocol version %d not supported", dp.ProxyProtocolVersion) + return fmt.Errorf("PROXY protocol version %d not supported", r.ProxyProtocolVersion) } } @@ -458,35 +396,35 @@ func proxyCopy(errc chan<- error, dst, src net.Conn) { errc <- err } -func (dp *DialProxy) keepAlivePeriod() time.Duration { - if dp.KeepAlivePeriod != 0 { - return dp.KeepAlivePeriod +func (r *Route) keepAlivePeriod() time.Duration { + if r.KeepAlivePeriod != 0 { + return r.KeepAlivePeriod } return time.Minute } -func (dp *DialProxy) dialTimeout() time.Duration { - if dp.DialTimeout > 0 { - return dp.DialTimeout +func (r *Route) dialTimeout() time.Duration { + if r.DialTimeout > 0 { + return r.DialTimeout } return 10 * time.Second } var defaultDialer = new(net.Dialer) -func (dp *DialProxy) dialContext() func(ctx context.Context, network, address string) (net.Conn, error) { - if dp.DialContext != nil { - return dp.DialContext +func (r *Route) dialContext() func(ctx context.Context, network, address string) (net.Conn, error) { + if r.DialContext != nil { + return r.DialContext } return defaultDialer.DialContext } -func (dp *DialProxy) onDialError() func(src net.Conn, dstDialErr error) { - if dp.OnDialError != nil { - return dp.OnDialError +func (r *Route) onDialError() func(src net.Conn, dstDialErr error) { + if r.OnDialError != nil { + return r.OnDialError } return func(src net.Conn, dstDialErr error) { - log.Printf("tcpproxy: for incoming conn %v, error dialing %q: %v", src.RemoteAddr().String(), dp.Addr, dstDialErr) + logrus.WithFields(logrus.Fields{"component": "tcpproxy"}).Errorf("for incoming conn %v, error dialing %q: %v", src.RemoteAddr().String(), r.Addr, dstDialErr) src.Close() } } diff --git a/pkg/component/controller/cplb/tcpproxy/tcpproxy_test.go b/pkg/component/controller/cplb/tcpproxy/tcpproxy_test.go index 777a5c95e4ed..ff9f4be6b557 100644 --- a/pkg/component/controller/cplb/tcpproxy/tcpproxy_test.go +++ b/pkg/component/controller/cplb/tcpproxy/tcpproxy_test.go @@ -75,7 +75,7 @@ func TestBufferedClose(t *testing.T) { defer back.Close() p := testProxy(t, front) - p.AddRoute(testFrontAddr, To(back.Addr().String())) + p.SetRoutes(testFrontAddr, []Route{To(back.Addr().String())}) if err := p.Start(); err != nil { t.Fatal(err) } @@ -114,7 +114,7 @@ func TestProxyAlwaysMatch(t *testing.T) { defer back.Close() p := testProxy(t, front) - p.AddRoute(testFrontAddr, To(back.Addr().String())) + p.setRoutes(testFrontAddr, []Route{To(back.Addr().String())}) if err := p.Start(); err != nil { t.Fatal(err) } @@ -149,10 +149,10 @@ func TestProxyPROXYOut(t *testing.T) { defer back.Close() p := testProxy(t, front) - p.AddRoute(testFrontAddr, &DialProxy{ + p.SetRoutes(testFrontAddr, []Route{{ Addr: back.Addr().String(), ProxyProtocolVersion: 1, - }) + }}) if err := p.Start(); err != nil { t.Fatal(err) } @@ -185,7 +185,7 @@ func TestSetRoutes(t *testing.T) { var p Proxy ipPort := ":8080" - p.AddRoute(ipPort, To("127.0.0.2:8080")) + p.setRoutes(ipPort, []Route{To("127.0.0.2:8080")}) cfg := p.configFor(ipPort) expectedAddrsList := [][]string{ @@ -203,21 +203,21 @@ func TestSetRoutes(t *testing.T) { } } -func stringsToTargets(s []string) []Target { - targets := make([]Target, len(s)) +func stringsToTargets(s []string) []Route { + targets := make([]Route, len(s)) for i, v := range s { targets[i] = To(v) } return targets } -func equalRoutes(routes []route, expectedAddrs []string) bool { +func equalRoutes(routes []Route, expectedAddrs []string) bool { if len(routes) != len(expectedAddrs) { return false } for i := range routes { - addr := routes[i].(fixedTarget).t.(*DialProxy).Addr + addr := routes[i].Addr if addr != expectedAddrs[i] { return false }