From 66f75dc6807297a68fa578d5a01c299bb7e10cc3 Mon Sep 17 00:00:00 2001 From: spencerugbo <102359791+spencerugbo@users.noreply.github.com> Date: Tue, 10 Sep 2024 09:39:52 +0100 Subject: [PATCH 1/4] Add automatic changelog generation in release workflow (#784) --- .github/workflows/release-branch.yml | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/.github/workflows/release-branch.yml b/.github/workflows/release-branch.yml index 78fcb34ba6..d1b6e9aae1 100644 --- a/.github/workflows/release-branch.yml +++ b/.github/workflows/release-branch.yml @@ -10,11 +10,6 @@ on: required: true type: boolean default: false - createPullRequest: - description: 'Create pull request back into main' - required: true - type: boolean - default: false uploadJWT: description: 'Temporary JWT to publish packages to up-ap.nginx.com' required: true @@ -125,12 +120,27 @@ jobs: run: | sudo apt-get update sudo apt-get install -y gpgv1 monkeysphere - go install github.com/goreleaser/nfpm/v2/cmd/nfpm@${{ env.NFPM_VERSION }} - - name: Tag release + go install github.com/goreleaser/nfpm/v2/cmd/nfpm@v2.35.3 + - name: Generate Changelog + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: | + git clone https://github.com/nginx/agent-changelog.git + cd ./agent-changelog/source + pip install -r requirements.txt + python agent.py + - name: Push Changelog + run: | + mv agent-changelog/source/changelog.md ./site/content/ + git config --global user.name 'github-actions' git config --global user.email '41898282+github-actions[bot]@users.noreply.github.com' + git add ./site/content/changelog.md + git commit -m "Add generated changelog" + git push origin HEAD:${{ github.ref_name }} + - name: Tag release + run: | git tag -a "v${{env.VERSION}}" -m "CI Autogenerated" git tag -a "sdk/v${{env.VERSION}}" -m "CI Autogenerated" - name: Push Tags @@ -175,8 +185,6 @@ jobs: az logout if: always() - name: Upload Release Assets - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # clobber overwrites existing assets of the same name run: | gh release upload --clobber v${{env.VERSION}} \ @@ -202,7 +210,7 @@ jobs: })) console.log(`Release published: ${release.data.html_url}`) - name: Create Pull Request - if: ${{ inputs.publishPackages == true && inputs.createPullRequest == true }} + if: ${{ inputs.publishPackages == true }} uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1 with: script: | From 9403d2a9d77787909e6364b3acf35b596d29f6fe Mon Sep 17 00:00:00 2001 From: Christopher van de Sande Date: Tue, 10 Sep 2024 10:40:44 +0100 Subject: [PATCH 2/4] Add checks for expired package signing key --- Makefile.packaging | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/Makefile.packaging b/Makefile.packaging index 84976dd54d..ab5b7b7f46 100644 --- a/Makefile.packaging +++ b/Makefile.packaging @@ -182,11 +182,18 @@ package: gpg-key $(PACKAGES_DIR) $(GITHUB_PACKAGES_DIR) $(AZURE_PACKAGES_DIR) ## gpg-key: ## Generate GPG public key $$(gpg --import $(NFPM_SIGNING_KEY_FILE)); \ keyid=$$(gpg --list-keys NGINX | egrep -A1 "^pub" | egrep -v "^pub" | tr -d '[:space:]'); \ + if [ -z "$$keyid" ]; then echo "Error: GPG key not found."; exit 1; fi; \ + # Check if the key is expired \ + # Look for the 'e' (expired) flag in the 'pub' or 'uid' lines \ + if gpg --list-keys --with-colons "$$keyid" | grep -E '^pub:e:|^uid:e:'; then \ + echo "Error: GPG key has expired."; \ + exit 1; \ + fi; \ expiry=1y; \ $$(gpg --quick-set-expire $$keyid $$expiry '*'); \ - # we need to convert the private gpg key to rsa pem format for pkg signing \ + # Convert the private GPG key to RSA PEM format for pkg signing \ $$(gpg --export-secret-key $$keyid | openpgp2ssh $$keyid > .key.rsa); \ - $$(gpg --output $(GPG_PUBLIC_KEY) --armor --export) + $$(gpg --output $(GPG_PUBLIC_KEY) --armor --export $$keyid) release: ## Publish tarball to the UPLOAD_URL echo "Publishing nginx-agent packages to ${UPLOAD_URL}"; \ From de63c35385ea89f3206659a7bd8e7ec4e15dcf97 Mon Sep 17 00:00:00 2001 From: oliveromahony Date: Wed, 11 Sep 2024 10:36:28 +0100 Subject: [PATCH 3/4] Race conditions fixes (#810) Added race conditions fixes to Agent v2 codebase --- .github/workflows/ci.yml | 2 +- nginx-agent.conf | 2 +- sdk/client/metric_reporter.go | 2 + sdk/config_helpers.go | 5 ++ src/core/config/config.go | 6 +- src/core/config/config_test.go | 4 +- src/core/environment.go | 8 +- src/core/metrics/sources/disk.go | 3 - src/core/metrics/sources/net_io.go | 5 +- src/core/metrics/sources/nginx_access_log.go | 5 +- src/core/metrics/sources/nginx_error_log.go | 6 +- src/core/nginx.go | 84 +++++++++---------- src/core/pipe.go | 5 ++ src/plugins/commander.go | 2 - src/plugins/config_reader.go | 27 ++++-- src/plugins/dataplane_status.go | 30 +++---- src/plugins/metrics.go | 4 + src/plugins/metrics_throlling.go | 1 - .../agent/sdk/v2/client/metric_reporter.go | 2 + .../nginx/agent/sdk/v2/config_helpers.go | 5 ++ .../nginx/agent/v2/src/core/config/config.go | 6 +- .../nginx/agent/v2/src/core/environment.go | 8 +- .../nginx/agent/v2/src/core/nginx.go | 84 +++++++++---------- .../nginx/agent/v2/src/core/pipe.go | 5 ++ .../nginx/agent/v2/test/utils/nginx.go | 4 +- .../agent/sdk/v2/client/metric_reporter.go | 2 + .../nginx/agent/sdk/v2/config_helpers.go | 5 ++ .../nginx/agent/v2/src/core/config/config.go | 6 +- .../nginx/agent/v2/src/core/environment.go | 8 +- .../agent/v2/src/core/metrics/sources/disk.go | 3 - .../v2/src/core/metrics/sources/net_io.go | 5 +- .../core/metrics/sources/nginx_access_log.go | 5 +- .../core/metrics/sources/nginx_error_log.go | 6 +- .../nginx/agent/v2/src/core/nginx.go | 84 +++++++++---------- .../nginx/agent/v2/src/core/pipe.go | 5 ++ .../nginx/agent/v2/src/plugins/commander.go | 2 - .../agent/v2/src/plugins/config_reader.go | 27 ++++-- .../agent/v2/src/plugins/dataplane_status.go | 30 +++---- .../nginx/agent/v2/src/plugins/metrics.go | 4 + .../agent/v2/src/plugins/metrics_throlling.go | 1 - .../nginx/agent/v2/test/utils/nginx.go | 4 +- test/utils/nginx.go | 4 +- .../agent/sdk/v2/client/metric_reporter.go | 2 + .../nginx/agent/sdk/v2/config_helpers.go | 5 ++ 44 files changed, 295 insertions(+), 228 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9455dfe519..6252ac4a25 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -238,7 +238,7 @@ jobs: name: official-oss-integration-test-logs-${{ matrix.container.image }}-${{ matrix.container.version }} path: /tmp/integration-test-logs/ retention-days: 3 - + official-plus-image-integration-tests: name: Integration Tests - Official Plus Images needs: build-unsigned-snapshot diff --git a/nginx-agent.conf b/nginx-agent.conf index 5a714bb5c7..792665e3c8 100644 --- a/nginx-agent.conf +++ b/nginx-agent.conf @@ -46,4 +46,4 @@ config_dirs: "/etc/nginx:/usr/local/etc/nginx:/usr/share/nginx/modules:/etc/nms" # host: 127.0.0.1 # # Set this value to a secure port number to prevent information leaks. - # port: 8038 \ No newline at end of file + # port: 8038 diff --git a/sdk/client/metric_reporter.go b/sdk/client/metric_reporter.go index 0bedeedd8a..6eaa73ca61 100644 --- a/sdk/client/metric_reporter.go +++ b/sdk/client/metric_reporter.go @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error { } func (r *metricReporter) Close() (err error) { + r.mu.Lock() + defer r.mu.Unlock() return r.closeConnection() } diff --git a/sdk/config_helpers.go b/sdk/config_helpers.go index 0b6f32e8d8..ed2595be87 100644 --- a/sdk/config_helpers.go +++ b/sdk/config_helpers.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/nginx/agent/sdk/v2/backoff" @@ -43,6 +44,8 @@ const ( httpClientTimeout = 1 * time.Second ) +var readLock = sync.Mutex{} + type DirectoryMap struct { paths map[string]*proto.Directory } @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives( allowedDirectories map[string]struct{}, ignoreDirectives []string, ) (*proto.NginxConfig, error) { + readLock.Lock() payload, err := crossplane.Parse(confFile, &crossplane.ParseOptions{ IgnoreDirectives: ignoreDirectives, @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives( if err != nil { return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err) } + readLock.Unlock() return nginxConfig, nil } diff --git a/src/core/config/config.go b/src/core/config/config.go index e4aca09014..b8ed3aa10a 100644 --- a/src/core/config/config.go +++ b/src/core/config/config.go @@ -19,6 +19,7 @@ import ( "reflect" "sort" "strings" + "sync" "time" agent_config "github.com/nginx/agent/sdk/v2/agent/config" @@ -50,6 +51,7 @@ const ( var ( Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter)) MigratedEnv = false + cfgMu = sync.Mutex{} ) func SetVersion(version, commit string) { @@ -196,7 +198,7 @@ func RegisterRunner(r func(cmd *cobra.Command, args []string)) { } func GetConfig(clientId string) (*Config, error) { - extensions := []string{} + var extensions []string for _, extension := range Viper.GetStringSlice(agent_config.ExtensionsKey) { if agent_config.IsKnownExtension(extension) { @@ -247,6 +249,8 @@ func GetConfig(clientId string) (*Config, error) { // overwritten or not. func UpdateAgentConfig(systemId string, updateTags []string, updateFeatures []string) (bool, error) { // Get current config on disk + cfgMu.Lock() + defer cfgMu.Unlock() config, err := GetConfig(systemId) if err != nil { log.Errorf("Failed to register config: %v", err) diff --git a/src/core/config/config_test.go b/src/core/config/config_test.go index 9d5767cfe8..7ffcea761e 100644 --- a/src/core/config/config_test.go +++ b/src/core/config/config_test.go @@ -163,7 +163,7 @@ func TestGetConfig(t *testing.T) { assert.Equal(t, []string{}, config.Tags) assert.Equal(t, Defaults.Features, config.Features) - assert.Equal(t, []string{}, config.Extensions) + assert.Equal(t, []string(nil), config.Extensions) }) t.Run("test override defaults with flags", func(t *testing.T) { @@ -274,7 +274,7 @@ func TestGetConfig(t *testing.T) { assert.Equal(t, Defaults.AgentMetrics.Mode, config.AgentMetrics.Mode) assert.Equal(t, 10*time.Minute, config.AgentMetrics.Backoff.MaxInterval) assert.Equal(t, Defaults.Features, config.Features) - assert.Equal(t, []string{}, config.Extensions) + assert.Equal(t, []string(nil), config.Extensions) }) t.Run("test override config values with ENV variables", func(t *testing.T) { diff --git a/src/core/environment.go b/src/core/environment.go index 5bb0355aa3..3e1b719760 100644 --- a/src/core/environment.go +++ b/src/core/environment.go @@ -20,6 +20,7 @@ import ( "regexp" "runtime" "strings" + "sync" "syscall" "github.com/google/uuid" @@ -43,7 +44,6 @@ import ( //go:generate mv fake_environment_fixed.go fake_environment_test.go type Environment interface { NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo - // NewHostInfoWithContext(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo GetHostname() (hostname string) GetSystemUUID() (hostId string) ReadDirectory(dir string, ext string) ([]string, error) @@ -70,6 +70,7 @@ type EnvironmentType struct { host *proto.HostInfo virtualizationFunc func(ctx context.Context) (string, string, error) isContainerFunc func() bool + hostMu sync.Mutex } type Process struct { @@ -106,6 +107,7 @@ const ( IsContainerKey = "isContainer" GetContainerIDKey = "GetContainerID" GetSystemUUIDKey = "GetSystemUUIDKey" + ReleaseInfoFile = "/etc/os-release" ) var ( @@ -125,6 +127,8 @@ func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, con func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { defer ctx.Done() + env.hostMu.Lock() + defer env.hostMu.Unlock() // temp cache measure if env.host == nil || clearCache { hostInformation, err := host.InfoWithContext(ctx) @@ -154,7 +158,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer Partitons: disks, Network: env.networks(), Processor: env.processors(hostInformation.KernelArch), - Release: releaseInfo("/etc/os-release"), + Release: releaseInfo(ReleaseInfoFile), Tags: *tags, AgentAccessibleDirs: configDirs, } diff --git a/src/core/metrics/sources/disk.go b/src/core/metrics/sources/disk.go index 98a3847a2d..3bf7eafb5c 100644 --- a/src/core/metrics/sources/disk.go +++ b/src/core/metrics/sources/disk.go @@ -14,7 +14,6 @@ import ( "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" - log "github.com/sirupsen/logrus" ) const MOUNT_POINT = "mount_point" @@ -49,8 +48,6 @@ func (c *Disk) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrapper "in_use": float64(usage.UsedPercentage), }) - log.Debugf("disk metrics collected: %v", len(simpleMetrics)) - select { case <-ctx.Done(): return diff --git a/src/core/metrics/sources/net_io.go b/src/core/metrics/sources/net_io.go index e3ef91cd83..13fcc2f24b 100644 --- a/src/core/metrics/sources/net_io.go +++ b/src/core/metrics/sources/net_io.go @@ -12,11 +12,12 @@ import ( "fmt" "sync" + log "github.com/sirupsen/logrus" + "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" "github.com/shirou/gopsutil/v3/net" - log "github.com/sirupsen/logrus" ) const NETWORK_INTERFACE = "network_interface" @@ -82,7 +83,6 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap } simpleMetrics := nio.convertSamplesToSimpleMetrics(v) - log.Debugf("net IO stats count: %d", len(simpleMetrics)) select { case <-ctx.Done(): @@ -109,6 +109,7 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap simpleMetrics := nio.convertSamplesToSimpleMetrics(totalStats) m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{}, simpleMetrics, proto.MetricsReport_SYSTEM) + log.Debugf("net IO stats: %v", currentNetIOStats) nio.netIOStats = currentNetIOStats } diff --git a/src/core/metrics/sources/nginx_access_log.go b/src/core/metrics/sources/nginx_access_log.go index 46c50b5830..b52a758428 100644 --- a/src/core/metrics/sources/nginx_access_log.go +++ b/src/core/metrics/sources/nginx_access_log.go @@ -182,7 +182,6 @@ func (c *NginxAccessLog) Stop() { fn() delete(c.logs, f) } - log.Debugf("Stopping NginxAccessLog source for nginx id: %v", c.baseDimensions.NginxId) } func (c *NginxAccessLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) { @@ -329,11 +328,11 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string mu.Unlock() case <-tick.C: + mu.Lock() + c.baseDimensions.NginxType = c.nginxType c.baseDimensions.PublishedAPI = logFile - mu.Lock() - if len(requestLengths) > 0 { httpCounters["request.length"] = getAverageMetricValue(requestLengths) } diff --git a/src/core/metrics/sources/nginx_error_log.go b/src/core/metrics/sources/nginx_error_log.go index 06368eb93a..52eb63ca4d 100644 --- a/src/core/metrics/sources/nginx_error_log.go +++ b/src/core/metrics/sources/nginx_error_log.go @@ -116,7 +116,6 @@ func (c *NginxErrorLog) Stop() { func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.mu.Lock() - defer c.mu.Unlock() c.baseDimensions = dimensions @@ -129,6 +128,7 @@ func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *met // add, remove or update existing log trailers c.syncLogs() } + c.mu.Unlock() } func (c *NginxErrorLog) recreateLogs() { @@ -178,7 +178,7 @@ func (c *NginxErrorLog) stopTailer(logFile string, cancelFunction context.Cancel delete(c.logFormats, logFile) } -func (c *NginxErrorLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) { +func (c *NginxErrorLog) collectLogStats(_ context.Context, m chan<- *metrics.StatsEntityWrapper) { c.mu.Lock() defer c.mu.Unlock() @@ -227,10 +227,10 @@ func (c *NginxErrorLog) logStats(ctx context.Context, logFile string) { mu.Unlock() case <-tick.C: + mu.Lock() c.baseDimensions.NginxType = c.nginxType c.baseDimensions.PublishedAPI = logFile - mu.Lock() simpleMetrics := c.convertSamplesToSimpleMetrics(counters) log.Tracef("Error log metrics collected: %v", simpleMetrics) diff --git a/src/core/nginx.go b/src/core/nginx.go index 577abc6c05..334df03610 100644 --- a/src/core/nginx.go +++ b/src/core/nginx.go @@ -37,10 +37,8 @@ const ( ) var ( - logMutex sync.Mutex - unpackMutex sync.Mutex - re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) - plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) + re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) + plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) ) type NginxBinary interface { @@ -55,26 +53,28 @@ type NginxBinary interface { UpdateNginxDetailsFromProcesses(nginxProcesses []*Process) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApply, error) ReadConfig(path, nginxId, systemId string) (*proto.NginxConfig, error) - UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool + UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string GetAccessLogs() map[string]string GetErrorLogs() map[string]string GetChildProcesses() map[string][]*proto.NginxDetails } type NginxBinaryType struct { - detailsMapMutex sync.Mutex - workersMapMutex sync.Mutex - statusUrlMutex sync.RWMutex - env Environment - config *config.Config - nginxDetailsMap map[string]*proto.NginxDetails - nginxWorkersMap map[string][]*proto.NginxDetails - nginxInfoMap map[string]*nginxInfo - accessLogs map[string]string - errorLogs map[string]string - statusUrls map[string]string - accessLogsUpdated bool - errorLogsUpdated bool + detailsMapMutex sync.Mutex + workersMapMutex sync.Mutex + logMutex sync.Mutex + logWriteMutex sync.Mutex + unpackMutex sync.Mutex + mapMutex sync.Mutex + statusUrlMutex sync.RWMutex + env Environment + config *config.Config + nginxDetailsMap map[string]*proto.NginxDetails + nginxWorkersMap map[string][]*proto.NginxDetails + nginxInfoMap map[string]*nginxInfo + accessLogs map[string]string + errorLogs map[string]string + statusUrls map[string]string } type nginxInfo struct { @@ -408,8 +408,8 @@ func (n *NginxBinaryType) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApp return nil, fmt.Errorf("config directory %s not allowed", filepath.Dir(details.ConfPath)) } - unpackMutex.Lock() - defer unpackMutex.Unlock() + n.unpackMutex.Lock() + defer n.unpackMutex.Unlock() log.Info("Updating NGINX config") var configApply *sdk.ConfigApply @@ -661,29 +661,22 @@ func (n *NginxBinaryType) ReadConfig(confFile, nginxId, systemId string) (*proto return nil, err } - // get access logs list for analysis - accessLogs := AccessLogs(configPayload) - // get error logs list for analysis - errorLogs := ErrorLogs(configPayload) + n.logWriteMutex.Lock() + defer n.logWriteMutex.Unlock() + n.accessLogs = n.UpdateLogs(n.GetAccessLogs(), AccessLogs(configPayload)) - logMutex.Lock() - defer logMutex.Unlock() - - n.accessLogsUpdated = n.UpdateLogs(n.accessLogs, accessLogs) - n.errorLogsUpdated = n.UpdateLogs(n.errorLogs, errorLogs) + n.errorLogs = n.UpdateLogs(n.GetErrorLogs(), ErrorLogs(configPayload)) return configPayload, nil } func (n *NginxBinaryType) GetAccessLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() return n.accessLogs } func (n *NginxBinaryType) GetErrorLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() + n.logMutex.Lock() + defer n.logMutex.Unlock() return n.errorLogs } @@ -802,6 +795,9 @@ func parseConfigureArguments(line string) (result map[string]interface{}, flags } func (n *NginxBinaryType) getNginxInfoFrom(ngxExe string) *nginxInfo { + n.mapMutex.Lock() + defer n.mapMutex.Unlock() + if ngxExe == "" { return &nginxInfo{} } @@ -905,27 +901,29 @@ func (n *NginxBinaryType) parseModulePath(dir string) ([]string, error) { return result, nil } -func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool { - logUpdated := false +func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string { + log.Debug("UpdateLogs") + + copiedLogs := make(map[string]string, len(existingLogs)) + // Copy each key-value pair from the original map to the new map + for logFile, logFormat := range existingLogs { + copiedLogs[logFile] = logFormat + } for logFile, logFormat := range newLogs { if !(strings.HasPrefix(logFile, "syslog:") || n.SkipLog(logFile)) { - if _, found := existingLogs[logFile]; !found || existingLogs[logFile] != logFormat { - logUpdated = true - } - existingLogs[logFile] = logFormat + copiedLogs[logFile] = logFormat } } // delete old logs - for logFile := range existingLogs { + for logFile := range copiedLogs { if _, found := newLogs[logFile]; !found { - delete(existingLogs, logFile) - logUpdated = true + delete(copiedLogs, logFile) } } - return logUpdated + return copiedLogs } func parseNginxVersion(line string) (version, plusVersion string) { diff --git a/src/core/pipe.go b/src/core/pipe.go index 47311e79dc..481da7f36e 100644 --- a/src/core/pipe.go +++ b/src/core/pipe.go @@ -37,6 +37,7 @@ type MessagePipe struct { extensionPlugins []ExtensionPlugin ctx context.Context mu sync.RWMutex + regMu sync.Mutex bus message_bus.MessageBus } @@ -72,7 +73,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } @@ -82,7 +85,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.extensionPlugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } diff --git a/src/plugins/commander.go b/src/plugins/commander.go index be11792834..582796de8d 100644 --- a/src/plugins/commander.go +++ b/src/plugins/commander.go @@ -102,8 +102,6 @@ func (c *Commander) agentBackoff(agentConfig *proto.AgentConfig) { func (c *Commander) agentRegistered(cmd *proto.Command) { switch commandData := cmd.GetData().(type) { case *proto.Command_AgentConnectResponse: - log.Infof("config command %v", commandData) - if agtCfg := commandData.AgentConnectResponse.AgentConfig; agtCfg != nil && agtCfg.Configs != nil && len(agtCfg.Configs.Configs) > 0 { for _, config := range agtCfg.Configs.Configs { diff --git a/src/plugins/config_reader.go b/src/plugins/config_reader.go index eb9606eac9..978f01b7f6 100644 --- a/src/plugins/config_reader.go +++ b/src/plugins/config_reader.go @@ -27,6 +27,7 @@ type ConfigReader struct { messagePipeline core.MessagePipeInterface config *config.Config mu sync.RWMutex + detailsMu sync.RWMutex } func NewConfigReader(config *config.Config) *ConfigReader { @@ -85,7 +86,10 @@ func (r *ConfigReader) Subscriptions() []string { } func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) { - if payloadAgentConfig != nil && payloadAgentConfig.Details != nil { + r.mu.Lock() + defer r.mu.Unlock() + + if payloadAgentConfig.Details != nil { onDiskAgentConfig, err := config.GetConfig(r.config.ClientID) if err != nil { log.Errorf("Failed to update Agent config - %v", err) @@ -96,21 +100,29 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) synchronizeTags := false if payloadAgentConfig.Details.Features != nil { - + r.detailsMu.Lock() for index, feature := range payloadAgentConfig.Details.Features { payloadAgentConfig.Details.Features[index] = strings.Replace(feature, "features_", "", 1) } sort.Strings(onDiskAgentConfig.Features) sort.Strings(payloadAgentConfig.Details.Features) + r.detailsMu.Unlock() + + r.detailsMu.RLock() synchronizeFeatures = !reflect.DeepEqual(payloadAgentConfig.Details.Features, onDiskAgentConfig.Features) + r.detailsMu.RUnlock() } else { + r.detailsMu.Lock() payloadAgentConfig.Details.Features = onDiskAgentConfig.Features + r.detailsMu.Unlock() } if payloadAgentConfig.Details.Tags == nil { + r.detailsMu.Lock() payloadAgentConfig.Details.Tags = []string{} + r.detailsMu.Unlock() } sort.Strings(onDiskAgentConfig.Tags) @@ -118,7 +130,9 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) synchronizeTags = !reflect.DeepEqual(payloadAgentConfig.Details.Tags, onDiskAgentConfig.Tags) if synchronizeFeatures || synchronizeTags { - configUpdated, err := config.UpdateAgentConfig(r.config.ClientID, payloadAgentConfig.Details.Tags, payloadAgentConfig.Details.Features) + tags := payloadAgentConfig.Details.Tags + features := payloadAgentConfig.Details.Features + configUpdated, err := config.UpdateAgentConfig(r.config.ClientID, tags, features) if err != nil { log.Errorf("Failed updating Agent config - %v", err) } @@ -142,25 +156,22 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) } r.messagePipeline.Process(core.NewMessage(core.AgentConfigChanged, payloadAgentConfig)) - } } func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) { if r.config != nil { + r.detailsMu.RLock() for _, feature := range r.config.Features { if feature != agent_config.FeatureRegistration && feature != agent_config.FeatureNginxConfigAsync { - r.mu.Lock() r.deRegisterPlugin(feature) - r.mu.Unlock() } } + r.detailsMu.RUnlock() } if agtCfg.Details != nil { - r.mu.Lock() r.messagePipeline.Process(core.NewMessage(core.EnableFeature, agtCfg.Details.Features)) - r.mu.Unlock() } } diff --git a/src/plugins/dataplane_status.go b/src/plugins/dataplane_status.go index 6b6014a5ff..99f1ffac72 100644 --- a/src/plugins/dataplane_status.go +++ b/src/plugins/dataplane_status.go @@ -4,7 +4,6 @@ * This source code is licensed under the Apache License, Version 2.0 license found in the * LICENSE file in the root directory of this source tree. */ - package plugins import ( @@ -15,13 +14,12 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" - log "github.com/sirupsen/logrus" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/payloads" + log "github.com/sirupsen/logrus" ) type DataPlaneStatus struct { @@ -42,6 +40,7 @@ type DataPlaneStatus struct { softwareDetails map[string]*proto.DataplaneSoftwareDetails nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus softwareDetailsMutex sync.RWMutex + structMu sync.RWMutex processes []*core.Process } @@ -67,7 +66,6 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core tags: &config.Tags, configDirs: config.ConfigDirs, reportInterval: config.Dataplane.Status.ReportInterval, - softwareDetailsMutex: sync.RWMutex{}, nginxConfigActivityStatuses: make(map[string]*proto.AgentActivityStatus), softwareDetails: make(map[string]*proto.DataplaneSoftwareDetails), processes: processes, @@ -84,11 +82,9 @@ func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) { func (dps *DataPlaneStatus) Close() { log.Info("DataPlaneStatus is wrapping up") dps.nginxConfigActivityStatuses = nil - dps.softwareDetailsMutex.Lock() dps.softwareDetails = nil dps.softwareDetailsMutex.Unlock() - dps.healthTicker.Stop() dps.sendStatus <- true } @@ -103,7 +99,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) // If the agent config on disk changed update DataPlaneStatus with relevant config info dps.syncAgentConfigChange() - case msg.Exact(core.DataplaneSoftwareDetailsUpdated): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -112,7 +107,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { dps.softwareDetails[data.GetPluginName()] = data.GetDataplaneSoftwareDetails() dps.softwareDetailsMutex.Unlock() } - case msg.Exact(core.NginxConfigValidationPending): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -121,7 +115,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { default: log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } - case msg.Exact(core.NginxConfigApplyFailed) || msg.Exact(core.NginxConfigApplySucceeded): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -132,7 +125,9 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } case msg.Exact(core.NginxDetailProcUpdate): + dps.structMu.Lock() dps.processes = msg.Data().([]*core.Process) + dps.structMu.Unlock() } } @@ -193,15 +188,12 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses { agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus) } - dps.softwareDetailsMutex.Lock() defer dps.softwareDetailsMutex.Unlock() - dataplaneSoftwareDetails := []*proto.DataplaneSoftwareDetails{} for _, softwareDetail := range dps.softwareDetails { dataplaneSoftwareDetails = append(dataplaneSoftwareDetails, softwareDetail) } - dataplaneStatus := &proto.DataplaneStatus{ Host: dps.hostInfo(forceDetails), Details: dps.detailsForProcess(dps.processes, forceDetails), @@ -214,6 +206,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { // this sets send if we are forcing details, or it has been 24 hours since the last send + dps.structMu.Lock() + defer dps.structMu.Unlock() hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, send) if !send && cmp.Equal(dps.envHostInfo, hostInfo) { return nil @@ -227,7 +221,6 @@ func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bool) (details []*proto.NginxDetails) { log.Tracef("detailsForProcess processes: %v", processes) - nowUTC := time.Now().UTC() // this sets send if we are forcing details, or it has been 24 hours since the last send for _, p := range processes { @@ -246,7 +239,9 @@ func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bo return nil } + dps.structMu.Lock() dps.lastSendDetails = nowUTC + dps.structMu.Unlock() return details } @@ -255,7 +250,6 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails := make(map[string]*proto.NginxHealth) instanceProcessCount := make(map[string]int) log.Tracef("healthForProcess processes: %v", processes) - for _, p := range processes { instanceID := dps.binary.GetNginxIDForProcess(p) log.Tracef("Process: %v instanceID %s", p, instanceID) @@ -277,10 +271,8 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails[instanceID].NginxStatus = proto.NginxHealth_DEGRADED } } - for instanceID, health := range heathDetails { log.Tracef("instanceID: %s health: %s", instanceID, health) - if instanceProcessCount[instanceID] <= 1 { reason := "does not have enough children" if heathDetails[instanceID].NginxStatus == proto.NginxHealth_DEGRADED { @@ -301,20 +293,22 @@ func (dps *DataPlaneStatus) syncAgentConfigChange() { return } log.Debugf("DataPlaneStatus is updating to a new config - %v", conf) - pollInt := conf.Dataplane.Status.PollInterval if pollInt < defaultMinInterval { pollInt = defaultMinInterval log.Warnf("interval set to %s, provided value (%s) less than minimum", pollInt, conf.Dataplane.Status.PollInterval) } - if conf.DisplayName == "" { conf.DisplayName = dps.env.GetHostname() log.Infof("setting displayName to %s", conf.DisplayName) } // Update DataPlaneStatus with relevant config info + dps.structMu.Lock() + dps.interval = pollInt dps.tags = &conf.Tags dps.configDirs = conf.ConfigDirs + + dps.structMu.Unlock() } diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index 55238474c4..97b0de01f1 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -71,7 +71,9 @@ func (m *Metrics) Init(pipeline core.MessagePipeInterface) { } func (m *Metrics) Close() { + m.collectorsMutex.Lock() m.collectors = nil + m.collectorsMutex.Unlock() log.Info("Metrics is wrapping up") } @@ -336,6 +338,8 @@ func createCollectorConfigsMap(config *config.Config, binary core.NginxBinary, p } func (m *Metrics) updateCollectorsConfig() { + m.collectorsMutex.Lock() + defer m.collectorsMutex.Unlock() log.Trace("Updating collector config") for _, collector := range m.collectors { if nginxCollector, ok := collector.(*collectors.NginxCollector); ok { diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index 249ff15ef2..91983146b2 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -106,7 +106,6 @@ func (r *MetricsThrottle) Process(msg *core.Message) { } collection := metrics.SaveCollections(*r.metricsCollections[report.Type], report) r.metricsCollections[report.Type] = &collection - log.Debugf("MetricsThrottle: Metrics collection saved [Type: %d]", report.Type) } } r.mu.Unlock() diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go index 0bedeedd8a..6eaa73ca61 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error { } func (r *metricReporter) Close() (err error) { + r.mu.Lock() + defer r.mu.Unlock() return r.closeConnection() } diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0b6f32e8d8..ed2595be87 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/nginx/agent/sdk/v2/backoff" @@ -43,6 +44,8 @@ const ( httpClientTimeout = 1 * time.Second ) +var readLock = sync.Mutex{} + type DirectoryMap struct { paths map[string]*proto.Directory } @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives( allowedDirectories map[string]struct{}, ignoreDirectives []string, ) (*proto.NginxConfig, error) { + readLock.Lock() payload, err := crossplane.Parse(confFile, &crossplane.ParseOptions{ IgnoreDirectives: ignoreDirectives, @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives( if err != nil { return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err) } + readLock.Unlock() return nginxConfig, nil } diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go index e4aca09014..b8ed3aa10a 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/config/config.go @@ -19,6 +19,7 @@ import ( "reflect" "sort" "strings" + "sync" "time" agent_config "github.com/nginx/agent/sdk/v2/agent/config" @@ -50,6 +51,7 @@ const ( var ( Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter)) MigratedEnv = false + cfgMu = sync.Mutex{} ) func SetVersion(version, commit string) { @@ -196,7 +198,7 @@ func RegisterRunner(r func(cmd *cobra.Command, args []string)) { } func GetConfig(clientId string) (*Config, error) { - extensions := []string{} + var extensions []string for _, extension := range Viper.GetStringSlice(agent_config.ExtensionsKey) { if agent_config.IsKnownExtension(extension) { @@ -247,6 +249,8 @@ func GetConfig(clientId string) (*Config, error) { // overwritten or not. func UpdateAgentConfig(systemId string, updateTags []string, updateFeatures []string) (bool, error) { // Get current config on disk + cfgMu.Lock() + defer cfgMu.Unlock() config, err := GetConfig(systemId) if err != nil { log.Errorf("Failed to register config: %v", err) diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go index 5bb0355aa3..3e1b719760 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go @@ -20,6 +20,7 @@ import ( "regexp" "runtime" "strings" + "sync" "syscall" "github.com/google/uuid" @@ -43,7 +44,6 @@ import ( //go:generate mv fake_environment_fixed.go fake_environment_test.go type Environment interface { NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo - // NewHostInfoWithContext(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo GetHostname() (hostname string) GetSystemUUID() (hostId string) ReadDirectory(dir string, ext string) ([]string, error) @@ -70,6 +70,7 @@ type EnvironmentType struct { host *proto.HostInfo virtualizationFunc func(ctx context.Context) (string, string, error) isContainerFunc func() bool + hostMu sync.Mutex } type Process struct { @@ -106,6 +107,7 @@ const ( IsContainerKey = "isContainer" GetContainerIDKey = "GetContainerID" GetSystemUUIDKey = "GetSystemUUIDKey" + ReleaseInfoFile = "/etc/os-release" ) var ( @@ -125,6 +127,8 @@ func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, con func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { defer ctx.Done() + env.hostMu.Lock() + defer env.hostMu.Unlock() // temp cache measure if env.host == nil || clearCache { hostInformation, err := host.InfoWithContext(ctx) @@ -154,7 +158,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer Partitons: disks, Network: env.networks(), Processor: env.processors(hostInformation.KernelArch), - Release: releaseInfo("/etc/os-release"), + Release: releaseInfo(ReleaseInfoFile), Tags: *tags, AgentAccessibleDirs: configDirs, } diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go index 577abc6c05..334df03610 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -37,10 +37,8 @@ const ( ) var ( - logMutex sync.Mutex - unpackMutex sync.Mutex - re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) - plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) + re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) + plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) ) type NginxBinary interface { @@ -55,26 +53,28 @@ type NginxBinary interface { UpdateNginxDetailsFromProcesses(nginxProcesses []*Process) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApply, error) ReadConfig(path, nginxId, systemId string) (*proto.NginxConfig, error) - UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool + UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string GetAccessLogs() map[string]string GetErrorLogs() map[string]string GetChildProcesses() map[string][]*proto.NginxDetails } type NginxBinaryType struct { - detailsMapMutex sync.Mutex - workersMapMutex sync.Mutex - statusUrlMutex sync.RWMutex - env Environment - config *config.Config - nginxDetailsMap map[string]*proto.NginxDetails - nginxWorkersMap map[string][]*proto.NginxDetails - nginxInfoMap map[string]*nginxInfo - accessLogs map[string]string - errorLogs map[string]string - statusUrls map[string]string - accessLogsUpdated bool - errorLogsUpdated bool + detailsMapMutex sync.Mutex + workersMapMutex sync.Mutex + logMutex sync.Mutex + logWriteMutex sync.Mutex + unpackMutex sync.Mutex + mapMutex sync.Mutex + statusUrlMutex sync.RWMutex + env Environment + config *config.Config + nginxDetailsMap map[string]*proto.NginxDetails + nginxWorkersMap map[string][]*proto.NginxDetails + nginxInfoMap map[string]*nginxInfo + accessLogs map[string]string + errorLogs map[string]string + statusUrls map[string]string } type nginxInfo struct { @@ -408,8 +408,8 @@ func (n *NginxBinaryType) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApp return nil, fmt.Errorf("config directory %s not allowed", filepath.Dir(details.ConfPath)) } - unpackMutex.Lock() - defer unpackMutex.Unlock() + n.unpackMutex.Lock() + defer n.unpackMutex.Unlock() log.Info("Updating NGINX config") var configApply *sdk.ConfigApply @@ -661,29 +661,22 @@ func (n *NginxBinaryType) ReadConfig(confFile, nginxId, systemId string) (*proto return nil, err } - // get access logs list for analysis - accessLogs := AccessLogs(configPayload) - // get error logs list for analysis - errorLogs := ErrorLogs(configPayload) + n.logWriteMutex.Lock() + defer n.logWriteMutex.Unlock() + n.accessLogs = n.UpdateLogs(n.GetAccessLogs(), AccessLogs(configPayload)) - logMutex.Lock() - defer logMutex.Unlock() - - n.accessLogsUpdated = n.UpdateLogs(n.accessLogs, accessLogs) - n.errorLogsUpdated = n.UpdateLogs(n.errorLogs, errorLogs) + n.errorLogs = n.UpdateLogs(n.GetErrorLogs(), ErrorLogs(configPayload)) return configPayload, nil } func (n *NginxBinaryType) GetAccessLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() return n.accessLogs } func (n *NginxBinaryType) GetErrorLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() + n.logMutex.Lock() + defer n.logMutex.Unlock() return n.errorLogs } @@ -802,6 +795,9 @@ func parseConfigureArguments(line string) (result map[string]interface{}, flags } func (n *NginxBinaryType) getNginxInfoFrom(ngxExe string) *nginxInfo { + n.mapMutex.Lock() + defer n.mapMutex.Unlock() + if ngxExe == "" { return &nginxInfo{} } @@ -905,27 +901,29 @@ func (n *NginxBinaryType) parseModulePath(dir string) ([]string, error) { return result, nil } -func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool { - logUpdated := false +func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string { + log.Debug("UpdateLogs") + + copiedLogs := make(map[string]string, len(existingLogs)) + // Copy each key-value pair from the original map to the new map + for logFile, logFormat := range existingLogs { + copiedLogs[logFile] = logFormat + } for logFile, logFormat := range newLogs { if !(strings.HasPrefix(logFile, "syslog:") || n.SkipLog(logFile)) { - if _, found := existingLogs[logFile]; !found || existingLogs[logFile] != logFormat { - logUpdated = true - } - existingLogs[logFile] = logFormat + copiedLogs[logFile] = logFormat } } // delete old logs - for logFile := range existingLogs { + for logFile := range copiedLogs { if _, found := newLogs[logFile]; !found { - delete(existingLogs, logFile) - logUpdated = true + delete(copiedLogs, logFile) } } - return logUpdated + return copiedLogs } func parseNginxVersion(line string) (version, plusVersion string) { diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go index 47311e79dc..481da7f36e 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/pipe.go @@ -37,6 +37,7 @@ type MessagePipe struct { extensionPlugins []ExtensionPlugin ctx context.Context mu sync.RWMutex + regMu sync.Mutex bus message_bus.MessageBus } @@ -72,7 +73,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } @@ -82,7 +85,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.extensionPlugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } diff --git a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go index dfb498adf5..73a64e226d 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/nginx.go @@ -133,9 +133,9 @@ func (m *MockNginxBinary) GetNginxDetailsFromProcess(nginxProcess *core.Process) return args.Get(0).(*proto.NginxDetails) } -func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) bool { +func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) map[string]string { args := m.Called(existing, newLogs) - return args.Bool(0) + return args.Get(0).(map[string]string) } func (m *MockNginxBinary) GetAccessLogs() map[string]string { diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go index 0bedeedd8a..6eaa73ca61 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error { } func (r *metricReporter) Close() (err error) { + r.mu.Lock() + defer r.mu.Unlock() return r.closeConnection() } diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0b6f32e8d8..ed2595be87 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/nginx/agent/sdk/v2/backoff" @@ -43,6 +44,8 @@ const ( httpClientTimeout = 1 * time.Second ) +var readLock = sync.Mutex{} + type DirectoryMap struct { paths map[string]*proto.Directory } @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives( allowedDirectories map[string]struct{}, ignoreDirectives []string, ) (*proto.NginxConfig, error) { + readLock.Lock() payload, err := crossplane.Parse(confFile, &crossplane.ParseOptions{ IgnoreDirectives: ignoreDirectives, @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives( if err != nil { return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err) } + readLock.Unlock() return nginxConfig, nil } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go index e4aca09014..b8ed3aa10a 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go @@ -19,6 +19,7 @@ import ( "reflect" "sort" "strings" + "sync" "time" agent_config "github.com/nginx/agent/sdk/v2/agent/config" @@ -50,6 +51,7 @@ const ( var ( Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter)) MigratedEnv = false + cfgMu = sync.Mutex{} ) func SetVersion(version, commit string) { @@ -196,7 +198,7 @@ func RegisterRunner(r func(cmd *cobra.Command, args []string)) { } func GetConfig(clientId string) (*Config, error) { - extensions := []string{} + var extensions []string for _, extension := range Viper.GetStringSlice(agent_config.ExtensionsKey) { if agent_config.IsKnownExtension(extension) { @@ -247,6 +249,8 @@ func GetConfig(clientId string) (*Config, error) { // overwritten or not. func UpdateAgentConfig(systemId string, updateTags []string, updateFeatures []string) (bool, error) { // Get current config on disk + cfgMu.Lock() + defer cfgMu.Unlock() config, err := GetConfig(systemId) if err != nil { log.Errorf("Failed to register config: %v", err) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go index 5bb0355aa3..3e1b719760 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go @@ -20,6 +20,7 @@ import ( "regexp" "runtime" "strings" + "sync" "syscall" "github.com/google/uuid" @@ -43,7 +44,6 @@ import ( //go:generate mv fake_environment_fixed.go fake_environment_test.go type Environment interface { NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo - // NewHostInfoWithContext(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo GetHostname() (hostname string) GetSystemUUID() (hostId string) ReadDirectory(dir string, ext string) ([]string, error) @@ -70,6 +70,7 @@ type EnvironmentType struct { host *proto.HostInfo virtualizationFunc func(ctx context.Context) (string, string, error) isContainerFunc func() bool + hostMu sync.Mutex } type Process struct { @@ -106,6 +107,7 @@ const ( IsContainerKey = "isContainer" GetContainerIDKey = "GetContainerID" GetSystemUUIDKey = "GetSystemUUIDKey" + ReleaseInfoFile = "/etc/os-release" ) var ( @@ -125,6 +127,8 @@ func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, con func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { defer ctx.Done() + env.hostMu.Lock() + defer env.hostMu.Unlock() // temp cache measure if env.host == nil || clearCache { hostInformation, err := host.InfoWithContext(ctx) @@ -154,7 +158,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer Partitons: disks, Network: env.networks(), Processor: env.processors(hostInformation.KernelArch), - Release: releaseInfo("/etc/os-release"), + Release: releaseInfo(ReleaseInfoFile), Tags: *tags, AgentAccessibleDirs: configDirs, } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go index 98a3847a2d..3bf7eafb5c 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go @@ -14,7 +14,6 @@ import ( "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" - log "github.com/sirupsen/logrus" ) const MOUNT_POINT = "mount_point" @@ -49,8 +48,6 @@ func (c *Disk) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrapper "in_use": float64(usage.UsedPercentage), }) - log.Debugf("disk metrics collected: %v", len(simpleMetrics)) - select { case <-ctx.Done(): return diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/net_io.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/net_io.go index e3ef91cd83..13fcc2f24b 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/net_io.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/net_io.go @@ -12,11 +12,12 @@ import ( "fmt" "sync" + log "github.com/sirupsen/logrus" + "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" "github.com/shirou/gopsutil/v3/net" - log "github.com/sirupsen/logrus" ) const NETWORK_INTERFACE = "network_interface" @@ -82,7 +83,6 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap } simpleMetrics := nio.convertSamplesToSimpleMetrics(v) - log.Debugf("net IO stats count: %d", len(simpleMetrics)) select { case <-ctx.Done(): @@ -109,6 +109,7 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap simpleMetrics := nio.convertSamplesToSimpleMetrics(totalStats) m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{}, simpleMetrics, proto.MetricsReport_SYSTEM) + log.Debugf("net IO stats: %v", currentNetIOStats) nio.netIOStats = currentNetIOStats } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go index 46c50b5830..b52a758428 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go @@ -182,7 +182,6 @@ func (c *NginxAccessLog) Stop() { fn() delete(c.logs, f) } - log.Debugf("Stopping NginxAccessLog source for nginx id: %v", c.baseDimensions.NginxId) } func (c *NginxAccessLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) { @@ -329,11 +328,11 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string mu.Unlock() case <-tick.C: + mu.Lock() + c.baseDimensions.NginxType = c.nginxType c.baseDimensions.PublishedAPI = logFile - mu.Lock() - if len(requestLengths) > 0 { httpCounters["request.length"] = getAverageMetricValue(requestLengths) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_error_log.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_error_log.go index 06368eb93a..52eb63ca4d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_error_log.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_error_log.go @@ -116,7 +116,6 @@ func (c *NginxErrorLog) Stop() { func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) { c.mu.Lock() - defer c.mu.Unlock() c.baseDimensions = dimensions @@ -129,6 +128,7 @@ func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *met // add, remove or update existing log trailers c.syncLogs() } + c.mu.Unlock() } func (c *NginxErrorLog) recreateLogs() { @@ -178,7 +178,7 @@ func (c *NginxErrorLog) stopTailer(logFile string, cancelFunction context.Cancel delete(c.logFormats, logFile) } -func (c *NginxErrorLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) { +func (c *NginxErrorLog) collectLogStats(_ context.Context, m chan<- *metrics.StatsEntityWrapper) { c.mu.Lock() defer c.mu.Unlock() @@ -227,10 +227,10 @@ func (c *NginxErrorLog) logStats(ctx context.Context, logFile string) { mu.Unlock() case <-tick.C: + mu.Lock() c.baseDimensions.NginxType = c.nginxType c.baseDimensions.PublishedAPI = logFile - mu.Lock() simpleMetrics := c.convertSamplesToSimpleMetrics(counters) log.Tracef("Error log metrics collected: %v", simpleMetrics) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go index 577abc6c05..334df03610 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -37,10 +37,8 @@ const ( ) var ( - logMutex sync.Mutex - unpackMutex sync.Mutex - re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) - plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) + re = regexp.MustCompile(`(?P\S+)/(?P\S+)`) + plusre = regexp.MustCompile(`(?P\S+)/(?P\S+).\((?P\S+plus\S+)\)`) ) type NginxBinary interface { @@ -55,26 +53,28 @@ type NginxBinary interface { UpdateNginxDetailsFromProcesses(nginxProcesses []*Process) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApply, error) ReadConfig(path, nginxId, systemId string) (*proto.NginxConfig, error) - UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool + UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string GetAccessLogs() map[string]string GetErrorLogs() map[string]string GetChildProcesses() map[string][]*proto.NginxDetails } type NginxBinaryType struct { - detailsMapMutex sync.Mutex - workersMapMutex sync.Mutex - statusUrlMutex sync.RWMutex - env Environment - config *config.Config - nginxDetailsMap map[string]*proto.NginxDetails - nginxWorkersMap map[string][]*proto.NginxDetails - nginxInfoMap map[string]*nginxInfo - accessLogs map[string]string - errorLogs map[string]string - statusUrls map[string]string - accessLogsUpdated bool - errorLogsUpdated bool + detailsMapMutex sync.Mutex + workersMapMutex sync.Mutex + logMutex sync.Mutex + logWriteMutex sync.Mutex + unpackMutex sync.Mutex + mapMutex sync.Mutex + statusUrlMutex sync.RWMutex + env Environment + config *config.Config + nginxDetailsMap map[string]*proto.NginxDetails + nginxWorkersMap map[string][]*proto.NginxDetails + nginxInfoMap map[string]*nginxInfo + accessLogs map[string]string + errorLogs map[string]string + statusUrls map[string]string } type nginxInfo struct { @@ -408,8 +408,8 @@ func (n *NginxBinaryType) WriteConfig(config *proto.NginxConfig) (*sdk.ConfigApp return nil, fmt.Errorf("config directory %s not allowed", filepath.Dir(details.ConfPath)) } - unpackMutex.Lock() - defer unpackMutex.Unlock() + n.unpackMutex.Lock() + defer n.unpackMutex.Unlock() log.Info("Updating NGINX config") var configApply *sdk.ConfigApply @@ -661,29 +661,22 @@ func (n *NginxBinaryType) ReadConfig(confFile, nginxId, systemId string) (*proto return nil, err } - // get access logs list for analysis - accessLogs := AccessLogs(configPayload) - // get error logs list for analysis - errorLogs := ErrorLogs(configPayload) + n.logWriteMutex.Lock() + defer n.logWriteMutex.Unlock() + n.accessLogs = n.UpdateLogs(n.GetAccessLogs(), AccessLogs(configPayload)) - logMutex.Lock() - defer logMutex.Unlock() - - n.accessLogsUpdated = n.UpdateLogs(n.accessLogs, accessLogs) - n.errorLogsUpdated = n.UpdateLogs(n.errorLogs, errorLogs) + n.errorLogs = n.UpdateLogs(n.GetErrorLogs(), ErrorLogs(configPayload)) return configPayload, nil } func (n *NginxBinaryType) GetAccessLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() return n.accessLogs } func (n *NginxBinaryType) GetErrorLogs() map[string]string { - logMutex.Lock() - defer logMutex.Unlock() + n.logMutex.Lock() + defer n.logMutex.Unlock() return n.errorLogs } @@ -802,6 +795,9 @@ func parseConfigureArguments(line string) (result map[string]interface{}, flags } func (n *NginxBinaryType) getNginxInfoFrom(ngxExe string) *nginxInfo { + n.mapMutex.Lock() + defer n.mapMutex.Unlock() + if ngxExe == "" { return &nginxInfo{} } @@ -905,27 +901,29 @@ func (n *NginxBinaryType) parseModulePath(dir string) ([]string, error) { return result, nil } -func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) bool { - logUpdated := false +func (n *NginxBinaryType) UpdateLogs(existingLogs map[string]string, newLogs map[string]string) map[string]string { + log.Debug("UpdateLogs") + + copiedLogs := make(map[string]string, len(existingLogs)) + // Copy each key-value pair from the original map to the new map + for logFile, logFormat := range existingLogs { + copiedLogs[logFile] = logFormat + } for logFile, logFormat := range newLogs { if !(strings.HasPrefix(logFile, "syslog:") || n.SkipLog(logFile)) { - if _, found := existingLogs[logFile]; !found || existingLogs[logFile] != logFormat { - logUpdated = true - } - existingLogs[logFile] = logFormat + copiedLogs[logFile] = logFormat } } // delete old logs - for logFile := range existingLogs { + for logFile := range copiedLogs { if _, found := newLogs[logFile]; !found { - delete(existingLogs, logFile) - logUpdated = true + delete(copiedLogs, logFile) } } - return logUpdated + return copiedLogs } func parseNginxVersion(line string) (version, plusVersion string) { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go index 47311e79dc..481da7f36e 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go @@ -37,6 +37,7 @@ type MessagePipe struct { extensionPlugins []ExtensionPlugin ctx context.Context mu sync.RWMutex + regMu sync.Mutex bus message_bus.MessageBus } @@ -72,7 +73,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } @@ -82,7 +85,9 @@ func (p *MessagePipe) Register(size int, plugins []Plugin, extensionPlugins []Ex for _, plugin := range p.extensionPlugins { for _, subscription := range plugin.Subscriptions() { + p.regMu.Lock() err := p.bus.Subscribe(subscription, plugin.Process) + p.regMu.Unlock() if err != nil { return err } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go index be11792834..582796de8d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go @@ -102,8 +102,6 @@ func (c *Commander) agentBackoff(agentConfig *proto.AgentConfig) { func (c *Commander) agentRegistered(cmd *proto.Command) { switch commandData := cmd.GetData().(type) { case *proto.Command_AgentConnectResponse: - log.Infof("config command %v", commandData) - if agtCfg := commandData.AgentConnectResponse.AgentConfig; agtCfg != nil && agtCfg.Configs != nil && len(agtCfg.Configs.Configs) > 0 { for _, config := range agtCfg.Configs.Configs { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/config_reader.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/config_reader.go index eb9606eac9..978f01b7f6 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/config_reader.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/config_reader.go @@ -27,6 +27,7 @@ type ConfigReader struct { messagePipeline core.MessagePipeInterface config *config.Config mu sync.RWMutex + detailsMu sync.RWMutex } func NewConfigReader(config *config.Config) *ConfigReader { @@ -85,7 +86,10 @@ func (r *ConfigReader) Subscriptions() []string { } func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) { - if payloadAgentConfig != nil && payloadAgentConfig.Details != nil { + r.mu.Lock() + defer r.mu.Unlock() + + if payloadAgentConfig.Details != nil { onDiskAgentConfig, err := config.GetConfig(r.config.ClientID) if err != nil { log.Errorf("Failed to update Agent config - %v", err) @@ -96,21 +100,29 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) synchronizeTags := false if payloadAgentConfig.Details.Features != nil { - + r.detailsMu.Lock() for index, feature := range payloadAgentConfig.Details.Features { payloadAgentConfig.Details.Features[index] = strings.Replace(feature, "features_", "", 1) } sort.Strings(onDiskAgentConfig.Features) sort.Strings(payloadAgentConfig.Details.Features) + r.detailsMu.Unlock() + + r.detailsMu.RLock() synchronizeFeatures = !reflect.DeepEqual(payloadAgentConfig.Details.Features, onDiskAgentConfig.Features) + r.detailsMu.RUnlock() } else { + r.detailsMu.Lock() payloadAgentConfig.Details.Features = onDiskAgentConfig.Features + r.detailsMu.Unlock() } if payloadAgentConfig.Details.Tags == nil { + r.detailsMu.Lock() payloadAgentConfig.Details.Tags = []string{} + r.detailsMu.Unlock() } sort.Strings(onDiskAgentConfig.Tags) @@ -118,7 +130,9 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) synchronizeTags = !reflect.DeepEqual(payloadAgentConfig.Details.Tags, onDiskAgentConfig.Tags) if synchronizeFeatures || synchronizeTags { - configUpdated, err := config.UpdateAgentConfig(r.config.ClientID, payloadAgentConfig.Details.Tags, payloadAgentConfig.Details.Features) + tags := payloadAgentConfig.Details.Tags + features := payloadAgentConfig.Details.Features + configUpdated, err := config.UpdateAgentConfig(r.config.ClientID, tags, features) if err != nil { log.Errorf("Failed updating Agent config - %v", err) } @@ -142,25 +156,22 @@ func (r *ConfigReader) updateAgentConfig(payloadAgentConfig *proto.AgentConfig) } r.messagePipeline.Process(core.NewMessage(core.AgentConfigChanged, payloadAgentConfig)) - } } func (r *ConfigReader) synchronizeFeatures(agtCfg *proto.AgentConfig) { if r.config != nil { + r.detailsMu.RLock() for _, feature := range r.config.Features { if feature != agent_config.FeatureRegistration && feature != agent_config.FeatureNginxConfigAsync { - r.mu.Lock() r.deRegisterPlugin(feature) - r.mu.Unlock() } } + r.detailsMu.RUnlock() } if agtCfg.Details != nil { - r.mu.Lock() r.messagePipeline.Process(core.NewMessage(core.EnableFeature, agtCfg.Details.Features)) - r.mu.Unlock() } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go index 6b6014a5ff..99f1ffac72 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go @@ -4,7 +4,6 @@ * This source code is licensed under the Apache License, Version 2.0 license found in the * LICENSE file in the root directory of this source tree. */ - package plugins import ( @@ -15,13 +14,12 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" - log "github.com/sirupsen/logrus" - agent_config "github.com/nginx/agent/sdk/v2/agent/config" "github.com/nginx/agent/sdk/v2/proto" "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/config" "github.com/nginx/agent/v2/src/core/payloads" + log "github.com/sirupsen/logrus" ) type DataPlaneStatus struct { @@ -42,6 +40,7 @@ type DataPlaneStatus struct { softwareDetails map[string]*proto.DataplaneSoftwareDetails nginxConfigActivityStatuses map[string]*proto.AgentActivityStatus softwareDetailsMutex sync.RWMutex + structMu sync.RWMutex processes []*core.Process } @@ -67,7 +66,6 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core tags: &config.Tags, configDirs: config.ConfigDirs, reportInterval: config.Dataplane.Status.ReportInterval, - softwareDetailsMutex: sync.RWMutex{}, nginxConfigActivityStatuses: make(map[string]*proto.AgentActivityStatus), softwareDetails: make(map[string]*proto.DataplaneSoftwareDetails), processes: processes, @@ -84,11 +82,9 @@ func (dps *DataPlaneStatus) Init(pipeline core.MessagePipeInterface) { func (dps *DataPlaneStatus) Close() { log.Info("DataPlaneStatus is wrapping up") dps.nginxConfigActivityStatuses = nil - dps.softwareDetailsMutex.Lock() dps.softwareDetails = nil dps.softwareDetailsMutex.Unlock() - dps.healthTicker.Stop() dps.sendStatus <- true } @@ -103,7 +99,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) // If the agent config on disk changed update DataPlaneStatus with relevant config info dps.syncAgentConfigChange() - case msg.Exact(core.DataplaneSoftwareDetailsUpdated): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -112,7 +107,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { dps.softwareDetails[data.GetPluginName()] = data.GetDataplaneSoftwareDetails() dps.softwareDetailsMutex.Unlock() } - case msg.Exact(core.NginxConfigValidationPending): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -121,7 +115,6 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { default: log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } - case msg.Exact(core.NginxConfigApplyFailed) || msg.Exact(core.NginxConfigApplySucceeded): log.Tracef("DataplaneStatus: %T message from topic %s received", msg.Data(), msg.Topic()) switch data := msg.Data().(type) { @@ -132,7 +125,9 @@ func (dps *DataPlaneStatus) Process(msg *core.Message) { log.Errorf("Expected the type %T but got %T", &proto.AgentActivityStatus{}, data) } case msg.Exact(core.NginxDetailProcUpdate): + dps.structMu.Lock() dps.processes = msg.Data().([]*core.Process) + dps.structMu.Unlock() } } @@ -193,15 +188,12 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS for _, nginxConfigActivityStatus := range dps.nginxConfigActivityStatuses { agentActivityStatuses = append(agentActivityStatuses, nginxConfigActivityStatus) } - dps.softwareDetailsMutex.Lock() defer dps.softwareDetailsMutex.Unlock() - dataplaneSoftwareDetails := []*proto.DataplaneSoftwareDetails{} for _, softwareDetail := range dps.softwareDetails { dataplaneSoftwareDetails = append(dataplaneSoftwareDetails, softwareDetail) } - dataplaneStatus := &proto.DataplaneStatus{ Host: dps.hostInfo(forceDetails), Details: dps.detailsForProcess(dps.processes, forceDetails), @@ -214,6 +206,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { // this sets send if we are forcing details, or it has been 24 hours since the last send + dps.structMu.Lock() + defer dps.structMu.Unlock() hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, send) if !send && cmp.Equal(dps.envHostInfo, hostInfo) { return nil @@ -227,7 +221,6 @@ func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bool) (details []*proto.NginxDetails) { log.Tracef("detailsForProcess processes: %v", processes) - nowUTC := time.Now().UTC() // this sets send if we are forcing details, or it has been 24 hours since the last send for _, p := range processes { @@ -246,7 +239,9 @@ func (dps *DataPlaneStatus) detailsForProcess(processes []*core.Process, send bo return nil } + dps.structMu.Lock() dps.lastSendDetails = nowUTC + dps.structMu.Unlock() return details } @@ -255,7 +250,6 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails := make(map[string]*proto.NginxHealth) instanceProcessCount := make(map[string]int) log.Tracef("healthForProcess processes: %v", processes) - for _, p := range processes { instanceID := dps.binary.GetNginxIDForProcess(p) log.Tracef("Process: %v instanceID %s", p, instanceID) @@ -277,10 +271,8 @@ func (dps *DataPlaneStatus) healthForProcess(processes []*core.Process) (healths heathDetails[instanceID].NginxStatus = proto.NginxHealth_DEGRADED } } - for instanceID, health := range heathDetails { log.Tracef("instanceID: %s health: %s", instanceID, health) - if instanceProcessCount[instanceID] <= 1 { reason := "does not have enough children" if heathDetails[instanceID].NginxStatus == proto.NginxHealth_DEGRADED { @@ -301,20 +293,22 @@ func (dps *DataPlaneStatus) syncAgentConfigChange() { return } log.Debugf("DataPlaneStatus is updating to a new config - %v", conf) - pollInt := conf.Dataplane.Status.PollInterval if pollInt < defaultMinInterval { pollInt = defaultMinInterval log.Warnf("interval set to %s, provided value (%s) less than minimum", pollInt, conf.Dataplane.Status.PollInterval) } - if conf.DisplayName == "" { conf.DisplayName = dps.env.GetHostname() log.Infof("setting displayName to %s", conf.DisplayName) } // Update DataPlaneStatus with relevant config info + dps.structMu.Lock() + dps.interval = pollInt dps.tags = &conf.Tags dps.configDirs = conf.ConfigDirs + + dps.structMu.Unlock() } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index 55238474c4..97b0de01f1 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -71,7 +71,9 @@ func (m *Metrics) Init(pipeline core.MessagePipeInterface) { } func (m *Metrics) Close() { + m.collectorsMutex.Lock() m.collectors = nil + m.collectorsMutex.Unlock() log.Info("Metrics is wrapping up") } @@ -336,6 +338,8 @@ func createCollectorConfigsMap(config *config.Config, binary core.NginxBinary, p } func (m *Metrics) updateCollectorsConfig() { + m.collectorsMutex.Lock() + defer m.collectorsMutex.Unlock() log.Trace("Updating collector config") for _, collector := range m.collectors { if nginxCollector, ok := collector.(*collectors.NginxCollector); ok { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index 249ff15ef2..91983146b2 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -106,7 +106,6 @@ func (r *MetricsThrottle) Process(msg *core.Message) { } collection := metrics.SaveCollections(*r.metricsCollections[report.Type], report) r.metricsCollections[report.Type] = &collection - log.Debugf("MetricsThrottle: Metrics collection saved [Type: %d]", report.Type) } } r.mu.Unlock() diff --git a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go index dfb498adf5..73a64e226d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/nginx.go @@ -133,9 +133,9 @@ func (m *MockNginxBinary) GetNginxDetailsFromProcess(nginxProcess *core.Process) return args.Get(0).(*proto.NginxDetails) } -func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) bool { +func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) map[string]string { args := m.Called(existing, newLogs) - return args.Bool(0) + return args.Get(0).(map[string]string) } func (m *MockNginxBinary) GetAccessLogs() map[string]string { diff --git a/test/utils/nginx.go b/test/utils/nginx.go index dfb498adf5..73a64e226d 100644 --- a/test/utils/nginx.go +++ b/test/utils/nginx.go @@ -133,9 +133,9 @@ func (m *MockNginxBinary) GetNginxDetailsFromProcess(nginxProcess *core.Process) return args.Get(0).(*proto.NginxDetails) } -func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) bool { +func (m *MockNginxBinary) UpdateLogs(existing map[string]string, newLogs map[string]string) map[string]string { args := m.Called(existing, newLogs) - return args.Bool(0) + return args.Get(0).(map[string]string) } func (m *MockNginxBinary) GetAccessLogs() map[string]string { diff --git a/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go b/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go index 0bedeedd8a..6eaa73ca61 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go +++ b/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error { } func (r *metricReporter) Close() (err error) { + r.mu.Lock() + defer r.mu.Unlock() return r.closeConnection() } diff --git a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 0b6f32e8d8..ed2595be87 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/nginx/agent/sdk/v2/backoff" @@ -43,6 +44,8 @@ const ( httpClientTimeout = 1 * time.Second ) +var readLock = sync.Mutex{} + type DirectoryMap struct { paths map[string]*proto.Directory } @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives( allowedDirectories map[string]struct{}, ignoreDirectives []string, ) (*proto.NginxConfig, error) { + readLock.Lock() payload, err := crossplane.Parse(confFile, &crossplane.ParseOptions{ IgnoreDirectives: ignoreDirectives, @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives( if err != nil { return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err) } + readLock.Unlock() return nginxConfig, nil } From acb0ca47a9a15574825bb794ad6c1d69d4ac19ad Mon Sep 17 00:00:00 2001 From: Luca Comellini Date: Wed, 11 Sep 2024 07:33:15 -0700 Subject: [PATCH 4/4] Add CLA bot workflow (#828) --- .github/workflows/f5-cla.yml | 51 ++++++++++++++++++++++++++++++++++++ CONTRIBUTING.md | 9 +++++++ 2 files changed, 60 insertions(+) create mode 100644 .github/workflows/f5-cla.yml diff --git a/.github/workflows/f5-cla.yml b/.github/workflows/f5-cla.yml new file mode 100644 index 0000000000..de0dbc8a55 --- /dev/null +++ b/.github/workflows/f5-cla.yml @@ -0,0 +1,51 @@ +name: F5 CLA + +on: + issue_comment: + types: + - created + pull_request_target: + types: + - opened + - synchronize + - reopened + +concurrency: + group: ${{ github.ref_name }}-cla + +permissions: + contents: read + +jobs: + f5-cla: + name: F5 CLA + runs-on: ubuntu-22.04 + permissions: + actions: write + contents: read + pull-requests: write + statuses: write + steps: + - name: Run F5 Contributor License Agreement (CLA) assistant + if: (github.event.comment.body == 'recheck' || github.event.comment.body == 'I have hereby read the F5 CLA and agree to its terms') || github.event_name == 'pull_request_target' + uses: contributor-assistant/github-action@f41946747f85d28e9a738f4f38dbcc74b69c7e0e # v2.5.1 + with: + # Any pull request targeting the following branch will trigger a CLA check. + branch: "main" + # Path to the CLA document. + path-to-document: "https://github.com/f5/.github/blob/main/CLA/cla-markdown.md" + # Custom CLA messages. + custom-notsigned-prcomment: "🎉 Thank you for your contribution! It appears you have not yet signed the F5 Contributor License Agreement (CLA), which is required for your changes to be incorporated into an F5 Open Source Software (OSS) project. Please kindly read the [F5 CLA](https://github.com/f5/.github/blob/main/CLA/cla-markdown.md) and reply on a new comment with the following text to agree:" + custom-pr-sign-comment: "I have hereby read the F5 CLA and agree to its terms" + custom-allsigned-prcomment: "✅ All required contributors have signed the F5 CLA for this PR. Thank you!" + # Remote repository storing CLA signatures. + remote-organization-name: "f5" + remote-repository-name: "f5-cla-data" + path-to-signatures: "signatures/beta/signatures.json" + # Comma separated list of usernames for maintainers or any other individuals who should not be prompted for a CLA. + allowlist: bot* + # Do not lock PRs after a merge. + lock-pullrequest-aftermerge: false + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + PERSONAL_ACCESS_TOKEN: ${{ secrets.F5_CLA_TOKEN }} diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d8ec3cb01e..bc6bf43094 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -35,6 +35,15 @@ To suggest a feature or enhancement, please create an issue on GitHub with the l Note: if you'd like to implement a new feature, please consider creating a feature request issue first to start a discussion about the feature. +### F5 Contributor License Agreement (CLA) + +F5 requires all external contributors to agree to the terms of the F5 CLA (available [here](https://github.com/f5/.github/blob/main/CLA/cla-markdown.md)) +before any of their changes can be incorporated into an F5 Open Source repository. + +If you have not yet agreed to the F5 CLA terms and submit a PR to this repository, a bot will prompt you to view and +agree to the F5 CLA. You will have to agree to the F5 CLA terms through a comment in the PR before any of your changes +can be merged. Your agreement signature will be safely stored by F5 and no longer be required in future PRs. + ## Code Guidelines