diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9455dfe519..6252ac4a25 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -238,7 +238,7 @@ jobs: name: official-oss-integration-test-logs-${{ matrix.container.image }}-${{ matrix.container.version }} path: /tmp/integration-test-logs/ retention-days: 3 - + official-plus-image-integration-tests: name: Integration Tests - Official Plus Images needs: build-unsigned-snapshot diff --git a/nginx-agent.conf b/nginx-agent.conf index 5a714bb5c7..792665e3c8 100644 --- a/nginx-agent.conf +++ b/nginx-agent.conf @@ -46,4 +46,4 @@ config_dirs: "/etc/nginx:/usr/local/etc/nginx:/usr/share/nginx/modules:/etc/nms" # host: 127.0.0.1 # # Set this value to a secure port number to prevent information leaks. - # port: 8038 \ No newline at end of file + # port: 8038 diff --git a/sdk/client/metric_reporter.go b/sdk/client/metric_reporter.go index 0bedeedd8a..6eaa73ca61 100644 --- a/sdk/client/metric_reporter.go +++ b/sdk/client/metric_reporter.go @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error { } func (r *metricReporter) Close() (err error) { + r.mu.Lock() + defer r.mu.Unlock() return r.closeConnection() } diff --git a/sdk/config_helpers.go b/sdk/config_helpers.go index 0b6f32e8d8..ed2595be87 100644 --- a/sdk/config_helpers.go +++ b/sdk/config_helpers.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/nginx/agent/sdk/v2/backoff" @@ -43,6 +44,8 @@ const ( httpClientTimeout = 1 * time.Second ) +var readLock = sync.Mutex{} + type DirectoryMap struct { paths map[string]*proto.Directory } @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives( allowedDirectories map[string]struct{}, ignoreDirectives []string, ) (*proto.NginxConfig, error) { + readLock.Lock() payload, err := crossplane.Parse(confFile, &crossplane.ParseOptions{ IgnoreDirectives: ignoreDirectives, @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives( if err != nil { return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err) } + readLock.Unlock() return nginxConfig, nil } diff --git a/src/core/config/config.go b/src/core/config/config.go index e4aca09014..b8ed3aa10a 100644 --- a/src/core/config/config.go +++ b/src/core/config/config.go @@ -19,6 +19,7 @@ import ( "reflect" "sort" "strings" + "sync" "time" agent_config "github.com/nginx/agent/sdk/v2/agent/config" @@ -50,6 +51,7 @@ const ( var ( Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter)) MigratedEnv = false + cfgMu = sync.Mutex{} ) func SetVersion(version, commit string) { @@ -196,7 +198,7 @@ func RegisterRunner(r func(cmd *cobra.Command, args []string)) { } func GetConfig(clientId string) (*Config, error) { - extensions := []string{} + var extensions []string for _, extension := range Viper.GetStringSlice(agent_config.ExtensionsKey) { if agent_config.IsKnownExtension(extension) { @@ -247,6 +249,8 @@ func GetConfig(clientId string) (*Config, error) { // overwritten or not. func UpdateAgentConfig(systemId string, updateTags []string, updateFeatures []string) (bool, error) { // Get current config on disk + cfgMu.Lock() + defer cfgMu.Unlock() config, err := GetConfig(systemId) if err != nil { log.Errorf("Failed to register config: %v", err) diff --git a/src/core/config/config_test.go b/src/core/config/config_test.go index 9d5767cfe8..7ffcea761e 100644 --- a/src/core/config/config_test.go +++ b/src/core/config/config_test.go @@ -163,7 +163,7 @@ func TestGetConfig(t *testing.T) { assert.Equal(t, []string{}, config.Tags) assert.Equal(t, Defaults.Features, config.Features) - assert.Equal(t, []string{}, config.Extensions) + assert.Equal(t, []string(nil), config.Extensions) }) t.Run("test override defaults with flags", func(t *testing.T) { @@ -274,7 +274,7 @@ func TestGetConfig(t *testing.T) { assert.Equal(t, Defaults.AgentMetrics.Mode, config.AgentMetrics.Mode) assert.Equal(t, 10*time.Minute, config.AgentMetrics.Backoff.MaxInterval) assert.Equal(t, Defaults.Features, config.Features) - assert.Equal(t, []string{}, config.Extensions) + assert.Equal(t, []string(nil), config.Extensions) }) t.Run("test override config values with ENV variables", func(t *testing.T) { diff --git a/src/core/environment.go b/src/core/environment.go index 5bb0355aa3..3e1b719760 100644 --- a/src/core/environment.go +++ b/src/core/environment.go @@ -20,6 +20,7 @@ import ( "regexp" "runtime" "strings" + "sync" "syscall" "github.com/google/uuid" @@ -43,7 +44,6 @@ import ( //go:generate mv fake_environment_fixed.go fake_environment_test.go type Environment interface { NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo - // NewHostInfoWithContext(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo GetHostname() (hostname string) GetSystemUUID() (hostId string) ReadDirectory(dir string, ext string) ([]string, error) @@ -70,6 +70,7 @@ type EnvironmentType struct { host *proto.HostInfo virtualizationFunc func(ctx context.Context) (string, string, error) isContainerFunc func() bool + hostMu sync.Mutex } type Process struct { @@ -106,6 +107,7 @@ const ( IsContainerKey = "isContainer" GetContainerIDKey = "GetContainerID" GetSystemUUIDKey = "GetSystemUUIDKey" + ReleaseInfoFile = "/etc/os-release" ) var ( @@ -125,6 +127,8 @@ func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, con func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { defer ctx.Done() + env.hostMu.Lock() + defer env.hostMu.Unlock() // temp cache measure if env.host == nil || clearCache { hostInformation, err := host.InfoWithContext(ctx) @@ -154,7 +158,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer Partitons: disks, Network: env.networks(), Processor: env.processors(hostInformation.KernelArch), - Release: releaseInfo("/etc/os-release"), + Release: releaseInfo(ReleaseInfoFile), Tags: *tags, AgentAccessibleDirs: configDirs, } diff --git a/src/core/metrics/sources/disk.go b/src/core/metrics/sources/disk.go index 98a3847a2d..3bf7eafb5c 100644 --- a/src/core/metrics/sources/disk.go +++ b/src/core/metrics/sources/disk.go @@ -14,7 +14,6 @@ import ( "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" - log "github.com/sirupsen/logrus" ) const MOUNT_POINT = "mount_point" @@ -49,8 +48,6 @@ func (c *Disk) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrapper "in_use": float64(usage.UsedPercentage), }) - log.Debugf("disk metrics collected: %v", len(simpleMetrics)) - select { case <-ctx.Done(): return diff --git a/src/core/metrics/sources/net_io.go b/src/core/metrics/sources/net_io.go index e3ef91cd83..13fcc2f24b 100644 --- a/src/core/metrics/sources/net_io.go +++ b/src/core/metrics/sources/net_io.go @@ -12,11 +12,12 @@ import ( "fmt" "sync" + log "github.com/sirupsen/logrus" + "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" "github.com/shirou/gopsutil/v3/net" - log "github.com/sirupsen/logrus" ) const NETWORK_INTERFACE = "network_interface" @@ -82,7 +83,6 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap } simpleMetrics := nio.convertSamplesToSimpleMetrics(v) - log.Debugf("net IO stats count: %d", len(simpleMetrics)) select { case <-ctx.Done(): @@ -109,6 +109,7 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap simpleMetrics := nio.convertSamplesToSimpleMetrics(totalStats) m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{}, simpleMetrics, proto.MetricsReport_SYSTEM) + log.Debugf("net IO stats: %v", currentNetIOStats) nio.netIOStats = currentNetIOStats } diff --git a/src/core/metrics/sources/nginx_access_log.go b/src/core/metrics/sources/nginx_access_log.go index 46c50b5830..b52a758428 100644 --- a/src/core/metrics/sources/nginx_access_log.go +++ b/src/core/metrics/sources/nginx_access_log.go @@ -182,7 +182,6 @@ func (c *NginxAccessLog) Stop() { fn() delete(c.logs, f) } - log.Debugf("Stopping NginxAccessLog source for nginx id: %v", c.baseDimensions.NginxId) } func (c *NginxAccessLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) { @@ -329,11 +328,11 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string mu.Unlock() case <-tick.C: + mu.Lock() + c.baseDimensions.NginxType = c.nginxType c.baseDimensions.PublishedAPI = logFile - mu.Lock() - if len(requestLengths) > 0 { httpCounters["request.length"] = getAverageMetricValue(requestLengths) } diff --git a/src/core/metrics/sources/nginx_error_log.go b/src/core/metrics/sources/nginx_error_log.go index 06368eb93a..52eb63ca4d 100644 --- a/src/core/metrics/sources/nginx_error_log.go +++ b/src/core/metrics/sources/nginx_error_log.go @@ -116,7 +116,6 @@ func (c *NginxErrorLog) Stop() { func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.mu.Lock() - defer c.mu.Unlock() c.baseDimensions = dimensions @@ -129,6 +128,7 @@ func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *met // add, remove or update existing log trailers c.syncLogs() } + c.mu.Unlock() } func (c *NginxErrorLog) recreateLogs() { @@ -178,7 +178,7 @@ func (c *NginxErrorLog) stopTailer(logFile string, cancelFunction context.Cancel delete(c.logFormats, logFile) } -func (c *NginxErrorLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) { +func (c *NginxErrorLog) collectLogStats(_ context.Context, m chan<- *metrics.StatsEntityWrapper) { c.mu.Lock() defer c.mu.Unlock() @@ -227,10 +227,10 @@ func (c *NginxErrorLog) logStats(ctx context.Context, logFile string) { mu.Unlock() case <-tick.C: + mu.Lock() c.baseDimensions.NginxType = c.nginxType c.baseDimensions.PublishedAPI = logFile - mu.Lock() simpleMetrics := c.convertSamplesToSimpleMetrics(counters) log.Tracef("Error log metrics collected: %v", simpleMetrics) diff --git a/src/core/nginx.go b/src/core/nginx.go index 577abc6c05..334df03610 100644 --- a/src/core/nginx.go +++ b/src/core/nginx.go @@ -37,10 +37,8 @@ const ( ) var ( - logMutex sync.Mutex - unpackMutex sync.Mutex - re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) - plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) + re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) + plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) ) type NginxBinary interface { @@ -55,26 +53,28 @@ type NginxBinary interface { UpdateNginxDetailsFromProcesses(nginxProcesses []*Process) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApply, error) ReadConfig(path, nginxId, systemId string) (*proto.NginxConfig, error) - UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool + UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string GetAccessLogs() map[string]string GetErrorLogs() map[string]string GetChildProcesses() map[string][]*proto.NginxDetails } type NginxBinaryType struct { - detailsMapMutex sync.Mutex - workersMapMutex sync.Mutex - statusUrlMutex sync.RWMutex - env Environment - config *config.Config - nginxDetailsMap map[string]*proto.NginxDetails - nginxWorkersMap map[string][]*proto.NginxDetails - nginxInfoMap map[string]*nginxInfo - accessLogs map[string]string - errorLogs map[string]string - statusUrls map[string]string - accessLogsUpdated bool - errorLogsUpdated bool + detailsMapMutex sync.Mutex + workersMapMutex sync.Mutex + logMutex sync.Mutex + logWriteMutex sync.Mutex + unpackMutex sync.Mutex + mapMutex sync.Mutex + statusUrlMutex sync.RWMutex + env Environment + config *config.Config + nginxDetailsMap map[string]*proto.NginxDetails + nginxWorkersMap map[string][]*proto.NginxDetails + nginxInfoMap map[string]*nginxInfo + accessLogs map[string]string + errorLogs map[string]string + statusUrls map[string]string } type nginxInfo struct { @@ -408,8 +408,8 @@ func (n *NginxBinaryType) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApp return nil, fmt.Errorf("config directory %s not allowed", filepath.Dir(details.ConfPath)) } - unpackMutex.Lock() - defer unpackMutex.Unlock() + n.unpackMutex.Lock() + defer n.unpackMutex.Unlock() log.Info("Updating NGINX config") var configApply *sdk.ConfigApply @@ -661,29 +661,22 @@ func (n *NginxBinaryType) ReadConfig(confFile, nginxId, systemId string) (*proto return nil, err } - // get access logs list for analysis - accessLogs := AccessLogs(configPayload) - // get error logs list for analysis - errorLogs := ErrorLogs(configPayload) + n.logWriteMutex.Lock() + defer n.logWriteMutex.Unlock() + n.accessLogs = n.UpdateLogs(n.GetAccessLogs(), AccessLogs(configPayload)) - logMutex.Lock() - defer logMutex.Unlock() - - n.accessLogsUpdated = n.UpdateLogs(n.accessLogs, accessLogs) - n.errorLogsUpdated = n.UpdateLogs(n.errorLogs, errorLogs) + n.errorLogs = n.UpdateLogs(n.GetErrorLogs(), ErrorLogs(configPayload)) return configPayload, nil } func (n *NginxBinaryType) GetAccessLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() return n.accessLogs } func (n *NginxBinaryType) GetErrorLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() + n.logMutex.Lock() + defer n.logMutex.Unlock() return n.errorLogs } @@ -802,6 +795,9 @@ func parseConfigureArguments(line string) (result map[string]interface{}, flags } func (n *NginxBinaryType) getNginxInfoFrom(ngxExe string) *nginxInfo { + n.mapMutex.Lock() + defer n.mapMutex.Unlock() + if ngxExe == "" { return &nginxInfo{} } @@ -905,27 +901,29 @@ func (n *NginxBinaryType) parseModulePath(dir string) ([]string, error) { return result, nil } -func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool { - logUpdated := false +func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string { + log.Debug("UpdateLogs") + + copiedLogs := make(map[string]string, len(existingLogs)) + // Copy each key-value pair from the original map to the new map + for logFile, logFormat := range existingLogs { + copiedLogs[logFile] = logFormat + } for logFile, logFormat := range newLogs { if !(strings.HasPrefix(logFile, "syslog:") || n.SkipLog(logFile)) { - if _, found := existingLogs[logFile]; !found || existingLogs[logFile] != logFormat { - logUpdated = true - } - existingLogs[logFile] = logFormat + copiedLogs[logFile] = logFormat } } // delete old logs - for logFile := range existingLogs { + for logFile := range copiedLogs { if _, found := newLogs[logFile]; !found { - delete(existingLogs, logFile) - logUpdated = true + delete(copiedLogs, logFile) } } - return logUpdated + return copiedLogs } func parseNginxVersion(line string) (version, plusVersion string) { diff --git a/src/core/pipe.go b/src/core/pipe.go index 47311e79dc..481da7f36e 100644 --- a/src/core/pipe.go +++ b/src/core/pipe.go @@ -37,6 +37,7 @@ type MessagePipe struct { extensionPlugins []ExtensionPlugin ctx context.Context mu sync.RWMutex + regMu sync.Mutex bus message_bus.MessageBus } @@ -72,7 +73,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } @@ -82,7 +85,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.extensionPlugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } diff --git a/src/plugins/commander.go b/src/plugins/commander.go index be11792834..582796de8d 100644 --- a/src/plugins/commander.go +++ b/src/plugins/commander.go @@ -102,8 +102,6 @@ func (c *Commander) agentBackoff(agentConfig *proto.AgentConfig) { func (c *Commander) agentRegistered(cmd *proto.Command) { switch commandData := cmd.GetData().(type) { case *proto.Command_AgentConnectResponse: - log.Infof("config command %v", commandData) - if agtCfg := commandData.AgentConnectResponse.AgentConfig; agtCfg != nil && agtCfg.Configs != nil && len(agtCfg.Configs.Configs) > 0 { for _, config := range agtCfg.Configs.Configs { diff --git a/src/plugins/config_reader.go b/src/plugins/config_reader.go index eb9606eac9..978f01b7f6 100644 --- a/src/plugins/config_reader.go +++ b/src/plugins/config_reader.go @@ -27,6 +27,7 @@ type ConfigReader struct { messagePipeline core.MessagePipeInterface config *config.Config mu sync.RWMutex + detailsMu sync.RWMutex } func NewConfigReader(config *config.Config) *ConfigReader { @@ -85,7 +86,10 @@ func (r *ConfigReader) Subscriptions() []string { } func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) { - if payloadAgentConfig != nil && payloadAgentConfig.Details != nil { + r.mu.Lock() + defer r.mu.Unlock() + + if payloadAgentConfig.Details != nil { onDiskAgentConfig, err := config.GetConfig(r.config.ClientID) if err != nil { log.Errorf("Failed to update Agent config - %v", err) @@ -96,21 +100,29 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) synchronizeTags := false if payloadAgentConfig.Details.Features != nil { - + r.detailsMu.Lock() for index, feature := range payloadAgentConfig.Details.Features { payloadAgentConfig.Details.Features[index] = strings.Replace(feature, "features_", "", 1) } sort.Strings(onDiskAgentConfig.Features) sort.Strings(payloadAgentConfig.Details.Features) + r.detailsMu.Unlock() + + r.detailsMu.RLock() synchronizeFeatures = !reflect.DeepEqual(payloadAgentConfig.Details.Features, onDiskAgentConfig.Features) + r.detailsMu.RUnlock() } else { + r.detailsMu.Lock() payloadAgentConfig.Details.Features = onDiskAgentConfig.Features + r.detailsMu.Unlock() } if payloadAgentConfig.Details.Tags == nil { + r.detailsMu.Lock() payloadAgentConfig.Details.Tags = []string{} + r.detailsMu.Unlock() } sort.Strings(onDiskAgentConfig.Tags) @@ -118,7 +130,9 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) synchronizeTags = !reflect.DeepEqual(payloadAgentConfig.Details.Tags, onDiskAgentConfig.Tags) if synchronizeFeatures || synchronizeTags { - configUpdated, err := config.UpdateAgentConfig(r.config.ClientID, payloadAgentConfig.Details.Tags, payloadAgentConfig.Details.Features) + tags := payloadAgentConfig.Details.Tags + features := payloadAgentConfig.Details.Features + configUpdated, err := config.UpdateAgentConfig(r.config.ClientID, tags, features) if err != nil { log.Errorf("Failed updating Agent config - %v", err) } @@ -142,25 +156,22 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) } r.messagePipeline.Process(core.NewMessage(core.AgentConfigChanged, payloadAgentConfig)) - } } func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) { if r.config != nil { + r.detailsMu.RLock() for _, feature := range r.config.Features { if feature != agent_config.FeatureRegistration && feature != agent_config.FeatureNginxConfigAsync { - r.mu.Lock() r.deRegisterPlugin(feature) - r.mu.Unlock() } } + r.detailsMu.RUnlock() } if agtCfg.Details != nil { - r.mu.Lock() r.messagePipeline.Process(core.NewMessage(core.EnableFeature, agtCfg.Details.Features)) - r.mu.Unlock() } } diff --git a/src/plugins/dataplane_status.go b/src/plugins/dataplane_status.go index 6b6014a5ff..99f1ffac72 100644 --- a/src/plugins/dataplane_status.go +++ b/src/plugins/dataplane_status.go @@ -4,7 +4,6 @@ * This source code is licensed under the Apache License, Version 2.0 license found in the * LICENSE file in the root directory of this source tree. */ - package plugins import ( @@ -15,13 +14,12 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" - log "github.com/sirupsen/logrus" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/payloads" + log "github.com/sirupsen/logrus" ) type DataPlaneStatus struct { @@ -42,6 +40,7 @@ type DataPlaneStatus struct { softwareDetails map[string]*proto.DataplaneSoftwareDetails nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus softwareDetailsMutex sync.RWMutex + structMu sync.RWMutex processes []*core.Process } @@ -67,7 +66,6 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core tags: &config.Tags, configDirs: config.ConfigDirs, reportInterval: config.Dataplane.Status.ReportInterval, - softwareDetailsMutex: sync.RWMutex{}, nginxConfigActivityStatuses: make(map[string]*proto.AgentActivityStatus), softwareDetails: make(map[string]*proto.DataplaneSoftwareDetails), processes: processes, @@ -84,11 +82,9 @@ func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) { func (dps *DataPlaneStatus) Close() { log.Info("DataPlaneStatus is wrapping up") dps.nginxConfigActivityStatuses = nil - dps.softwareDetailsMutex.Lock() dps.softwareDetails = nil dps.softwareDetailsMutex.Unlock() - dps.healthTicker.Stop() dps.sendStatus <- true } @@ -103,7 +99,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) // If the agent config on disk changed update DataPlaneStatus with relevant config info dps.syncAgentConfigChange() - case msg.Exact(core.DataplaneSoftwareDetailsUpdated): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -112,7 +107,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { dps.softwareDetails[data.GetPluginName()] = data.GetDataplaneSoftwareDetails() dps.softwareDetailsMutex.Unlock() } - case msg.Exact(core.NginxConfigValidationPending): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -121,7 +115,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { default: log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } - case msg.Exact(core.NginxConfigApplyFailed) || msg.Exact(core.NginxConfigApplySucceeded): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -132,7 +125,9 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } case msg.Exact(core.NginxDetailProcUpdate): + dps.structMu.Lock() dps.processes = msg.Data().([]*core.Process) + dps.structMu.Unlock() } } @@ -193,15 +188,12 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses { agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus) } - dps.softwareDetailsMutex.Lock() defer dps.softwareDetailsMutex.Unlock() - dataplaneSoftwareDetails := []*proto.DataplaneSoftwareDetails{} for _, softwareDetail := range dps.softwareDetails { dataplaneSoftwareDetails = append(dataplaneSoftwareDetails, softwareDetail) } - dataplaneStatus := &proto.DataplaneStatus{ Host: dps.hostInfo(forceDetails), Details: dps.detailsForProcess(dps.processes, forceDetails), @@ -214,6 +206,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { // this sets send if we are forcing details, or it has been 24 hours since the last send + dps.structMu.Lock() + defer dps.structMu.Unlock() hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, send) if !send && cmp.Equal(dps.envHostInfo, hostInfo) { return nil @@ -227,7 +221,6 @@ func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bool) (details []*proto.NginxDetails) { log.Tracef("detailsForProcess processes: %v", processes) - nowUTC := time.Now().UTC() // this sets send if we are forcing details, or it has been 24 hours since the last send for _, p := range processes { @@ -246,7 +239,9 @@ func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bo return nil } + dps.structMu.Lock() dps.lastSendDetails = nowUTC + dps.structMu.Unlock() return details } @@ -255,7 +250,6 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails := make(map[string]*proto.NginxHealth) instanceProcessCount := make(map[string]int) log.Tracef("healthForProcess processes: %v", processes) - for _, p := range processes { instanceID := dps.binary.GetNginxIDForProcess(p) log.Tracef("Process: %v instanceID %s", p, instanceID) @@ -277,10 +271,8 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails[instanceID].NginxStatus = proto.NginxHealth_DEGRADED } } - for instanceID, health := range heathDetails { log.Tracef("instanceID: %s health: %s", instanceID, health) - if instanceProcessCount[instanceID] <= 1 { reason := "does not have enough children" if heathDetails[instanceID].NginxStatus == proto.NginxHealth_DEGRADED { @@ -301,20 +293,22 @@ func (dps *DataPlaneStatus) syncAgentConfigChange() { return } log.Debugf("DataPlaneStatus is updating to a new config - %v", conf) - pollInt := conf.Dataplane.Status.PollInterval if pollInt < defaultMinInterval { pollInt = defaultMinInterval log.Warnf("interval set to %s, provided value (%s) less than minimum", pollInt, conf.Dataplane.Status.PollInterval) } - if conf.DisplayName == "" { conf.DisplayName = dps.env.GetHostname() log.Infof("setting displayName to %s", conf.DisplayName) } // Update DataPlaneStatus with relevant config info + dps.structMu.Lock() + dps.interval = pollInt dps.tags = &conf.Tags dps.configDirs = conf.ConfigDirs + + dps.structMu.Unlock() } diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index 55238474c4..97b0de01f1 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -71,7 +71,9 @@ func (m *Metrics) Init(pipeline core.MessagePipeInterface) { } func (m *Metrics) Close() { + m.collectorsMutex.Lock() m.collectors = nil + m.collectorsMutex.Unlock() log.Info("Metrics is wrapping up") } @@ -336,6 +338,8 @@ func createCollectorConfigsMap(config *config.Config, binary core.NginxBinary, p } func (m *Metrics) updateCollectorsConfig() { + m.collectorsMutex.Lock() + defer m.collectorsMutex.Unlock() log.Trace("Updating collector config") for _, collector := range m.collectors { if nginxCollector, ok := collector.(*collectors.NginxCollector); ok { diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index 249ff15ef2..91983146b2 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -106,7 +106,6 @@ func (r *MetricsThrottle) Process(msg *core.Message) { } collection := metrics.SaveCollections(*r.metricsCollections[report.Type], report) r.metricsCollections[report.Type] = &collection - log.Debugf("MetricsThrottle: Metrics collection saved [Type: %d]", report.Type) } } r.mu.Unlock() diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go index 0bedeedd8a..6eaa73ca61 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error { } func (r *metricReporter) Close() (err error) { + r.mu.Lock() + defer r.mu.Unlock() return r.closeConnection() } diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0b6f32e8d8..ed2595be87 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/nginx/agent/sdk/v2/backoff" @@ -43,6 +44,8 @@ const ( httpClientTimeout = 1 * time.Second ) +var readLock = sync.Mutex{} + type DirectoryMap struct { paths map[string]*proto.Directory } @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives( allowedDirectories map[string]struct{}, ignoreDirectives []string, ) (*proto.NginxConfig, error) { + readLock.Lock() payload, err := crossplane.Parse(confFile, &crossplane.ParseOptions{ IgnoreDirectives: ignoreDirectives, @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives( if err != nil { return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err) } + readLock.Unlock() return nginxConfig, nil } diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go index e4aca09014..b8ed3aa10a 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go @@ -19,6 +19,7 @@ import ( "reflect" "sort" "strings" + "sync" "time" agent_config "github.com/nginx/agent/sdk/v2/agent/config" @@ -50,6 +51,7 @@ const ( var ( Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter)) MigratedEnv = false + cfgMu = sync.Mutex{} ) func SetVersion(version, commit string) { @@ -196,7 +198,7 @@ func RegisterRunner(r func(cmd *cobra.Command, args []string)) { } func GetConfig(clientId string) (*Config, error) { - extensions := []string{} + var extensions []string for _, extension := range Viper.GetStringSlice(agent_config.ExtensionsKey) { if agent_config.IsKnownExtension(extension) { @@ -247,6 +249,8 @@ func GetConfig(clientId string) (*Config, error) { // overwritten or not. func UpdateAgentConfig(systemId string, updateTags []string, updateFeatures []string) (bool, error) { // Get current config on disk + cfgMu.Lock() + defer cfgMu.Unlock() config, err := GetConfig(systemId) if err != nil { log.Errorf("Failed to register config: %v", err) diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go index 5bb0355aa3..3e1b719760 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go @@ -20,6 +20,7 @@ import ( "regexp" "runtime" "strings" + "sync" "syscall" "github.com/google/uuid" @@ -43,7 +44,6 @@ import ( //go:generate mv fake_environment_fixed.go fake_environment_test.go type Environment interface { NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo - // NewHostInfoWithContext(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo GetHostname() (hostname string) GetSystemUUID() (hostId string) ReadDirectory(dir string, ext string) ([]string, error) @@ -70,6 +70,7 @@ type EnvironmentType struct { host *proto.HostInfo virtualizationFunc func(ctx context.Context) (string, string, error) isContainerFunc func() bool + hostMu sync.Mutex } type Process struct { @@ -106,6 +107,7 @@ const ( IsContainerKey = "isContainer" GetContainerIDKey = "GetContainerID" GetSystemUUIDKey = "GetSystemUUIDKey" + ReleaseInfoFile = "/etc/os-release" ) var ( @@ -125,6 +127,8 @@ func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, con func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { defer ctx.Done() + env.hostMu.Lock() + defer env.hostMu.Unlock() // temp cache measure if env.host == nil || clearCache { hostInformation, err := host.InfoWithContext(ctx) @@ -154,7 +158,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer Partitons: disks, Network: env.networks(), Processor: env.processors(hostInformation.KernelArch), - Release: releaseInfo("/etc/os-release"), + Release: releaseInfo(ReleaseInfoFile), Tags: *tags, AgentAccessibleDirs: configDirs, } diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go index 577abc6c05..334df03610 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -37,10 +37,8 @@ const ( ) var ( - logMutex sync.Mutex - unpackMutex sync.Mutex - re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) - plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) + re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) + plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) ) type NginxBinary interface { @@ -55,26 +53,28 @@ type NginxBinary interface { UpdateNginxDetailsFromProcesses(nginxProcesses []*Process) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApply, error) ReadConfig(path, nginxId, systemId string) (*proto.NginxConfig, error) - UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool + UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string GetAccessLogs() map[string]string GetErrorLogs() map[string]string GetChildProcesses() map[string][]*proto.NginxDetails } type NginxBinaryType struct { - detailsMapMutex sync.Mutex - workersMapMutex sync.Mutex - statusUrlMutex sync.RWMutex - env Environment - config *config.Config - nginxDetailsMap map[string]*proto.NginxDetails - nginxWorkersMap map[string][]*proto.NginxDetails - nginxInfoMap map[string]*nginxInfo - accessLogs map[string]string - errorLogs map[string]string - statusUrls map[string]string - accessLogsUpdated bool - errorLogsUpdated bool + detailsMapMutex sync.Mutex + workersMapMutex sync.Mutex + logMutex sync.Mutex + logWriteMutex sync.Mutex + unpackMutex sync.Mutex + mapMutex sync.Mutex + statusUrlMutex sync.RWMutex + env Environment + config *config.Config + nginxDetailsMap map[string]*proto.NginxDetails + nginxWorkersMap map[string][]*proto.NginxDetails + nginxInfoMap map[string]*nginxInfo + accessLogs map[string]string + errorLogs map[string]string + statusUrls map[string]string } type nginxInfo struct { @@ -408,8 +408,8 @@ func (n *NginxBinaryType) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApp return nil, fmt.Errorf("config directory %s not allowed", filepath.Dir(details.ConfPath)) } - unpackMutex.Lock() - defer unpackMutex.Unlock() + n.unpackMutex.Lock() + defer n.unpackMutex.Unlock() log.Info("Updating NGINX config") var configApply *sdk.ConfigApply @@ -661,29 +661,22 @@ func (n *NginxBinaryType) ReadConfig(confFile, nginxId, systemId string) (*proto return nil, err } - // get access logs list for analysis - accessLogs := AccessLogs(configPayload) - // get error logs list for analysis - errorLogs := ErrorLogs(configPayload) + n.logWriteMutex.Lock() + defer n.logWriteMutex.Unlock() + n.accessLogs = n.UpdateLogs(n.GetAccessLogs(), AccessLogs(configPayload)) - logMutex.Lock() - defer logMutex.Unlock() - - n.accessLogsUpdated = n.UpdateLogs(n.accessLogs, accessLogs) - n.errorLogsUpdated = n.UpdateLogs(n.errorLogs, errorLogs) + n.errorLogs = n.UpdateLogs(n.GetErrorLogs(), ErrorLogs(configPayload)) return configPayload, nil } func (n *NginxBinaryType) GetAccessLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() return n.accessLogs } func (n *NginxBinaryType) GetErrorLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() + n.logMutex.Lock() + defer n.logMutex.Unlock() return n.errorLogs } @@ -802,6 +795,9 @@ func parseConfigureArguments(line string) (result map[string]interface{}, flags } func (n *NginxBinaryType) getNginxInfoFrom(ngxExe string) *nginxInfo { + n.mapMutex.Lock() + defer n.mapMutex.Unlock() + if ngxExe == "" { return &nginxInfo{} } @@ -905,27 +901,29 @@ func (n *NginxBinaryType) parseModulePath(dir string) ([]string, error) { return result, nil } -func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool { - logUpdated := false +func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string { + log.Debug("UpdateLogs") + + copiedLogs := make(map[string]string, len(existingLogs)) + // Copy each key-value pair from the original map to the new map + for logFile, logFormat := range existingLogs { + copiedLogs[logFile] = logFormat + } for logFile, logFormat := range newLogs { if !(strings.HasPrefix(logFile, "syslog:") || n.SkipLog(logFile)) { - if _, found := existingLogs[logFile]; !found || existingLogs[logFile] != logFormat { - logUpdated = true - } - existingLogs[logFile] = logFormat + copiedLogs[logFile] = logFormat } } // delete old logs - for logFile := range existingLogs { + for logFile := range copiedLogs { if _, found := newLogs[logFile]; !found { - delete(existingLogs, logFile) - logUpdated = true + delete(copiedLogs, logFile) } } - return logUpdated + return copiedLogs } func parseNginxVersion(line string) (version, plusVersion string) { diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go index 47311e79dc..481da7f36e 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go @@ -37,6 +37,7 @@ type MessagePipe struct { extensionPlugins []ExtensionPlugin ctx context.Context mu sync.RWMutex + regMu sync.Mutex bus message_bus.MessageBus } @@ -72,7 +73,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } @@ -82,7 +85,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.extensionPlugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } diff --git a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go index dfb498adf5..73a64e226d 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go @@ -133,9 +133,9 @@ func (m *MockNginxBinary) GetNginxDetailsFromProcess(nginxProcess *core.Process) return args.Get(0).(*proto.NginxDetails) } -func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) bool { +func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) map[string]string { args := m.Called(existing, newLogs) - return args.Bool(0) + return args.Get(0).(map[string]string) } func (m *MockNginxBinary) GetAccessLogs() map[string]string { diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go index 0bedeedd8a..6eaa73ca61 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error { } func (r *metricReporter) Close() (err error) { + r.mu.Lock() + defer r.mu.Unlock() return r.closeConnection() } diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0b6f32e8d8..ed2595be87 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/nginx/agent/sdk/v2/backoff" @@ -43,6 +44,8 @@ const ( httpClientTimeout = 1 * time.Second ) +var readLock = sync.Mutex{} + type DirectoryMap struct { paths map[string]*proto.Directory } @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives( allowedDirectories map[string]struct{}, ignoreDirectives []string, ) (*proto.NginxConfig, error) { + readLock.Lock() payload, err := crossplane.Parse(confFile, &crossplane.ParseOptions{ IgnoreDirectives: ignoreDirectives, @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives( if err != nil { return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err) } + readLock.Unlock() return nginxConfig, nil } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go index e4aca09014..b8ed3aa10a 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go @@ -19,6 +19,7 @@ import ( "reflect" "sort" "strings" + "sync" "time" agent_config "github.com/nginx/agent/sdk/v2/agent/config" @@ -50,6 +51,7 @@ const ( var ( Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter)) MigratedEnv = false + cfgMu = sync.Mutex{} ) func SetVersion(version, commit string) { @@ -196,7 +198,7 @@ func RegisterRunner(r func(cmd *cobra.Command, args []string)) { } func GetConfig(clientId string) (*Config, error) { - extensions := []string{} + var extensions []string for _, extension := range Viper.GetStringSlice(agent_config.ExtensionsKey) { if agent_config.IsKnownExtension(extension) { @@ -247,6 +249,8 @@ func GetConfig(clientId string) (*Config, error) { // overwritten or not. func UpdateAgentConfig(systemId string, updateTags []string, updateFeatures []string) (bool, error) { // Get current config on disk + cfgMu.Lock() + defer cfgMu.Unlock() config, err := GetConfig(systemId) if err != nil { log.Errorf("Failed to register config: %v", err) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go index 5bb0355aa3..3e1b719760 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go @@ -20,6 +20,7 @@ import ( "regexp" "runtime" "strings" + "sync" "syscall" "github.com/google/uuid" @@ -43,7 +44,6 @@ import ( //go:generate mv fake_environment_fixed.go fake_environment_test.go type Environment interface { NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo - // NewHostInfoWithContext(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo GetHostname() (hostname string) GetSystemUUID() (hostId string) ReadDirectory(dir string, ext string) ([]string, error) @@ -70,6 +70,7 @@ type EnvironmentType struct { host *proto.HostInfo virtualizationFunc func(ctx context.Context) (string, string, error) isContainerFunc func() bool + hostMu sync.Mutex } type Process struct { @@ -106,6 +107,7 @@ const ( IsContainerKey = "isContainer" GetContainerIDKey = "GetContainerID" GetSystemUUIDKey = "GetSystemUUIDKey" + ReleaseInfoFile = "/etc/os-release" ) var ( @@ -125,6 +127,8 @@ func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, con func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { defer ctx.Done() + env.hostMu.Lock() + defer env.hostMu.Unlock() // temp cache measure if env.host == nil || clearCache { hostInformation, err := host.InfoWithContext(ctx) @@ -154,7 +158,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer Partitons: disks, Network: env.networks(), Processor: env.processors(hostInformation.KernelArch), - Release: releaseInfo("/etc/os-release"), + Release: releaseInfo(ReleaseInfoFile), Tags: *tags, AgentAccessibleDirs: configDirs, } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go index 98a3847a2d..3bf7eafb5c 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go @@ -14,7 +14,6 @@ import ( "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" - log "github.com/sirupsen/logrus" ) const MOUNT_POINT = "mount_point" @@ -49,8 +48,6 @@ func (c *Disk) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrapper "in_use": float64(usage.UsedPercentage), }) - log.Debugf("disk metrics collected: %v", len(simpleMetrics)) - select { case <-ctx.Done(): return diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/net_io.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/net_io.go index e3ef91cd83..13fcc2f24b 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/net_io.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/net_io.go @@ -12,11 +12,12 @@ import ( "fmt" "sync" + log "github.com/sirupsen/logrus" + "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" "github.com/shirou/gopsutil/v3/net" - log "github.com/sirupsen/logrus" ) const NETWORK_INTERFACE = "network_interface" @@ -82,7 +83,6 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap } simpleMetrics := nio.convertSamplesToSimpleMetrics(v) - log.Debugf("net IO stats count: %d", len(simpleMetrics)) select { case <-ctx.Done(): @@ -109,6 +109,7 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap simpleMetrics := nio.convertSamplesToSimpleMetrics(totalStats) m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{}, simpleMetrics, proto.MetricsReport_SYSTEM) + log.Debugf("net IO stats: %v", currentNetIOStats) nio.netIOStats = currentNetIOStats } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go index 46c50b5830..b52a758428 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go @@ -182,7 +182,6 @@ func (c *NginxAccessLog) Stop() { fn() delete(c.logs, f) } - log.Debugf("Stopping NginxAccessLog source for nginx id: %v", c.baseDimensions.NginxId) } func (c *NginxAccessLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) { @@ -329,11 +328,11 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string mu.Unlock() case <-tick.C: + mu.Lock() + c.baseDimensions.NginxType = c.nginxType c.baseDimensions.PublishedAPI = logFile - mu.Lock() - if len(requestLengths) > 0 { httpCounters["request.length"] = getAverageMetricValue(requestLengths) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_error_log.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_error_log.go index 06368eb93a..52eb63ca4d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_error_log.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_error_log.go @@ -116,7 +116,6 @@ func (c *NginxErrorLog) Stop() { func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.mu.Lock() - defer c.mu.Unlock() c.baseDimensions = dimensions @@ -129,6 +128,7 @@ func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *met // add, remove or update existing log trailers c.syncLogs() } + c.mu.Unlock() } func (c *NginxErrorLog) recreateLogs() { @@ -178,7 +178,7 @@ func (c *NginxErrorLog) stopTailer(logFile string, cancelFunction context.Cancel delete(c.logFormats, logFile) } -func (c *NginxErrorLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) { +func (c *NginxErrorLog) collectLogStats(_ context.Context, m chan<- *metrics.StatsEntityWrapper) { c.mu.Lock() defer c.mu.Unlock() @@ -227,10 +227,10 @@ func (c *NginxErrorLog) logStats(ctx context.Context, logFile string) { mu.Unlock() case <-tick.C: + mu.Lock() c.baseDimensions.NginxType = c.nginxType c.baseDimensions.PublishedAPI = logFile - mu.Lock() simpleMetrics := c.convertSamplesToSimpleMetrics(counters) log.Tracef("Error log metrics collected: %v", simpleMetrics) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go index 577abc6c05..334df03610 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -37,10 +37,8 @@ const ( ) var ( - logMutex sync.Mutex - unpackMutex sync.Mutex - re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) - plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) + re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) + plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) ) type NginxBinary interface { @@ -55,26 +53,28 @@ type NginxBinary interface { UpdateNginxDetailsFromProcesses(nginxProcesses []*Process) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApply, error) ReadConfig(path, nginxId, systemId string) (*proto.NginxConfig, error) - UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool + UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string GetAccessLogs() map[string]string GetErrorLogs() map[string]string GetChildProcesses() map[string][]*proto.NginxDetails } type NginxBinaryType struct { - detailsMapMutex sync.Mutex - workersMapMutex sync.Mutex - statusUrlMutex sync.RWMutex - env Environment - config *config.Config - nginxDetailsMap map[string]*proto.NginxDetails - nginxWorkersMap map[string][]*proto.NginxDetails - nginxInfoMap map[string]*nginxInfo - accessLogs map[string]string - errorLogs map[string]string - statusUrls map[string]string - accessLogsUpdated bool - errorLogsUpdated bool + detailsMapMutex sync.Mutex + workersMapMutex sync.Mutex + logMutex sync.Mutex + logWriteMutex sync.Mutex + unpackMutex sync.Mutex + mapMutex sync.Mutex + statusUrlMutex sync.RWMutex + env Environment + config *config.Config + nginxDetailsMap map[string]*proto.NginxDetails + nginxWorkersMap map[string][]*proto.NginxDetails + nginxInfoMap map[string]*nginxInfo + accessLogs map[string]string + errorLogs map[string]string + statusUrls map[string]string } type nginxInfo struct { @@ -408,8 +408,8 @@ func (n *NginxBinaryType) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApp return nil, fmt.Errorf("config directory %s not allowed", filepath.Dir(details.ConfPath)) } - unpackMutex.Lock() - defer unpackMutex.Unlock() + n.unpackMutex.Lock() + defer n.unpackMutex.Unlock() log.Info("Updating NGINX config") var configApply *sdk.ConfigApply @@ -661,29 +661,22 @@ func (n *NginxBinaryType) ReadConfig(confFile, nginxId, systemId string) (*proto return nil, err } - // get access logs list for analysis - accessLogs := AccessLogs(configPayload) - // get error logs list for analysis - errorLogs := ErrorLogs(configPayload) + n.logWriteMutex.Lock() + defer n.logWriteMutex.Unlock() + n.accessLogs = n.UpdateLogs(n.GetAccessLogs(), AccessLogs(configPayload)) - logMutex.Lock() - defer logMutex.Unlock() - - n.accessLogsUpdated = n.UpdateLogs(n.accessLogs, accessLogs) - n.errorLogsUpdated = n.UpdateLogs(n.errorLogs, errorLogs) + n.errorLogs = n.UpdateLogs(n.GetErrorLogs(), ErrorLogs(configPayload)) return configPayload, nil } func (n *NginxBinaryType) GetAccessLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() return n.accessLogs } func (n *NginxBinaryType) GetErrorLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() + n.logMutex.Lock() + defer n.logMutex.Unlock() return n.errorLogs } @@ -802,6 +795,9 @@ func parseConfigureArguments(line string) (result map[string]interface{}, flags } func (n *NginxBinaryType) getNginxInfoFrom(ngxExe string) *nginxInfo { + n.mapMutex.Lock() + defer n.mapMutex.Unlock() + if ngxExe == "" { return &nginxInfo{} } @@ -905,27 +901,29 @@ func (n *NginxBinaryType) parseModulePath(dir string) ([]string, error) { return result, nil } -func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool { - logUpdated := false +func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string { + log.Debug("UpdateLogs") + + copiedLogs := make(map[string]string, len(existingLogs)) + // Copy each key-value pair from the original map to the new map + for logFile, logFormat := range existingLogs { + copiedLogs[logFile] = logFormat + } for logFile, logFormat := range newLogs { if !(strings.HasPrefix(logFile, "syslog:") || n.SkipLog(logFile)) { - if _, found := existingLogs[logFile]; !found || existingLogs[logFile] != logFormat { - logUpdated = true - } - existingLogs[logFile] = logFormat + copiedLogs[logFile] = logFormat } } // delete old logs - for logFile := range existingLogs { + for logFile := range copiedLogs { if _, found := newLogs[logFile]; !found { - delete(existingLogs, logFile) - logUpdated = true + delete(copiedLogs, logFile) } } - return logUpdated + return copiedLogs } func parseNginxVersion(line string) (version, plusVersion string) { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go index 47311e79dc..481da7f36e 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go @@ -37,6 +37,7 @@ type MessagePipe struct { extensionPlugins []ExtensionPlugin ctx context.Context mu sync.RWMutex + regMu sync.Mutex bus message_bus.MessageBus } @@ -72,7 +73,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } @@ -82,7 +85,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.extensionPlugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go index be11792834..582796de8d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go @@ -102,8 +102,6 @@ func (c *Commander) agentBackoff(agentConfig *proto.AgentConfig) { func (c *Commander) agentRegistered(cmd *proto.Command) { switch commandData := cmd.GetData().(type) { case *proto.Command_AgentConnectResponse: - log.Infof("config command %v", commandData) - if agtCfg := commandData.AgentConnectResponse.AgentConfig; agtCfg != nil && agtCfg.Configs != nil && len(agtCfg.Configs.Configs) > 0 { for _, config := range agtCfg.Configs.Configs { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/config_reader.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/config_reader.go index eb9606eac9..978f01b7f6 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/config_reader.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/config_reader.go @@ -27,6 +27,7 @@ type ConfigReader struct { messagePipeline core.MessagePipeInterface config *config.Config mu sync.RWMutex + detailsMu sync.RWMutex } func NewConfigReader(config *config.Config) *ConfigReader { @@ -85,7 +86,10 @@ func (r *ConfigReader) Subscriptions() []string { } func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) { - if payloadAgentConfig != nil && payloadAgentConfig.Details != nil { + r.mu.Lock() + defer r.mu.Unlock() + + if payloadAgentConfig.Details != nil { onDiskAgentConfig, err := config.GetConfig(r.config.ClientID) if err != nil { log.Errorf("Failed to update Agent config - %v", err) @@ -96,21 +100,29 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) synchronizeTags := false if payloadAgentConfig.Details.Features != nil { - + r.detailsMu.Lock() for index, feature := range payloadAgentConfig.Details.Features { payloadAgentConfig.Details.Features[index] = strings.Replace(feature, "features_", "", 1) } sort.Strings(onDiskAgentConfig.Features) sort.Strings(payloadAgentConfig.Details.Features) + r.detailsMu.Unlock() + + r.detailsMu.RLock() synchronizeFeatures = !reflect.DeepEqual(payloadAgentConfig.Details.Features, onDiskAgentConfig.Features) + r.detailsMu.RUnlock() } else { + r.detailsMu.Lock() payloadAgentConfig.Details.Features = onDiskAgentConfig.Features + r.detailsMu.Unlock() } if payloadAgentConfig.Details.Tags == nil { + r.detailsMu.Lock() payloadAgentConfig.Details.Tags = []string{} + r.detailsMu.Unlock() } sort.Strings(onDiskAgentConfig.Tags) @@ -118,7 +130,9 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) synchronizeTags = !reflect.DeepEqual(payloadAgentConfig.Details.Tags, onDiskAgentConfig.Tags) if synchronizeFeatures || synchronizeTags { - configUpdated, err := config.UpdateAgentConfig(r.config.ClientID, payloadAgentConfig.Details.Tags, payloadAgentConfig.Details.Features) + tags := payloadAgentConfig.Details.Tags + features := payloadAgentConfig.Details.Features + configUpdated, err := config.UpdateAgentConfig(r.config.ClientID, tags, features) if err != nil { log.Errorf("Failed updating Agent config - %v", err) } @@ -142,25 +156,22 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) } r.messagePipeline.Process(core.NewMessage(core.AgentConfigChanged, payloadAgentConfig)) - } } func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) { if r.config != nil { + r.detailsMu.RLock() for _, feature := range r.config.Features { if feature != agent_config.FeatureRegistration && feature != agent_config.FeatureNginxConfigAsync { - r.mu.Lock() r.deRegisterPlugin(feature) - r.mu.Unlock() } } + r.detailsMu.RUnlock() } if agtCfg.Details != nil { - r.mu.Lock() r.messagePipeline.Process(core.NewMessage(core.EnableFeature, agtCfg.Details.Features)) - r.mu.Unlock() } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go index 6b6014a5ff..99f1ffac72 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go @@ -4,7 +4,6 @@ * This source code is licensed under the Apache License, Version 2.0 license found in the * LICENSE file in the root directory of this source tree. */ - package plugins import ( @@ -15,13 +14,12 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" - log "github.com/sirupsen/logrus" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/payloads" + log "github.com/sirupsen/logrus" ) type DataPlaneStatus struct { @@ -42,6 +40,7 @@ type DataPlaneStatus struct { softwareDetails map[string]*proto.DataplaneSoftwareDetails nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus softwareDetailsMutex sync.RWMutex + structMu sync.RWMutex processes []*core.Process } @@ -67,7 +66,6 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core tags: &config.Tags, configDirs: config.ConfigDirs, reportInterval: config.Dataplane.Status.ReportInterval, - softwareDetailsMutex: sync.RWMutex{}, nginxConfigActivityStatuses: make(map[string]*proto.AgentActivityStatus), softwareDetails: make(map[string]*proto.DataplaneSoftwareDetails), processes: processes, @@ -84,11 +82,9 @@ func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) { func (dps *DataPlaneStatus) Close() { log.Info("DataPlaneStatus is wrapping up") dps.nginxConfigActivityStatuses = nil - dps.softwareDetailsMutex.Lock() dps.softwareDetails = nil dps.softwareDetailsMutex.Unlock() - dps.healthTicker.Stop() dps.sendStatus <- true } @@ -103,7 +99,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) // If the agent config on disk changed update DataPlaneStatus with relevant config info dps.syncAgentConfigChange() - case msg.Exact(core.DataplaneSoftwareDetailsUpdated): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -112,7 +107,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { dps.softwareDetails[data.GetPluginName()] = data.GetDataplaneSoftwareDetails() dps.softwareDetailsMutex.Unlock() } - case msg.Exact(core.NginxConfigValidationPending): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -121,7 +115,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { default: log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } - case msg.Exact(core.NginxConfigApplyFailed) || msg.Exact(core.NginxConfigApplySucceeded): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -132,7 +125,9 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } case msg.Exact(core.NginxDetailProcUpdate): + dps.structMu.Lock() dps.processes = msg.Data().([]*core.Process) + dps.structMu.Unlock() } } @@ -193,15 +188,12 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses { agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus) } - dps.softwareDetailsMutex.Lock() defer dps.softwareDetailsMutex.Unlock() - dataplaneSoftwareDetails := []*proto.DataplaneSoftwareDetails{} for _, softwareDetail := range dps.softwareDetails { dataplaneSoftwareDetails = append(dataplaneSoftwareDetails, softwareDetail) } - dataplaneStatus := &proto.DataplaneStatus{ Host: dps.hostInfo(forceDetails), Details: dps.detailsForProcess(dps.processes, forceDetails), @@ -214,6 +206,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { // this sets send if we are forcing details, or it has been 24 hours since the last send + dps.structMu.Lock() + defer dps.structMu.Unlock() hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, send) if !send && cmp.Equal(dps.envHostInfo, hostInfo) { return nil @@ -227,7 +221,6 @@ func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bool) (details []*proto.NginxDetails) { log.Tracef("detailsForProcess processes: %v", processes) - nowUTC := time.Now().UTC() // this sets send if we are forcing details, or it has been 24 hours since the last send for _, p := range processes { @@ -246,7 +239,9 @@ func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bo return nil } + dps.structMu.Lock() dps.lastSendDetails = nowUTC + dps.structMu.Unlock() return details } @@ -255,7 +250,6 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails := make(map[string]*proto.NginxHealth) instanceProcessCount := make(map[string]int) log.Tracef("healthForProcess processes: %v", processes) - for _, p := range processes { instanceID := dps.binary.GetNginxIDForProcess(p) log.Tracef("Process: %v instanceID %s", p, instanceID) @@ -277,10 +271,8 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails[instanceID].NginxStatus = proto.NginxHealth_DEGRADED } } - for instanceID, health := range heathDetails { log.Tracef("instanceID: %s health: %s", instanceID, health) - if instanceProcessCount[instanceID] <= 1 { reason := "does not have enough children" if heathDetails[instanceID].NginxStatus == proto.NginxHealth_DEGRADED { @@ -301,20 +293,22 @@ func (dps *DataPlaneStatus) syncAgentConfigChange() { return } log.Debugf("DataPlaneStatus is updating to a new config - %v", conf) - pollInt := conf.Dataplane.Status.PollInterval if pollInt < defaultMinInterval { pollInt = defaultMinInterval log.Warnf("interval set to %s, provided value (%s) less than minimum", pollInt, conf.Dataplane.Status.PollInterval) } - if conf.DisplayName == "" { conf.DisplayName = dps.env.GetHostname() log.Infof("setting displayName to %s", conf.DisplayName) } // Update DataPlaneStatus with relevant config info + dps.structMu.Lock() + dps.interval = pollInt dps.tags = &conf.Tags dps.configDirs = conf.ConfigDirs + + dps.structMu.Unlock() } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index 55238474c4..97b0de01f1 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -71,7 +71,9 @@ func (m *Metrics) Init(pipeline core.MessagePipeInterface) { } func (m *Metrics) Close() { + m.collectorsMutex.Lock() m.collectors = nil + m.collectorsMutex.Unlock() log.Info("Metrics is wrapping up") } @@ -336,6 +338,8 @@ func createCollectorConfigsMap(config *config.Config, binary core.NginxBinary, p } func (m *Metrics) updateCollectorsConfig() { + m.collectorsMutex.Lock() + defer m.collectorsMutex.Unlock() log.Trace("Updating collector config") for _, collector := range m.collectors { if nginxCollector, ok := collector.(*collectors.NginxCollector); ok { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index 249ff15ef2..91983146b2 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -106,7 +106,6 @@ func (r *MetricsThrottle) Process(msg *core.Message) { } collection := metrics.SaveCollections(*r.metricsCollections[report.Type], report) r.metricsCollections[report.Type] = &collection - log.Debugf("MetricsThrottle: Metrics collection saved [Type: %d]", report.Type) } } r.mu.Unlock() diff --git a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go index dfb498adf5..73a64e226d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go @@ -133,9 +133,9 @@ func (m *MockNginxBinary) GetNginxDetailsFromProcess(nginxProcess *core.Process) return args.Get(0).(*proto.NginxDetails) } -func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) bool { +func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) map[string]string { args := m.Called(existing, newLogs) - return args.Bool(0) + return args.Get(0).(map[string]string) } func (m *MockNginxBinary) GetAccessLogs() map[string]string { diff --git a/test/utils/nginx.go b/test/utils/nginx.go index dfb498adf5..73a64e226d 100644 --- a/test/utils/nginx.go +++ b/test/utils/nginx.go @@ -133,9 +133,9 @@ func (m *MockNginxBinary) GetNginxDetailsFromProcess(nginxProcess *core.Process) return args.Get(0).(*proto.NginxDetails) } -func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) bool { +func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) map[string]string { args := m.Called(existing, newLogs) - return args.Bool(0) + return args.Get(0).(map[string]string) } func (m *MockNginxBinary) GetAccessLogs() map[string]string { diff --git a/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go b/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go index 0bedeedd8a..6eaa73ca61 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go +++ b/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error { } func (r *metricReporter) Close() (err error) { + r.mu.Lock() + defer r.mu.Unlock() return r.closeConnection() } diff --git a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0b6f32e8d8..ed2595be87 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/nginx/agent/sdk/v2/backoff" @@ -43,6 +44,8 @@ const ( httpClientTimeout = 1 * time.Second ) +var readLock = sync.Mutex{} + type DirectoryMap struct { paths map[string]*proto.Directory } @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives( allowedDirectories map[string]struct{}, ignoreDirectives []string, ) (*proto.NginxConfig, error) { + readLock.Lock() payload, err := crossplane.Parse(confFile, &crossplane.ParseOptions{ IgnoreDirectives: ignoreDirectives, @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives( if err != nil { return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err) } + readLock.Unlock() return nginxConfig, nil }