Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
dhurley committed Jun 11, 2024
1 parent 6ae8d8f commit 7b8fcc5
Showing 1 changed file with 30 additions and 19 deletions.
49 changes: 30 additions & 19 deletions internal/command/command_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,35 +68,46 @@ func (cp *CommandPlugin) Info() *bus.Info {
}
}

// nolint: revive, cyclop
func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
switch msg.Topic {
case bus.ResourceUpdateTopic:
if resource, ok := msg.Data.(*mpi.Resource); ok {
err := cp.commandService.UpdateDataPlaneStatus(ctx, resource)
if err != nil {
slog.ErrorContext(ctx, "Unable to update data plane status", "error", err)
}
}
cp.processResourceUpdate(ctx, msg)
case bus.InstanceHealthTopic:
if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok {
err := cp.commandService.UpdateDataPlaneHealth(ctx, instances)
if err != nil {
slog.ErrorContext(ctx, "Unable to update data plane health", "error", err)
}
}
cp.processInstanceHealth(ctx, msg)
case bus.DataPlaneResponseTopic:
if response, ok := msg.Data.(*mpi.DataPlaneResponse); ok {
err := cp.commandService.SendDataPlaneResponse(ctx, response)
if err != nil {
slog.ErrorContext(ctx, "Unable to send data plane response", "error", err)
}
}
cp.processDataPlaneResponse(ctx, msg)
default:
slog.DebugContext(ctx, "Command plugin unknown topic", "topic", msg.Topic)
}
}

func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Message) {
if resource, ok := msg.Data.(*mpi.Resource); ok {
err := cp.commandService.UpdateDataPlaneStatus(ctx, resource)
if err != nil {
slog.ErrorContext(ctx, "Unable to update data plane status", "error", err)
}
}
}

func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Message) {
if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok {
err := cp.commandService.UpdateDataPlaneHealth(ctx, instances)
if err != nil {
slog.ErrorContext(ctx, "Unable to update data plane health", "error", err)
}
}
}

func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.Message) {
if response, ok := msg.Data.(*mpi.DataPlaneResponse); ok {
err := cp.commandService.SendDataPlaneResponse(ctx, response)
if err != nil {
slog.ErrorContext(ctx, "Unable to send data plane response", "error", err)
}
}
}

func (cp *CommandPlugin) Subscriptions() []string {
return []string{
bus.ResourceUpdateTopic,
Expand Down

0 comments on commit 7b8fcc5

Please sign in to comment.