Skip to content

Commit

Permalink
Cache RAM capacity (/proc/meminfo) readings and CAS when the cache is…
Browse files Browse the repository at this point in the history
… expired (#63)

* feat: use cap() instead of len() to compute the spooling threshold

* feat: if the buffer is above the s.maxInMemorySize, don't return it in the pool but just discard it

* fix: compute the len of the buffer AND the cap when deciding to spool

* feat: add a CAS cache that stores a cached reading of meminfo to avoid hammering the file with reads

* fix: TestThresholdCrossing as a buffer grow is expressed in a power of 2 and the buffer.cap() check when writing was making the test fail

* feat: also discard the buffer pointer if it's cap exceeded memory limit when closing
  • Loading branch information
equals215 authored Jan 20, 2025
1 parent 64c28f2 commit 97a7c50
Showing 1 changed file with 96 additions and 40 deletions.
136 changes: 96 additions & 40 deletions pkg/spooledtempfile/spooled.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)

// MaxInMemorySize is the max number of bytes (currently 1MB)
Expand All @@ -20,6 +22,18 @@ const MaxInMemorySize = 1024 * 1024
// we'll force spooling to disk. For example, 0.5 = 50%.
const DefaultMaxRAMUsageFraction = 0.50

// Constant defining how often we check system memory usage.
const memoryCheckInterval = 500 * time.Millisecond

// globalMemoryCache is a struct representing global cache of memory usage data.
type memoryUsageData struct {
lastChecked time.Time
lastFraction float64
}

// memoryUsageCache is an atomic pointer to memoryUsageData.
var memoryUsageCache atomic.Pointer[memoryUsageData]

var spooledPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
Expand Down Expand Up @@ -266,52 +280,94 @@ func (s *spooledTempFile) isSystemMemoryUsageHigh() bool {
// how much memory is used vs total. Returns fraction = used / total
// This is a Linux-specific implementation.
// This function is defined as a variable so it can be overridden in tests.
// Now includes lock-free CAS caching to avoid hammering /proc/meminfo on every call.
var getSystemMemoryUsedFraction = func() (float64, error) {
f, err := os.Open("/proc/meminfo")
if err != nil {
return 0, err
}
defer f.Close()

// We look for MemTotal, MemAvailable (or MemFree if MemAvailable is missing)
var memTotal, memAvailable, memFree, buffers, cached uint64
for {
// Atomically load the current pointer.
oldPtr := memoryUsageCache.Load()
if oldPtr != nil {
data := *oldPtr
// If it's still fresh, just return it.
if time.Since(data.lastChecked) < memoryCheckInterval {
return data.lastFraction, nil
}
}

scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
fields := strings.Fields(line)
if len(fields) < 2 {
// Data is nil or stale -> we attempt to refresh.
// But first, double-check if someone else already updated
// between the time we loaded oldPtr and now.
againPtr := memoryUsageCache.Load()
if againPtr != oldPtr {
// Another goroutine already updated it => just loop again
// so we can use the fresh data. (No need to read /proc/meminfo.)
continue
}
key := strings.TrimRight(fields[0], ":")
value, _ := strconv.ParseUint(fields[1], 10, 64)
// value is typically in kB
switch key {
case "MemTotal":
memTotal = value
case "MemAvailable":
memAvailable = value
case "MemFree":
memFree = value
case "Buffers":
buffers = value
case "Cached":
cached = value

// We're the winners and need to refresh the data.
f, err := os.Open("/proc/meminfo")
if err != nil {
// If we cannot open /proc/meminfo, return an error
// or fallback if you prefer
return 0, fmt.Errorf("failed to open /proc/meminfo: %v", err)
}
defer f.Close()

var memTotal, memAvailable, memFree, buffers, cached uint64
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
fields := strings.Fields(line)
if len(fields) < 2 {
continue
}
key := strings.TrimRight(fields[0], ":")
value, _ := strconv.ParseUint(fields[1], 10, 64)
// value is typically in kB
switch key {
case "MemTotal":
memTotal = value
case "MemAvailable":
memAvailable = value
case "MemFree":
memFree = value
case "Buffers":
buffers = value
case "Cached":
cached = value
}
}
if err := scanner.Err(); err != nil {
return 0, fmt.Errorf("scanner error reading /proc/meminfo: %v", err)
}
}

if memTotal == 0 {
return 0, fmt.Errorf("could not find MemTotal in /proc/meminfo")
}
if memTotal == 0 {
return 0, fmt.Errorf("could not find MemTotal in /proc/meminfo")
}

// If MemAvailable is present (Linux 3.14+), we can directly use it:
if memAvailable > 0 {
used := memTotal - memAvailable
return float64(used) / float64(memTotal), nil
}
var used uint64
if memAvailable > 0 {
// Linux 3.14+ has MemAvailable for better measure
used = memTotal - memAvailable
} else {
// Approximate available as free + buffers + cached
approxAvailable := memFree + buffers + cached
used = memTotal - approxAvailable
}

fraction := float64(used) / float64(memTotal)

newData := &memoryUsageData{
lastChecked: time.Now(),
lastFraction: fraction,
}

// Otherwise, approximate "available" as free+buffers+cached
approxAvailable := memFree + buffers + cached
used := memTotal - approxAvailable
return float64(used) / float64(memTotal), nil
// CAS to store the new data (only if oldPtr is still valid).
swapped := memoryUsageCache.CompareAndSwap(oldPtr, newData)
if swapped {
// We successfully updated => return the fresh fraction.
return fraction, nil
}
// If swap fails, it means another goroutine beat us to it.
// So we just loop around, load their data, and return that.
}
}

0 comments on commit 97a7c50

Please sign in to comment.