Skip to content

Commit

Permalink
feat(watcher): add goroutine to check for disruptions
Browse files Browse the repository at this point in the history
It will be triggered based on a ticker, defined by a config
called DisruptionWatcherIntervalSeconds. It will get the
number of occupied rooms and call the runtime func to handle
disruption mitigation. Also, added a config to runtimeWatcher.
  • Loading branch information
hspedro committed Mar 13, 2024
1 parent 111e7cf commit cfe4ec6
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 21 deletions.
13 changes: 11 additions & 2 deletions cmd/runtimewatcher/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/topfreegames/maestro/internal/core/services/events"
"github.com/topfreegames/maestro/internal/core/services/workers"
"github.com/topfreegames/maestro/internal/core/worker"
workerconfigs "github.com/topfreegames/maestro/internal/core/worker/config"
"github.com/topfreegames/maestro/internal/core/worker/runtimewatcher"
"github.com/topfreegames/maestro/internal/service"
)
Expand All @@ -42,16 +43,24 @@ func provideRuntimeWatcherBuilder() *worker.WorkerBuilder {
}
}

func provideRuntimeWatcherConfig(c config.Config) *workerconfigs.RuntimeWatcherConfig {
return &workerconfigs.RuntimeWatcherConfig{
DisruptionWorkerIntervalSeconds: c.GetDuration("runtimeWatcher.disruptionWorker.intervalSeconds"),
DisruptionSafetyPercentage: c.GetFloat64("runtimeWatcher.disruptionWorker.safetyPercentage"),
}
}

var WorkerOptionsSet = wire.NewSet(
service.NewRuntimeKubernetes,
service.NewRoomStorageRedis,
RoomManagerSet,
wire.Struct(new(worker.WorkerOptions), "RoomManager", "Runtime"))
provideRuntimeWatcherConfig,
wire.Struct(new(worker.WorkerOptions), "Runtime", "RoomStorage", "RoomManager", "RuntimeWatcherConfig"))

var RoomManagerSet = wire.NewSet(
service.NewSchedulerStoragePg,
service.NewClockTime,
service.NewPortAllocatorRandom,
service.NewRoomStorageRedis,
service.NewGameRoomInstanceStorageRedis,
service.NewSchedulerCacheRedis,
service.NewRoomManagerConfig,
Expand Down
28 changes: 20 additions & 8 deletions cmd/runtimewatcher/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ workers:
reporter:
metrics:
intervalMillis: 10000
runtimeWatcher:
disruptionWatcher:
intervalSeconds: 5

services:
roomManager:
Expand Down
30 changes: 30 additions & 0 deletions internal/core/worker/config/runtime_watcher_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// MIT License
//
// Copyright (c) 2021 TFG Co
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package config

import "time"

type RuntimeWatcherConfig struct {
DisruptionWorkerIntervalSeconds time.Duration
DisruptionSafetyPercentage float64
}
62 changes: 62 additions & 0 deletions internal/core/worker/runtimewatcher/runtime_watcher_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/topfreegames/maestro/internal/core/logs"
"github.com/topfreegames/maestro/internal/core/worker/config"

"github.com/topfreegames/maestro/internal/core/entities"
"github.com/topfreegames/maestro/internal/core/entities/game_room"
Expand All @@ -46,22 +48,26 @@ const WorkerName = "runtime_watcher"
type runtimeWatcherWorker struct {
scheduler *entities.Scheduler
roomManager ports.RoomManager
roomStorage ports.RoomStorage
// TODO(gabrielcorado): should we access the port directly? do we need to
// provide the same `Watcher` interface but on the RoomManager?
runtime ports.Runtime
logger *zap.Logger
ctx context.Context
cancelFunc context.CancelFunc
config *config.RuntimeWatcherConfig
workerWaitGroup *sync.WaitGroup
}

func NewRuntimeWatcherWorker(scheduler *entities.Scheduler, opts *worker.WorkerOptions) worker.Worker {
return &runtimeWatcherWorker{
scheduler: scheduler,
roomManager: opts.RoomManager,
roomStorage: opts.RoomStorage,
runtime: opts.Runtime,
logger: zap.L().With(zap.String(logs.LogFieldServiceName, WorkerName), zap.String(logs.LogFieldSchedulerName, scheduler.Name)),
workerWaitGroup: &sync.WaitGroup{},
config: opts.RuntimeWatcherConfig,
}
}

Expand Down Expand Up @@ -94,10 +100,66 @@ func (w *runtimeWatcherWorker) spawnUpdateRoomWatchers(resultChan chan game_room
}
}

