Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix // Fix race condition in Pop & PopN operation of ring buffer #177

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions ringbuffer/ringbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ func (rb *RingBuffer[T]) Len() int64 {
}

func (rb *RingBuffer[T]) Pop() (T, bool) {
if rb.Len() == 0 {
rb.mu.Lock()
if rb.len == 0 {
Copy link
Contributor

@tprifti tprifti Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we check for the ringbuffer length twice here, 1st time using rb.Len(), and second time using rb.len

func (rb *RingBuffer[T]) Pop() (T, bool) {
	rb.mu.Lock() // lock here to avoid reading length 2 times
	if rb.Len() == 0 {
		rb.mu.Unlock()
		var t T
		return t, false
	}
	rb.content.head = (rb.content.head + 1) % rb.content.mod
	item := rb.content.items[rb.content.head]
	var t T
	rb.content.items[rb.content.head] = t
	atomic.AddInt64(&rb.len, -1)
	rb.mu.Unlock()
	return item, true
}

Copy link
Author

@mapogolions mapogolions Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's called double-checked locking pattern. There are usually two options we can use:

  1. Lock immediately:
rb.mu.Lock()
if rb.len == 0 {
	rb.mu.Unlock()
	var t T
	return t, false
}
  1. Use double-checked locking:
if rb.Len() == 0 { // As far as I understand, we read atomically to prevent tearing read of int64.
	var t T
	return t, false
}
rb.mu.Lock()
if rb.len == 0 {
	rb.mu.Unlock()
	var t T
	return t, false
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would prefer the first option, since it's bit more readable and we do not need to check the rb length twice.

Good job for detecting the issue!

rb.mu.Unlock()
var t T
return t, false
}
rb.mu.Lock()
rb.content.head = (rb.content.head + 1) % rb.content.mod
item := rb.content.items[rb.content.head]
var t T
Expand All @@ -71,10 +72,11 @@ func (rb *RingBuffer[T]) Pop() (T, bool) {
}

func (rb *RingBuffer[T]) PopN(n int64) ([]T, bool) {
if rb.Len() == 0 {
rb.mu.Lock()
if rb.len == 0 {
Copy link
Contributor

@tprifti tprifti Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing here:

func (rb *RingBuffer[T]) PopN(n int64) ([]T, bool) {
	rb.mu.Lock()
	if rb.Len() == 0 {
		rb.mu.Unlock()
		return nil, false
	}
	content := rb.content
	if n >= rb.len {
		n = rb.len
	}
	atomic.AddInt64(&rb.len, -n)

	items := make([]T, n)
	for i := int64(0); i < n; i++ {
		pos := (content.head + 1 + i) % content.mod
		items[i] = content.items[pos]
		var t T
		content.items[pos] = t
	}
	content.head = (content.head + n) % content.mod

	rb.mu.Unlock()
	return items, true
}

rb.mu.Unlock()
return nil, false
}
rb.mu.Lock()
content := rb.content

if n >= rb.len {
Expand Down
56 changes: 56 additions & 0 deletions ringbuffer/ringbuffer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ringbuffer

import (
"sync"
"sync/atomic"
"testing"
)

Expand Down Expand Up @@ -37,3 +39,57 @@ func TestPushPopN(t *testing.T) {
}
}
}

func TestPopThreadSafety(t *testing.T) {
t.Run("Pop should be thread-safe", func(t *testing.T) {
testCase := func() {
rb := New[int](4)
rb.Push(1)
wg := sync.WaitGroup{}
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
rb.Pop()
}()
}
wg.Wait()
if rb.Len() == -1 {
t.Fatal("item popped twice")
}
}

// Increase the number of iterations to raise the likelihood of reproducing the race condition
for i := 0; i < 100_000; i++ {
testCase()
}
})

t.Run("PopN should be thread-safe", func(t *testing.T) {
testCase := func() {
rb := New[int](4)
rb.Push(1)
counter := atomic.Int32{}
wg := sync.WaitGroup{}
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, ok := rb.PopN(1)
if ok {
counter.Add(1)
}
}()
}
wg.Wait()
if counter.Load() > 1 {
t.Fatal("false positive item removal")
}
}

// Increase the number of iterations to raise the likelihood of reproducing the race condition
for i := 0; i < 100_000; i++ {
testCase()
}
})
}