Skip to content

Commit

Permalink
Add nginx plus api actions (#955)
Browse files Browse the repository at this point in the history
  • Loading branch information
aphralG authored Jan 20, 2025
1 parent 2b427d0 commit 0f0c0e2
Show file tree
Hide file tree
Showing 14 changed files with 2,293 additions and 406 deletions.
743 changes: 477 additions & 266 deletions api/grpc/mpi/v1/command.pb.go

Large diffs are not rendered by default.

462 changes: 462 additions & 0 deletions api/grpc/mpi/v1/command.pb.validate.go

Large diffs are not rendered by default.

23 changes: 21 additions & 2 deletions api/grpc/mpi/v1/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,11 @@ message APIActionRequest {
message NGINXPlusAction {
// types of actions possible with NGINX Plus API
oneof action {
UpdateHTTPUpstreamServers update_http_upstream_servers = 2;
GetHTTPUpstreamServers get_http_upstream_servers = 3;
UpdateHTTPUpstreamServers update_http_upstream_servers = 1;
GetHTTPUpstreamServers get_http_upstream_servers = 2;
UpdateStreamServers update_stream_servers = 3;
GetUpstreams get_upstreams = 4;
GetStreamUpstreams get_stream_upstreams = 5;
}
}

Expand All @@ -223,6 +226,22 @@ message GetHTTPUpstreamServers {
string http_upstream_name = 1;
}

// Update Upstream Stream Servers for an instance
message UpdateStreamServers {
// the name of the upstream stream
string upstream_stream_name = 1;
// a list of upstream stream servers
repeated google.protobuf.Struct servers = 2;
}

// Get Upstreams for an instance
message GetUpstreams {
}

// Get Stream Upstream Servers for an instance
message GetStreamUpstreams {
}

// Request an update on a particular command
message CommandStatusRequest {}

Expand Down
42 changes: 42 additions & 0 deletions docs/proto/protos.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
- [DataPlaneResponse](#mpi-v1-DataPlaneResponse)
- [FileServer](#mpi-v1-FileServer)
- [GetHTTPUpstreamServers](#mpi-v1-GetHTTPUpstreamServers)
- [GetStreamUpstreams](#mpi-v1-GetStreamUpstreams)
- [GetUpstreams](#mpi-v1-GetUpstreams)
- [HealthRequest](#mpi-v1-HealthRequest)
- [HostInfo](#mpi-v1-HostInfo)
- [Instance](#mpi-v1-Instance)
Expand All @@ -74,6 +76,7 @@
- [UpdateDataPlaneStatusRequest](#mpi-v1-UpdateDataPlaneStatusRequest)
- [UpdateDataPlaneStatusResponse](#mpi-v1-UpdateDataPlaneStatusResponse)
- [UpdateHTTPUpstreamServers](#mpi-v1-UpdateHTTPUpstreamServers)
- [UpdateStreamServers](#mpi-v1-UpdateStreamServers)

- [InstanceHealth.InstanceHealthStatus](#mpi-v1-InstanceHealth-InstanceHealthStatus)
- [InstanceMeta.InstanceType](#mpi-v1-InstanceMeta-InstanceType)
Expand Down Expand Up @@ -798,6 +801,26 @@ Get HTTP Upstream Servers for an instance



<a name="mpi-v1-GetStreamUpstreams"></a>

### GetStreamUpstreams
Get Stream Upstream Servers for an instance






<a name="mpi-v1-GetUpstreams"></a>

### GetUpstreams
Get Upstreams for an instance






<a name="mpi-v1-HealthRequest"></a>

### HealthRequest
Expand Down Expand Up @@ -978,6 +1001,9 @@ Perform an action using the NGINX Plus API on an instance
| ----- | ---- | ----- | ----------- |
| update_http_upstream_servers | [UpdateHTTPUpstreamServers](#mpi-v1-UpdateHTTPUpstreamServers) | | |
| get_http_upstream_servers | [GetHTTPUpstreamServers](#mpi-v1-GetHTTPUpstreamServers) | | |
| update_stream_servers | [UpdateStreamServers](#mpi-v1-UpdateStreamServers) | | |
| get_upstreams | [GetUpstreams](#mpi-v1-GetUpstreams) | | |
| get_stream_upstreams | [GetStreamUpstreams](#mpi-v1-GetStreamUpstreams) | | |



Expand Down Expand Up @@ -1137,6 +1163,22 @@ Update HTTP Upstream Servers for an instance




<a name="mpi-v1-UpdateStreamServers"></a>

### UpdateStreamServers
Update Upstream Stream Servers for an instance


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| upstream_stream_name | [string](#string) | | the name of the upstream stream |
| servers | [google.protobuf.Struct](#google-protobuf-Struct) | repeated | a list of upstream stream servers |








Expand Down
21 changes: 21 additions & 0 deletions internal/datasource/proto/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"log/slog"
"time"

mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/google/uuid"
agentUuid "github.com/nginx/agent/v3/pkg/uuid"
)
Expand All @@ -29,3 +32,21 @@ func GenerateMessageID() string {

return uuidv7.String()
}

func CreateDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
message, instanceID, err string,
) *mpi.DataPlaneResponse {
return &mpi.DataPlaneResponse{
MessageMeta: &mpi.MessageMeta{
MessageId: GenerateMessageID(),
CorrelationId: correlationID,
Timestamp: timestamppb.Now(),
},
CommandResponse: &mpi.CommandResponse{
Status: status,
Message: message,
Error: err,
},
InstanceId: instanceID,
}
}
149 changes: 149 additions & 0 deletions internal/resource/nginx_plus_actions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (c) F5, Inc.
//
// This source code is licensed under the Apache License, Version 2.0 license found in the
// LICENSE file in the root directory of this source tree.

package resource

import (
"context"
"encoding/json"
"log/slog"

mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
"github.com/nginx/agent/v3/internal/datasource/proto"
"github.com/nginx/agent/v3/internal/logger"
)

const emptyResponse = "{}"

type APIAction struct {
ResourceService resourceServiceInterface
}

func (a *APIAction) HandleUpdateStreamServersRequest(ctx context.Context, action *mpi.NGINXPlusAction,
instance *mpi.Instance,
) *mpi.DataPlaneResponse {
correlationID := logger.GetCorrelationID(ctx)
instanceID := instance.GetInstanceMeta().GetInstanceId()

add, update, del, err := a.ResourceService.UpdateStreamServers(ctx, instance,
action.GetUpdateStreamServers().GetUpstreamStreamName(), action.GetUpdateStreamServers().GetServers())
if err != nil {
slog.ErrorContext(ctx, "Unable to update stream servers of upstream", "request",
action.GetUpdateHttpUpstreamServers(), "error", err)

return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
"", instanceID, err.Error())
}

slog.DebugContext(ctx, "Successfully updated stream upstream servers", "http_upstream_name",
action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), "add", len(add), "update", len(update),
"delete", len(del))

return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
"Successfully updated stream upstream servers", instanceID, "")
}

func (a *APIAction) HandleGetStreamUpstreamsRequest(ctx context.Context,
instance *mpi.Instance,
) *mpi.DataPlaneResponse {
correlationID := logger.GetCorrelationID(ctx)
instanceID := instance.GetInstanceMeta().GetInstanceId()
streamUpstreamsResponse := emptyResponse

streamUpstreams, err := a.ResourceService.GetStreamUpstreams(ctx, instance)
if err != nil {
slog.ErrorContext(ctx, "Unable to get stream upstreams", "error", err)
return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
"", instanceID, err.Error())
}

if streamUpstreams != nil {
streamUpstreamsJSON, jsonErr := json.Marshal(streamUpstreams)
if jsonErr != nil {
slog.ErrorContext(ctx, "Unable to marshal stream upstreams", "err", err)
}
streamUpstreamsResponse = string(streamUpstreamsJSON)
}

return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
streamUpstreamsResponse, instanceID, "")
}

func (a *APIAction) HandleGetUpstreamsRequest(ctx context.Context, instance *mpi.Instance) *mpi.DataPlaneResponse {
correlationID := logger.GetCorrelationID(ctx)
instanceID := instance.GetInstanceMeta().GetInstanceId()
upstreamsResponse := emptyResponse

upstreams, err := a.ResourceService.GetUpstreams(ctx, instance)
if err != nil {
slog.InfoContext(ctx, "Unable to get upstreams", "error", err)

return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
"", instanceID, err.Error())
}

if upstreams != nil {
upstreamsJSON, jsonErr := json.Marshal(upstreams)
if jsonErr != nil {
slog.ErrorContext(ctx, "Unable to marshal upstreams", "err", err)
}
upstreamsResponse = string(upstreamsJSON)
}

return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
upstreamsResponse, instanceID, "")
}

func (a *APIAction) HandleUpdateHTTPUpstreamsRequest(ctx context.Context, action *mpi.NGINXPlusAction,
instance *mpi.Instance,
) *mpi.DataPlaneResponse {
correlationID := logger.GetCorrelationID(ctx)
instanceID := instance.GetInstanceMeta().GetInstanceId()

add, update, del, err := a.ResourceService.UpdateHTTPUpstreamServers(ctx, instance,
action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(),
action.GetUpdateHttpUpstreamServers().GetServers())
if err != nil {
slog.ErrorContext(ctx, "Unable to update HTTP servers of upstream", "request",
action.GetUpdateHttpUpstreamServers(), "error", err)

return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
"", instanceID, err.Error())
}

slog.DebugContext(ctx, "Successfully updated http upstream servers", "http_upstream_name",
action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), "add", len(add), "update", len(update),
"delete", len(del))

return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
"Successfully updated HTTP Upstreams", instanceID, "")
}

func (a *APIAction) HandleGetHTTPUpstreamsServersRequest(ctx context.Context, action *mpi.NGINXPlusAction,
instance *mpi.Instance,
) *mpi.DataPlaneResponse {
correlationID := logger.GetCorrelationID(ctx)
instanceID := instance.GetInstanceMeta().GetInstanceId()
upstreamsResponse := emptyResponse

upstreams, err := a.ResourceService.GetHTTPUpstreamServers(ctx, instance,
action.GetGetHttpUpstreamServers().GetHttpUpstreamName())
if err != nil {
slog.ErrorContext(ctx, "Unable to get HTTP servers of upstream", "error", err)
return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
"", instanceID, err.Error())
}

if upstreams != nil {
upstreamsJSON, jsonErr := json.Marshal(upstreams)
if jsonErr != nil {
slog.ErrorContext(ctx, "Unable to marshal http upstreams", "err", err)
}
upstreamsResponse = string(upstreamsJSON)
}

return proto.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
upstreamsResponse, instanceID, "")
}
Loading

0 comments on commit 0f0c0e2

Please sign in to comment.