Skip to content

Commit

Permalink
Race conditions fixes (#810)
Browse files Browse the repository at this point in the history
Added race conditions fixes to Agent v2 codebase
  • Loading branch information
oliveromahony authored Sep 11, 2024
1 parent 9b5a0a0 commit de63c35
Show file tree
Hide file tree
Showing 44 changed files with 295 additions and 228 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ jobs:
name: official-oss-integration-test-logs-${{ matrix.container.image }}-${{ matrix.container.version }}
path: /tmp/integration-test-logs/
retention-days: 3

official-plus-image-integration-tests:
name: Integration Tests - Official Plus Images
needs: build-unsigned-snapshot
Expand Down
2 changes: 1 addition & 1 deletion nginx-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ config_dirs: "/etc/nginx:/usr/local/etc/nginx:/usr/share/nginx/modules:/etc/nms"
# host: 127.0.0.1
#
# Set this value to a secure port number to prevent information leaks.
# port: 8038
# port: 8038
2 changes: 2 additions & 0 deletions sdk/client/metric_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func (r *metricReporter) createClient() error {
}

func (r *metricReporter) Close() (err error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.closeConnection()
}

Expand Down
5 changes: 5 additions & 0 deletions sdk/config_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/nginx/agent/sdk/v2/backoff"
Expand All @@ -43,6 +44,8 @@ const (
httpClientTimeout = 1 * time.Second
)

var readLock = sync.Mutex{}

type DirectoryMap struct {
paths map[string]*proto.Directory
}
Expand Down Expand Up @@ -113,6 +116,7 @@ func GetNginxConfigWithIgnoreDirectives(
allowedDirectories map[string]struct{},
ignoreDirectives []string,
) (*proto.NginxConfig, error) {
readLock.Lock()
payload, err := crossplane.Parse(confFile,
&crossplane.ParseOptions{
IgnoreDirectives: ignoreDirectives,
Expand Down Expand Up @@ -142,6 +146,7 @@ func GetNginxConfigWithIgnoreDirectives(
if err != nil {
return nil, fmt.Errorf("error assemble payload from %s, error: %s", confFile, err)
}
readLock.Unlock()

return nginxConfig, nil
}
Expand Down
6 changes: 5 additions & 1 deletion src/core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"reflect"
"sort"
"strings"
"sync"
"time"

agent_config "github.com/nginx/agent/sdk/v2/agent/config"
Expand Down Expand Up @@ -50,6 +51,7 @@ const (
var (
Viper = viper.NewWithOptions(viper.KeyDelimiter(agent_config.KeyDelimiter))
MigratedEnv = false
cfgMu = sync.Mutex{}
)

func SetVersion(version, commit string) {
Expand Down Expand Up @@ -196,7 +198,7 @@ func RegisterRunner(r func(cmd *cobra.Command, args []string)) {
}

func GetConfig(clientId string) (*Config, error) {
extensions := []string{}
var extensions []string

for _, extension := range Viper.GetStringSlice(agent_config.ExtensionsKey) {
if agent_config.IsKnownExtension(extension) {
Expand Down Expand Up @@ -247,6 +249,8 @@ func GetConfig(clientId string) (*Config, error) {
// overwritten or not.
func UpdateAgentConfig(systemId string, updateTags []string, updateFeatures []string) (bool, error) {
// Get current config on disk
cfgMu.Lock()
defer cfgMu.Unlock()
config, err := GetConfig(systemId)
if err != nil {
log.Errorf("Failed to register config: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions src/core/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestGetConfig(t *testing.T) {

assert.Equal(t, []string{}, config.Tags)
assert.Equal(t, Defaults.Features, config.Features)
assert.Equal(t, []string{}, config.Extensions)
assert.Equal(t, []string(nil), config.Extensions)
})

t.Run("test override defaults with flags", func(t *testing.T) {
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestGetConfig(t *testing.T) {
assert.Equal(t, Defaults.AgentMetrics.Mode, config.AgentMetrics.Mode)
assert.Equal(t, 10*time.Minute, config.AgentMetrics.Backoff.MaxInterval)
assert.Equal(t, Defaults.Features, config.Features)
assert.Equal(t, []string{}, config.Extensions)
assert.Equal(t, []string(nil), config.Extensions)
})

t.Run("test override config values with ENV variables", func(t *testing.T) {
Expand Down
8 changes: 6 additions & 2 deletions src/core/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"regexp"
"runtime"
"strings"
"sync"
"syscall"

"github.com/google/uuid"
Expand All @@ -43,7 +44,6 @@ import (
//go:generate mv fake_environment_fixed.go fake_environment_test.go
type Environment interface {
NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo
// NewHostInfoWithContext(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo
GetHostname() (hostname string)
GetSystemUUID() (hostId string)
ReadDirectory(dir string, ext string) ([]string, error)
Expand All @@ -70,6 +70,7 @@ type EnvironmentType struct {
host *proto.HostInfo
virtualizationFunc func(ctx context.Context) (string, string, error)
isContainerFunc func() bool
hostMu sync.Mutex
}

type Process struct {
Expand Down Expand Up @@ -106,6 +107,7 @@ const (
IsContainerKey = "isContainer"
GetContainerIDKey = "GetContainerID"
GetSystemUUIDKey = "GetSystemUUIDKey"
ReleaseInfoFile = "/etc/os-release"
)

var (
Expand All @@ -125,6 +127,8 @@ func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, con

func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo {
defer ctx.Done()
env.hostMu.Lock()
defer env.hostMu.Unlock()
// temp cache measure
if env.host == nil || clearCache {
hostInformation, err := host.InfoWithContext(ctx)
Expand Down Expand Up @@ -154,7 +158,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer
Partitons: disks,
Network: env.networks(),
Processor: env.processors(hostInformation.KernelArch),
Release: releaseInfo("/etc/os-release"),
Release: releaseInfo(ReleaseInfoFile),
Tags: *tags,
AgentAccessibleDirs: configDirs,
}
Expand Down
3 changes: 0 additions & 3 deletions src/core/metrics/sources/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/metrics"
log "github.com/sirupsen/logrus"
)

const MOUNT_POINT = "mount_point"
Expand Down Expand Up @@ -49,8 +48,6 @@ func (c *Disk) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrapper
"in_use": float64(usage.UsedPercentage),
})

log.Debugf("disk metrics collected: %v", len(simpleMetrics))

select {
case <-ctx.Done():
return
Expand Down
5 changes: 3 additions & 2 deletions src/core/metrics/sources/net_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"fmt"
"sync"

log "github.com/sirupsen/logrus"

"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core"
"github.com/nginx/agent/v2/src/core/metrics"
"github.com/shirou/gopsutil/v3/net"
log "github.com/sirupsen/logrus"
)

const NETWORK_INTERFACE = "network_interface"
Expand Down Expand Up @@ -82,7 +83,6 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap
}

simpleMetrics := nio.convertSamplesToSimpleMetrics(v)
log.Debugf("net IO stats count: %d", len(simpleMetrics))

select {
case <-ctx.Done():
Expand All @@ -109,6 +109,7 @@ func (nio *NetIO) Collect(ctx context.Context, m chan<- *metrics.StatsEntityWrap
simpleMetrics := nio.convertSamplesToSimpleMetrics(totalStats)
m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{}, simpleMetrics, proto.MetricsReport_SYSTEM)

log.Debugf("net IO stats: %v", currentNetIOStats)
nio.netIOStats = currentNetIOStats
}

Expand Down
5 changes: 2 additions & 3 deletions src/core/metrics/sources/nginx_access_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func (c *NginxAccessLog) Stop() {
fn()
delete(c.logs, f)
}
log.Debugf("Stopping NginxAccessLog source for nginx id: %v", c.baseDimensions.NginxId)
}

func (c *NginxAccessLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) {
Expand Down Expand Up @@ -329,11 +328,11 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string
mu.Unlock()

case <-tick.C:
mu.Lock()

c.baseDimensions.NginxType = c.nginxType
c.baseDimensions.PublishedAPI = logFile

mu.Lock()

if len(requestLengths) > 0 {
httpCounters["request.length"] = getAverageMetricValue(requestLengths)
}
Expand Down
6 changes: 3 additions & 3 deletions src/core/metrics/sources/nginx_error_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func (c *NginxErrorLog) Stop() {

func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *metrics.NginxCollectorConfig) {
c.mu.Lock()
defer c.mu.Unlock()

c.baseDimensions = dimensions

Expand All @@ -129,6 +128,7 @@ func (c *NginxErrorLog) Update(dimensions *metrics.CommonDim, collectorConf *met
// add, remove or update existing log trailers
c.syncLogs()
}
c.mu.Unlock()
}

func (c *NginxErrorLog) recreateLogs() {
Expand Down Expand Up @@ -178,7 +178,7 @@ func (c *NginxErrorLog) stopTailer(logFile string, cancelFunction context.Cancel
delete(c.logFormats, logFile)
}

func (c *NginxErrorLog) collectLogStats(ctx context.Context, m chan<- *metrics.StatsEntityWrapper) {
func (c *NginxErrorLog) collectLogStats(_ context.Context, m chan<- *metrics.StatsEntityWrapper) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -227,10 +227,10 @@ func (c *NginxErrorLog) logStats(ctx context.Context, logFile string) {
mu.Unlock()

case <-tick.C:
mu.Lock()
c.baseDimensions.NginxType = c.nginxType
c.baseDimensions.PublishedAPI = logFile

mu.Lock()
simpleMetrics := c.convertSamplesToSimpleMetrics(counters)
log.Tracef("Error log metrics collected: %v", simpleMetrics)

Expand Down
Loading

0 comments on commit de63c35

Please sign in to comment.