diff --git a/src/plugins/dataplane_status.go b/src/plugins/dataplane_status.go index 1ac74eb4c5..99f1ffac72 100644 --- a/src/plugins/dataplane_status.go +++ b/src/plugins/dataplane_status.go @@ -4,7 +4,6 @@ * This source code is licensed under the Apache License, Version 2.0 license found in the * LICENSE file in the root directory of this source tree. */ - package plugins import ( @@ -15,13 +14,12 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" - log "github.com/sirupsen/logrus" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/payloads" + log "github.com/sirupsen/logrus" ) type DataPlaneStatus struct { @@ -43,7 +41,6 @@ type DataPlaneStatus struct { nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus softwareDetailsMutex sync.RWMutex structMu sync.RWMutex - hostInfoMu sync.Mutex processes []*core.Process } @@ -77,23 +74,17 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) { log.Info("DataPlaneStatus initializing") - - dps.structMu.Lock() dps.messagePipeline = pipeline dps.ctx = dps.messagePipeline.Context() - dps.structMu.Unlock() - dps.healthGoRoutine(pipeline) } func (dps *DataPlaneStatus) Close() { log.Info("DataPlaneStatus is wrapping up") dps.nginxConfigActivityStatuses = nil - dps.softwareDetailsMutex.Lock() dps.softwareDetails = nil dps.softwareDetailsMutex.Unlock() - dps.healthTicker.Stop() dps.sendStatus <- true } @@ -108,7 +99,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) // If the agent config on disk changed update DataPlaneStatus with relevant config info dps.syncAgentConfigChange() - case msg.Exact(core.DataplaneSoftwareDetailsUpdated): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -117,7 +107,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { dps.softwareDetails[data.GetPluginName()] = data.GetDataplaneSoftwareDetails() dps.softwareDetailsMutex.Unlock() } - case msg.Exact(core.NginxConfigValidationPending): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -126,7 +115,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { default: log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } - case msg.Exact(core.NginxConfigApplyFailed) || msg.Exact(core.NginxConfigApplySucceeded): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -194,22 +182,18 @@ func (dps *DataPlaneStatus) healthGoRoutine(pipeline core.MessagePipeInterface) } func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneStatus { - dps.structMu.RLock() forceDetails = forceDetails || time.Now().UTC().Add(-dps.reportInterval).After(dps.lastSendDetails) agentActivityStatuses := []*proto.AgentActivityStatus{} for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses { agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus) } - dps.softwareDetailsMutex.Lock() defer dps.softwareDetailsMutex.Unlock() - dataplaneSoftwareDetails := []*proto.DataplaneSoftwareDetails{} for _, softwareDetail := range dps.softwareDetails { dataplaneSoftwareDetails = append(dataplaneSoftwareDetails, softwareDetail) } - dataplaneStatus := &proto.DataplaneStatus{ Host: dps.hostInfo(forceDetails), Details: dps.detailsForProcess(dps.processes, forceDetails), @@ -217,30 +201,26 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS DataplaneSoftwareDetails: dataplaneSoftwareDetails, AgentActivityStatus: agentActivityStatuses, } - dps.structMu.RUnlock() return dataplaneStatus } func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { // this sets send if we are forcing details, or it has been 24 hours since the last send - dps.structMu.RLock() - defer dps.structMu.RUnlock() + dps.structMu.Lock() + defer dps.structMu.Unlock() hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, send) if !send && cmp.Equal(dps.envHostInfo, hostInfo) { return nil } - dps.hostInfoMu.Lock() dps.envHostInfo = hostInfo log.Tracef("hostInfo: %v", hostInfo) - dps.hostInfoMu.Unlock() return hostInfo } func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bool) (details []*proto.NginxDetails) { log.Tracef("detailsForProcess processes: %v", processes) - nowUTC := time.Now().UTC() // this sets send if we are forcing details, or it has been 24 hours since the last send for _, p := range processes { @@ -249,12 +229,10 @@ func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bo } details = append(details, dps.binary.GetNginxDetailsFromProcess(p)) // spec says process CreateTime is unix UTC in MS - dps.structMu.RLock() if time.UnixMilli(p.CreateTime).After(dps.lastSendDetails) { // set send because this process has started since the last send send = true } - dps.structMu.RUnlock() } if !send { @@ -272,7 +250,6 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails := make(map[string]*proto.NginxHealth) instanceProcessCount := make(map[string]int) log.Tracef("healthForProcess processes: %v", processes) - for _, p := range processes { instanceID := dps.binary.GetNginxIDForProcess(p) log.Tracef("Process: %v instanceID %s", p, instanceID) @@ -294,10 +271,8 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails[instanceID].NginxStatus = proto.NginxHealth_DEGRADED } } - for instanceID, health := range heathDetails { log.Tracef("instanceID: %s health: %s", instanceID, health) - if instanceProcessCount[instanceID] <= 1 { reason := "does not have enough children" if heathDetails[instanceID].NginxStatus == proto.NginxHealth_DEGRADED { @@ -318,13 +293,11 @@ func (dps *DataPlaneStatus) syncAgentConfigChange() { return } log.Debugf("DataPlaneStatus is updating to a new config - %v", conf) - pollInt := conf.Dataplane.Status.PollInterval if pollInt < defaultMinInterval { pollInt = defaultMinInterval log.Warnf("interval set to %s, provided value (%s) less than minimum", pollInt, conf.Dataplane.Status.PollInterval) } - if conf.DisplayName == "" { conf.DisplayName = dps.env.GetHostname() log.Infof("setting displayName to %s", conf.DisplayName) @@ -332,8 +305,10 @@ func (dps *DataPlaneStatus) syncAgentConfigChange() { // Update DataPlaneStatus with relevant config info dps.structMu.Lock() + dps.interval = pollInt dps.tags = &conf.Tags dps.configDirs = conf.ConfigDirs + dps.structMu.Unlock() } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go index 1ac74eb4c5..99f1ffac72 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go @@ -4,7 +4,6 @@ * This source code is licensed under the Apache License, Version 2.0 license found in the * LICENSE file in the root directory of this source tree. */ - package plugins import ( @@ -15,13 +14,12 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" - log "github.com/sirupsen/logrus" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/payloads" + log "github.com/sirupsen/logrus" ) type DataPlaneStatus struct { @@ -43,7 +41,6 @@ type DataPlaneStatus struct { nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus softwareDetailsMutex sync.RWMutex structMu sync.RWMutex - hostInfoMu sync.Mutex processes []*core.Process } @@ -77,23 +74,17 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) { log.Info("DataPlaneStatus initializing") - - dps.structMu.Lock() dps.messagePipeline = pipeline dps.ctx = dps.messagePipeline.Context() - dps.structMu.Unlock() - dps.healthGoRoutine(pipeline) } func (dps *DataPlaneStatus) Close() { log.Info("DataPlaneStatus is wrapping up") dps.nginxConfigActivityStatuses = nil - dps.softwareDetailsMutex.Lock() dps.softwareDetails = nil dps.softwareDetailsMutex.Unlock() - dps.healthTicker.Stop() dps.sendStatus <- true } @@ -108,7 +99,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) // If the agent config on disk changed update DataPlaneStatus with relevant config info dps.syncAgentConfigChange() - case msg.Exact(core.DataplaneSoftwareDetailsUpdated): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -117,7 +107,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { dps.softwareDetails[data.GetPluginName()] = data.GetDataplaneSoftwareDetails() dps.softwareDetailsMutex.Unlock() } - case msg.Exact(core.NginxConfigValidationPending): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -126,7 +115,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { default: log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } - case msg.Exact(core.NginxConfigApplyFailed) || msg.Exact(core.NginxConfigApplySucceeded): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -194,22 +182,18 @@ func (dps *DataPlaneStatus) healthGoRoutine(pipeline core.MessagePipeInterface) } func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneStatus { - dps.structMu.RLock() forceDetails = forceDetails || time.Now().UTC().Add(-dps.reportInterval).After(dps.lastSendDetails) agentActivityStatuses := []*proto.AgentActivityStatus{} for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses { agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus) } - dps.softwareDetailsMutex.Lock() defer dps.softwareDetailsMutex.Unlock() - dataplaneSoftwareDetails := []*proto.DataplaneSoftwareDetails{} for _, softwareDetail := range dps.softwareDetails { dataplaneSoftwareDetails = append(dataplaneSoftwareDetails, softwareDetail) } - dataplaneStatus := &proto.DataplaneStatus{ Host: dps.hostInfo(forceDetails), Details: dps.detailsForProcess(dps.processes, forceDetails), @@ -217,30 +201,26 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS DataplaneSoftwareDetails: dataplaneSoftwareDetails, AgentActivityStatus: agentActivityStatuses, } - dps.structMu.RUnlock() return dataplaneStatus } func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { // this sets send if we are forcing details, or it has been 24 hours since the last send - dps.structMu.RLock() - defer dps.structMu.RUnlock() + dps.structMu.Lock() + defer dps.structMu.Unlock() hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, send) if !send && cmp.Equal(dps.envHostInfo, hostInfo) { return nil } - dps.hostInfoMu.Lock() dps.envHostInfo = hostInfo log.Tracef("hostInfo: %v", hostInfo) - dps.hostInfoMu.Unlock() return hostInfo } func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bool) (details []*proto.NginxDetails) { log.Tracef("detailsForProcess processes: %v", processes) - nowUTC := time.Now().UTC() // this sets send if we are forcing details, or it has been 24 hours since the last send for _, p := range processes { @@ -249,12 +229,10 @@ func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bo } details = append(details, dps.binary.GetNginxDetailsFromProcess(p)) // spec says process CreateTime is unix UTC in MS - dps.structMu.RLock() if time.UnixMilli(p.CreateTime).After(dps.lastSendDetails) { // set send because this process has started since the last send send = true } - dps.structMu.RUnlock() } if !send { @@ -272,7 +250,6 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails := make(map[string]*proto.NginxHealth) instanceProcessCount := make(map[string]int) log.Tracef("healthForProcess processes: %v", processes) - for _, p := range processes { instanceID := dps.binary.GetNginxIDForProcess(p) log.Tracef("Process: %v instanceID %s", p, instanceID) @@ -294,10 +271,8 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails[instanceID].NginxStatus = proto.NginxHealth_DEGRADED } } - for instanceID, health := range heathDetails { log.Tracef("instanceID: %s health: %s", instanceID, health) - if instanceProcessCount[instanceID] <= 1 { reason := "does not have enough children" if heathDetails[instanceID].NginxStatus == proto.NginxHealth_DEGRADED { @@ -318,13 +293,11 @@ func (dps *DataPlaneStatus) syncAgentConfigChange() { return } log.Debugf("DataPlaneStatus is updating to a new config - %v", conf) - pollInt := conf.Dataplane.Status.PollInterval if pollInt < defaultMinInterval { pollInt = defaultMinInterval log.Warnf("interval set to %s, provided value (%s) less than minimum", pollInt, conf.Dataplane.Status.PollInterval) } - if conf.DisplayName == "" { conf.DisplayName = dps.env.GetHostname() log.Infof("setting displayName to %s", conf.DisplayName) @@ -332,8 +305,10 @@ func (dps *DataPlaneStatus) syncAgentConfigChange() { // Update DataPlaneStatus with relevant config info dps.structMu.Lock() + dps.interval = pollInt dps.tags = &conf.Tags dps.configDirs = conf.ConfigDirs + dps.structMu.Unlock() }