diff --git a/actor/engine.go b/actor/engine.go index 4670a3a..8163f78 100644 --- a/actor/engine.go +++ b/actor/engine.go @@ -157,6 +157,7 @@ func (sr SendRepeater) start() { case <-ticker.C: sr.engine.SendWithSender(sr.target, sr.msg, sr.self) case <-sr.cancelch: + ticker.Stop() return } } diff --git a/actor/engine_test.go b/actor/engine_test.go index 0fc3d16..5d27a8c 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -135,10 +135,8 @@ func TestSendMsgRaceCon(t *testing.T) { for i := 0; i < 100; i++ { wg.Add(1) - go func() { - e.Send(pid, []byte("f")) - wg.Done() - }() + e.Send(pid, []byte("f")) + wg.Done() } wg.Wait() } @@ -231,8 +229,7 @@ func TestRequestResponse(t *testing.T) { // 56 ns/op func BenchmarkSendMessageLocal(b *testing.B) { e := NewEngine() - p := NewTestProducer(nil, func(_ *testing.T, _ *Context) {}) - pid := e.Spawn(p, "bench", WithInboxSize(1024*8)) + pid := e.SpawnFunc(func(_ *Context) {}, "bench", WithInboxSize(128)) b.ResetTimer() b.Run("send_message_local", func(b *testing.B) { diff --git a/actor/inbox.go b/actor/inbox.go index 43b0d25..19f581c 100644 --- a/actor/inbox.go +++ b/actor/inbox.go @@ -2,12 +2,36 @@ package actor import ( "runtime" + "sync/atomic" - "github.com/anthdm/hollywood/ggq" - "github.com/anthdm/hollywood/log" + "github.com/anthdm/hollywood/ringbuffer" ) -var LOCK_OS_THREAD = true +const defaultThroughput = 300 + +const ( + idle int32 = iota + running +) + +type Scheduler interface { + Schedule(fn func()) + Throughput() int +} + +type goscheduler int + +func (goscheduler) Schedule(fn func()) { + go fn() +} + +func (sched goscheduler) Throughput() int { + return int(sched) +} + +func NewScheduler(throughput int) Scheduler { + return goscheduler(throughput) +} type Inboxer interface { Send(Envelope) @@ -16,43 +40,56 @@ type Inboxer interface { } type Inbox struct { - ggq *ggq.GGQ[Envelope] - proc Processer + rb *ringbuffer.RingBuffer[Envelope] + proc Processer + scheduler Scheduler + procStatus int32 } func NewInbox(size int) *Inbox { - in := &Inbox{} - in.ggq = ggq.New[Envelope](uint32(size), in) - return in + return &Inbox{ + rb: ringbuffer.New[Envelope](int64(size)), + scheduler: NewScheduler(defaultThroughput), + } } -func (in *Inbox) Consume(msgs []Envelope) { - in.proc.Invoke(msgs) +func (in *Inbox) Send(msg Envelope) { + in.rb.Push(msg) + in.schedule() } -func (in *Inbox) Start(proc Processer) { - in.proc = proc - var lockOSThread bool - // prevent race condition here be reassigning before go routine. - if LOCK_OS_THREAD { - lockOSThread = true +func (in *Inbox) schedule() { + if atomic.CompareAndSwapInt32(&in.procStatus, idle, running) { + in.scheduler.Schedule(in.process) } - go func() { - if lockOSThread { - runtime.LockOSThread() +} + +func (in *Inbox) process() { + in.run() + atomic.StoreInt32(&in.procStatus, idle) +} + +func (in *Inbox) run() { + i, t := 0, in.scheduler.Throughput() + for { + if i > t { + i = 0 + runtime.Gosched() } - in.ggq.ReadN() - }() - log.Tracew("[INBOX] started", log.M{"pid": proc.PID()}) + i++ + + if msg, ok := in.rb.Pop(); ok { + in.proc.Invoke([]Envelope{msg}) + } else { + return + } + } } -func (in *Inbox) Stop() error { - in.ggq.Close() - log.Tracew("[INBOX] closed", log.M{"pid": in.proc.PID()}) - return nil +func (in *Inbox) Start(proc Processer) { + in.proc = proc } -func (in *Inbox) Send(msg Envelope) { - in.ggq.Awake() - in.ggq.Write(msg) +func (in *Inbox) Stop() error { + return nil } diff --git a/examples/helloworld/main.go b/examples/helloworld/main.go index 858cd68..36ab06c 100644 --- a/examples/helloworld/main.go +++ b/examples/helloworld/main.go @@ -2,7 +2,7 @@ package main import ( "fmt" - "time" + "sync" "github.com/anthdm/hollywood/actor" ) @@ -20,17 +20,21 @@ func newFoo() actor.Receiver { func (f *foo) Receive(ctx *actor.Context) { switch msg := ctx.Message().(type) { case actor.Started: - fmt.Println("foo started") + fmt.Println("actor started") + case actor.Stopped: + fmt.Println("actor stopped") case *message: - fmt.Println("foo has received", msg.data) + fmt.Println("actor has received", msg.data) } } func main() { engine := actor.NewEngine() - pid := engine.Spawn(newFoo, "foo") - for i := 0; i < 99; i++ { + pid := engine.Spawn(newFoo, "my_actor") + for i := 0; i < 100; i++ { engine.Send(pid, &message{data: "hello world!"}) } - time.Sleep(time.Second * 1) + wg := sync.WaitGroup{} + engine.Poison(pid, &wg) + wg.Wait() } diff --git a/examples/ttt/main.go b/examples/ttt/main.go new file mode 100644 index 0000000..9c9a1f4 --- /dev/null +++ b/examples/ttt/main.go @@ -0,0 +1,28 @@ +package main + +import ( + "fmt" + "sync" + "time" + + "github.com/anthdm/hollywood/actor" +) + +func main() { + e := actor.NewEngine() + pid := e.SpawnFunc(func(c *actor.Context) { + switch msg := c.Message().(type) { + case actor.Started: + fmt.Println("started") + case actor.Stopped: + fmt.Println("stopped") + default: + _ = msg + } + }, "foobarbas") + + wg := sync.WaitGroup{} + e.Poison(pid, &wg) + wg.Wait() + time.Sleep(time.Second * 2) +} diff --git a/ggq/ggq.go b/ggq/ggq.go deleted file mode 100644 index 75e0148..0000000 --- a/ggq/ggq.go +++ /dev/null @@ -1,171 +0,0 @@ -package ggq - -import ( - "runtime" - "sync" - "sync/atomic" - "unsafe" - - "github.com/anthdm/hollywood/log" -) - -type Consumer[T any] interface { - Consume([]T) -} - -const cacheLinePadding = 64 - -const ( - slotEmpty = iota - slotBusy - slotCommitted -) - -const ( - stateRunning = iota - stateClosed -) - -type slot[T any] struct { - item T - atomic.Uint32 -} - -type GGQ[T any] struct { - _ [cacheLinePadding]byte - written atomic.Uint32 - _ [cacheLinePadding - unsafe.Sizeof(atomic.Uint32{})]byte - read atomic.Uint32 - _ [cacheLinePadding - unsafe.Sizeof(atomic.Uint32{})]byte - state atomic.Uint32 - _ [cacheLinePadding - unsafe.Sizeof(atomic.Uint32{})]byte - isIdling atomic.Bool - _ [cacheLinePadding - unsafe.Sizeof(atomic.Bool{})]byte - buffer []slot[T] - _ [cacheLinePadding]byte - mask uint32 - consumer Consumer[T] - itemBuffer []T - cond *sync.Cond -} - -func New[T any](size uint32, consumer Consumer[T]) *GGQ[T] { - if !isPOW2(size) { - log.Fatalw("the size of the queue need to be a number that is the power of 2", log.M{}) - } - return &GGQ[T]{ - buffer: make([]slot[T], size), - mask: size - 1, - consumer: consumer, - itemBuffer: make([]T, size+1), - cond: sync.NewCond(nil), - } -} - -func (q *GGQ[T]) Write(val T) { - slot := &q.buffer[q.written.Add(1)&q.mask] - for !slot.CompareAndSwap(slotEmpty, slotBusy) { - switch slot.Load() { - case slotBusy, slotCommitted: - runtime.Gosched() - case slotEmpty: - continue - } - } - slot.item = val - slot.Store(slotCommitted) -} - -func (q *GGQ[T]) ReadN() (T, bool) { - var lower, upper uint32 - current := q.read.Load() - for { - lower = current + 1 - upper = q.written.Load() - if lower <= upper { - q.Consume(lower, upper) - q.read.Store(upper) - current = upper - runtime.Gosched() - } else if upper := q.written.Load(); lower <= upper { - runtime.Gosched() - } else if !q.state.CompareAndSwap(stateClosed, stateRunning) { - var mu sync.Mutex - q.cond.L = &mu - q.isIdling.Store(true) - mu.Lock() - q.cond.Wait() - mu.Unlock() - q.isIdling.Store(false) - } else { - break - } - } - var t T - return t, true -} - -// Awake the queue if its in the idle state. -func (q *GGQ[T]) Awake() { - if q.isIdling.Load() { - q.cond.Signal() - } -} - -func (q *GGQ[T]) IsIdle() bool { - return q.isIdling.Load() -} - -func (q *GGQ[T]) Consume(lower, upper uint32) { - consumed := 0 - for ; lower <= upper; lower++ { - slot := &q.buffer[lower&q.mask] - for !slot.CompareAndSwap(slotCommitted, slotBusy) { - switch slot.Load() { - case slotBusy: - runtime.Gosched() - case slotCommitted: - continue - } - } - q.itemBuffer[consumed] = slot.item - slot.Store(slotEmpty) - consumed++ - } - q.consumer.Consume(q.itemBuffer[:consumed]) -} - -// ReadN gives way better performance, due to batching messages with -// lock os thread. -func (q *GGQ[T]) Read() (T, bool) { - slot := &q.buffer[q.read.Add(1)&q.mask] - for !slot.CompareAndSwap(slotCommitted, slotBusy) { - switch slot.Load() { - case slotBusy: - runtime.Gosched() - case slotEmpty: - if q.state.CompareAndSwap(stateClosed, stateRunning) { - var t T - return t, true - } - runtime.Gosched() - case slotCommitted: - continue - } - } - item := slot.item - slot.Store(slotEmpty) - return item, false -} - -func (q *GGQ[T]) Close() { - q.state.Store(stateClosed) - q.cond.Signal() -} - -func isPOW2(n uint32) bool { - if n <= 0 { - return false - } - return (n & (n - 1)) == 0 -} diff --git a/ggq/ggq_test.go b/ggq/ggq_test.go deleted file mode 100644 index 4e8f6b8..0000000 --- a/ggq/ggq_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package ggq - -import ( - "fmt" - "testing" -) - -type consumer[T any] struct{} - -func (c *consumer[T]) Consume(t []T) { - fmt.Println(t) -} - -func TestSingleMessageNotConsuming(t *testing.T) { - q := New[int](1024, &consumer[int]{}) - - q.cond.Signal() - for i := 0; i < 10; i++ { - q.Write(i) - } - q.Close() - - q.ReadN() -} - -// 35 ns/op 0 B/op 0 allocs/op -func BenchmarkReadN(b *testing.B) { - q := New[int](1024*12, &consumer[int]{}) - go func() { - for i := 0; i < b.N; i++ { - q.Write(i) - } - q.Close() - }() - - q.ReadN() -} - -func BenchmarkRead(b *testing.B) { - q := New[int](1024*12, &consumer[int]{}) - - go func() { - for i := 0; i < b.N; i++ { - q.Write(i) - } - q.Close() - }() - - for { - _, closed := q.Read() - if closed { - break - } - } -} diff --git a/remote/remote_test.go b/remote/remote_test.go index cbd42c5..68fbaeb 100644 --- a/remote/remote_test.go +++ b/remote/remote_test.go @@ -64,14 +64,6 @@ func TestWithSender(t *testing.T) { } func TestRequestResponse(t *testing.T) { - // NOTE: Its important when running to remote on the same binary - // which in the real world will never happen, we need to unlock the OS thread - // or we will have context deadlines exceeds on request responses. - // Hence, for testing this we need to set it to 0 - actor.LOCK_OS_THREAD = false - defer func() { - actor.LOCK_OS_THREAD = true - }() var ( a = makeRemoteEngine("127.0.0.1:4001") b = makeRemoteEngine("127.0.0.1:5001") diff --git a/ringbuffer/ringbuffer.go b/ringbuffer/ringbuffer.go new file mode 100644 index 0000000..0bd7522 --- /dev/null +++ b/ringbuffer/ringbuffer.go @@ -0,0 +1,93 @@ +package ringbuffer + +import ( + "sync" + "sync/atomic" +) + +type buffer[T any] struct { + items []T + head, tail, mod int64 +} + +type RingBuffer[T any] struct { + len int64 + content *buffer[T] + mu sync.Mutex +} + +func New[T any](size int64) *RingBuffer[T] { + return &RingBuffer[T]{ + content: &buffer[T]{ + items: make([]T, size), + head: 0, + tail: 0, + mod: size, + }, + len: 0, + } +} + +func (rb *RingBuffer[T]) Push(item T) { + rb.mu.Lock() + rb.content.tail = (rb.content.tail + 1) % rb.content.mod + if rb.content.tail == rb.content.head { + size := rb.content.mod * 2 + newBuff := make([]T, size) + for i := int64(0); i < rb.content.mod; i++ { + idx := (rb.content.tail + i) % rb.content.mod + newBuff[i] = rb.content.items[idx] + } + content := &buffer[T]{ + items: newBuff, + head: 0, + tail: rb.content.mod, + mod: size, + } + rb.content = content + } + atomic.AddInt64(&rb.len, 1) + rb.content.items[rb.content.tail] = item + rb.mu.Unlock() +} + +func (rb *RingBuffer[T]) Len() int64 { + return atomic.LoadInt64(&rb.len) +} + +func (rb *RingBuffer[T]) Pop() (T, bool) { + if rb.Len() == 0 { + var t T + return t, false + } + rb.mu.Lock() + rb.content.head = (rb.content.head + 1) % rb.content.mod + item := rb.content.items[rb.content.head] + var t T + rb.content.items[rb.content.head] = t + atomic.AddInt64(&rb.len, -1) + rb.mu.Unlock() + return item, true +} + +func (rb *RingBuffer[T]) PopN(n int64) ([]T, bool) { + rb.mu.Lock() + content := rb.content + + if n >= rb.len { + n = rb.len + } + atomic.AddInt64(&rb.len, -n) + + items := make([]T, n) + for i := int64(0); i < n; i++ { + pos := (content.head + 1 + i) % content.mod + items[i] = content.items[pos] + var t T + content.items[pos] = t + } + content.head = (content.head + n) % content.mod + + rb.mu.Unlock() + return items, true +} diff --git a/ringbuffer/ringbuffer_test.go b/ringbuffer/ringbuffer_test.go new file mode 100644 index 0000000..c352732 --- /dev/null +++ b/ringbuffer/ringbuffer_test.go @@ -0,0 +1,39 @@ +package ringbuffer + +import ( + "testing" +) + +type Item struct { + i int +} + +func TestPushPop(t *testing.T) { + rb := New[Item](1024) + for i := 0; i < 5000; i++ { + rb.Push(Item{i}) + item, ok := rb.Pop() + if ok { + if item.i != i { + t.Fatal("invalid item popped") + } + } + } +} + +func TestPushPopN(t *testing.T) { + rb := New[Item](1024) + n := 5000 + for i := 0; i < n; i++ { + rb.Push(Item{i}) + } + items, ok := rb.PopN(int64(n)) + if !ok { + t.Fatal("expected to pop many items") + } + for i := 0; i < n; i++ { + if items[i].i != i { + t.Fatal("invalid item popped") + } + } +}