From 0c70f9b7917bae87ce5af25b3fb9f02596f72415 Mon Sep 17 00:00:00 2001 From: Saylor Berman Date: Thu, 16 Jan 2025 16:23:19 -0700 Subject: [PATCH] Refactor for writing status and config on connection --- .../templates/tmp-nginx-deployment.yaml | 2 +- deploy/aws-nlb/deploy.yaml | 1 + deploy/azure/deploy.yaml | 1 + deploy/default/deploy.yaml | 1 + deploy/experimental-nginx-plus/deploy.yaml | 1 + deploy/experimental/deploy.yaml | 1 + deploy/nginx-plus/deploy.yaml | 1 + deploy/nodeport/deploy.yaml | 1 + deploy/openshift/deploy.yaml | 1 + .../snippets-filters-nginx-plus/deploy.yaml | 1 + deploy/snippets-filters/deploy.yaml | 1 + go.mod | 2 +- go.sum | 4 +- internal/mode/static/handler.go | 124 ++++--- internal/mode/static/handler_test.go | 72 +++- internal/mode/static/manager.go | 22 +- internal/mode/static/nginx/agent/agent.go | 93 +++--- .../agent/agentfakes/fake_nginx_updater.go | 96 +++--- .../static/nginx/agent/broadcast/broadcast.go | 32 +- .../broadcastfakes/fake_broadcaster.go | 23 +- internal/mode/static/nginx/agent/command.go | 189 ++++++++--- .../mode/static/nginx/agent/deployment.go | 153 ++++++--- internal/mode/static/nginx/agent/file.go | 8 +- .../static/nginx/agent/grpc/connections.go | 17 +- .../fake_agent_connections_tracker.go | 312 ++++++++++++++++++ .../static/nginx/agent/grpc/messenger/doc.go | 4 + .../nginx/agent/grpc/messenger/messenger.go | 96 ++++++ internal/mode/static/nginx/agent/hack/hack.go | 182 ---------- internal/mode/static/nginx/agent/meta/doc.go | 4 - internal/mode/static/nginx/agent/meta/meta.go | 20 -- .../mode/static/nginx/conf/nginx-plus.conf | 18 +- internal/mode/static/state/graph/graph.go | 8 + .../mode/static/status/prepare_requests.go | 14 +- .../static/status/prepare_requests_test.go | 12 +- internal/mode/static/status/queue.go | 66 ++++ 35 files changed, 1048 insertions(+), 535 deletions(-) create mode 100644 internal/mode/static/nginx/agent/grpc/grpcfakes/fake_agent_connections_tracker.go create mode 100644 internal/mode/static/nginx/agent/grpc/messenger/doc.go create mode 100644 internal/mode/static/nginx/agent/grpc/messenger/messenger.go delete mode 100644 internal/mode/static/nginx/agent/hack/hack.go delete mode 100644 internal/mode/static/nginx/agent/meta/doc.go delete mode 100644 internal/mode/static/nginx/agent/meta/meta.go create mode 100644 internal/mode/static/status/queue.go diff --git a/charts/nginx-gateway-fabric/templates/tmp-nginx-deployment.yaml b/charts/nginx-gateway-fabric/templates/tmp-nginx-deployment.yaml index df4a0a9624..c9706d19a4 100644 --- a/charts/nginx-gateway-fabric/templates/tmp-nginx-deployment.yaml +++ b/charts/nginx-gateway-fabric/templates/tmp-nginx-deployment.yaml @@ -77,7 +77,7 @@ spec: - NET_BIND_SERVICE drop: - ALL - # readOnlyRootFilesystem: true + readOnlyRootFilesystem: true runAsUser: 101 runAsGroup: 1001 volumeMounts: diff --git a/deploy/aws-nlb/deploy.yaml b/deploy/aws-nlb/deploy.yaml index dba4bc53d0..1618a1924f 100644 --- a/deploy/aws-nlb/deploy.yaml +++ b/deploy/aws-nlb/deploy.yaml @@ -338,6 +338,7 @@ spec: - NET_BIND_SERVICE drop: - ALL + readOnlyRootFilesystem: true runAsGroup: 1001 runAsUser: 101 seccompProfile: diff --git a/deploy/azure/deploy.yaml b/deploy/azure/deploy.yaml index c5557b11b1..a4f9326b6a 100644 --- a/deploy/azure/deploy.yaml +++ b/deploy/azure/deploy.yaml @@ -337,6 +337,7 @@ spec: - NET_BIND_SERVICE drop: - ALL + readOnlyRootFilesystem: true runAsGroup: 1001 runAsUser: 101 seccompProfile: diff --git a/deploy/default/deploy.yaml b/deploy/default/deploy.yaml index 6b90c66690..f4f53dab71 100644 --- a/deploy/default/deploy.yaml +++ b/deploy/default/deploy.yaml @@ -335,6 +335,7 @@ spec: - NET_BIND_SERVICE drop: - ALL + readOnlyRootFilesystem: true runAsGroup: 1001 runAsUser: 101 seccompProfile: diff --git a/deploy/experimental-nginx-plus/deploy.yaml b/deploy/experimental-nginx-plus/deploy.yaml index 54bb27fc93..79c188c154 100644 --- a/deploy/experimental-nginx-plus/deploy.yaml +++ b/deploy/experimental-nginx-plus/deploy.yaml @@ -351,6 +351,7 @@ spec: - NET_BIND_SERVICE drop: - ALL + readOnlyRootFilesystem: true runAsGroup: 1001 runAsUser: 101 seccompProfile: diff --git a/deploy/experimental/deploy.yaml b/deploy/experimental/deploy.yaml index 6116b0991f..7aa78ae2b3 100644 --- a/deploy/experimental/deploy.yaml +++ b/deploy/experimental/deploy.yaml @@ -341,6 +341,7 @@ spec: - NET_BIND_SERVICE drop: - ALL + readOnlyRootFilesystem: true runAsGroup: 1001 runAsUser: 101 seccompProfile: diff --git a/deploy/nginx-plus/deploy.yaml b/deploy/nginx-plus/deploy.yaml index 7d545dce91..e38753b14d 100644 --- a/deploy/nginx-plus/deploy.yaml +++ b/deploy/nginx-plus/deploy.yaml @@ -345,6 +345,7 @@ spec: - NET_BIND_SERVICE drop: - ALL + readOnlyRootFilesystem: true runAsGroup: 1001 runAsUser: 101 seccompProfile: diff --git a/deploy/nodeport/deploy.yaml b/deploy/nodeport/deploy.yaml index e5cdc5997b..1be398ef1a 100644 --- a/deploy/nodeport/deploy.yaml +++ b/deploy/nodeport/deploy.yaml @@ -335,6 +335,7 @@ spec: - NET_BIND_SERVICE drop: - ALL + readOnlyRootFilesystem: true runAsGroup: 1001 runAsUser: 101 seccompProfile: diff --git a/deploy/openshift/deploy.yaml b/deploy/openshift/deploy.yaml index f32825a7be..704d39106b 100644 --- a/deploy/openshift/deploy.yaml +++ b/deploy/openshift/deploy.yaml @@ -343,6 +343,7 @@ spec: - NET_BIND_SERVICE drop: - ALL + readOnlyRootFilesystem: true runAsGroup: 1001 runAsUser: 101 seccompProfile: diff --git a/deploy/snippets-filters-nginx-plus/deploy.yaml b/deploy/snippets-filters-nginx-plus/deploy.yaml index 6a7a6640c6..c4a31d5202 100644 --- a/deploy/snippets-filters-nginx-plus/deploy.yaml +++ b/deploy/snippets-filters-nginx-plus/deploy.yaml @@ -348,6 +348,7 @@ spec: - NET_BIND_SERVICE drop: - ALL + readOnlyRootFilesystem: true runAsGroup: 1001 runAsUser: 101 seccompProfile: diff --git a/deploy/snippets-filters/deploy.yaml b/deploy/snippets-filters/deploy.yaml index 910256fb77..2432521fcb 100644 --- a/deploy/snippets-filters/deploy.yaml +++ b/deploy/snippets-filters/deploy.yaml @@ -338,6 +338,7 @@ spec: - NET_BIND_SERVICE drop: - ALL + readOnlyRootFilesystem: true runAsGroup: 1001 runAsUser: 101 seccompProfile: diff --git a/go.mod b/go.mod index 4dcf235cd3..b0662e61ad 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.11.2 - github.com/nginx/agent/v3 v3.0.0-20241220140549-28adb688a8b4 + github.com/nginx/agent/v3 v3.0.0-20250116154650-db82ebd210da github.com/nginxinc/telemetry-exporter v0.1.2 github.com/onsi/ginkgo/v2 v2.22.1 github.com/onsi/gomega v1.36.1 diff --git a/go.sum b/go.sum index 4aa869186d..560883ddeb 100644 --- a/go.sum +++ b/go.sum @@ -133,8 +133,8 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/nginx/agent/v3 v3.0.0-20241220140549-28adb688a8b4 h1:Tn0SOlxq9uaJuqc6DUGZGYszrtHAHOaLnhbBWMzK1Bs= -github.com/nginx/agent/v3 v3.0.0-20241220140549-28adb688a8b4/go.mod h1:HDi/Je5AKCe5by/hWs2jbzUqi3BN4K32hMD2/hWN5G8= +github.com/nginx/agent/v3 v3.0.0-20250116154650-db82ebd210da h1:O9jpWnCGO0peM5km5GQ4yDXQZeer5hHFvc1ND/MoWUk= +github.com/nginx/agent/v3 v3.0.0-20250116154650-db82ebd210da/go.mod h1:HDi/Je5AKCe5by/hWs2jbzUqi3BN4K32hMD2/hWN5G8= github.com/nginxinc/nginx-plus-go-client/v2 v2.0.1 h1:5VVK38bnELMDWnwfF6dSv57ResXh9AUzeDa72ENj94o= github.com/nginxinc/nginx-plus-go-client/v2 v2.0.1/go.mod h1:He+1izxYxVVO5/C9ZTukwOpvkAx5eS19nRQgKXDhX5I= github.com/nginxinc/telemetry-exporter v0.1.2 h1:97vUGhQYgQ2KEsXKCBmr5gqfuujJCKPHwdg5HKoANUs= diff --git a/internal/mode/static/handler.go b/internal/mode/static/handler.go index 6fb0a3c50a..9e92ba76f5 100644 --- a/internal/mode/static/handler.go +++ b/internal/mode/static/handler.go @@ -2,6 +2,7 @@ package static import ( "context" + "errors" "fmt" "sync" "time" @@ -35,6 +36,7 @@ type handlerMetricsCollector interface { // eventHandlerConfig holds configuration parameters for eventHandlerImpl. type eventHandlerConfig struct { + ctx context.Context // nginxUpdater updates nginx configuration using the NGINX agent. nginxUpdater agent.NginxUpdater // metricsCollector collects metrics for this controller. @@ -59,6 +61,12 @@ type eventHandlerConfig struct { deployCtxCollector licensing.Collector // graphBuiltHealthChecker sets the health of the Pod to Ready once we've built our initial graph. graphBuiltHealthChecker *graphBuiltHealthChecker + // statusQueue contains updates when the handler should write statuses. + statusQueue *status.Queue + // nginxDeployments contains a map of all nginx Deployments, and data about them. + nginxDeployments *agent.DeploymentStore + // logger is the logger for the event handler. + logger logr.Logger // gatewayPodConfig contains information about this Pod. gatewayPodConfig ngfConfig.GatewayPodConfig // controlConfigNSName is the NamespacedName of the NginxGateway config for this controller. @@ -102,7 +110,7 @@ type eventHandlerImpl struct { // objectFilters contains all created objectFilters, with the key being a filterKey objectFilters map[filterKey]objectFilter - latestReloadResult status.NginxReloadResult + // latestReloadResult status.NginxReloadResult cfg eventHandlerConfig lock sync.Mutex @@ -137,6 +145,8 @@ func newEventHandlerImpl(cfg eventHandlerConfig) *eventHandlerImpl { }, } + go handler.waitForStatusUpdates(cfg.ctx) + return handler } @@ -164,10 +174,22 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log h.cfg.graphBuiltHealthChecker.setAsReady() } + // TODO(sberman): hardcode this deployment name until we support provisioning data planes + // If no deployments exist, we should just return without doing anything. + deploymentName := types.NamespacedName{ + Name: "tmp-nginx-deployment", + Namespace: h.cfg.gatewayPodConfig.Namespace, + } + // TODO(sberman): if nginx Deployment is scaled down, we should remove the pod from the ConnectionsTracker + // and Deployment. // If fully deleted, then delete the deployment from the Store - var err error var configApplied bool + deployment := h.cfg.nginxDeployments.GetOrStore(ctx, deploymentName) + if deployment == nil { + panic("expected deployment, got nil") + } + switch changeType { case state.NoChange: logger.Info("Handling events didn't result into NGINX configuration changes") @@ -183,16 +205,13 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log h.setLatestConfiguration(&cfg) + deployment.Lock.Lock() if h.cfg.plus { - // TODO(sberman): hardcode this deployment name until we support provisioning data planes - deployment := types.NamespacedName{ - Name: "tmp-nginx-deployment", - Namespace: h.cfg.gatewayPodConfig.Namespace, - } - configApplied, err = h.cfg.nginxUpdater.UpdateUpstreamServers(ctx, deployment, cfg) + configApplied = h.cfg.nginxUpdater.UpdateUpstreamServers(deployment, cfg) } else { - configApplied, err = h.updateNginxConf(ctx, cfg) + configApplied = h.updateNginxConf(deployment, cfg) } + deployment.Lock.Unlock() case state.ClusterStateChange: h.version++ cfg := dataplane.BuildConfiguration(ctx, gr, h.cfg.serviceResolver, h.version) @@ -204,31 +223,53 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log h.setLatestConfiguration(&cfg) - configApplied, err = h.updateNginxConf(ctx, cfg) + deployment.Lock.Lock() + configApplied = h.updateNginxConf(deployment, cfg) + deployment.Lock.Unlock() } - var nginxReloadRes status.NginxReloadResult - switch { - case err != nil: - logger.Error(err, "Failed to update NGINX configuration") - nginxReloadRes.Error = err - case configApplied: - logger.Info("NGINX configuration was successfully updated") - default: - logger.Info("No NGINX instances to configure") + configErr := deployment.GetLatestConfigError() + upstreamErr := deployment.GetLatestUpstreamError() + err := errors.Join(configErr, upstreamErr) + + if configApplied || err != nil { + obj := &status.QueueObject{ + Error: err, + Deployment: deploymentName, + } + h.cfg.statusQueue.Enqueue(obj) } +} - h.latestReloadResult = nginxReloadRes +func (h *eventHandlerImpl) waitForStatusUpdates(ctx context.Context) { + for { + item := h.cfg.statusQueue.Dequeue(ctx) + if item == nil { + return + } - if configApplied || err != nil { - h.updateStatuses(ctx, logger, gr) + var nginxReloadRes graph.NginxReloadResult + switch { + case item.Error != nil: + h.cfg.logger.Error(item.Error, "Failed to update NGINX configuration") + nginxReloadRes.Error = item.Error + default: + h.cfg.logger.Info("NGINX configuration was successfully updated") + } + + // TODO(sberman): once we support multiple Gateways, we'll have to get + // the correct Graph for the Deployment contained in the update message + gr := h.cfg.processor.GetLatestGraph() + gr.LatestReloadResult = nginxReloadRes + + h.updateStatuses(ctx, gr) } } -func (h *eventHandlerImpl) updateStatuses(ctx context.Context, logger logr.Logger, gr *graph.Graph) { +func (h *eventHandlerImpl) updateStatuses(ctx context.Context, gr *graph.Graph) { gwAddresses, err := getGatewayAddresses(ctx, h.cfg.k8sClient, nil, h.cfg.gatewayPodConfig) if err != nil { - logger.Error(err, "Setting GatewayStatusAddress to Pod IP Address") + h.cfg.logger.Error(err, "Setting GatewayStatusAddress to Pod IP Address") } transitionTime := metav1.Now() @@ -241,7 +282,7 @@ func (h *eventHandlerImpl) updateStatuses(ctx context.Context, logger logr.Logge gr.L4Routes, gr.Routes, transitionTime, - h.latestReloadResult, + gr.LatestReloadResult, h.cfg.gatewayCtlrName, ) @@ -273,7 +314,7 @@ func (h *eventHandlerImpl) updateStatuses(ctx context.Context, logger logr.Logge gr.IgnoredGateways, transitionTime, gwAddresses, - h.latestReloadResult, + gr.LatestReloadResult, ) h.cfg.statusUpdater.UpdateGroup(ctx, groupGateways, gwReqs...) } @@ -308,30 +349,19 @@ func (h *eventHandlerImpl) parseAndCaptureEvent(ctx context.Context, logger logr } // updateNginxConf updates nginx conf files and reloads nginx. -func (h *eventHandlerImpl) updateNginxConf(ctx context.Context, conf dataplane.Configuration) (bool, error) { +func (h *eventHandlerImpl) updateNginxConf( + deployment *agent.Deployment, + conf dataplane.Configuration, +) bool { files := h.cfg.generator.Generate(conf) - - // TODO(sberman): hardcode this deployment name until we support provisioning data planes - deployment := types.NamespacedName{ - Name: "tmp-nginx-deployment", - Namespace: h.cfg.gatewayPodConfig.Namespace, - } - - applied, err := h.cfg.nginxUpdater.UpdateConfig(ctx, deployment, files) - if err != nil { - return false, err - } + applied := h.cfg.nginxUpdater.UpdateConfig(deployment, files) // If using NGINX Plus, update upstream servers using the API. - var plusApplied bool - if h.cfg.plus && applied { - plusApplied, err = h.cfg.nginxUpdater.UpdateUpstreamServers(ctx, deployment, conf) - if err != nil { - return false, err - } + if h.cfg.plus { + h.cfg.nginxUpdater.UpdateUpstreamServers(deployment, conf) } - return applied || plusApplied, nil + return applied } // updateControlPlaneAndSetStatus updates the control plane configuration and then sets the status @@ -508,7 +538,7 @@ func (h *eventHandlerImpl) nginxGatewayServiceUpsert(ctx context.Context, logger gr.IgnoredGateways, transitionTime, gwAddresses, - h.latestReloadResult, + gr.LatestReloadResult, ) h.cfg.statusUpdater.UpdateGroup(ctx, groupGateways, gatewayStatuses...) } @@ -534,7 +564,7 @@ func (h *eventHandlerImpl) nginxGatewayServiceDelete( gr.IgnoredGateways, transitionTime, gwAddresses, - h.latestReloadResult, + gr.LatestReloadResult, ) h.cfg.statusUpdater.UpdateGroup(ctx, groupGateways, gatewayStatuses...) } diff --git a/internal/mode/static/handler_test.go b/internal/mode/static/handler_test.go index bc95306256..f7218ab99f 100644 --- a/internal/mode/static/handler_test.go +++ b/internal/mode/static/handler_test.go @@ -27,11 +27,13 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics/collectors" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/agentfakes" + agentgrpcfakes "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/grpcfakes" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/configfakes" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/graph" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/statefakes" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/status" ) var _ = Describe("eventHandler", func() { @@ -43,9 +45,12 @@ var _ = Describe("eventHandler", func() { fakeStatusUpdater *statusfakes.FakeGroupUpdater fakeEventRecorder *record.FakeRecorder fakeK8sClient client.WithWatch + queue *status.Queue namespace = "nginx-gateway" configName = "nginx-gateway-config" zapLogLevelSetter zapLogLevelSetter + ctx context.Context + cancel context.CancelFunc ) const nginxGatewayServiceName = "nginx-gateway" @@ -66,10 +71,13 @@ var _ = Describe("eventHandler", func() { Expect(fakeGenerator.GenerateArgsForCall(0)).Should(Equal(expectedConf)) Expect(fakeNginxUpdater.UpdateConfigCallCount()).Should(Equal(1)) - _, _, files := fakeNginxUpdater.UpdateConfigArgsForCall(0) + _, files := fakeNginxUpdater.UpdateConfigArgsForCall(0) Expect(expectedFiles).To(Equal(files)) - Expect(fakeStatusUpdater.UpdateGroupCallCount()).Should(Equal(2)) + Eventually( + func() int { + return fakeStatusUpdater.UpdateGroupCallCount() + }).Should(Equal(2)) _, name, reqs := fakeStatusUpdater.UpdateGroupArgsForCall(0) Expect(name).To(Equal(groupAllExceptGateways)) Expect(reqs).To(BeEmpty()) @@ -80,20 +88,25 @@ var _ = Describe("eventHandler", func() { } BeforeEach(func() { + ctx, cancel = context.WithCancel(context.Background()) //nolint:fatcontext // ignore for test + fakeProcessor = &statefakes.FakeChangeProcessor{} fakeProcessor.ProcessReturns(state.NoChange, &graph.Graph{}) + fakeProcessor.GetLatestGraphReturns(&graph.Graph{}) fakeGenerator = &configfakes.FakeGenerator{} fakeNginxUpdater = &agentfakes.FakeNginxUpdater{} - fakeNginxUpdater.UpdateConfigReturns(true, nil) + fakeNginxUpdater.UpdateConfigReturns(true) fakeStatusUpdater = &statusfakes.FakeGroupUpdater{} fakeEventRecorder = record.NewFakeRecorder(1) zapLogLevelSetter = newZapLogLevelSetter(zap.NewAtomicLevel()) fakeK8sClient = fake.NewFakeClient() + queue = status.NewQueue() // Needed because handler checks the service from the API on every HandleEventBatch Expect(fakeK8sClient.Create(context.Background(), createService(nginxGatewayServiceName))).To(Succeed()) handler = newEventHandlerImpl(eventHandlerConfig{ + ctx: ctx, k8sClient: fakeK8sClient, processor: fakeProcessor, generator: fakeGenerator, @@ -103,6 +116,8 @@ var _ = Describe("eventHandler", func() { eventRecorder: fakeEventRecorder, deployCtxCollector: &licensingfakes.FakeCollector{}, graphBuiltHealthChecker: newGraphBuiltHealthChecker(), + statusQueue: queue, + nginxDeployments: agent.NewDeploymentStore(&agentgrpcfakes.FakeAgentConnectionsTracker{}), controlConfigNSName: types.NamespacedName{Namespace: namespace, Name: configName}, gatewayPodConfig: config.GatewayPodConfig{ ServiceName: "nginx-gateway", @@ -114,6 +129,10 @@ var _ = Describe("eventHandler", func() { Expect(handler.cfg.graphBuiltHealthChecker.ready).To(BeFalse()) }) + AfterEach(func() { + cancel() + }) + Describe("Process the Gateway API resources events", func() { fakeCfgFiles := []agent.File{ { @@ -214,7 +233,7 @@ var _ = Describe("eventHandler", func() { }, } - fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{ + gr := &graph.Graph{ GatewayClass: &graph.GatewayClass{ Source: gc, Valid: true, @@ -222,7 +241,10 @@ var _ = Describe("eventHandler", func() { IgnoredGatewayClasses: map[types.NamespacedName]*gatewayv1.GatewayClass{ client.ObjectKeyFromObject(ignoredGC): ignoredGC, }, - }) + } + + fakeProcessor.ProcessReturns(state.ClusterStateChange, gr) + fakeProcessor.GetLatestGraphReturns(gr) e := &events.UpsertEvent{ Resource: &gatewayv1.HTTPRoute{}, // any supported is OK @@ -237,7 +259,10 @@ var _ = Describe("eventHandler", func() { handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) - Expect(fakeStatusUpdater.UpdateGroupCallCount()).To(Equal(2)) + Eventually( + func() int { + return fakeStatusUpdater.UpdateGroupCallCount() + }).Should(Equal(2)) _, name, reqs := fakeStatusUpdater.UpdateGroupArgsForCall(0) Expect(name).To(Equal(groupAllExceptGateways)) @@ -442,6 +467,22 @@ var _ = Describe("eventHandler", func() { }) }) + It("should update status when receiving a queue event", func() { + obj := &status.QueueObject{ + Deployment: types.NamespacedName{}, + Error: errors.New("status error"), + } + queue.Enqueue(obj) + + Eventually( + func() int { + return fakeStatusUpdater.UpdateGroupCallCount() + }).Should(Equal(2)) + + gr := handler.cfg.processor.GetLatestGraph() + Expect(gr.LatestReloadResult.Error.Error()).To(Equal("status error")) + }) + It("should set the health checker status properly", func() { e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}} batch := []interface{}{e} @@ -534,6 +575,17 @@ var _ = Describe("getDeploymentContext", func() { }) When("nginx plus is true", func() { + var ctx context.Context + var cancel context.CancelFunc + + BeforeEach(func() { + ctx, cancel = context.WithCancel(context.Background()) //nolint:fatcontext + }) + + AfterEach(func() { + cancel() + }) + It("returns deployment context", func() { expDepCtx := dataplane.DeploymentContext{ Integration: "ngf", @@ -543,7 +595,9 @@ var _ = Describe("getDeploymentContext", func() { } handler := newEventHandlerImpl(eventHandlerConfig{ - plus: true, + ctx: ctx, + statusQueue: status.NewQueue(), + plus: true, deployCtxCollector: &licensingfakes.FakeCollector{ CollectStub: func(_ context.Context) (dataplane.DeploymentContext, error) { return expDepCtx, nil @@ -559,7 +613,9 @@ var _ = Describe("getDeploymentContext", func() { expErr := errors.New("collect error") handler := newEventHandlerImpl(eventHandlerConfig{ - plus: true, + ctx: ctx, + statusQueue: status.NewQueue(), + plus: true, deployCtxCollector: &licensingfakes.FakeCollector{ CollectStub: func(_ context.Context) (dataplane.DeploymentContext, error) { return dataplane.DeploymentContext{}, expErr diff --git a/internal/mode/static/manager.go b/internal/mode/static/manager.go index 70ba17405d..97901b9259 100644 --- a/internal/mode/static/manager.go +++ b/internal/mode/static/manager.go @@ -44,7 +44,7 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/framework/helpers" "github.com/nginxinc/nginx-gateway-fabric/internal/framework/kinds" "github.com/nginxinc/nginx-gateway-fabric/internal/framework/runnables" - "github.com/nginxinc/nginx-gateway-fabric/internal/framework/status" + frameworkStatus "github.com/nginxinc/nginx-gateway-fabric/internal/framework/status" ngftypes "github.com/nginxinc/nginx-gateway-fabric/internal/framework/types" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/config" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/licensing" @@ -61,6 +61,7 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/graph" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/resolver" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/validation" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/status" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry" ) @@ -158,7 +159,7 @@ func StartManager(cfg config.Config) error { handlerCollector, ok := handlerCollector.(prometheus.Collector) if !ok { - return fmt.Errorf("handlerCollector is not a prometheus.Collector: %w", status.ErrFailedAssert) + return fmt.Errorf("handlerCollector is not a prometheus.Collector: %w", frameworkStatus.ErrFailedAssert) } metrics.Registry.MustRegister( @@ -167,19 +168,25 @@ func StartManager(cfg config.Config) error { ) } - statusUpdater := status.NewUpdater( + statusUpdater := frameworkStatus.NewUpdater( mgr.GetClient(), cfg.Logger.WithName("statusUpdater"), ) - groupStatusUpdater := status.NewLeaderAwareGroupUpdater(statusUpdater) + groupStatusUpdater := frameworkStatus.NewLeaderAwareGroupUpdater(statusUpdater) deployCtxCollector := licensing.NewDeploymentContextCollector(licensing.DeploymentContextCollectorConfig{ K8sClientReader: mgr.GetAPIReader(), PodUID: cfg.GatewayPodConfig.UID, Logger: cfg.Logger.WithName("deployCtxCollector"), }) - nginxUpdater := agent.NewNginxUpdater(cfg.Logger.WithName("nginxUpdater"), mgr.GetAPIReader(), cfg.Plus) + statusQueue := status.NewQueue() + nginxUpdater := agent.NewNginxUpdater( + cfg.Logger.WithName("nginxUpdater"), + mgr.GetAPIReader(), + statusQueue, + cfg.Plus, + ) grpcServer := agentgrpc.NewServer( cfg.Logger.WithName("agentGRPCServer"), @@ -194,8 +201,8 @@ func StartManager(cfg config.Config) error { return fmt.Errorf("cannot register grpc server: %w", err) } - // TODO(sberman): event handler loop should wait on a channel until the grpc server has started eventHandler := newEventHandlerImpl(eventHandlerConfig{ + ctx: ctx, nginxUpdater: nginxUpdater, metricsCollector: handlerCollector, statusUpdater: groupStatusUpdater, @@ -208,6 +215,7 @@ func StartManager(cfg config.Config) error { ), k8sClient: mgr.GetClient(), k8sReader: mgr.GetAPIReader(), + logger: cfg.Logger.WithName("eventHandler"), logLevelSetter: logLevelSetter, eventRecorder: recorder, deployCtxCollector: deployCtxCollector, @@ -217,6 +225,8 @@ func StartManager(cfg config.Config) error { gatewayCtlrName: cfg.GatewayCtlrName, updateGatewayClassStatus: cfg.UpdateGatewayClassStatus, plus: cfg.Plus, + statusQueue: statusQueue, + nginxDeployments: nginxUpdater.NginxDeployments, }) objects, objectLists := prepareFirstEventBatchPreparerArgs(cfg) diff --git a/internal/mode/static/nginx/agent/agent.go b/internal/mode/static/nginx/agent/agent.go index 7e95bd10f9..73a7c42c2e 100644 --- a/internal/mode/static/nginx/agent/agent.go +++ b/internal/mode/static/nginx/agent/agent.go @@ -1,20 +1,19 @@ package agent import ( - "context" "errors" "fmt" "github.com/go-logr/logr" pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" "google.golang.org/protobuf/types/known/structpb" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/broadcast" agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/resolver" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/status" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate @@ -23,23 +22,15 @@ import ( // NginxUpdater is an interface for updating NGINX using the NGINX agent. type NginxUpdater interface { - UpdateConfig( - ctx context.Context, - deploymentNsName types.NamespacedName, - files []File, - ) (bool, error) - UpdateUpstreamServers( - ctx context.Context, - deploymentNsName types.NamespacedName, - conf dataplane.Configuration, - ) (bool, error) + UpdateConfig(deployment *Deployment, files []File) bool + UpdateUpstreamServers(deployment *Deployment, conf dataplane.Configuration) bool } // NginxUpdaterImpl implements the NginxUpdater interface. type NginxUpdaterImpl struct { CommandService *commandService FileService *fileService - nginxDeployments *DeploymentStore + NginxDeployments *DeploymentStore logger logr.Logger plus bool } @@ -48,72 +39,76 @@ type NginxUpdaterImpl struct { func NewNginxUpdater( logger logr.Logger, reader client.Reader, + statusQueue *status.Queue, plus bool, ) *NginxUpdaterImpl { connTracker := agentgrpc.NewConnectionsTracker() nginxDeployments := NewDeploymentStore(connTracker) - commandService := newCommandService(logger.WithName("commandService"), reader, nginxDeployments, connTracker) + commandService := newCommandService( + logger.WithName("commandService"), + reader, + nginxDeployments, + connTracker, + statusQueue, + ) fileService := newFileService(logger.WithName("fileService"), nginxDeployments, connTracker) return &NginxUpdaterImpl{ logger: logger, plus: plus, - nginxDeployments: nginxDeployments, + NginxDeployments: nginxDeployments, CommandService: commandService, FileService: fileService, } } // UpdateConfig sends the nginx configuration to the agent. -// Returns whether configuration was applied or not, and any error that occurred. -func (n *NginxUpdaterImpl) UpdateConfig(ctx context.Context, nsName types.NamespacedName, files []File) (bool, error) { +// Returns whether configuration was applied or not. +func (n *NginxUpdaterImpl) UpdateConfig( + deployment *Deployment, + files []File, +) bool { n.logger.Info("Sending nginx configuration to agent") - deployment := n.nginxDeployments.GetOrStore(ctx, nsName) - if deployment == nil { - return false, fmt.Errorf("failed to register nginx deployment %q", nsName.Name) - } - - // TODO(sberman): wait to send config until Deployment pods have all connected. - // If an nginx Pod creation event triggered this update, then we should include that - // pod name in the call to this function. Then we can wait for the DeploymentStore - // to show that this Pod has connected, and proceed with sending the config. + // reset the latest error to nil now that we're applying new config + deployment.SetLatestConfigError(nil) msg := deployment.SetFiles(files) + applied := deployment.GetBroadcaster().Send(msg) - applied, err := deployment.GetBroadcaster().Send(msg) - if err != nil { - return false, fmt.Errorf("could not set nginx files: %w", err) + latestStatus := deployment.GetConfigurationStatus() + if latestStatus != nil { + deployment.SetLatestConfigError(latestStatus) } - return applied, nil + return applied } // UpdateUpstreamServers sends an APIRequest to the agent to update upstream servers using the NGINX Plus API. // Only applicable when using NGINX Plus. -// Returns whether configuration was applied or not, and any error that occurred. +// Returns whether configuration was applied or not. func (n *NginxUpdaterImpl) UpdateUpstreamServers( - ctx context.Context, - nsName types.NamespacedName, + deployment *Deployment, conf dataplane.Configuration, -) (bool, error) { +) bool { if !n.plus { - return false, nil + return false } - n.logger.Info("Updating upstream servers using NGINX Plus API") - - deployment := n.nginxDeployments.GetOrStore(ctx, nsName) - if deployment == nil { - return false, fmt.Errorf("failed to register nginx deployment %q", nsName.Name) - } broadcaster := deployment.GetBroadcaster() + // reset the latest error to nil now that we're applying new config + deployment.SetLatestUpstreamError(nil) + var updateErr error var applied bool actions := make([]*pb.NGINXPlusAction, 0, len(conf.Upstreams)) for _, upstream := range conf.Upstreams { + if len(upstream.Endpoints) == 0 { + continue + } + action := &pb.NGINXPlusAction{ Action: &pb.NGINXPlusAction_UpdateHttpUpstreamServers{ UpdateHttpUpstreamServers: buildUpstreamServers(upstream), @@ -126,17 +121,25 @@ func (n *NginxUpdaterImpl) UpdateUpstreamServers( NGINXPlusAction: action, } - var err error - applied, err = broadcaster.Send(msg) - if err != nil { + applied = broadcaster.Send(msg) + if err := deployment.GetConfigurationStatus(); err != nil { updateErr = errors.Join(updateErr, fmt.Errorf( "couldn't update upstream %q via the API: %w", upstream.Name, err)) } } + + if updateErr != nil { + deployment.SetLatestUpstreamError(updateErr) + } + + if applied { + n.logger.Info("Updated upstream servers using NGINX Plus API") + } + // Store the most recent actions on the deployment so any new subscribers can apply them when first connecting. deployment.SetNGINXPlusActions(actions) - return applied, updateErr + return applied } func buildUpstreamServers(upstream dataplane.Upstream) *pb.UpdateHTTPUpstreamServers { diff --git a/internal/mode/static/nginx/agent/agentfakes/fake_nginx_updater.go b/internal/mode/static/nginx/agent/agentfakes/fake_nginx_updater.go index ee228a3610..606fa67532 100644 --- a/internal/mode/static/nginx/agent/agentfakes/fake_nginx_updater.go +++ b/internal/mode/static/nginx/agent/agentfakes/fake_nginx_updater.go @@ -2,73 +2,64 @@ package agentfakes import ( - "context" "sync" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane" - "k8s.io/apimachinery/pkg/types" ) type FakeNginxUpdater struct { - UpdateConfigStub func(context.Context, types.NamespacedName, []agent.File) (bool, error) + UpdateConfigStub func(*agent.Deployment, []agent.File) bool updateConfigMutex sync.RWMutex updateConfigArgsForCall []struct { - arg1 context.Context - arg2 types.NamespacedName - arg3 []agent.File + arg1 *agent.Deployment + arg2 []agent.File } updateConfigReturns struct { result1 bool - result2 error } updateConfigReturnsOnCall map[int]struct { result1 bool - result2 error } - UpdateUpstreamServersStub func(context.Context, types.NamespacedName, dataplane.Configuration) (bool, error) + UpdateUpstreamServersStub func(*agent.Deployment, dataplane.Configuration) bool updateUpstreamServersMutex sync.RWMutex updateUpstreamServersArgsForCall []struct { - arg1 context.Context - arg2 types.NamespacedName - arg3 dataplane.Configuration + arg1 *agent.Deployment + arg2 dataplane.Configuration } updateUpstreamServersReturns struct { result1 bool - result2 error } updateUpstreamServersReturnsOnCall map[int]struct { result1 bool - result2 error } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } -func (fake *FakeNginxUpdater) UpdateConfig(arg1 context.Context, arg2 types.NamespacedName, arg3 []agent.File) (bool, error) { - var arg3Copy []agent.File - if arg3 != nil { - arg3Copy = make([]agent.File, len(arg3)) - copy(arg3Copy, arg3) +func (fake *FakeNginxUpdater) UpdateConfig(arg1 *agent.Deployment, arg2 []agent.File) bool { + var arg2Copy []agent.File + if arg2 != nil { + arg2Copy = make([]agent.File, len(arg2)) + copy(arg2Copy, arg2) } fake.updateConfigMutex.Lock() ret, specificReturn := fake.updateConfigReturnsOnCall[len(fake.updateConfigArgsForCall)] fake.updateConfigArgsForCall = append(fake.updateConfigArgsForCall, struct { - arg1 context.Context - arg2 types.NamespacedName - arg3 []agent.File - }{arg1, arg2, arg3Copy}) + arg1 *agent.Deployment + arg2 []agent.File + }{arg1, arg2Copy}) stub := fake.UpdateConfigStub fakeReturns := fake.updateConfigReturns - fake.recordInvocation("UpdateConfig", []interface{}{arg1, arg2, arg3Copy}) + fake.recordInvocation("UpdateConfig", []interface{}{arg1, arg2Copy}) fake.updateConfigMutex.Unlock() if stub != nil { - return stub(arg1, arg2, arg3) + return stub(arg1, arg2) } if specificReturn { - return ret.result1, ret.result2 + return ret.result1 } - return fakeReturns.result1, fakeReturns.result2 + return fakeReturns.result1 } func (fake *FakeNginxUpdater) UpdateConfigCallCount() int { @@ -77,64 +68,60 @@ func (fake *FakeNginxUpdater) UpdateConfigCallCount() int { return len(fake.updateConfigArgsForCall) } -func (fake *FakeNginxUpdater) UpdateConfigCalls(stub func(context.Context, types.NamespacedName, []agent.File) (bool, error)) { +func (fake *FakeNginxUpdater) UpdateConfigCalls(stub func(*agent.Deployment, []agent.File) bool) { fake.updateConfigMutex.Lock() defer fake.updateConfigMutex.Unlock() fake.UpdateConfigStub = stub } -func (fake *FakeNginxUpdater) UpdateConfigArgsForCall(i int) (context.Context, types.NamespacedName, []agent.File) { +func (fake *FakeNginxUpdater) UpdateConfigArgsForCall(i int) (*agent.Deployment, []agent.File) { fake.updateConfigMutex.RLock() defer fake.updateConfigMutex.RUnlock() argsForCall := fake.updateConfigArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 + return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeNginxUpdater) UpdateConfigReturns(result1 bool, result2 error) { +func (fake *FakeNginxUpdater) UpdateConfigReturns(result1 bool) { fake.updateConfigMutex.Lock() defer fake.updateConfigMutex.Unlock() fake.UpdateConfigStub = nil fake.updateConfigReturns = struct { result1 bool - result2 error - }{result1, result2} + }{result1} } -func (fake *FakeNginxUpdater) UpdateConfigReturnsOnCall(i int, result1 bool, result2 error) { +func (fake *FakeNginxUpdater) UpdateConfigReturnsOnCall(i int, result1 bool) { fake.updateConfigMutex.Lock() defer fake.updateConfigMutex.Unlock() fake.UpdateConfigStub = nil if fake.updateConfigReturnsOnCall == nil { fake.updateConfigReturnsOnCall = make(map[int]struct { result1 bool - result2 error }) } fake.updateConfigReturnsOnCall[i] = struct { result1 bool - result2 error - }{result1, result2} + }{result1} } -func (fake *FakeNginxUpdater) UpdateUpstreamServers(arg1 context.Context, arg2 types.NamespacedName, arg3 dataplane.Configuration) (bool, error) { +func (fake *FakeNginxUpdater) UpdateUpstreamServers(arg1 *agent.Deployment, arg2 dataplane.Configuration) bool { fake.updateUpstreamServersMutex.Lock() ret, specificReturn := fake.updateUpstreamServersReturnsOnCall[len(fake.updateUpstreamServersArgsForCall)] fake.updateUpstreamServersArgsForCall = append(fake.updateUpstreamServersArgsForCall, struct { - arg1 context.Context - arg2 types.NamespacedName - arg3 dataplane.Configuration - }{arg1, arg2, arg3}) + arg1 *agent.Deployment + arg2 dataplane.Configuration + }{arg1, arg2}) stub := fake.UpdateUpstreamServersStub fakeReturns := fake.updateUpstreamServersReturns - fake.recordInvocation("UpdateUpstreamServers", []interface{}{arg1, arg2, arg3}) + fake.recordInvocation("UpdateUpstreamServers", []interface{}{arg1, arg2}) fake.updateUpstreamServersMutex.Unlock() if stub != nil { - return stub(arg1, arg2, arg3) + return stub(arg1, arg2) } if specificReturn { - return ret.result1, ret.result2 + return ret.result1 } - return fakeReturns.result1, fakeReturns.result2 + return fakeReturns.result1 } func (fake *FakeNginxUpdater) UpdateUpstreamServersCallCount() int { @@ -143,43 +130,40 @@ func (fake *FakeNginxUpdater) UpdateUpstreamServersCallCount() int { return len(fake.updateUpstreamServersArgsForCall) } -func (fake *FakeNginxUpdater) UpdateUpstreamServersCalls(stub func(context.Context, types.NamespacedName, dataplane.Configuration) (bool, error)) { +func (fake *FakeNginxUpdater) UpdateUpstreamServersCalls(stub func(*agent.Deployment, dataplane.Configuration) bool) { fake.updateUpstreamServersMutex.Lock() defer fake.updateUpstreamServersMutex.Unlock() fake.UpdateUpstreamServersStub = stub } -func (fake *FakeNginxUpdater) UpdateUpstreamServersArgsForCall(i int) (context.Context, types.NamespacedName, dataplane.Configuration) { +func (fake *FakeNginxUpdater) UpdateUpstreamServersArgsForCall(i int) (*agent.Deployment, dataplane.Configuration) { fake.updateUpstreamServersMutex.RLock() defer fake.updateUpstreamServersMutex.RUnlock() argsForCall := fake.updateUpstreamServersArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 + return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeNginxUpdater) UpdateUpstreamServersReturns(result1 bool, result2 error) { +func (fake *FakeNginxUpdater) UpdateUpstreamServersReturns(result1 bool) { fake.updateUpstreamServersMutex.Lock() defer fake.updateUpstreamServersMutex.Unlock() fake.UpdateUpstreamServersStub = nil fake.updateUpstreamServersReturns = struct { result1 bool - result2 error - }{result1, result2} + }{result1} } -func (fake *FakeNginxUpdater) UpdateUpstreamServersReturnsOnCall(i int, result1 bool, result2 error) { +func (fake *FakeNginxUpdater) UpdateUpstreamServersReturnsOnCall(i int, result1 bool) { fake.updateUpstreamServersMutex.Lock() defer fake.updateUpstreamServersMutex.Unlock() fake.UpdateUpstreamServersStub = nil if fake.updateUpstreamServersReturnsOnCall == nil { fake.updateUpstreamServersReturnsOnCall = make(map[int]struct { result1 bool - result2 error }) } fake.updateUpstreamServersReturnsOnCall[i] = struct { result1 bool - result2 error - }{result1, result2} + }{result1} } func (fake *FakeNginxUpdater) Invocations() map[string][][]interface{} { diff --git a/internal/mode/static/nginx/agent/broadcast/broadcast.go b/internal/mode/static/nginx/agent/broadcast/broadcast.go index ded3e005e0..780f843238 100644 --- a/internal/mode/static/nginx/agent/broadcast/broadcast.go +++ b/internal/mode/static/nginx/agent/broadcast/broadcast.go @@ -2,7 +2,6 @@ package broadcast import ( "context" - "errors" "sync" pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" @@ -16,19 +15,19 @@ import ( // Broadcaster defines an interface for consumers to subscribe to File updates. type Broadcaster interface { Subscribe() SubscriberChannels - Send(NginxAgentMessage) (bool, error) + Send(NginxAgentMessage) bool CancelSubscription(string) } type SubscriberChannels struct { ListenCh <-chan NginxAgentMessage - ResponseCh chan<- error + ResponseCh chan<- struct{} ID string } type storedChannels struct { listenCh chan<- NginxAgentMessage - responseCh <-chan error + responseCh <-chan struct{} id string } @@ -41,7 +40,7 @@ type DeploymentBroadcaster struct { subCh chan storedChannels unsubCh chan string listeners map[string]storedChannels - errorCh chan error + doneCh chan struct{} } // NewDeploymentBroadcaster returns a new instance of a DeploymentBroadcaster. @@ -51,7 +50,7 @@ func NewDeploymentBroadcaster(ctx context.Context) *DeploymentBroadcaster { publishCh: make(chan NginxAgentMessage), subCh: make(chan storedChannels), unsubCh: make(chan string), - errorCh: make(chan error), + doneCh: make(chan struct{}), } go broadcaster.run(ctx) @@ -62,7 +61,7 @@ func NewDeploymentBroadcaster(ctx context.Context) *DeploymentBroadcaster { // to listen on for messages, as well as a channel to respond on. func (b *DeploymentBroadcaster) Subscribe() SubscriberChannels { listenCh := make(chan NginxAgentMessage) - responseCh := make(chan error) + responseCh := make(chan struct{}) id := string(uuid.NewUUID()) subscriberChans := SubscriberChannels{ @@ -81,12 +80,12 @@ func (b *DeploymentBroadcaster) Subscribe() SubscriberChannels { } // Send the message to all listeners. Wait for all listeners to respond. -// Returns true if there were listeners that received the message, and returns any -// responses (nil for success, error for failure). -func (b *DeploymentBroadcaster) Send(message NginxAgentMessage) (bool, error) { +// Returns true if there were listeners that received the message. +func (b *DeploymentBroadcaster) Send(message NginxAgentMessage) bool { b.publishCh <- message + <-b.doneCh - return len(b.listeners) > 0, <-b.errorCh + return len(b.listeners) > 0 } // CancelSubscription removes a Subscriber from the channel list. @@ -112,7 +111,6 @@ func (b *DeploymentBroadcaster) run(ctx context.Context) { var wg sync.WaitGroup wg.Add(len(b.listeners)) - responses := make(chan error, len(b.listeners)) for _, channels := range b.listeners { go func() { defer wg.Done() @@ -120,18 +118,12 @@ func (b *DeploymentBroadcaster) run(ctx context.Context) { // send message and wait for it to be read channels.listenCh <- msg // wait for response - res := <-channels.responseCh - // add response to the list of responses - responses <- res + <-channels.responseCh }() } wg.Wait() - var err error - for range len(b.listeners) { - err = errors.Join(err, <-responses) - } - b.errorCh <- err + b.doneCh <- struct{}{} } } } diff --git a/internal/mode/static/nginx/agent/broadcast/broadcastfakes/fake_broadcaster.go b/internal/mode/static/nginx/agent/broadcast/broadcastfakes/fake_broadcaster.go index f9fc63aa0a..95421dc5d6 100644 --- a/internal/mode/static/nginx/agent/broadcast/broadcastfakes/fake_broadcaster.go +++ b/internal/mode/static/nginx/agent/broadcast/broadcastfakes/fake_broadcaster.go @@ -13,18 +13,16 @@ type FakeBroadcaster struct { cancelSubscriptionArgsForCall []struct { arg1 string } - SendStub func(broadcast.NginxAgentMessage) (bool, error) + SendStub func(broadcast.NginxAgentMessage) bool sendMutex sync.RWMutex sendArgsForCall []struct { arg1 broadcast.NginxAgentMessage } sendReturns struct { result1 bool - result2 error } sendReturnsOnCall map[int]struct { result1 bool - result2 error } SubscribeStub func() broadcast.SubscriberChannels subscribeMutex sync.RWMutex @@ -72,7 +70,7 @@ func (fake *FakeBroadcaster) CancelSubscriptionArgsForCall(i int) string { return argsForCall.arg1 } -func (fake *FakeBroadcaster) Send(arg1 broadcast.NginxAgentMessage) (bool, error) { +func (fake *FakeBroadcaster) Send(arg1 broadcast.NginxAgentMessage) bool { fake.sendMutex.Lock() ret, specificReturn := fake.sendReturnsOnCall[len(fake.sendArgsForCall)] fake.sendArgsForCall = append(fake.sendArgsForCall, struct { @@ -86,9 +84,9 @@ func (fake *FakeBroadcaster) Send(arg1 broadcast.NginxAgentMessage) (bool, error return stub(arg1) } if specificReturn { - return ret.result1, ret.result2 + return ret.result1 } - return fakeReturns.result1, fakeReturns.result2 + return fakeReturns.result1 } func (fake *FakeBroadcaster) SendCallCount() int { @@ -97,7 +95,7 @@ func (fake *FakeBroadcaster) SendCallCount() int { return len(fake.sendArgsForCall) } -func (fake *FakeBroadcaster) SendCalls(stub func(broadcast.NginxAgentMessage) (bool, error)) { +func (fake *FakeBroadcaster) SendCalls(stub func(broadcast.NginxAgentMessage) bool) { fake.sendMutex.Lock() defer fake.sendMutex.Unlock() fake.SendStub = stub @@ -110,30 +108,27 @@ func (fake *FakeBroadcaster) SendArgsForCall(i int) broadcast.NginxAgentMessage return argsForCall.arg1 } -func (fake *FakeBroadcaster) SendReturns(result1 bool, result2 error) { +func (fake *FakeBroadcaster) SendReturns(result1 bool) { fake.sendMutex.Lock() defer fake.sendMutex.Unlock() fake.SendStub = nil fake.sendReturns = struct { result1 bool - result2 error - }{result1, result2} + }{result1} } -func (fake *FakeBroadcaster) SendReturnsOnCall(i int, result1 bool, result2 error) { +func (fake *FakeBroadcaster) SendReturnsOnCall(i int, result1 bool) { fake.sendMutex.Lock() defer fake.sendMutex.Unlock() fake.SendStub = nil if fake.sendReturnsOnCall == nil { fake.sendReturnsOnCall = make(map[int]struct { result1 bool - result2 error }) } fake.sendReturnsOnCall[i] = struct { result1 bool - result2 error - }{result1, result2} + }{result1} } func (fake *FakeBroadcaster) Subscribe() broadcast.SubscriberChannels { diff --git a/internal/mode/static/nginx/agent/command.go b/internal/mode/static/nginx/agent/command.go index b9e051c317..328bf0266e 100644 --- a/internal/mode/static/nginx/agent/command.go +++ b/internal/mode/static/nginx/agent/command.go @@ -4,14 +4,15 @@ import ( "context" "errors" "fmt" - "strings" + "io" "time" "github.com/go-logr/logr" + "github.com/google/uuid" pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + grpcStatus "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -22,14 +23,16 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/broadcast" agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc" grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context" - "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/meta" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/messenger" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/status" ) // commandService handles the connection and subscription to the data plane agent. type commandService struct { pb.CommandServiceServer nginxDeployments *DeploymentStore - connTracker *agentgrpc.ConnectionsTracker + connTracker agentgrpc.AgentConnectionsTracker + statusQueue *status.Queue k8sReader client.Reader // TODO(sberman): all logs are at Info level right now. Adjust appropriately. logger logr.Logger @@ -39,13 +42,15 @@ func newCommandService( logger logr.Logger, reader client.Reader, depStore *DeploymentStore, - connTracker *agentgrpc.ConnectionsTracker, + connTracker agentgrpc.AgentConnectionsTracker, + statusQueue *status.Queue, ) *commandService { return &commandService{ k8sReader: reader, logger: logger, connTracker: connTracker, nginxDeployments: depStore, + statusQueue: statusQueue, } } @@ -82,7 +87,7 @@ func (cs *commandService) CreateConnection( Error: err.Error(), }, } - return response, status.Errorf(codes.Internal, "error getting pod owner %s", err.Error()) + return response, grpcStatus.Errorf(codes.Internal, "error getting pod owner %s", err.Error()) } conn := agentgrpc.Connection{ @@ -108,8 +113,6 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error return agentgrpc.ErrStatusInvalidConnection } - cs.logger.Info(fmt.Sprintf("Received subscribe request from %q", gi.IPAddress)) - // wait for the agent to report itself and nginx conn, deployment, err := cs.waitForConnection(ctx, gi) if err != nil { @@ -117,39 +120,30 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error return err } - // apply current config before starting listen loop - deployment.RLock() - fileOverviews, configVersion := deployment.GetFileOverviews() - if err = in.Send(buildRequest(fileOverviews, conn.InstanceID, configVersion)); err != nil { - fmt.Printf("ERROR applying initial config: %v\n", err) - // TODO(sberman): how do we write this status? - } + cs.logger.Info(fmt.Sprintf("Successfully connected to nginx agent %s", conn.PodName)) - for _, action := range deployment.GetNGINXPlusActions() { - if err := in.Send(buildPlusAPIRequest(action, conn.InstanceID)); err != nil { - fmt.Printf("ERROR applying initial API config: %v\n", err) - // TODO(sberman): how do we write this status? - } - } - deployment.RUnlock() + msgr := messenger.New(in) + go msgr.Run(ctx) - if err == nil { - cs.logger.Info(fmt.Sprintf("Successfully configured nginx for new subscription %q", conn.PodName)) + // apply current config before starting event loop + deployment.Lock.RLock() + if err := cs.setInitialConfig(ctx, deployment, conn, msgr); err != nil { + deployment.Lock.RUnlock() + + return err } // subscribe to the deployment broadcaster to get file updates broadcaster := deployment.GetBroadcaster() channels := broadcaster.Subscribe() defer broadcaster.CancelSubscription(channels.ID) - - go cs.listenForDataPlaneResponse(ctx, in, channels.ResponseCh) + deployment.Lock.RUnlock() for { select { case <-ctx.Done(): - return ctx.Err() + return grpcStatus.Error(codes.Canceled, context.Cause(ctx).Error()) case msg := <-channels.ListenCh: - deployment.RLock() var req *pb.ManagementPlaneRequest switch msg.Type { case broadcast.ConfigApplyRequest: @@ -160,14 +154,28 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error panic(fmt.Sprintf("unknown request type %d", msg.Type)) } - if err := in.Send(req); err != nil { - deployment.RUnlock() + if err := msgr.Send(ctx, req); err != nil { cs.logger.Error(err, "error sending request to agent") - channels.ResponseCh <- err + deployment.SetPodErrorStatus(conn.PodName, err) + channels.ResponseCh <- struct{}{} - return err + return grpcStatus.Error(codes.Internal, err.Error()) + } + case err = <-msgr.Errors(): + cs.logger.Error(err, "connection error") + if errors.Is(err, io.EOF) { + return grpcStatus.Error(codes.Aborted, err.Error()) } - deployment.RUnlock() + return grpcStatus.Error(codes.Internal, err.Error()) + case msg := <-msgr.Messages(): + res := msg.GetCommandResponse() + if res.GetStatus() != pb.CommandResponse_COMMAND_STATUS_OK { + err := fmt.Errorf("bad response from agent: msg: %s; error: %s", res.GetMessage(), res.GetError()) + deployment.SetPodErrorStatus(conn.PodName, err) + } else { + deployment.SetPodErrorStatus(conn.PodName, nil) + } + channels.ResponseCh <- struct{}{} } } } @@ -197,7 +205,7 @@ func (cs *commandService) waitForConnection( case <-ticker.C: if conn, ok := cs.connTracker.ConnectionIsReady(gi.IPAddress); ok { // connection has been established, now ensure that the deployment exists in the store - if deployment, ok := cs.nginxDeployments.Get(conn.Parent); ok { + if deployment := cs.nginxDeployments.Get(conn.Parent); deployment != nil { return &conn, deployment, nil } err = deploymentStoreErr @@ -208,38 +216,109 @@ func (cs *commandService) waitForConnection( } } -func (cs *commandService) listenForDataPlaneResponse( +// setInitialConfig gets the initial configuration for this connection and applies it. +// The caller MUST lock the deployment before calling this. +func (cs *commandService) setInitialConfig( + ctx context.Context, + deployment *Deployment, + conn *agentgrpc.Connection, + msgr *messenger.Messenger, +) error { + fileOverviews, configVersion := deployment.GetFileOverviews() + if err := msgr.Send(ctx, buildRequest(fileOverviews, conn.InstanceID, configVersion)); err != nil { + cs.logAndSendErrorStatus(deployment, conn, err) + + return grpcStatus.Error(codes.Internal, err.Error()) + } + + applyErr, connErr := cs.waitForInitialConfigApply(ctx, msgr) + if connErr != nil { + cs.logger.Error(connErr, "error setting initial configuration") + + return connErr + } + + time.Sleep(1 * time.Second) + + var upstreamErr error + for _, action := range deployment.GetNGINXPlusActions() { + if err := msgr.Send(ctx, buildPlusAPIRequest(action, conn.InstanceID)); err != nil { + cs.logAndSendErrorStatus(deployment, conn, err) + + return grpcStatus.Error(codes.Internal, err.Error()) + } + + upstreamApplyErr, connErr := cs.waitForInitialConfigApply(ctx, msgr) + if connErr != nil { + cs.logger.Error(connErr, "error setting initial configuration") + + return connErr + } + + upstreamErr = errors.Join(upstreamErr, upstreamApplyErr) + } + // send the status (error or nil) to the status queue + cs.logAndSendErrorStatus(deployment, conn, errors.Join(applyErr, upstreamErr)) + + return nil +} + +// waitForInitialConfigApply waits for the nginx agent to respond after a Subscriber attempts +// to apply its initial config. +// Two errors are returned +// - applyErr is an error applying the configuration +// - connectionErr is an error with the connection or sending the configuration +// The caller treats a connectionErr as unrecoverable, while the applyErr is used +// to set the status on the Gateway resources. +func (cs *commandService) waitForInitialConfigApply( ctx context.Context, - in pb.CommandService_SubscribeServer, - responseCh chan<- error, -) { + msgr *messenger.Messenger, +) (applyErr error, connectionErr error) { for { select { case <-ctx.Done(): - return - default: - dataPlaneResponse, err := in.Recv() - if err != nil && !strings.Contains(err.Error(), "context canceled") { - cs.logger.Error(err, "failed to receive data plane response") - return + return nil, grpcStatus.Error(codes.Canceled, context.Cause(ctx).Error()) + case err := <-msgr.Errors(): + if errors.Is(err, io.EOF) { + return nil, grpcStatus.Error(codes.Aborted, err.Error()) } - - res := dataPlaneResponse.GetCommandResponse() + return nil, grpcStatus.Error(codes.Internal, err.Error()) + case msg := <-msgr.Messages(): + res := msg.GetCommandResponse() if res.GetStatus() != pb.CommandResponse_COMMAND_STATUS_OK { - err := fmt.Errorf("bad response from agent: %s; error: %s", res.GetMessage(), res.GetError()) - responseCh <- err - } else { - responseCh <- nil + applyErr := fmt.Errorf("bad response from agent: msg: %s; error: %s", res.GetMessage(), res.GetError()) + return applyErr, nil } + + return applyErr, connectionErr } } } +// logAndSendErrorStatus logs an error, sets it on the Deployment object for that Pod, and then sends +// the full Deployment error status to the status queue. This ensures that any other Pod errors that already +// exist on the Deployment are not overwritten. +// If the error is nil, then we just enqueue the nil value and don't log it, which indicates success. +func (cs *commandService) logAndSendErrorStatus(deployment *Deployment, conn *agentgrpc.Connection, err error) { + if err != nil { + cs.logger.Error(err, "error sending request to agent") + } else { + cs.logger.Info(fmt.Sprintf("Successfully configured nginx for new subscription %q", conn.PodName)) + } + deployment.SetPodErrorStatus(conn.PodName, err) + + queueObj := &status.QueueObject{ + Deployment: conn.Parent, + Error: deployment.GetConfigurationStatus(), + } + cs.statusQueue.Enqueue(queueObj) +} + func buildRequest(fileOverviews []*pb.File, instanceID, version string) *pb.ManagementPlaneRequest { return &pb.ManagementPlaneRequest{ MessageMeta: &pb.MessageMeta{ - MessageId: meta.GenerateMessageID(), - CorrelationId: meta.GenerateMessageID(), + MessageId: uuid.NewString(), + CorrelationId: uuid.NewString(), Timestamp: timestamppb.Now(), }, Request: &pb.ManagementPlaneRequest_ConfigApplyRequest{ @@ -259,8 +338,8 @@ func buildRequest(fileOverviews []*pb.File, instanceID, version string) *pb.Mana func buildPlusAPIRequest(action *pb.NGINXPlusAction, instanceID string) *pb.ManagementPlaneRequest { return &pb.ManagementPlaneRequest{ MessageMeta: &pb.MessageMeta{ - MessageId: meta.GenerateMessageID(), - CorrelationId: meta.GenerateMessageID(), + MessageId: uuid.NewString(), + CorrelationId: uuid.NewString(), Timestamp: timestamppb.Now(), }, Request: &pb.ManagementPlaneRequest_ActionRequest{ @@ -346,7 +425,7 @@ func (cs *commandService) UpdateDataPlaneStatus( instanceID := getNginxInstanceID(req.GetResource().GetInstances()) if instanceID == "" { - return nil, status.Errorf(codes.InvalidArgument, "request does not contain nginx instanceID") + return nil, grpcStatus.Errorf(codes.InvalidArgument, "request does not contain nginx instanceID") } cs.connTracker.SetInstanceID(gi.IPAddress, instanceID) diff --git a/internal/mode/static/nginx/agent/deployment.go b/internal/mode/static/nginx/agent/deployment.go index b4ecb3013a..217dff480e 100644 --- a/internal/mode/static/nginx/agent/deployment.go +++ b/internal/mode/static/nginx/agent/deployment.go @@ -2,6 +2,7 @@ package agent import ( "context" + "errors" "fmt" "sync" @@ -11,40 +12,61 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/broadcast" agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc" - "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/hack" ) +// ignoreFiles is a list of static or base files that live in the +// nginx container that should not be touched by the agent. Any files +// that we add directly into the container should be added here. +var ignoreFiles = []string{ + "/etc/nginx/nginx.conf", + "/etc/nginx/mime.types", + "/etc/nginx/grpc-error-locations.conf", + "/etc/nginx/grpc-error-pages.conf", + "/usr/share/nginx/html/50x.html", + "/usr/share/nginx/html/dashboard.html", + "/usr/share/nginx/html/index.html", + "/usr/share/nginx/html/nginx-modules-reference.pdf", +} + +const fileMode = "0644" + // Deployment represents an nginx Deployment. It contains its own nginx configuration files, -// and a broadcaster for sending those files to all of its pods that are subscribed. +// a broadcaster for sending those files to all of its pods that are subscribed, and errors +// that may have occurred while applying configuration. type Deployment struct { + // podStatuses is a map of all Pods for this Deployment and the most recent error + // (or nil if successful) that occurred on a config call to the nginx agent. + podStatuses map[string]error + broadcaster broadcast.Broadcaster - configVersion string + configVersion string + // error that is set if a ConfigApply call failed for a Pod. This is needed + // because if subsequent upstream API calls are made within the same update event, + // and are successful, the previous error would be lost in the podStatuses map. + // It's used to preserve the error for when we write status after fully updating nginx. + latestConfigError error + // error that is set when at least one upstream API call failed for a Pod. + // This is needed because subsequent API calls within the same update event could succeed, + // and therefore the previous error would be lost in the podStatuses map. It's used to preserve + // the error for when we write status after fully updating nginx. + latestUpstreamError error + nginxPlusActions []*pb.NGINXPlusAction fileOverviews []*pb.File files []File - lock sync.RWMutex + Lock sync.RWMutex } // newDeployment returns a new deployment object. func newDeployment(ctx context.Context) *Deployment { return &Deployment{ broadcaster: broadcast.NewDeploymentBroadcaster(ctx), + podStatuses: make(map[string]error), } } -// RLock locks the deployment for reading. Used by the Subscriber to lock the deployment from any file -// changes while updating agent. -func (d *Deployment) RLock() { - d.lock.RLock() -} - -// RUnlock unlocks the deployment from reading. -func (d *Deployment) RUnlock() { - d.lock.RUnlock() -} - // GetBroadcaster returns the deployment's broadcaster. func (d *Deployment) GetBroadcaster() broadcast.Broadcaster { return d.broadcaster @@ -52,25 +74,46 @@ func (d *Deployment) GetBroadcaster() broadcast.Broadcaster { // GetFileOverviews returns the current list of fileOverviews and configVersion for the deployment. func (d *Deployment) GetFileOverviews() ([]*pb.File, string) { - d.lock.RLock() - defer d.lock.RUnlock() + d.Lock.RLock() + defer d.Lock.RUnlock() return d.fileOverviews, d.configVersion } // GetNGINXPlusActions returns the current NGINX Plus API Actions for the deployment. func (d *Deployment) GetNGINXPlusActions() []*pb.NGINXPlusAction { - d.lock.RLock() - defer d.lock.RUnlock() + d.Lock.RLock() + defer d.Lock.RUnlock() return d.nginxPlusActions } +// GetLatestConfigError gets the latest config apply error for the deployment. +func (d *Deployment) GetLatestConfigError() error { + d.Lock.RLock() + defer d.Lock.RUnlock() + + return d.latestConfigError +} + +// GetLatestUpstreamError gets the latest upstream update error for the deployment. +func (d *Deployment) GetLatestUpstreamError() error { + d.Lock.RLock() + defer d.Lock.RUnlock() + + return d.latestUpstreamError +} + +/* +The following functions for the Deployment object are UNLOCKED, meaning that they are unsafe. +Callers of these functions MUST ensure the lock is set before calling. + +These functions are called as part of the ConfigApply or APIRequest processes. These entire processes +are locked by the caller, hence why the functions themselves do not set the locks. +*/ + // GetFile gets the requested file for the deployment and returns its contents. -// This function MUST only be called after Deployment.Lock() has been called. -// This function is called by the agent during a ConfigApplyRequest transaction. -// Since the Deployment must be locked for the duration of the transaction, -// the Subscriber Locks and Unlocks the Deployment. +// Caller MUST lock the deployment before calling this function. func (d *Deployment) GetFile(name, hash string) []byte { for _, file := range d.files { if name == file.Meta.GetName() && hash == file.Meta.GetHash() { @@ -82,10 +125,8 @@ func (d *Deployment) GetFile(name, hash string) []byte { } // SetFiles updates the nginx files and fileOverviews for the deployment and returns the message to send. +// Caller MUST lock the deployment before calling this function. func (d *Deployment) SetFiles(files []File) broadcast.NginxAgentMessage { - d.lock.Lock() - defer d.lock.Unlock() - d.files = files fileOverviews := make([]*pb.File, 0, len(files)) @@ -93,20 +134,17 @@ func (d *Deployment) SetFiles(files []File) broadcast.NginxAgentMessage { fileOverviews = append(fileOverviews, &pb.File{FileMeta: file.Meta}) } - // hack to include unchanging static files in the payload so they don't get deleted - staticFiles := hack.GetStaticFiles() - for _, file := range staticFiles { + // add ignored files to the overview as 'unmanaged' so agent doesn't touch them + for _, f := range ignoreFiles { meta := &pb.FileMeta{ - Name: file.Name, - Hash: filesHelper.GenerateHash(file.Contents), - Permissions: file.Permissions, + Name: f, + Permissions: fileMode, } fileOverviews = append(fileOverviews, &pb.File{ - FileMeta: meta, + FileMeta: meta, + Unmanaged: true, }) - - d.files = append(d.files, File{Meta: meta, Contents: file.Contents}) } d.configVersion = filesHelper.GenerateConfigVersion(fileOverviews) @@ -121,31 +159,58 @@ func (d *Deployment) SetFiles(files []File) broadcast.NginxAgentMessage { // SetNGINXPlusActions updates the deployment's latest NGINX Plus Actions to perform if using NGINX Plus. // Used by a Subscriber when it first connects. +// Caller MUST lock the deployment before calling this function. func (d *Deployment) SetNGINXPlusActions(actions []*pb.NGINXPlusAction) { - d.lock.Lock() - defer d.lock.Unlock() - d.nginxPlusActions = actions } +// SetPodErrorStatus sets the error status of a Pod in this Deployment if applying the config failed. +func (d *Deployment) SetPodErrorStatus(pod string, err error) { + d.podStatuses[pod] = err +} + +// SetLatestConfigError sets the latest config apply error for the deployment. +// Caller MUST lock the deployment before calling this function. +func (d *Deployment) SetLatestConfigError(err error) { + d.latestConfigError = err +} + +// SetLatestUpstreamError sets the latest upstream update error for the deployment. +// Caller MUST lock the deployment before calling this function. +func (d *Deployment) SetLatestUpstreamError(err error) { + d.latestUpstreamError = err +} + +// GetConfigurationStatus returns the current config status for this Deployment. It combines +// the most recent errors (if they exist) for all Pods in the Deployment into a single error. +// Caller MUST lock the deployment before calling this function. +func (d *Deployment) GetConfigurationStatus() error { + var err error + for _, statusErr := range d.podStatuses { + err = errors.Join(err, statusErr) + } + + return err +} + // DeploymentStore holds a map of all Deployments. type DeploymentStore struct { - connTracker *agentgrpc.ConnectionsTracker + connTracker agentgrpc.AgentConnectionsTracker deployments sync.Map } // NewDeploymentStore returns a new instance of a DeploymentStore. -func NewDeploymentStore(connTracker *agentgrpc.ConnectionsTracker) *DeploymentStore { +func NewDeploymentStore(connTracker agentgrpc.AgentConnectionsTracker) *DeploymentStore { return &DeploymentStore{ connTracker: connTracker, } } // Get returns the desired deployment from the store. -func (d *DeploymentStore) Get(nsName types.NamespacedName) (*Deployment, bool) { +func (d *DeploymentStore) Get(nsName types.NamespacedName) *Deployment { val, ok := d.deployments.Load(nsName) if !ok { - return nil, false + return nil } deployment, ok := val.(*Deployment) @@ -153,13 +218,13 @@ func (d *DeploymentStore) Get(nsName types.NamespacedName) (*Deployment, bool) { panic(fmt.Sprintf("expected Deployment, got type %T", val)) } - return deployment, true + return deployment } // GetOrStore returns the existing value for the key if present. // Otherwise, it stores and returns the given value. func (d *DeploymentStore) GetOrStore(ctx context.Context, nsName types.NamespacedName) *Deployment { - if deployment, ok := d.Get(nsName); ok { + if deployment := d.Get(nsName); deployment != nil { return deployment } diff --git a/internal/mode/static/nginx/agent/file.go b/internal/mode/static/nginx/agent/file.go index d2b1b390f5..113313067a 100644 --- a/internal/mode/static/nginx/agent/file.go +++ b/internal/mode/static/nginx/agent/file.go @@ -24,7 +24,7 @@ type File struct { type fileService struct { pb.FileServiceServer nginxDeployments *DeploymentStore - connTracker *agentgrpc.ConnectionsTracker + connTracker agentgrpc.AgentConnectionsTracker // TODO(sberman): all logs are at Info level right now. Adjust appropriately. logger logr.Logger } @@ -32,7 +32,7 @@ type fileService struct { func newFileService( logger logr.Logger, depStore *DeploymentStore, - connTracker *agentgrpc.ConnectionsTracker, + connTracker agentgrpc.AgentConnectionsTracker, ) *fileService { return &fileService{ logger: logger, @@ -72,8 +72,8 @@ func (fs *fileService) GetFile( return nil, status.Errorf(codes.NotFound, "connection not found") } - deployment, ok := fs.nginxDeployments.Get(conn.Parent) - if !ok { + deployment := fs.nginxDeployments.Get(conn.Parent) + if deployment == nil { return nil, status.Errorf(codes.NotFound, "deployment not found in store") } diff --git a/internal/mode/static/nginx/agent/grpc/connections.go b/internal/mode/static/nginx/agent/grpc/connections.go index 324288382a..3465c3823c 100644 --- a/internal/mode/static/nginx/agent/grpc/connections.go +++ b/internal/mode/static/nginx/agent/grpc/connections.go @@ -6,6 +6,21 @@ import ( "k8s.io/apimachinery/pkg/types" ) +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate + +//counterfeiter:generate . AgentConnectionsTracker + +// AgentConnectionsTracker defines an interface to track all connections between the control plane +// and nginx agents. +type AgentConnectionsTracker interface { + Track(key string, conn Connection) + GetConnection(key string) Connection + ConnectionIsReady(key string) (Connection, bool) + SetInstanceID(key, id string) + UntrackConnectionsForParent(parent types.NamespacedName) +} + +// Connection contains the data about a single nginx agent connection. type Connection struct { PodName string InstanceID string @@ -21,7 +36,7 @@ type ConnectionsTracker struct { } // NewConnectionsTracker returns a new ConnectionsTracker instance. -func NewConnectionsTracker() *ConnectionsTracker { +func NewConnectionsTracker() AgentConnectionsTracker { return &ConnectionsTracker{ connections: make(map[string]Connection), } diff --git a/internal/mode/static/nginx/agent/grpc/grpcfakes/fake_agent_connections_tracker.go b/internal/mode/static/nginx/agent/grpc/grpcfakes/fake_agent_connections_tracker.go new file mode 100644 index 0000000000..d58fca0d26 --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/grpcfakes/fake_agent_connections_tracker.go @@ -0,0 +1,312 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package grpcfakes + +import ( + "sync" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc" + "k8s.io/apimachinery/pkg/types" +) + +type FakeAgentConnectionsTracker struct { + ConnectionIsReadyStub func(string) (grpc.Connection, bool) + connectionIsReadyMutex sync.RWMutex + connectionIsReadyArgsForCall []struct { + arg1 string + } + connectionIsReadyReturns struct { + result1 grpc.Connection + result2 bool + } + connectionIsReadyReturnsOnCall map[int]struct { + result1 grpc.Connection + result2 bool + } + GetConnectionStub func(string) grpc.Connection + getConnectionMutex sync.RWMutex + getConnectionArgsForCall []struct { + arg1 string + } + getConnectionReturns struct { + result1 grpc.Connection + } + getConnectionReturnsOnCall map[int]struct { + result1 grpc.Connection + } + SetInstanceIDStub func(string, string) + setInstanceIDMutex sync.RWMutex + setInstanceIDArgsForCall []struct { + arg1 string + arg2 string + } + TrackStub func(string, grpc.Connection) + trackMutex sync.RWMutex + trackArgsForCall []struct { + arg1 string + arg2 grpc.Connection + } + UntrackConnectionsForParentStub func(types.NamespacedName) + untrackConnectionsForParentMutex sync.RWMutex + untrackConnectionsForParentArgsForCall []struct { + arg1 types.NamespacedName + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeAgentConnectionsTracker) ConnectionIsReady(arg1 string) (grpc.Connection, bool) { + fake.connectionIsReadyMutex.Lock() + ret, specificReturn := fake.connectionIsReadyReturnsOnCall[len(fake.connectionIsReadyArgsForCall)] + fake.connectionIsReadyArgsForCall = append(fake.connectionIsReadyArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.ConnectionIsReadyStub + fakeReturns := fake.connectionIsReadyReturns + fake.recordInvocation("ConnectionIsReady", []interface{}{arg1}) + fake.connectionIsReadyMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeAgentConnectionsTracker) ConnectionIsReadyCallCount() int { + fake.connectionIsReadyMutex.RLock() + defer fake.connectionIsReadyMutex.RUnlock() + return len(fake.connectionIsReadyArgsForCall) +} + +func (fake *FakeAgentConnectionsTracker) ConnectionIsReadyCalls(stub func(string) (grpc.Connection, bool)) { + fake.connectionIsReadyMutex.Lock() + defer fake.connectionIsReadyMutex.Unlock() + fake.ConnectionIsReadyStub = stub +} + +func (fake *FakeAgentConnectionsTracker) ConnectionIsReadyArgsForCall(i int) string { + fake.connectionIsReadyMutex.RLock() + defer fake.connectionIsReadyMutex.RUnlock() + argsForCall := fake.connectionIsReadyArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeAgentConnectionsTracker) ConnectionIsReadyReturns(result1 grpc.Connection, result2 bool) { + fake.connectionIsReadyMutex.Lock() + defer fake.connectionIsReadyMutex.Unlock() + fake.ConnectionIsReadyStub = nil + fake.connectionIsReadyReturns = struct { + result1 grpc.Connection + result2 bool + }{result1, result2} +} + +func (fake *FakeAgentConnectionsTracker) ConnectionIsReadyReturnsOnCall(i int, result1 grpc.Connection, result2 bool) { + fake.connectionIsReadyMutex.Lock() + defer fake.connectionIsReadyMutex.Unlock() + fake.ConnectionIsReadyStub = nil + if fake.connectionIsReadyReturnsOnCall == nil { + fake.connectionIsReadyReturnsOnCall = make(map[int]struct { + result1 grpc.Connection + result2 bool + }) + } + fake.connectionIsReadyReturnsOnCall[i] = struct { + result1 grpc.Connection + result2 bool + }{result1, result2} +} + +func (fake *FakeAgentConnectionsTracker) GetConnection(arg1 string) grpc.Connection { + fake.getConnectionMutex.Lock() + ret, specificReturn := fake.getConnectionReturnsOnCall[len(fake.getConnectionArgsForCall)] + fake.getConnectionArgsForCall = append(fake.getConnectionArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.GetConnectionStub + fakeReturns := fake.getConnectionReturns + fake.recordInvocation("GetConnection", []interface{}{arg1}) + fake.getConnectionMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeAgentConnectionsTracker) GetConnectionCallCount() int { + fake.getConnectionMutex.RLock() + defer fake.getConnectionMutex.RUnlock() + return len(fake.getConnectionArgsForCall) +} + +func (fake *FakeAgentConnectionsTracker) GetConnectionCalls(stub func(string) grpc.Connection) { + fake.getConnectionMutex.Lock() + defer fake.getConnectionMutex.Unlock() + fake.GetConnectionStub = stub +} + +func (fake *FakeAgentConnectionsTracker) GetConnectionArgsForCall(i int) string { + fake.getConnectionMutex.RLock() + defer fake.getConnectionMutex.RUnlock() + argsForCall := fake.getConnectionArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeAgentConnectionsTracker) GetConnectionReturns(result1 grpc.Connection) { + fake.getConnectionMutex.Lock() + defer fake.getConnectionMutex.Unlock() + fake.GetConnectionStub = nil + fake.getConnectionReturns = struct { + result1 grpc.Connection + }{result1} +} + +func (fake *FakeAgentConnectionsTracker) GetConnectionReturnsOnCall(i int, result1 grpc.Connection) { + fake.getConnectionMutex.Lock() + defer fake.getConnectionMutex.Unlock() + fake.GetConnectionStub = nil + if fake.getConnectionReturnsOnCall == nil { + fake.getConnectionReturnsOnCall = make(map[int]struct { + result1 grpc.Connection + }) + } + fake.getConnectionReturnsOnCall[i] = struct { + result1 grpc.Connection + }{result1} +} + +func (fake *FakeAgentConnectionsTracker) SetInstanceID(arg1 string, arg2 string) { + fake.setInstanceIDMutex.Lock() + fake.setInstanceIDArgsForCall = append(fake.setInstanceIDArgsForCall, struct { + arg1 string + arg2 string + }{arg1, arg2}) + stub := fake.SetInstanceIDStub + fake.recordInvocation("SetInstanceID", []interface{}{arg1, arg2}) + fake.setInstanceIDMutex.Unlock() + if stub != nil { + fake.SetInstanceIDStub(arg1, arg2) + } +} + +func (fake *FakeAgentConnectionsTracker) SetInstanceIDCallCount() int { + fake.setInstanceIDMutex.RLock() + defer fake.setInstanceIDMutex.RUnlock() + return len(fake.setInstanceIDArgsForCall) +} + +func (fake *FakeAgentConnectionsTracker) SetInstanceIDCalls(stub func(string, string)) { + fake.setInstanceIDMutex.Lock() + defer fake.setInstanceIDMutex.Unlock() + fake.SetInstanceIDStub = stub +} + +func (fake *FakeAgentConnectionsTracker) SetInstanceIDArgsForCall(i int) (string, string) { + fake.setInstanceIDMutex.RLock() + defer fake.setInstanceIDMutex.RUnlock() + argsForCall := fake.setInstanceIDArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeAgentConnectionsTracker) Track(arg1 string, arg2 grpc.Connection) { + fake.trackMutex.Lock() + fake.trackArgsForCall = append(fake.trackArgsForCall, struct { + arg1 string + arg2 grpc.Connection + }{arg1, arg2}) + stub := fake.TrackStub + fake.recordInvocation("Track", []interface{}{arg1, arg2}) + fake.trackMutex.Unlock() + if stub != nil { + fake.TrackStub(arg1, arg2) + } +} + +func (fake *FakeAgentConnectionsTracker) TrackCallCount() int { + fake.trackMutex.RLock() + defer fake.trackMutex.RUnlock() + return len(fake.trackArgsForCall) +} + +func (fake *FakeAgentConnectionsTracker) TrackCalls(stub func(string, grpc.Connection)) { + fake.trackMutex.Lock() + defer fake.trackMutex.Unlock() + fake.TrackStub = stub +} + +func (fake *FakeAgentConnectionsTracker) TrackArgsForCall(i int) (string, grpc.Connection) { + fake.trackMutex.RLock() + defer fake.trackMutex.RUnlock() + argsForCall := fake.trackArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeAgentConnectionsTracker) UntrackConnectionsForParent(arg1 types.NamespacedName) { + fake.untrackConnectionsForParentMutex.Lock() + fake.untrackConnectionsForParentArgsForCall = append(fake.untrackConnectionsForParentArgsForCall, struct { + arg1 types.NamespacedName + }{arg1}) + stub := fake.UntrackConnectionsForParentStub + fake.recordInvocation("UntrackConnectionsForParent", []interface{}{arg1}) + fake.untrackConnectionsForParentMutex.Unlock() + if stub != nil { + fake.UntrackConnectionsForParentStub(arg1) + } +} + +func (fake *FakeAgentConnectionsTracker) UntrackConnectionsForParentCallCount() int { + fake.untrackConnectionsForParentMutex.RLock() + defer fake.untrackConnectionsForParentMutex.RUnlock() + return len(fake.untrackConnectionsForParentArgsForCall) +} + +func (fake *FakeAgentConnectionsTracker) UntrackConnectionsForParentCalls(stub func(types.NamespacedName)) { + fake.untrackConnectionsForParentMutex.Lock() + defer fake.untrackConnectionsForParentMutex.Unlock() + fake.UntrackConnectionsForParentStub = stub +} + +func (fake *FakeAgentConnectionsTracker) UntrackConnectionsForParentArgsForCall(i int) types.NamespacedName { + fake.untrackConnectionsForParentMutex.RLock() + defer fake.untrackConnectionsForParentMutex.RUnlock() + argsForCall := fake.untrackConnectionsForParentArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeAgentConnectionsTracker) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.connectionIsReadyMutex.RLock() + defer fake.connectionIsReadyMutex.RUnlock() + fake.getConnectionMutex.RLock() + defer fake.getConnectionMutex.RUnlock() + fake.setInstanceIDMutex.RLock() + defer fake.setInstanceIDMutex.RUnlock() + fake.trackMutex.RLock() + defer fake.trackMutex.RUnlock() + fake.untrackConnectionsForParentMutex.RLock() + defer fake.untrackConnectionsForParentMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeAgentConnectionsTracker) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ grpc.AgentConnectionsTracker = new(FakeAgentConnectionsTracker) diff --git a/internal/mode/static/nginx/agent/grpc/messenger/doc.go b/internal/mode/static/nginx/agent/grpc/messenger/doc.go new file mode 100644 index 0000000000..60150e4ad8 --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/messenger/doc.go @@ -0,0 +1,4 @@ +/* +Package messenger provides a wrapper around a gRPC stream with the nginx agent. +*/ +package messenger diff --git a/internal/mode/static/nginx/agent/grpc/messenger/messenger.go b/internal/mode/static/nginx/agent/grpc/messenger/messenger.go new file mode 100644 index 0000000000..1388a3d0b0 --- /dev/null +++ b/internal/mode/static/nginx/agent/grpc/messenger/messenger.go @@ -0,0 +1,96 @@ +package messenger + +import ( + "context" + "errors" + + pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" +) + +type Messenger struct { + incoming chan *pb.ManagementPlaneRequest + outgoing chan *pb.DataPlaneResponse + errorCh chan error + server pb.CommandService_SubscribeServer +} + +func New(server pb.CommandService_SubscribeServer) *Messenger { + return &Messenger{ + incoming: make(chan *pb.ManagementPlaneRequest), + outgoing: make(chan *pb.DataPlaneResponse), + errorCh: make(chan error), + server: server, + } +} + +func (m *Messenger) Run(ctx context.Context) { + go m.handleRecv(ctx) + m.handleSend(ctx) +} + +// Send a message, will return error if the context is Done. +func (m *Messenger) Send(ctx context.Context, msg *pb.ManagementPlaneRequest) error { + select { + case <-ctx.Done(): + return ctx.Err() + case m.incoming <- msg: + } + return nil +} + +func (m *Messenger) handleSend(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case msg := <-m.incoming: + err := m.server.Send(msg) + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(ctx.Err(), context.Canceled) { + return + } + m.errorCh <- err + + return + } + } + } +} + +// Messages returns the data plane response channel. +func (m *Messenger) Messages() <-chan *pb.DataPlaneResponse { + return m.outgoing +} + +// Errors returns the error channel. +func (m *Messenger) Errors() <-chan error { + return m.errorCh +} + +// handleRecv handles an incoming message from the nginx agent. +// It blocks until Recv returns. The result from the Recv is either going to Error or Messages channel. +func (m *Messenger) handleRecv(ctx context.Context) { + for { + msg, err := m.server.Recv() + if err != nil { + select { + case <-ctx.Done(): + return + case m.errorCh <- err: + } + return + } + + if msg == nil { + // close the outgoing channel to signal no more messages to be sent + close(m.outgoing) + return + } + + select { + case <-ctx.Done(): + return + case m.outgoing <- msg: + } + } +} diff --git a/internal/mode/static/nginx/agent/hack/hack.go b/internal/mode/static/nginx/agent/hack/hack.go deleted file mode 100644 index 7f158f570f..0000000000 --- a/internal/mode/static/nginx/agent/hack/hack.go +++ /dev/null @@ -1,182 +0,0 @@ -package hack - -// This is here for now to send our base files to agent that NGF normally doesn't send. -// Otherwise, agent will attempt to delete them since they aren't included in the payload. - -type File struct { - Name string - Permissions string - Contents []byte -} - -func GetStaticFiles() []File { - return []File{ - { - Name: "/etc/nginx/nginx.conf", - Permissions: "0644", - Contents: nginxConf, - }, - { - Name: "/etc/nginx/mime.types", - Permissions: "0644", - Contents: mimeTypes, - }, - } -} - -var nginxConf = []byte(` -load_module /usr/lib/nginx/modules/ngx_http_js_module.so; -include /etc/nginx/main-includes/*.conf; - -worker_processes auto; - -pid /var/run/nginx/nginx.pid; - -events { - worker_connections 1024; -} - -http { - include /etc/nginx/conf.d/*.conf; - include /etc/nginx/mime.types; - js_import /usr/lib/nginx/modules/njs/httpmatches.js; - - default_type application/octet-stream; - - proxy_headers_hash_bucket_size 512; - proxy_headers_hash_max_size 1024; - server_names_hash_bucket_size 256; - server_names_hash_max_size 1024; - variables_hash_bucket_size 512; - variables_hash_max_size 1024; - - sendfile on; - tcp_nopush on; - - server_tokens off; - - server { - listen unix:/var/run/nginx/nginx-status.sock; - access_log off; - - location /stub_status { - stub_status; - } - } -} - -stream { - variables_hash_bucket_size 512; - variables_hash_max_size 1024; - - map_hash_max_size 2048; - map_hash_bucket_size 256; - - log_format stream-main '$remote_addr [$time_local] ' - '$protocol $status $bytes_sent $bytes_received ' - '$session_time "$ssl_preread_server_name"'; - access_log /dev/stdout stream-main; - include /etc/nginx/stream-conf.d/*.conf; -} - `) - -var mimeTypes = []byte(` -types { - text/html html htm shtml; - text/css css; - text/xml xml; - image/gif gif; - image/jpeg jpeg jpg; - application/javascript js; - application/atom+xml atom; - application/rss+xml rss; - - text/mathml mml; - text/plain txt; - text/vnd.sun.j2me.app-descriptor jad; - text/vnd.wap.wml wml; - text/x-component htc; - - image/avif avif; - image/png png; - image/svg+xml svg svgz; - image/tiff tif tiff; - image/vnd.wap.wbmp wbmp; - image/webp webp; - image/x-icon ico; - image/x-jng jng; - image/x-ms-bmp bmp; - - font/woff woff; - font/woff2 woff2; - - application/java-archive jar war ear; - application/json json; - application/mac-binhex40 hqx; - application/msword doc; - application/pdf pdf; - application/postscript ps eps ai; - application/rtf rtf; - application/vnd.apple.mpegurl m3u8; - application/vnd.google-earth.kml+xml kml; - application/vnd.google-earth.kmz kmz; - application/vnd.ms-excel xls; - application/vnd.ms-fontobject eot; - application/vnd.ms-powerpoint ppt; - application/vnd.oasis.opendocument.graphics odg; - application/vnd.oasis.opendocument.presentation odp; - application/vnd.oasis.opendocument.spreadsheet ods; - application/vnd.oasis.opendocument.text odt; - application/vnd.openxmlformats-officedocument.presentationml.presentation - pptx; - application/vnd.openxmlformats-officedocument.spreadsheetml.sheet - xlsx; - application/vnd.openxmlformats-officedocument.wordprocessingml.document - docx; - application/vnd.wap.wmlc wmlc; - application/wasm wasm; - application/x-7z-compressed 7z; - application/x-cocoa cco; - application/x-java-archive-diff jardiff; - application/x-java-jnlp-file jnlp; - application/x-makeself run; - application/x-perl pl pm; - application/x-pilot prc pdb; - application/x-rar-compressed rar; - application/x-redhat-package-manager rpm; - application/x-sea sea; - application/x-shockwave-flash swf; - application/x-stuffit sit; - application/x-tcl tcl tk; - application/x-x509-ca-cert der pem crt; - application/x-xpinstall xpi; - application/xhtml+xml xhtml; - application/xspf+xml xspf; - application/zip zip; - - application/octet-stream bin exe dll; - application/octet-stream deb; - application/octet-stream dmg; - application/octet-stream iso img; - application/octet-stream msi msp msm; - - audio/midi mid midi kar; - audio/mpeg mp3; - audio/ogg ogg; - audio/x-m4a m4a; - audio/x-realaudio ra; - - video/3gpp 3gpp 3gp; - video/mp2t ts; - video/mp4 mp4; - video/mpeg mpeg mpg; - video/quicktime mov; - video/webm webm; - video/x-flv flv; - video/x-m4v m4v; - video/x-mng mng; - video/x-ms-asf asx asf; - video/x-ms-wmv wmv; - video/x-msvideo avi; -} -`) diff --git a/internal/mode/static/nginx/agent/meta/doc.go b/internal/mode/static/nginx/agent/meta/doc.go deleted file mode 100644 index 166cc412c1..0000000000 --- a/internal/mode/static/nginx/agent/meta/doc.go +++ /dev/null @@ -1,4 +0,0 @@ -/* -Package meta contains the functions for creating MessageMeta that is included in requests to the nginx agent. -*/ -package meta diff --git a/internal/mode/static/nginx/agent/meta/meta.go b/internal/mode/static/nginx/agent/meta/meta.go deleted file mode 100644 index 5621f96040..0000000000 --- a/internal/mode/static/nginx/agent/meta/meta.go +++ /dev/null @@ -1,20 +0,0 @@ -package meta - -import ( - "time" - - "github.com/google/uuid" - agentUuid "github.com/nginx/agent/v3/pkg/uuid" -) - -const CorrelationIDKey = "correlation_id" - -// GenerateMessageID generates a unique message ID, falling back to sha256 and timestamp if UUID generation fails. -func GenerateMessageID() string { - uuidv7, err := uuid.NewUUID() - if err != nil { - return agentUuid.Generate("%s", time.Now().String()) - } - - return uuidv7.String() -} diff --git a/internal/mode/static/nginx/conf/nginx-plus.conf b/internal/mode/static/nginx/conf/nginx-plus.conf index 17ac6de4e3..e119a9b454 100644 --- a/internal/mode/static/nginx/conf/nginx-plus.conf +++ b/internal/mode/static/nginx/conf/nginx-plus.conf @@ -28,6 +28,15 @@ http { server_tokens off; + server { + listen unix:/var/run/nginx/nginx-plus-api.sock; + access_log off; + + location /api { + api write=on; + } + } + server { listen 127.0.0.1:8765; root /usr/share/nginx/html; @@ -42,15 +51,6 @@ http { api write=off; } } - - server { - listen unix:/var/run/nginx/nginx-plus-api.sock; - access_log off; - - location /api { - api write=on; - } - } } stream { diff --git a/internal/mode/static/state/graph/graph.go b/internal/mode/static/state/graph/graph.go index 0b56ec1018..40a04598a9 100644 --- a/internal/mode/static/state/graph/graph.go +++ b/internal/mode/static/state/graph/graph.go @@ -83,6 +83,14 @@ type Graph struct { SnippetsFilters map[types.NamespacedName]*SnippetsFilter // PlusSecrets holds the secrets related to NGINX Plus licensing. PlusSecrets map[types.NamespacedName][]PlusSecretFile + + LatestReloadResult NginxReloadResult +} + +// NginxReloadResult describes the result of an NGINX reload. +type NginxReloadResult struct { + // Error is the error that occurred during the reload. + Error error } // ProtectedPorts are the ports that may not be configured by a listener with a descriptive name of each port. diff --git a/internal/mode/static/status/prepare_requests.go b/internal/mode/static/status/prepare_requests.go index 8bcf48cb9f..f678054aa2 100644 --- a/internal/mode/static/status/prepare_requests.go +++ b/internal/mode/static/status/prepare_requests.go @@ -19,18 +19,12 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/graph" ) -// NginxReloadResult describes the result of an NGINX reload. -type NginxReloadResult struct { - // Error is the error that occurred during the reload. - Error error -} - // PrepareRouteRequests prepares status UpdateRequests for the given Routes. func PrepareRouteRequests( l4routes map[graph.L4RouteKey]*graph.L4Route, routes map[graph.RouteKey]*graph.L7Route, transitionTime metav1.Time, - nginxReloadRes NginxReloadResult, + nginxReloadRes graph.NginxReloadResult, gatewayCtlrName string, ) []frameworkStatus.UpdateRequest { reqs := make([]frameworkStatus.UpdateRequest, 0, len(routes)) @@ -107,7 +101,7 @@ func prepareRouteStatus( gatewayCtlrName string, parentRefs []graph.ParentRef, conds []conditions.Condition, - nginxReloadRes NginxReloadResult, + nginxReloadRes graph.NginxReloadResult, transitionTime metav1.Time, srcGeneration int64, ) v1.RouteStatus { @@ -214,7 +208,7 @@ func PrepareGatewayRequests( ignoredGateways map[types.NamespacedName]*v1.Gateway, transitionTime metav1.Time, gwAddresses []v1.GatewayStatusAddress, - nginxReloadRes NginxReloadResult, + nginxReloadRes graph.NginxReloadResult, ) []frameworkStatus.UpdateRequest { reqs := make([]frameworkStatus.UpdateRequest, 0, 1+len(ignoredGateways)) @@ -240,7 +234,7 @@ func prepareGatewayRequest( gateway *graph.Gateway, transitionTime metav1.Time, gwAddresses []v1.GatewayStatusAddress, - nginxReloadRes NginxReloadResult, + nginxReloadRes graph.NginxReloadResult, ) frameworkStatus.UpdateRequest { if !gateway.Valid { conds := conditions.ConvertConditions( diff --git a/internal/mode/static/status/prepare_requests_test.go b/internal/mode/static/status/prepare_requests_test.go index 77261d1efb..03b9cd69bc 100644 --- a/internal/mode/static/status/prepare_requests_test.go +++ b/internal/mode/static/status/prepare_requests_test.go @@ -275,7 +275,7 @@ func TestBuildHTTPRouteStatuses(t *testing.T) { map[graph.L4RouteKey]*graph.L4Route{}, routes, transitionTime, - NginxReloadResult{}, + graph.NginxReloadResult{}, gatewayCtlrName, ) @@ -354,7 +354,7 @@ func TestBuildGRPCRouteStatuses(t *testing.T) { map[graph.L4RouteKey]*graph.L4Route{}, routes, transitionTime, - NginxReloadResult{}, + graph.NginxReloadResult{}, gatewayCtlrName, ) @@ -431,7 +431,7 @@ func TestBuildTLSRouteStatuses(t *testing.T) { routes, map[graph.RouteKey]*graph.L7Route{}, transitionTime, - NginxReloadResult{}, + graph.NginxReloadResult{}, gatewayCtlrName, ) @@ -535,7 +535,7 @@ func TestBuildRouteStatusesNginxErr(t *testing.T) { map[graph.L4RouteKey]*graph.L4Route{}, routes, transitionTime, - NginxReloadResult{Error: errors.New("test error")}, + graph.NginxReloadResult{Error: errors.New("test error")}, gatewayCtlrName, ) @@ -741,7 +741,7 @@ func TestBuildGatewayStatuses(t *testing.T) { routeKey := graph.RouteKey{NamespacedName: types.NamespacedName{Namespace: "test", Name: "hr-1"}} tests := []struct { - nginxReloadRes NginxReloadResult + nginxReloadRes graph.NginxReloadResult gateway *graph.Gateway ignoredGateways map[types.NamespacedName]*v1.Gateway expected map[types.NamespacedName]v1.GatewayStatus @@ -1133,7 +1133,7 @@ func TestBuildGatewayStatuses(t *testing.T) { }, }, }, - nginxReloadRes: NginxReloadResult{Error: errors.New("test error")}, + nginxReloadRes: graph.NginxReloadResult{Error: errors.New("test error")}, }, } diff --git a/internal/mode/static/status/queue.go b/internal/mode/static/status/queue.go new file mode 100644 index 0000000000..5f31bbec6d --- /dev/null +++ b/internal/mode/static/status/queue.go @@ -0,0 +1,66 @@ +package status + +import ( + "context" + "sync" + + "k8s.io/apimachinery/pkg/types" +) + +// QueueObject is the object to be passed to the queue for status updates. +type QueueObject struct { + Error error + Deployment types.NamespacedName +} + +// Queue represents a queue with unlimited size. +type Queue struct { + notifyCh chan struct{} + items []*QueueObject + + lock sync.Mutex +} + +// NewQueue returns a new Queue object. +func NewQueue() *Queue { + return &Queue{ + items: []*QueueObject{}, + notifyCh: make(chan struct{}, 1), + } +} + +// Enqueue adds an item to the queue and notifies any blocked readers. +func (q *Queue) Enqueue(item *QueueObject) { + q.lock.Lock() + defer q.lock.Unlock() + + q.items = append(q.items, item) + + select { + case q.notifyCh <- struct{}{}: + default: + } +} + +// Dequeue removes and returns the front item from the queue. +// It blocks if the queue is empty or when the context is canceled. +func (q *Queue) Dequeue(ctx context.Context) *QueueObject { + q.lock.Lock() + defer q.lock.Unlock() + + for len(q.items) == 0 { + q.lock.Unlock() + select { + case <-ctx.Done(): + q.lock.Lock() + return nil + case <-q.notifyCh: + q.lock.Lock() + } + } + + front := q.items[0] + q.items = q.items[1:] + + return front +}