func (w *runtimeWatcherWorker) mitigateDisruptions() error {
occupiedRoomsAmount, err := w.roomStorage.GetRoomCountByStatus(w.ctx, w.scheduler.Name, game_room.GameStatusOccupied)
if err != nil {
w.logger.Error(
"failed to get occupied rooms for scheduler",
zap.String("scheduler", w.scheduler.Name),
zap.Error(err),
)
return err
}
err = w.runtime.MitigateDisruption(w.ctx, w.scheduler, occupiedRoomsAmount, w.config.DisruptionSafetyPercentage)
if err != nil {
w.logger.Error(
"failed to mitigate disruption",
zap.String("scheduler", w.scheduler.Name),
zap.Error(err),
)
return err
}
w.logger.Debug(
"mitigated disruption for occupied rooms",
zap.String("scheduler", w.scheduler.Name),
zap.Int("occupiedRooms", occupiedRoomsAmount),
)

return nil
}

func (w *runtimeWatcherWorker) spawnDisruptionWatcher() {
w.workerWaitGroup.Add(1)

go func() {
defer w.workerWaitGroup.Done()
ticker := time.NewTicker(time.Second * w.config.DisruptionWorkerIntervalSeconds)
defer ticker.Stop()

for {
select {
case <-ticker.C:
err := w.mitigateDisruptions()
if err != nil {
w.logger.Warn(
"Mitigate Disruption watcher run failed",
zap.String("scheduler", w.scheduler.Name),
zap.Error(err),
)
}
case <-w.ctx.Done():
return
}
}

}()
}

func (w *runtimeWatcherWorker) spawnWatchers(
resultChan chan game_room.InstanceEvent,
) {
w.spawnUpdateRoomWatchers(resultChan)
w.spawnDisruptionWatcher()
}

