diff --git a/pkg/bgp/id.go b/pkg/bgp/id.go index 6c9a82a51c..86f2a85774 100644 --- a/pkg/bgp/id.go +++ b/pkg/bgp/id.go @@ -9,7 +9,7 @@ import ( "strconv" "strings" - "github.com/cloudnativelabs/kube-router/v2/pkg/utils" + "github.com/cloudnativelabs/kube-router/v2/pkg" gobgp "github.com/osrg/gobgp/v3/pkg/packet/bgp" ) @@ -21,7 +21,7 @@ const ( // GenerateRouterID will generate a router ID based upon the user's configuration (or lack there of) and the node's // primary IP address if the user has not specified. If the user has configured the router ID as "generate" then we // will generate a router ID based upon fnv hashing the node's primary IP address. -func GenerateRouterID(nodeIPAware utils.NodeIPAware, configRouterID string) (string, error) { +func GenerateRouterID(nodeIPAware pkg.NodeIPAware, configRouterID string) (string, error) { switch { case configRouterID == "generate": h := fnv.New32a() diff --git a/pkg/bgp/path.go b/pkg/bgp/path.go index bace2d0c97..4500011794 100644 --- a/pkg/bgp/path.go +++ b/pkg/bgp/path.go @@ -6,13 +6,20 @@ import ( "k8s.io/klog/v2" ) -func PathChanged(path *gobgpapi.Path, pl PeerLister, rs pkg.RouteSyncer, tc pkg.TunnelCleaner) error { +type PathHandler struct { + PeerLister PeerLister + RouteInjector pkg.RouteInjector + RouteSyncer pkg.RouteSyncer + TunnelCleaner pkg.TunnelCleaner +} + +func (ph *PathHandler) Changed(path *gobgpapi.Path) error { klog.V(2).Infof("Path Looks Like: %s", path.String()) dst, nextHop, err := ParsePath(path) if err != nil { return err } - tunnelName := tc.GenerateTunnelName(nextHop.String()) + tunnelName := ph.TunnelCleaner.GenerateTunnelName(nextHop.String()) // If we've made it this far, then it is likely that the node is holding a destination route for this path already. // If the path we've received from GoBGP is a withdrawal, we should clean up any lingering routes that may exist @@ -23,7 +30,7 @@ func PathChanged(path *gobgpapi.Path, pl PeerLister, rs pkg.RouteSyncer, tc pkg. // The path might be withdrawn because the peer became unestablished or it may be withdrawn because just the // path was withdrawn. Check to see if the peer is still established before deciding whether to clean the // tunnel and tunnel routes or whether to just delete the destination route. - peerEstablished, err := IsPeerEstablished(pl, nextHop.String()) + peerEstablished, err := IsPeerEstablished(ph.PeerLister, nextHop.String()) if err != nil { klog.Errorf("encountered error while checking peer status: %v", err) } @@ -31,15 +38,17 @@ func PathChanged(path *gobgpapi.Path, pl PeerLister, rs pkg.RouteSyncer, tc pkg. klog.V(1).Infof("Peer '%s' was not found any longer, removing tunnel and routes", nextHop.String()) // Also delete route from state map so that it doesn't get re-synced after deletion - rs.DelInjectedRoute(dst) - tc.CleanupTunnel(dst, tunnelName) + ph.RouteSyncer.DelInjectedRoute(dst) + ph.TunnelCleaner.CleanupTunnel(dst, tunnelName) return nil } // Also delete route from state map so that it doesn't get re-synced after deletion - rs.DelInjectedRoute(dst) + ph.RouteSyncer.DelInjectedRoute(dst) return nil } - return nil + // If this is not a withdraw, then we need to process the route. This takes care of creating any necessary tunnels, + // and adding any necessary host routes depending on the user's config + return ph.RouteInjector.InjectRoute(dst, nextHop) } diff --git a/pkg/controllers/netpol/network_policy_controller.go b/pkg/controllers/netpol/network_policy_controller.go index 0d48a41c84..b8c1a6be13 100644 --- a/pkg/controllers/netpol/network_policy_controller.go +++ b/pkg/controllers/netpol/network_policy_controller.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/cloudnativelabs/kube-router/v2/pkg" "github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck" "github.com/cloudnativelabs/kube-router/v2/pkg/metrics" "github.com/cloudnativelabs/kube-router/v2/pkg/options" @@ -63,7 +64,7 @@ var ( // NetworkPolicyController struct to hold information required by NetworkPolicyController type NetworkPolicyController struct { - krNode utils.NodeIPAndFamilyAware + krNode pkg.NodeIPAndFamilyAware serviceClusterIPRanges []net.IPNet serviceExternalIPRanges []net.IPNet serviceLoadBalancerIPRanges []net.IPNet diff --git a/pkg/controllers/proxy/network_services_controller.go b/pkg/controllers/proxy/network_services_controller.go index 45e9d213e2..ee5be3112d 100644 --- a/pkg/controllers/proxy/network_services_controller.go +++ b/pkg/controllers/proxy/network_services_controller.go @@ -13,6 +13,7 @@ import ( "syscall" "time" + "github.com/cloudnativelabs/kube-router/v2/pkg" "github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck" "github.com/cloudnativelabs/kube-router/v2/pkg/metrics" "github.com/cloudnativelabs/kube-router/v2/pkg/options" @@ -108,7 +109,7 @@ const ( // NetworkServicesController struct stores information needed by the controller type NetworkServicesController struct { - krNode utils.NodeAware + krNode pkg.NodeAware syncPeriod time.Duration mu sync.Mutex serviceMap serviceInfoMap diff --git a/pkg/controllers/routing/host_route_sync.go b/pkg/controllers/routing/host_route_sync.go index a8c595c39f..0a9d6bd33e 100644 --- a/pkg/controllers/routing/host_route_sync.go +++ b/pkg/controllers/routing/host_route_sync.go @@ -7,22 +7,22 @@ import ( "sync" "time" + "github.com/cloudnativelabs/kube-router/v2/pkg" "github.com/cloudnativelabs/kube-router/v2/pkg/bgp" "github.com/cloudnativelabs/kube-router/v2/pkg/routes" gobgpapi "github.com/osrg/gobgp/v3/api" - gobgp "github.com/osrg/gobgp/v3/pkg/server" "github.com/vishvananda/netlink" "k8s.io/klog/v2" ) -type BGPServerUnsetError struct{} +type BGPPathListerUnsetError struct{} type BGPListError struct { msg string err error } -func (b BGPServerUnsetError) Error() string { +func (b BGPPathListerUnsetError) Error() string { return "BGP server not yet specified" } @@ -46,13 +46,14 @@ func (b BGPListError) Unwrap() error { // RouteSync is a struct that holds all of the information needed for syncing routes to the kernel's routing table type RouteSync struct { - routeTableStateMap map[string]*netlink.Route - injectedRoutesSyncPeriod time.Duration - mutex sync.Mutex - routeReplacer func(route *netlink.Route) error - routeDeleter func(destinationSubnet *net.IPNet) error - routeAdder func(route *netlink.Route) error - bgpServer *gobgp.BgpServer + routeTableStateMap map[string]*netlink.Route + routeSyncPeriod time.Duration + mutex sync.Mutex + routeReplacer func(route *netlink.Route) error + routeDeleter func(destinationSubnet *net.IPNet) error + routeAdder func(route *netlink.Route) error + routeInjector pkg.RouteInjector + pathLister pkg.BGPPathLister } // addInjectedRoute adds a route to the route map that is regularly synced to the kernel's routing table @@ -77,51 +78,114 @@ func (rs *RouteSync) DelInjectedRoute(dst *net.IPNet) { } } +func (rs *RouteSync) checkState(authoritativeState map[string]*netlink.Route) ([]*netlink.Route, []*netlink.Route) { + // While we're iterating over the state map, we should hold the mutex to prevent any other operations from + // interfering with the state map + rs.mutex.Lock() + defer rs.mutex.Unlock() + + routesToAdd := make([]*netlink.Route, 0) + routesToDelete := make([]*netlink.Route, 0) + + // Compare the routes source of truth from BGP to the routes in our state, searching for any routes that might be + // missing from the state and adding them if they are missing + for dst, route := range authoritativeState { + if _, ok := rs.routeTableStateMap[dst]; ok { + klog.V(3).Infof("Route already exists for destination: %s", dst) + continue + } + + routesToAdd = append(routesToAdd, route) + } + + // Compare the routes in our state to the routes source of truth from BGP, searching for any routes that might be + // missing from BGP and deleting them if they are missing + for dst, route := range rs.routeTableStateMap { + if _, ok := authoritativeState[dst]; ok { + klog.V(3).Infof("Route already exists for destination: %s", dst) + continue + } + + routesToDelete = append(routesToDelete, route) + } + + return routesToAdd, routesToDelete +} + func (rs *RouteSync) checkCacheAgainstBGP() error { convertPathsToRouteMap := func(path []*gobgpapi.Path) map[string]*netlink.Route { routeMap := make(map[string]*netlink.Route, 0) for _, p := range path { klog.V(3).Infof("Path: %v", p) + + // Leave out withdraw paths from the map, we don't need to worry about tracking them because we are going to + // delete any routes not found in the map we're returning anyway + if p.IsWithdraw { + klog.V(3).Infof("Path is a withdrawal, skipping") + continue + } + + // Seems like a valid path, let's parse it dst, nh, err := bgp.ParsePath(p) if err != nil { klog.Warningf("Failed to parse BGP path, not failing so as to not block updating paths that are "+ "valid: %v", err) } + + // Add path to our map routeMap[dst.String()] = &netlink.Route{ Dst: dst, Gw: nh, Protocol: routes.ZebraOriginator, } } + return routeMap } - if rs.bgpServer == nil { - return BGPServerUnsetError{} + // During startup, it is possible for this function to possibly be called before the BGP server has been set on it, + // in this case, return BGPServerUnsetError + if rs.pathLister == nil { + return BGPPathListerUnsetError{} } - rs.mutex.Lock() - defer rs.mutex.Unlock() - allPaths := make([]*gobgpapi.Path, 0) + // Create a var for tracking all of the paths we're about to get + allPaths := make([]*gobgpapi.Path, 0) pathList := func(path *gobgpapi.Destination) { allPaths = append(allPaths, path.Paths...) } + // Call ListPath() for all families passing in our pathList function from above, to set allPaths for _, family := range []*gobgpapi.Family{ {Afi: gobgpapi.Family_AFI_IP, Safi: gobgpapi.Family_SAFI_UNICAST}, {Afi: gobgpapi.Family_AFI_IP6, Safi: gobgpapi.Family_SAFI_UNICAST}} { - err := rs.bgpServer.ListPath(context.Background(), &gobgpapi.ListPathRequest{Family: family}, pathList) + err := rs.pathLister.ListPath(context.Background(), &gobgpapi.ListPathRequest{Family: family}, pathList) if err != nil { return newBGPListError("Failed to list BGP paths", err) } } + // Convert all paths to a map of routes, this serves as our authoritative source of truth for what routes should be bgpRoutes := convertPathsToRouteMap(allPaths) - // REPLACE ME - for dst, route := range bgpRoutes { - if dst != "" && route != nil { - return nil + // Check the state of the routes against the authoritative source of truth + routesToAdd, routesToDelete := rs.checkState(bgpRoutes) + + // Add missing routes + for _, route := range routesToAdd { + klog.Infof("Found route from BGP that did not exist in state, adding to state: %s", route) + err := rs.routeInjector.InjectRoute(route.Dst, route.Gw) + if err != nil { + klog.Errorf("Failed to inject route: %v", err) + } + } + + // Delete routes that are no longer in the authoritative source of truth + for _, route := range routesToDelete { + klog.Infof("Found route in state that did not exist in BGP, deleting from state: %s", route) + err := rs.routeDeleter(route.Dst) + if err != nil { + klog.Errorf("Failed to delete route: %v", err) } } @@ -148,12 +212,26 @@ func (rs *RouteSync) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { wg.Add(1) go func(stopCh <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() - t := time.NewTicker(rs.injectedRoutesSyncPeriod) - defer t.Stop() + t1 := time.NewTicker(rs.routeSyncPeriod) + // Check our local state against BGP once for every 5 route syncs + t2 := time.NewTicker(5 * rs.routeSyncPeriod) + defer t1.Stop() for { select { - case <-t.C: + case <-t1.C: rs.SyncLocalRouteTable() + case <-t2.C: + err := rs.checkCacheAgainstBGP() + if err != nil { + switch err.(type) { + case BGPPathListerUnsetError: + klog.Warningf("BGP server not yet set, cannot check cache against BGP") + case BGPListError: + klog.Errorf("Failed to check cache against BGP due to BGP error: %v", err) + default: + klog.Errorf("Failed to check cache against BGP: %v", err) + } + } case <-stopCh: klog.Infof("Shutting down local route synchronization") return @@ -162,25 +240,24 @@ func (rs *RouteSync) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { }(stopCh, wg) } -// addBGPServer adds a BGP server to the routeSyncer so that it can be used to advertise routes -// -//nolint:unused // we're going to implement this later -func (rs *RouteSync) addBGPServer(server *gobgp.BgpServer) { - rs.bgpServer = server +// addBGPPathLister adds a BGP server to the routeSyncer so that it can be used to advertise routes +func (rs *RouteSync) AddBGPPathLister(pl pkg.BGPPathLister) { + rs.pathLister = pl } // NewRouteSyncer creates a new routeSyncer that, when run, will sync routes kept in its local state table every // syncPeriod -func NewRouteSyncer(syncPeriod time.Duration) *RouteSync { +func NewRouteSyncer(ri pkg.RouteInjector, syncPeriod time.Duration) *RouteSync { rs := RouteSync{} rs.routeTableStateMap = make(map[string]*netlink.Route) - rs.injectedRoutesSyncPeriod = syncPeriod + rs.routeSyncPeriod = syncPeriod rs.mutex = sync.Mutex{} // We substitute the RouteR* functions here so that we can easily monkey patch it in our unit tests rs.routeReplacer = netlink.RouteReplace rs.routeDeleter = routes.DeleteByDestination rs.routeAdder = netlink.RouteAdd + rs.routeInjector = ri return &rs } diff --git a/pkg/controllers/routing/host_route_sync_test.go b/pkg/controllers/routing/host_route_sync_test.go index 2ff2a6a505..cd48a408b8 100644 --- a/pkg/controllers/routing/host_route_sync_test.go +++ b/pkg/controllers/routing/host_route_sync_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/cloudnativelabs/kube-router/v2/pkg" + "github.com/cloudnativelabs/kube-router/v2/pkg/routes" gobgpapi "github.com/osrg/gobgp/v3/api" gobgp "github.com/osrg/gobgp/v3/pkg/server" "github.com/stretchr/testify/assert" @@ -16,10 +17,10 @@ import ( ) var ( - testRoutes = map[string]string{ - "192.168.0.1": "192.168.0.0/24", - "10.255.0.1": "10.255.0.0/16", - } + // testRoutes = map[string]string{ + // "192.168.0.1": "192.168.0.0/24", + // "10.255.0.1": "10.255.0.0/16", + // } testRoute = map[string]string{ "192.168.0.1": "192.168.0.0/24", } @@ -80,14 +81,15 @@ func (mnl *mockNetlink) waitForSyncLocalRouteToAcquireLock(syncer pkg.RouteSynce } func (mnl *mockNetlink) wasCalled() bool { - return mnl.currentRoute != nil + return mnl.currentRoute != nil || mnl.currentDelSubnet != nil } func testCreateRoutes(routes []testCaseRoute) []testCaseBGPRoute { convertedRoutes := make([]testCaseBGPRoute, len(routes)) for idx, route := range routes { + dstIP, _, _ := net.ParseCIDR(route.dst) nlri, _ := apb.New(&gobgpapi.IPAddressPrefix{ - Prefix: route.dst, + Prefix: dstIP.String(), PrefixLen: route.mask, }) origin, _ := apb.New(&gobgpapi.OriginAttribute{ @@ -145,10 +147,11 @@ func Test_syncLocalRouteTable(t *testing.T) { prepSyncLocalTest := func() (*mockNetlink, *RouteSync) { // Create myNetlink so that it will wait 200 milliseconds on routeReplace and artificially hold its lock myNetlink := mockNetlink{} + myInjector := routes.MockLinuxRouter{} myNetlink.pause = time.Millisecond * 200 // Create a route replacer and seed it with some routes to iterate over - syncer := NewRouteSyncer(15 * time.Second) + syncer := NewRouteSyncer(&myInjector, 15*time.Second) syncer.routeTableStateMap = testGenerateRouteMap(testRoute) // Replace the netlink.RouteReplace function with our own mock function that includes a WaitGroup for syncing @@ -212,8 +215,9 @@ func Test_routeSyncer_run(t *testing.T) { } t.Run("Ensure that run goroutine shuts down correctly on stop", func(t *testing.T) { + myInjector := routes.MockLinuxRouter{} // Setup routeSyncer to run 10 times a second - syncer := NewRouteSyncer(100 * time.Millisecond) + syncer := NewRouteSyncer(&myInjector, 100*time.Millisecond) myNetLink := mockNetlink{} myNetLink.pause = 0 myNetLink.wg = &sync.WaitGroup{} @@ -266,16 +270,18 @@ func Test_routeSyncer_checkAgainstBGPCache(t *testing.T) { return testRouteMap } - testSetup := func() (*mockNetlink, *RouteSync) { + testSetup := func() (*mockNetlink, *routes.MockLinuxRouter, *RouteSync) { + myInjector := routes.MockLinuxRouter{} + // Setup routeSyncer to run 10 times a second - syncer := NewRouteSyncer(100 * time.Millisecond) + syncer := NewRouteSyncer(&myInjector, 100*time.Millisecond) syncer.routeTableStateMap = testGenerateRouteMap(createTestRouteMap(testRoutes)) myNetLink := mockNetlink{} syncer.routeReplacer = myNetLink.mockRouteAction syncer.routeAdder = myNetLink.mockRouteAction syncer.routeDeleter = myNetLink.mockDestinationDelete - return &myNetLink, syncer + return &myNetLink, &myInjector, syncer } testStartBGPServer := func() (*gobgp.BgpServer, error) { @@ -293,15 +299,17 @@ func Test_routeSyncer_checkAgainstBGPCache(t *testing.T) { } t.Run("Ensure proper error when BGP Server is Unset", func(t *testing.T) { + myInjector := routes.MockLinuxRouter{} + // Setup routeSyncer to run 10 times a second - syncer := NewRouteSyncer(100 * time.Millisecond) + syncer := NewRouteSyncer(&myInjector, 100*time.Millisecond) err := syncer.checkCacheAgainstBGP() assert.NotNil(t, err, "Expected an error when BGP server is unset") - assert.IsType(t, BGPServerUnsetError{}, err, "Expected an BGPServerUnsetError error") + assert.IsType(t, BGPPathListerUnsetError{}, err, "Expected an BGPServerUnsetError error") }) t.Run("Ensure no action is taken when routes are in sync", func(t *testing.T) { - myNetLink, syncer := testSetup() + _, myInjector, syncer := testSetup() // Start the BGP server bgpServer, err := testStartBGPServer() @@ -310,7 +318,7 @@ func Test_routeSyncer_checkAgainstBGPCache(t *testing.T) { } defer func() { _ = bgpServer.StopBgp(context.Background(), &gobgpapi.StopBgpRequest{}) }() - syncer.bgpServer = bgpServer + syncer.pathLister = bgpServer bgpRoutes := testCreateRoutes(testRoutes) @@ -326,6 +334,89 @@ func Test_routeSyncer_checkAgainstBGPCache(t *testing.T) { err = syncer.checkCacheAgainstBGP() assert.NoError(t, err, "Expected no error when BGP and local routes are in sync") - assert.False(t, myNetLink.wasCalled(), "Expected no calls to netlink when BGP and local routes are in sync") + myInjector.AssertNotCalled(t, "InjectRoute", + "Expected no calls to InjectRoute when BGP and local routes are in sync") + }) + + t.Run("Ensure add action is taken when routes are missing from state", func(t *testing.T) { + _, myInjector, syncer := testSetup() + + // Start the BGP server + bgpServer, err := testStartBGPServer() + if err != nil { + t.Fatalf("Failed to start BGP server: %v", err) + } + defer func() { _ = bgpServer.StopBgp(context.Background(), &gobgpapi.StopBgpRequest{}) }() + syncer.pathLister = bgpServer + + bgpRoutes := testCreateRoutes(testRoutes) + + for _, route := range bgpRoutes { + _, err := bgpServer.AddPath(context.Background(), &gobgpapi.AddPathRequest{ + Path: route.path, + }) + if err != nil { + t.Fatalf("Failed to advertise route: %v", err) + } + } + + // Find a random route from the route table state + var key string + var route *netlink.Route + var nh net.IP + for key, route = range syncer.routeTableStateMap { + break // Get just one route + } + for _, testRoute := range testRoutes { + if testRoute.dst == key { + nh = net.ParseIP(testRoute.nh) + break + } + } + + // Delete the key from the state map and setup mocks to expect calls + delete(syncer.routeTableStateMap, key) + myInjector.On("InjectRoute", route.Dst, nh).Return(nil) + + err = syncer.checkCacheAgainstBGP() + assert.NoError(t, err, "Expected no error when BGP and local routes are in sync") + + myInjector.AssertCalled(t, "InjectRoute", route.Dst, nh) + }) + + t.Run("Ensure delete action is taken when routes are missing from BGP", func(t *testing.T) { + myNetLink, myInjector, syncer := testSetup() + + // Start the BGP server + bgpServer, err := testStartBGPServer() + if err != nil { + t.Fatalf("Failed to start BGP server: %v", err) + } + defer func() { _ = bgpServer.StopBgp(context.Background(), &gobgpapi.StopBgpRequest{}) }() + + syncer.pathLister = bgpServer + + // Remove a route from the routes added to BGP + removedRoute := testRoutes[0] + updatedRoutes := testRoutes[1:] + bgpRoutes := testCreateRoutes(updatedRoutes) + + for _, route := range bgpRoutes { + _, err := bgpServer.AddPath(context.Background(), &gobgpapi.AddPathRequest{ + Path: route.path, + }) + if err != nil { + t.Fatalf("Failed to advertise route: %v", err) + } + } + + err = syncer.checkCacheAgainstBGP() + assert.NoError(t, err, "Expected no error when BGP and local routes are in sync") + + assert.True(t, myNetLink.wasCalled(), "Expected a call to DeleteByDestination when a route is missing from BGP") + assert.Equal(t, removedRoute.dst, myNetLink.currentDelSubnet.String(), + "Expected the subnet to be deleted to be the one missing from BGP") + myInjector.AssertNotCalled(t, "InjectRoute", + "Expected no calls to InjectRoute when local state has an extra route") }) } diff --git a/pkg/controllers/routing/network_routes_controller.go b/pkg/controllers/routing/network_routes_controller.go index 5934d7b267..8378792946 100644 --- a/pkg/controllers/routing/network_routes_controller.go +++ b/pkg/controllers/routing/network_routes_controller.go @@ -78,7 +78,7 @@ type PolicyBasedRouter interface { // NetworkRoutingController is struct to hold necessary information required by controller type NetworkRoutingController struct { - krNode utils.NodeAware + krNode pkg.NodeAware routerID string activeNodes map[string]bool mu sync.Mutex @@ -133,6 +133,7 @@ type NetworkRoutingController struct { routeSyncer pkg.RouteSyncer pbr PolicyBasedRouter tunneler pkg.Tunneler + routerInjector pkg.RouteInjector nodeLister cache.Indexer svcLister cache.Indexer @@ -301,9 +302,6 @@ func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.Controll klog.Infof("Starting network route controller") - // Start route syncer - nrc.routeSyncer.Run(stopCh, wg) - // Wait till we are ready to launch BGP server for { err := nrc.startBgpServer(true) @@ -322,6 +320,13 @@ func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.Controll } } + // Start route syncer + nrc.routeSyncer.AddBGPPathLister(nrc.bgpServer) + nrc.routeSyncer.Run(stopCh, wg) + + // Watch for BGP Updates + go nrc.watchBgpUpdates(nrc.routerInjector) + nrc.bgpServerStarted = true if !nrc.bgpGracefulRestart { defer func() { @@ -437,7 +442,13 @@ func (nrc *NetworkRoutingController) updateCNIConfig() { } } -func (nrc *NetworkRoutingController) watchBgpUpdates() { +func (nrc *NetworkRoutingController) watchBgpUpdates(ri pkg.RouteInjector) { + ph := bgp.PathHandler{ + PeerLister: nrc.bgpServer, + RouteInjector: ri, + RouteSyncer: nrc.routeSyncer, + TunnelCleaner: nrc.tunneler, + } pathWatch := func(r *gobgpapi.WatchEventResponse) { if table := r.GetTable(); table != nil { for _, path := range table.Paths { @@ -453,7 +464,7 @@ func (nrc *NetworkRoutingController) watchBgpUpdates() { } klog.V(2).Infof("Processing bgp route advertisement from peer: %s", path.NeighborIp) - if err := bgp.PathChanged(path, nrc.bgpServer, nrc.routeSyncer, nrc.tunneler); err != nil { + if err := ph.Changed(path); err != nil { klog.Errorf("Failed to inject routes due to: " + err.Error()) } } @@ -628,7 +639,7 @@ func (nrc *NetworkRoutingController) Cleanup() { klog.Infof("Successfully cleaned the NetworkRoutesController configuration done by kube-router") } -func (nrc *NetworkRoutingController) syncNodeIPSets(nodeIPAware utils.NodeIPAware) error { +func (nrc *NetworkRoutingController) syncNodeIPSets(nodeIPAware pkg.NodeIPAware) error { var err error start := time.Now() defer func() { @@ -906,8 +917,6 @@ func (nrc *NetworkRoutingController) startBgpServer(grpcServer bool) error { return errors.New("failed to start BGP server due to : " + err.Error()) } - go nrc.watchBgpUpdates() - // If the global routing peer is configured then peer with it // else attempt to get peers from node specific BGP annotations. if len(nrc.globalPeerRouters) == 0 { @@ -1129,7 +1138,6 @@ func NewNetworkRoutingController(clientset kubernetes.Interface, nrc.bgpServerStarted = false nrc.disableSrcDstCheck = kubeRouterConfig.DisableSrcDstCheck nrc.initSrcDstCheckDone = false - nrc.routeSyncer = NewRouteSyncer(kubeRouterConfig.InjectedRoutesSyncPeriod) nrc.bgpHoldtime = kubeRouterConfig.BGPHoldTime.Seconds() if nrc.bgpHoldtime > 65536 || nrc.bgpHoldtime < 3 { @@ -1303,6 +1311,19 @@ func NewNetworkRoutingController(clientset kubernetes.Interface, } nrc.localAddressList = append(nrc.localAddressList, localAddresses...) } + + // Create a new LinuxRouter instance + nrc.routerInjector = &routes.LinuxRouter{ + NodeIPA: nrc.krNode, + Tunneler: nrc.tunneler, + RouteSyncer: nrc.routeSyncer, + OverlayConfig: nrc.overlayConfig, + } + + // Create a new route syncer instance + nrc.routeSyncer = NewRouteSyncer(nrc.routerInjector, kubeRouterConfig.InjectedRoutesSyncPeriod) + + // Kubernetes handler setup nrc.svcLister = svcInformer.GetIndexer() nrc.ServiceEventHandler = nrc.newServiceEventHandler() diff --git a/pkg/routes/linux_routes.go b/pkg/routes/linux_routes.go index 945d99ed22..887d2e0ca5 100644 --- a/pkg/routes/linux_routes.go +++ b/pkg/routes/linux_routes.go @@ -33,13 +33,19 @@ func DeleteByDestination(destinationSubnet *net.IPNet) error { return nil } -func InjectRoute(subnet *net.IPNet, gw net.IP, ipa utils.NodeIPAware, tn pkg.Tunneler, rs pkg.RouteSyncer, - oc pkg.OverlayConfig) error { +type LinuxRouter struct { + NodeIPA pkg.NodeIPAware + Tunneler pkg.Tunneler + RouteSyncer pkg.RouteSyncer + OverlayConfig pkg.OverlayConfig +} + +func (lr *LinuxRouter) InjectRoute(subnet *net.IPNet, gw net.IP) error { var route *netlink.Route var err error var link netlink.Link - tunnelName := tn.GenerateTunnelName(gw.String()) + tunnelName := lr.Tunneler.GenerateTunnelName(gw.String()) checkNHSameSubnet := func(needle net.IP, haystack []net.IP) bool { for _, nodeIP := range haystack { nodeSubnet, _, err := utils.GetNodeSubnet(nodeIP, nil) @@ -58,23 +64,23 @@ func InjectRoute(subnet *net.IPNet, gw net.IP, ipa utils.NodeIPAware, tn pkg.Tun // Determine if we are in the same subnet as the gateway (next hop) var sameSubnet bool if gw.To4() != nil { - sameSubnet = checkNHSameSubnet(gw, ipa.GetNodeIPv4Addrs()) + sameSubnet = checkNHSameSubnet(gw, lr.NodeIPA.GetNodeIPv4Addrs()) } else if gw.To16() != nil { - sameSubnet = checkNHSameSubnet(gw, ipa.GetNodeIPv6Addrs()) + sameSubnet = checkNHSameSubnet(gw, lr.NodeIPA.GetNodeIPv6Addrs()) } // create IPIP tunnels only when node is not in same subnet or overlay-type is set to 'full' // if the user has disabled overlays, don't create tunnels. If we're not creating a tunnel, check to see if there is // any cleanup that needs to happen. - if shouldCreateTunnel(oc, sameSubnet) { - link, err = tn.SetupOverlayTunnel(tunnelName, gw, subnet) + if shouldCreateTunnel(lr.OverlayConfig, sameSubnet) { + link, err = lr.Tunneler.SetupOverlayTunnel(tunnelName, gw, subnet) if err != nil { return err } } else { // knowing that a tunnel shouldn't exist for this route, check to see if there are any lingering tunnels / // routes that need to be cleaned up. - tn.CleanupTunnel(subnet, tunnelName) + lr.Tunneler.CleanupTunnel(subnet, tunnelName) } switch { @@ -82,10 +88,10 @@ func InjectRoute(subnet *net.IPNet, gw net.IP, ipa utils.NodeIPAware, tn pkg.Tun // if we set up an overlay tunnel link, then use it for destination routing var bestIPForFamily net.IP if subnet.IP.To4() != nil { - bestIPForFamily = ipa.FindBestIPv4NodeAddress() + bestIPForFamily = lr.NodeIPA.FindBestIPv4NodeAddress() } else { // Need to activate the ip command in IPv6 mode - bestIPForFamily = ipa.FindBestIPv6NodeAddress() + bestIPForFamily = lr.NodeIPA.FindBestIPv6NodeAddress() } if bestIPForFamily == nil { return fmt.Errorf("not able to find an appropriate configured IP address on node for destination "+ @@ -120,9 +126,9 @@ func InjectRoute(subnet *net.IPNet, gw net.IP, ipa utils.NodeIPAware, tn pkg.Tun // Alright, everything is in place, and we have our route configured, let's add it to the host's routing table klog.V(2).Infof("Inject route: '%s via %s' from peer to routing table", subnet, gw) - rs.AddInjectedRoute(subnet, route) + lr.RouteSyncer.AddInjectedRoute(subnet, route) // Immediately sync the local route table regardless of timer - rs.SyncLocalRouteTable() + lr.RouteSyncer.SyncLocalRouteTable() return nil } diff --git a/pkg/routes/linux_routes_mock.go b/pkg/routes/linux_routes_mock.go new file mode 100644 index 0000000000..44557a0592 --- /dev/null +++ b/pkg/routes/linux_routes_mock.go @@ -0,0 +1,18 @@ +package routes + +import ( + "net" + + "github.com/stretchr/testify/mock" +) + +// Define the mock struct +type MockLinuxRouter struct { + mock.Mock +} + +// Implement the InjectRoute method +func (m *MockLinuxRouter) InjectRoute(subnet *net.IPNet, gw net.IP) error { + args := m.Called(subnet, gw) + return args.Error(0) +} diff --git a/pkg/routes/pbr.go b/pkg/routes/pbr.go index 50b913b64e..19149fe078 100644 --- a/pkg/routes/pbr.go +++ b/pkg/routes/pbr.go @@ -5,6 +5,7 @@ import ( "os/exec" "strings" + "github.com/cloudnativelabs/kube-router/v2/pkg" "github.com/cloudnativelabs/kube-router/v2/pkg/utils" ) @@ -17,13 +18,13 @@ const ( // PolicyBasedRules is a struct that holds all of the information needed for manipulating policy based routing rules type PolicyBasedRules struct { - nfa utils.NodeFamilyAware + nfa pkg.NodeFamilyAware podIPv4CIDRs []string podIPv6CIDRs []string } // NewPolicyBasedRules creates a new PBR object which will be used to manipulate policy based routing rules -func NewPolicyBasedRules(nfa utils.NodeFamilyAware, podIPv4CIDRs, podIPv6CIDRs []string) *PolicyBasedRules { +func NewPolicyBasedRules(nfa pkg.NodeFamilyAware, podIPv4CIDRs, podIPv6CIDRs []string) *PolicyBasedRules { return &PolicyBasedRules{ nfa: nfa, podIPv4CIDRs: podIPv4CIDRs, diff --git a/pkg/tunnels/linux_tunnels.go b/pkg/tunnels/linux_tunnels.go index 61974e6529..3cfeba6270 100644 --- a/pkg/tunnels/linux_tunnels.go +++ b/pkg/tunnels/linux_tunnels.go @@ -10,8 +10,8 @@ import ( "strconv" "strings" + "github.com/cloudnativelabs/kube-router/v2/pkg" "github.com/cloudnativelabs/kube-router/v2/pkg/routes" - "github.com/cloudnativelabs/kube-router/v2/pkg/utils" "github.com/vishvananda/netlink" "k8s.io/klog/v2" ) @@ -76,12 +76,12 @@ func ParseEncapPort(encapPort uint16) (EncapPort, error) { } type OverlayTunnel struct { - krNode utils.NodeIPAware + krNode pkg.NodeIPAware encapPort EncapPort encapType EncapType } -func NewOverlayTunnel(krNode utils.NodeIPAware, encapType EncapType, encapPort EncapPort) *OverlayTunnel { +func NewOverlayTunnel(krNode pkg.NodeIPAware, encapType EncapType, encapPort EncapPort) *OverlayTunnel { return &OverlayTunnel{ krNode: krNode, encapPort: encapPort, diff --git a/pkg/types.go b/pkg/types.go index a9fc3b04c8..42c70796c5 100644 --- a/pkg/types.go +++ b/pkg/types.go @@ -1,9 +1,11 @@ package pkg import ( + "context" "net" "sync" + gobgpapi "github.com/osrg/gobgp/v3/api" "github.com/vishvananda/netlink" ) @@ -13,6 +15,7 @@ type RouteSyncer interface { DelInjectedRoute(dst *net.IPNet) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) SyncLocalRouteTable() + AddBGPPathLister(pl BGPPathLister) } // TunnelNamer is an interface that defines the methods needed to generate tunnel names @@ -39,6 +42,59 @@ type Tunneler interface { TunnelCreator } +// NodeIPAware is an interface that provides methods to get the node's IP addresses in various data structures. +type NodeIPAware interface { + FindBestIPv4NodeAddress() net.IP + FindBestIPv6NodeAddress() net.IP + GetNodeIPv4Addrs() []net.IP + GetNodeIPv6Addrs() []net.IP + GetNodeIPAddrs() []net.IP + GetPrimaryNodeIP() net.IP +} + +// NodeInterfaceAware is an interface that provides methods to get the node's interface name, MTU, and subnet. This +// interface is a collection of functions that are only available if you are running on the node itself, as kube-router +// determines this by looking at the node's interfaces and parsing the address data there. If you attempt to call these +// functions on a remote node, they will return nil or an error. +type NodeInterfaceAware interface { + GetNodeInterfaceName() string + GetNodeMTU() (int, error) +} + +// NodeFamilyAware is an interface that provides methods to check if a node is IPv4 or IPv6 capable. +type NodeFamilyAware interface { + IsIPv4Capable() bool + IsIPv6Capable() bool +} + +// NodeNameAware is an interface that provides a method to get the node's name. +type NodeNameAware interface { + GetNodeName() string +} + +// NodeIPAndFamilyAware is an interface that combines the NodeIPAware and NodeFamilyAware interfaces. +type NodeIPAndFamilyAware interface { + NodeIPAware + NodeFamilyAware +} + +// NodeAware is an interface that combines the NodeIPAware, NodeInterfaceAware, NodeFamilyAware, and NodeNameAware +// interfaces. +type NodeAware interface { + NodeIPAware + NodeInterfaceAware + NodeFamilyAware + NodeNameAware +} + +type RouteInjector interface { + InjectRoute(subnet *net.IPNet, gw net.IP) error +} + +type BGPPathLister interface { + ListPath(ctx context.Context, r *gobgpapi.ListPathRequest, fn func(*gobgpapi.Destination)) error +} + type OverlayConfig struct { EnableOverlay bool OverlayType string diff --git a/pkg/utils/node.go b/pkg/utils/node.go index 87da1dd367..36690a1ca1 100644 --- a/pkg/utils/node.go +++ b/pkg/utils/node.go @@ -37,51 +37,6 @@ type LocalKRNode struct { linkQ LocalLinkQuerier } -// NodeIPAware is an interface that provides methods to get the node's IP addresses in various data structures. -type NodeIPAware interface { - FindBestIPv4NodeAddress() net.IP - FindBestIPv6NodeAddress() net.IP - GetNodeIPv4Addrs() []net.IP - GetNodeIPv6Addrs() []net.IP - GetNodeIPAddrs() []net.IP - GetPrimaryNodeIP() net.IP -} - -// NodeInterfaceAware is an interface that provides methods to get the node's interface name, MTU, and subnet. This -// interface is a collection of functions that are only available if you are running on the node itself, as kube-router -// determines this by looking at the node's interfaces and parsing the address data there. If you attempt to call these -// functions on a remote node, they will return nil or an error. -type NodeInterfaceAware interface { - GetNodeInterfaceName() string - GetNodeMTU() (int, error) -} - -// NodeFamilyAware is an interface that provides methods to check if a node is IPv4 or IPv6 capable. -type NodeFamilyAware interface { - IsIPv4Capable() bool - IsIPv6Capable() bool -} - -// NodeNameAware is an interface that provides a method to get the node's name. -type NodeNameAware interface { - GetNodeName() string -} - -// NodeIPAndFamilyAware is an interface that combines the NodeIPAware and NodeFamilyAware interfaces. -type NodeIPAndFamilyAware interface { - NodeIPAware - NodeFamilyAware -} - -// NodeAware is an interface that combines the NodeIPAware, NodeInterfaceAware, NodeFamilyAware, and NodeNameAware -// interfaces. -type NodeAware interface { - NodeIPAware - NodeInterfaceAware - NodeFamilyAware - NodeNameAware -} - // GetNodeIPv4Addrs returns the node's IPv4 addresses as defined by the Kubernetes Node Object. func (n *KRNode) GetNodeIPv4Addrs() []net.IP { var nodeIPs []net.IP