Skip to content

Commit

Permalink
fixed test
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveromahony committed Sep 3, 2024
1 parent b11289a commit e3986d9
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 60 deletions.
35 changes: 5 additions & 30 deletions src/plugins/dataplane_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
* This source code is licensed under the Apache License, Version 2.0 license found in the
* LICENSE file in the root directory of this source tree.
*/

package plugins

import (
Expand All @@ -15,13 +14,12 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"

agent_config "github.com/nginx/agent/sdk/v2/agent/config"
"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/config"
"github.com/nginx/agent/v2/src/core/payloads"
log "github.com/sirupsen/logrus"
)

type DataPlaneStatus struct {
Expand All @@ -43,7 +41,6 @@ type DataPlaneStatus struct {
nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus
softwareDetailsMutex sync.RWMutex
structMu sync.RWMutex
hostInfoMu sync.Mutex
processes []*core.Process
}

Expand Down Expand Up @@ -77,23 +74,17 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core

func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) {
log.Info("DataPlaneStatus initializing")

dps.structMu.Lock()
dps.messagePipeline = pipeline
dps.ctx = dps.messagePipeline.Context()
dps.structMu.Unlock()

dps.healthGoRoutine(pipeline)
}

func (dps *DataPlaneStatus) Close() {
log.Info("DataPlaneStatus is wrapping up")
dps.nginxConfigActivityStatuses = nil

dps.softwareDetailsMutex.Lock()
dps.softwareDetails = nil
dps.softwareDetailsMutex.Unlock()

dps.healthTicker.Stop()
dps.sendStatus <- true
}
Expand All @@ -108,7 +99,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) {
log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic())
// If the agent config on disk changed update DataPlaneStatus with relevant config info
dps.syncAgentConfigChange()

case msg.Exact(core.DataplaneSoftwareDetailsUpdated):
log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic())
switch data := msg.Data().(type) {
Expand All @@ -117,7 +107,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) {
dps.softwareDetails[data.GetPluginName()] = data.GetDataplaneSoftwareDetails()
dps.softwareDetailsMutex.Unlock()
}

case msg.Exact(core.NginxConfigValidationPending):
log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic())
switch data := msg.Data().(type) {
Expand All @@ -126,7 +115,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) {
default:
log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data)
}

case msg.Exact(core.NginxConfigApplyFailed) || msg.Exact(core.NginxConfigApplySucceeded):
log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic())
switch data := msg.Data().(type) {
Expand Down Expand Up @@ -194,53 +182,45 @@ func (dps *DataPlaneStatus) healthGoRoutine(pipeline core.MessagePipeInterface)
}

func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneStatus {
dps.structMu.RLock()
forceDetails = forceDetails || time.Now().UTC().Add(-dps.reportInterval).After(dps.lastSendDetails)

agentActivityStatuses := []*proto.AgentActivityStatus{}
for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses {
agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus)
}

dps.softwareDetailsMutex.Lock()
defer dps.softwareDetailsMutex.Unlock()

dataplaneSoftwareDetails := []*proto.DataplaneSoftwareDetails{}
for _, softwareDetail := range dps.softwareDetails {
dataplaneSoftwareDetails = append(dataplaneSoftwareDetails, softwareDetail)
}

dataplaneStatus := &proto.DataplaneStatus{
Host: dps.hostInfo(forceDetails),
Details: dps.detailsForProcess(dps.processes, forceDetails),
Healths: dps.healthForProcess(dps.processes),
DataplaneSoftwareDetails: dataplaneSoftwareDetails,
AgentActivityStatus: agentActivityStatuses,
}
dps.structMu.RUnlock()
return dataplaneStatus
}

func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) {
// this sets send if we are forcing details, or it has been 24 hours since the last send
dps.structMu.RLock()
defer dps.structMu.RUnlock()
dps.structMu.Lock()
defer dps.structMu.Unlock()
hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, send)
if !send && cmp.Equal(dps.envHostInfo, hostInfo) {
return nil
}

dps.hostInfoMu.Lock()
dps.envHostInfo = hostInfo
log.Tracef("hostInfo: %v", hostInfo)
dps.hostInfoMu.Unlock()

return hostInfo
}

func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bool) (details []*proto.NginxDetails) {
log.Tracef("detailsForProcess processes: %v", processes)

nowUTC := time.Now().UTC()
// this sets send if we are forcing details, or it has been 24 hours since the last send
for _, p := range processes {
Expand All @@ -249,12 +229,10 @@ func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bo
}
details = append(details, dps.binary.GetNginxDetailsFromProcess(p))
// spec says process CreateTime is unix UTC in MS
dps.structMu.RLock()
if time.UnixMilli(p.CreateTime).After(dps.lastSendDetails) {
// set send because this process has started since the last send
send = true
}
dps.structMu.RUnlock()
}

if !send {
Expand All @@ -272,7 +250,6 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths
heathDetails := make(map[string]*proto.NginxHealth)
instanceProcessCount := make(map[string]int)
log.Tracef("healthForProcess processes: %v", processes)

for _, p := range processes {
instanceID := dps.binary.GetNginxIDForProcess(p)
log.Tracef("Process: %v instanceID %s", p, instanceID)
Expand All @@ -294,10 +271,8 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths
heathDetails[instanceID].NginxStatus = proto.NginxHealth_DEGRADED
}
}

for instanceID, health := range heathDetails {
log.Tracef("instanceID: %s health: %s", instanceID, health)

if instanceProcessCount[instanceID] <= 1 {
reason := "does not have enough children"
if heathDetails[instanceID].NginxStatus == proto.NginxHealth_DEGRADED {
Expand All @@ -318,22 +293,22 @@ func (dps *DataPlaneStatus) syncAgentConfigChange() {
return
}
log.Debugf("DataPlaneStatus is updating to a new config - %v", conf)

pollInt := conf.Dataplane.Status.PollInterval
if pollInt < defaultMinInterval {
pollInt = defaultMinInterval
log.Warnf("interval set to %s, provided value (%s) less than minimum", pollInt, conf.Dataplane.Status.PollInterval)
}

if conf.DisplayName == "" {
conf.DisplayName = dps.env.GetHostname()
log.Infof("setting displayName to %s", conf.DisplayName)
}

// Update DataPlaneStatus with relevant config info
dps.structMu.Lock()

dps.interval = pollInt
dps.tags = &conf.Tags
dps.configDirs = conf.ConfigDirs

dps.structMu.Unlock()
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit e3986d9

Please sign in to comment.