func (w *runtimeWatcherWorker) Start(ctx context.Context) error {
Expand Down
33 changes: 22 additions & 11 deletions internal/core/worker/runtimewatcher/runtime_watcher_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,26 @@ import (
"github.com/topfreegames/maestro/internal/core/entities"
"github.com/topfreegames/maestro/internal/core/entities/game_room"
porterrors "github.com/topfreegames/maestro/internal/core/ports/errors"
"github.com/topfreegames/maestro/internal/core/ports/mock"
mockports "github.com/topfreegames/maestro/internal/core/ports/mock"
"github.com/topfreegames/maestro/internal/core/worker"
"github.com/topfreegames/maestro/internal/core/worker/config"
)

func workerOptions(t *testing.T) (*gomock.Controller, *mockports.MockRuntime, *mockports.MockRoomManager, *worker.WorkerOptions) {
func workerOptions(t *testing.T) (*gomock.Controller, *mockports.MockRuntime, *mockports.MockRoomManager, *mockports.MockRoomStorage, *worker.WorkerOptions) {
mockCtrl := gomock.NewController(t)

runtime := mockports.NewMockRuntime(mockCtrl)
roomManager := mockports.NewMockRoomManager(mockCtrl)
roomStorage := mock.NewMockRoomStorage(mockCtrl)

return mockCtrl, runtime, roomManager, &worker.WorkerOptions{
return mockCtrl, runtime, roomManager, roomStorage, &worker.WorkerOptions{
Runtime: runtime,
RoomManager: roomManager,
RoomStorage: roomStorage,
RuntimeWatcherConfig: &config.RuntimeWatcherConfig{
DisruptionWorkerIntervalSeconds: 1,
},
}
}

Expand All @@ -61,7 +68,7 @@ func TestRuntimeWatcher_Start(t *testing.T) {

for _, event := range events {
t.Run(fmt.Sprintf("when %s happens, updates instance", event.String()), func(t *testing.T) {
mockCtrl, runtime, roomManager, workerOptions := workerOptions(t)
mockCtrl, runtime, roomManager, _, workerOptions := workerOptions(t)

scheduler := &entities.Scheduler{Name: "test"}
watcher := NewRuntimeWatcherWorker(scheduler, workerOptions)
Expand Down Expand Up @@ -102,7 +109,7 @@ func TestRuntimeWatcher_Start(t *testing.T) {
})

t.Run(fmt.Sprintf("when %s happens, and update instance fails, does nothing", event.String()), func(t *testing.T) {
mockCtrl, runtime, roomManager, workerOptions := workerOptions(t)
mockCtrl, runtime, roomManager, _, workerOptions := workerOptions(t)

scheduler := &entities.Scheduler{Name: "test"}
watcher := NewRuntimeWatcherWorker(scheduler, workerOptions)
Expand Down Expand Up @@ -144,7 +151,7 @@ func TestRuntimeWatcher_Start(t *testing.T) {
}

t.Run("fails to start watcher", func(t *testing.T) {
_, runtime, _, workerOptions := workerOptions(t)
_, runtime, _, _, workerOptions := workerOptions(t)

scheduler := &entities.Scheduler{Name: "test"}
watcher := NewRuntimeWatcherWorker(scheduler, workerOptions)
Expand All @@ -167,7 +174,7 @@ func TestRuntimeWatcher_Start(t *testing.T) {
})

t.Run("clean room state on delete event", func(t *testing.T) {
mockCtrl, runtime, roomManager, workerOptions := workerOptions(t)
mockCtrl, runtime, roomManager, _, workerOptions := workerOptions(t)

scheduler := &entities.Scheduler{Name: "test"}
watcher := NewRuntimeWatcherWorker(scheduler, workerOptions)
Expand Down Expand Up @@ -206,7 +213,7 @@ func TestRuntimeWatcher_Start(t *testing.T) {
})

t.Run("when clean room state fails, does nothing", func(t *testing.T) {
mockCtrl, runtime, roomManager, workerOptions := workerOptions(t)
mockCtrl, runtime, roomManager, _, workerOptions := workerOptions(t)

scheduler := &entities.Scheduler{Name: "test"}
watcher := NewRuntimeWatcherWorker(scheduler, workerOptions)
Expand Down Expand Up @@ -245,25 +252,29 @@ func TestRuntimeWatcher_Start(t *testing.T) {
})

t.Run("when resultChan is closed, worker stops without error", func(t *testing.T) {
mockCtrl, runtime, _, workerOptions := workerOptions(t)
mockCtrl, runtime, _, roomStorage, workerOptions := workerOptions(t)

scheduler := &entities.Scheduler{Name: "test"}
watcher := NewRuntimeWatcherWorker(scheduler, workerOptions)

runtimeWatcher := mockports.NewMockRuntimeWatcher(mockCtrl)
runtime.EXPECT().WatchGameRoomInstances(gomock.Any(), scheduler).Return(runtimeWatcher, nil)

resultChan := make(chan game_room.InstanceEvent)
runtimeWatcher.EXPECT().ResultChan().Return(resultChan)
runtimeWatcher.EXPECT().Stop()

runtime.EXPECT().MitigateDisruption(gomock.Any(), gomock.Any(), 0, 0.0).Return(nil).MinTimes(0)
roomStorage.EXPECT().GetRoomCountByStatus(gomock.Any(), gomock.Any(), gomock.Any()).Return(0, nil).MinTimes(0)

ctx, cancelFunc := context.WithCancel(context.Background())

go func() {
time.Sleep(time.Millisecond * 100)
close(resultChan)
cancelFunc()
}()

err := watcher.Start(context.Background())
err := watcher.Start(ctx)
require.NoError(t, err)
})

}
1 change: 1 addition & 0 deletions internal/core/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type WorkerOptions struct {
RoomStorage ports.RoomStorage
InstanceStorage ports.GameRoomInstanceStorage
MetricsReporterConfig *config.MetricsReporterConfig
RuntimeWatcherConfig *config.RuntimeWatcherConfig
}

// Configuration holds all worker configuration parameters.
Expand Down

0 comments on commit cfe4ec6

Please sign in to comment.