From da36f738f76cbe4d4424499c0c2c7d5563f8049b Mon Sep 17 00:00:00 2001 From: Saylor Berman Date: Thu, 16 Jan 2025 16:50:06 -0700 Subject: [PATCH] Add some context --- internal/mode/static/handler.go | 2 -- internal/mode/static/nginx/agent/agent.go | 13 +++++++++++-- .../mode/static/nginx/agent/broadcast/broadcast.go | 5 +++++ internal/mode/static/nginx/agent/command.go | 9 +++++++++ .../static/nginx/agent/grpc/messenger/messenger.go | 3 +++ 5 files changed, 28 insertions(+), 4 deletions(-) diff --git a/internal/mode/static/handler.go b/internal/mode/static/handler.go index 9e92ba76f5..4bfbd8e480 100644 --- a/internal/mode/static/handler.go +++ b/internal/mode/static/handler.go @@ -110,8 +110,6 @@ type eventHandlerImpl struct { // objectFilters contains all created objectFilters, with the key being a filterKey objectFilters map[filterKey]objectFilter - // latestReloadResult status.NginxReloadResult - cfg eventHandlerConfig lock sync.Mutex diff --git a/internal/mode/static/nginx/agent/agent.go b/internal/mode/static/nginx/agent/agent.go index 73a7c42c2e..84a20e46a8 100644 --- a/internal/mode/static/nginx/agent/agent.go +++ b/internal/mode/static/nginx/agent/agent.go @@ -64,7 +64,16 @@ func NewNginxUpdater( } // UpdateConfig sends the nginx configuration to the agent. -// Returns whether configuration was applied or not. +// Returns whether the configuration was sent to any agents. +// +// The flow of events is as follows: +// - Set the configuration files on the deployment. +// - Broadcast the message containing file metadata to all pods (subscriptions) for the deployment. +// - Agent receives a ConfigApplyRequest with the list of file metadata. +// - Agent calls GetFile for each file in the list, which we send back to the agent. +// - Agent updates nginx, and responds with a DataPlaneResponse. +// - Subscriber responds back to the broadcaster to inform that the transaction is complete. +// - If any errors occurred, they are set on the deployment for the handler to use in the status update. func (n *NginxUpdaterImpl) UpdateConfig( deployment *Deployment, files []File, @@ -87,7 +96,7 @@ func (n *NginxUpdaterImpl) UpdateConfig( // UpdateUpstreamServers sends an APIRequest to the agent to update upstream servers using the NGINX Plus API. // Only applicable when using NGINX Plus. -// Returns whether configuration was applied or not. +// Returns whether the configuration was sent to any agents. func (n *NginxUpdaterImpl) UpdateUpstreamServers( deployment *Deployment, conf dataplane.Configuration, diff --git a/internal/mode/static/nginx/agent/broadcast/broadcast.go b/internal/mode/static/nginx/agent/broadcast/broadcast.go index 780f843238..2543399555 100644 --- a/internal/mode/static/nginx/agent/broadcast/broadcast.go +++ b/internal/mode/static/nginx/agent/broadcast/broadcast.go @@ -19,12 +19,17 @@ type Broadcaster interface { CancelSubscription(string) } +// SubscriberChannels are the channels sent to the subscriber to listen and respond on. +// The ID is used for map lookup to delete a subscriber when it's gone. type SubscriberChannels struct { ListenCh <-chan NginxAgentMessage ResponseCh chan<- struct{} ID string } +// storedChannels are the same channels used in the SubscriberChannels, but reverse direction. +// These are used to store the channels for the broadcaster to send and listen on, +// and can be looked up in the map using the same ID. type storedChannels struct { listenCh chan<- NginxAgentMessage responseCh <-chan struct{} diff --git a/internal/mode/static/nginx/agent/command.go b/internal/mode/static/nginx/agent/command.go index 328bf0266e..c3a3e7d93f 100644 --- a/internal/mode/static/nginx/agent/command.go +++ b/internal/mode/static/nginx/agent/command.go @@ -105,6 +105,13 @@ func (cs *commandService) CreateConnection( } // Subscribe is a decoupled communication mechanism between the data plane agent and control plane. +// The series of events are as follows: +// - Wait for the agent to register its nginx instance with the control plane. +// - Grab the most recent deployment configuration for itself, and attempt to apply it. +// - Subscribe to any future updates from the NginxUpdater and start a loop to listen for those updates. +// If any connection or unrecoverable errors occur, return and agent should re-establish a subscription. +// If errors occur with applying the config, log and put those errors into the status queue to be written +// to the Gateway status. func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error { ctx := in.Context() @@ -238,6 +245,8 @@ func (cs *commandService) setInitialConfig( return connErr } + // TODO(sberman): without a delay, sometimes the following API request will fail because it doesn't + // think the upstreams exist yet. Have to figure this one out. time.Sleep(1 * time.Second) var upstreamErr error diff --git a/internal/mode/static/nginx/agent/grpc/messenger/messenger.go b/internal/mode/static/nginx/agent/grpc/messenger/messenger.go index 1388a3d0b0..35ccbf22d9 100644 --- a/internal/mode/static/nginx/agent/grpc/messenger/messenger.go +++ b/internal/mode/static/nginx/agent/grpc/messenger/messenger.go @@ -7,6 +7,7 @@ import ( pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" ) +// Messenger is a wrapper around a gRPC stream with the nginx agent. type Messenger struct { incoming chan *pb.ManagementPlaneRequest outgoing chan *pb.DataPlaneResponse @@ -14,6 +15,7 @@ type Messenger struct { server pb.CommandService_SubscribeServer } +// New returns a new Messenger instance. func New(server pb.CommandService_SubscribeServer) *Messenger { return &Messenger{ incoming: make(chan *pb.ManagementPlaneRequest), @@ -23,6 +25,7 @@ func New(server pb.CommandService_SubscribeServer) *Messenger { } } +// Run starts the Messenger to listen for any Send() or Recv() events over the stream. func (m *Messenger) Run(ctx context.Context) { go m.handleRecv(ctx) m.handleSend(ctx)