Skip to content

Commit

Permalink
Add some context
Browse files Browse the repository at this point in the history
  • Loading branch information
sjberman committed Jan 16, 2025
1 parent 0c70f9b commit da36f73
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 4 deletions.
2 changes: 0 additions & 2 deletions internal/mode/static/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ type eventHandlerImpl struct {
// objectFilters contains all created objectFilters, with the key being a filterKey
objectFilters map[filterKey]objectFilter

// latestReloadResult status.NginxReloadResult

cfg eventHandlerConfig
lock sync.Mutex

Expand Down
13 changes: 11 additions & 2 deletions internal/mode/static/nginx/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,16 @@ func NewNginxUpdater(
}

// UpdateConfig sends the nginx configuration to the agent.
// Returns whether configuration was applied or not.
// Returns whether the configuration was sent to any agents.
//
// The flow of events is as follows:
// - Set the configuration files on the deployment.
// - Broadcast the message containing file metadata to all pods (subscriptions) for the deployment.
// - Agent receives a ConfigApplyRequest with the list of file metadata.
// - Agent calls GetFile for each file in the list, which we send back to the agent.
// - Agent updates nginx, and responds with a DataPlaneResponse.
// - Subscriber responds back to the broadcaster to inform that the transaction is complete.
// - If any errors occurred, they are set on the deployment for the handler to use in the status update.
func (n *NginxUpdaterImpl) UpdateConfig(
deployment *Deployment,
files []File,
Expand All @@ -87,7 +96,7 @@ func (n *NginxUpdaterImpl) UpdateConfig(

// UpdateUpstreamServers sends an APIRequest to the agent to update upstream servers using the NGINX Plus API.
// Only applicable when using NGINX Plus.
// Returns whether configuration was applied or not.
// Returns whether the configuration was sent to any agents.
func (n *NginxUpdaterImpl) UpdateUpstreamServers(
deployment *Deployment,
conf dataplane.Configuration,
Expand Down
5 changes: 5 additions & 0 deletions internal/mode/static/nginx/agent/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ type Broadcaster interface {
CancelSubscription(string)
}

// SubscriberChannels are the channels sent to the subscriber to listen and respond on.
// The ID is used for map lookup to delete a subscriber when it's gone.
type SubscriberChannels struct {
ListenCh <-chan NginxAgentMessage
ResponseCh chan<- struct{}
ID string
}

// storedChannels are the same channels used in the SubscriberChannels, but reverse direction.
// These are used to store the channels for the broadcaster to send and listen on,
// and can be looked up in the map using the same ID.
type storedChannels struct {
listenCh chan<- NginxAgentMessage
responseCh <-chan struct{}
Expand Down
9 changes: 9 additions & 0 deletions internal/mode/static/nginx/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ func (cs *commandService) CreateConnection(
}

// Subscribe is a decoupled communication mechanism between the data plane agent and control plane.
// The series of events are as follows:
// - Wait for the agent to register its nginx instance with the control plane.
// - Grab the most recent deployment configuration for itself, and attempt to apply it.
// - Subscribe to any future updates from the NginxUpdater and start a loop to listen for those updates.
// If any connection or unrecoverable errors occur, return and agent should re-establish a subscription.
// If errors occur with applying the config, log and put those errors into the status queue to be written
// to the Gateway status.
func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error {
ctx := in.Context()

Expand Down Expand Up @@ -238,6 +245,8 @@ func (cs *commandService) setInitialConfig(
return connErr
}

// TODO(sberman): without a delay, sometimes the following API request will fail because it doesn't
// think the upstreams exist yet. Have to figure this one out.
time.Sleep(1 * time.Second)

var upstreamErr error
Expand Down
3 changes: 3 additions & 0 deletions internal/mode/static/nginx/agent/grpc/messenger/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
)

// Messenger is a wrapper around a gRPC stream with the nginx agent.
type Messenger struct {
incoming chan *pb.ManagementPlaneRequest
outgoing chan *pb.DataPlaneResponse
errorCh chan error
server pb.CommandService_SubscribeServer
}

// New returns a new Messenger instance.
func New(server pb.CommandService_SubscribeServer) *Messenger {
return &Messenger{
incoming: make(chan *pb.ManagementPlaneRequest),
Expand All @@ -23,6 +25,7 @@ func New(server pb.CommandService_SubscribeServer) *Messenger {
}
}

// Run starts the Messenger to listen for any Send() or Recv() events over the stream.
func (m *Messenger) Run(ctx context.Context) {
go m.handleRecv(ctx)
m.handleSend(ctx)
Expand Down

0 comments on commit da36f73

Please sign in to comment.