From 276965d54575322d729bef2e3072761875629b31 Mon Sep 17 00:00:00 2001 From: Pedro Soares Date: Tue, 5 Mar 2024 12:54:25 -0300 Subject: [PATCH] feat(watcher): add goroutine to check for disruptions It will be triggered based on a ticker, defined by a config called DisruptionWatcherIntervalSeconds. It will get the number of occupied rooms and call the runtime func to handle disruption mitigation. Also, added a config to runtimeWatcher. --- cmd/runtimewatcher/wire.go | 13 +++- cmd/runtimewatcher/wire_gen.go | 28 ++++++--- config/config.yaml | 3 + .../worker/config/runtime_watcher_config.go | 30 +++++++++ .../runtimewatcher/runtime_watcher_worker.go | 62 +++++++++++++++++++ .../runtime_watcher_worker_test.go | 33 ++++++---- internal/core/worker/worker.go | 1 + 7 files changed, 149 insertions(+), 21 deletions(-) create mode 100644 internal/core/worker/config/runtime_watcher_config.go diff --git a/cmd/runtimewatcher/wire.go b/cmd/runtimewatcher/wire.go index f7639ff5e..a6272fb67 100644 --- a/cmd/runtimewatcher/wire.go +++ b/cmd/runtimewatcher/wire.go @@ -31,6 +31,7 @@ import ( "github.com/topfreegames/maestro/internal/core/services/events" "github.com/topfreegames/maestro/internal/core/services/workers" "github.com/topfreegames/maestro/internal/core/worker" + workerconfigs "github.com/topfreegames/maestro/internal/core/worker/config" "github.com/topfreegames/maestro/internal/core/worker/runtimewatcher" "github.com/topfreegames/maestro/internal/service" ) @@ -42,16 +43,24 @@ func provideRuntimeWatcherBuilder() *worker.WorkerBuilder { } } +func provideRuntimeWatcherConfig(c config.Config) *workerconfigs.RuntimeWatcherConfig { + return &workerconfigs.RuntimeWatcherConfig{ + DisruptionWorkerIntervalSeconds: c.GetDuration("runtimeWatcher.disruptionWorker.intervalSeconds"), + DisruptionSafetyPercentage: c.GetFloat64("runtimeWatcher.disruptionWorker.safetyPercentage"), + } +} + var WorkerOptionsSet = wire.NewSet( service.NewRuntimeKubernetes, + service.NewRoomStorageRedis, RoomManagerSet, - wire.Struct(new(worker.WorkerOptions), "RoomManager", "Runtime")) + provideRuntimeWatcherConfig, + wire.Struct(new(worker.WorkerOptions), "Runtime", "RoomStorage", "RoomManager", "RuntimeWatcherConfig")) var RoomManagerSet = wire.NewSet( service.NewSchedulerStoragePg, service.NewClockTime, service.NewPortAllocatorRandom, - service.NewRoomStorageRedis, service.NewGameRoomInstanceStorageRedis, service.NewSchedulerCacheRedis, service.NewRoomManagerConfig, diff --git a/cmd/runtimewatcher/wire_gen.go b/cmd/runtimewatcher/wire_gen.go index 2bfbc894c..fab8618bb 100644 --- a/cmd/runtimewatcher/wire_gen.go +++ b/cmd/runtimewatcher/wire_gen.go @@ -12,6 +12,7 @@ import ( "github.com/topfreegames/maestro/internal/core/services/events" "github.com/topfreegames/maestro/internal/core/services/workers" "github.com/topfreegames/maestro/internal/core/worker" + config2 "github.com/topfreegames/maestro/internal/core/worker/config" "github.com/topfreegames/maestro/internal/core/worker/runtimewatcher" "github.com/topfreegames/maestro/internal/service" ) @@ -24,8 +25,7 @@ func initializeRuntimeWatcher(c config.Config) (*workers.WorkersManager, error) if err != nil { return nil, err } - clock := service.NewClockTime() - portAllocator, err := service.NewPortAllocatorRandom(c) + runtime, err := service.NewRuntimeKubernetes(c) if err != nil { return nil, err } @@ -33,11 +33,12 @@ func initializeRuntimeWatcher(c config.Config) (*workers.WorkersManager, error) if err != nil { return nil, err } - gameRoomInstanceStorage, err := service.NewGameRoomInstanceStorageRedis(c) + clock := service.NewClockTime() + portAllocator, err := service.NewPortAllocatorRandom(c) if err != nil { return nil, err } - runtime, err := service.NewRuntimeKubernetes(c) + gameRoomInstanceStorage, err := service.NewGameRoomInstanceStorageRedis(c) if err != nil { return nil, err } @@ -59,9 +60,12 @@ func initializeRuntimeWatcher(c config.Config) (*workers.WorkersManager, error) return nil, err } roomManager := service.NewRoomManager(clock, portAllocator, roomStorage, gameRoomInstanceStorage, runtime, eventsService, roomManagerConfig) + runtimeWatcherConfig := provideRuntimeWatcherConfig(c) workerOptions := &worker.WorkerOptions{ - RoomManager: roomManager, - Runtime: runtime, + Runtime: runtime, + RoomStorage: roomStorage, + RoomManager: roomManager, + RuntimeWatcherConfig: runtimeWatcherConfig, } workersManager := workers.NewWorkersManager(workerBuilder, c, schedulerStorage, workerOptions) return workersManager, nil @@ -76,6 +80,14 @@ func provideRuntimeWatcherBuilder() *worker.WorkerBuilder { } } -var WorkerOptionsSet = wire.NewSet(service.NewRuntimeKubernetes, RoomManagerSet, wire.Struct(new(worker.WorkerOptions), "RoomManager", "Runtime")) +func provideRuntimeWatcherConfig(c config.Config) *config2.RuntimeWatcherConfig { + return &config2.RuntimeWatcherConfig{ + DisruptionWorkerIntervalSeconds: c.GetDuration("runtimeWatcher.disruptionWorker.intervalSeconds"), + DisruptionSafetyPercentage: c.GetFloat64("runtimeWatcher.disruptionWorker.safetyPercentage"), + } +} + +var WorkerOptionsSet = wire.NewSet(service.NewRuntimeKubernetes, service.NewRoomStorageRedis, RoomManagerSet, + provideRuntimeWatcherConfig, wire.Struct(new(worker.WorkerOptions), "Runtime", "RoomStorage", "RoomManager", "RuntimeWatcherConfig")) -var RoomManagerSet = wire.NewSet(service.NewSchedulerStoragePg, service.NewClockTime, service.NewPortAllocatorRandom, service.NewRoomStorageRedis, service.NewGameRoomInstanceStorageRedis, service.NewSchedulerCacheRedis, service.NewRoomManagerConfig, service.NewRoomManager, service.NewEventsForwarder, events.NewEventsForwarderService, service.NewEventsForwarderServiceConfig) +var RoomManagerSet = wire.NewSet(service.NewSchedulerStoragePg, service.NewClockTime, service.NewPortAllocatorRandom, service.NewGameRoomInstanceStorageRedis, service.NewSchedulerCacheRedis, service.NewRoomManagerConfig, service.NewRoomManager, service.NewEventsForwarder, events.NewEventsForwarderService, service.NewEventsForwarderServiceConfig) diff --git a/config/config.yaml b/config/config.yaml index 58bb1679f..5a46ae400 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -59,6 +59,9 @@ workers: reporter: metrics: intervalMillis: 10000 +runtimeWatcher: + disruptionWatcher: + intervalSeconds: 5 services: roomManager: diff --git a/internal/core/worker/config/runtime_watcher_config.go b/internal/core/worker/config/runtime_watcher_config.go new file mode 100644 index 000000000..a86c287d5 --- /dev/null +++ b/internal/core/worker/config/runtime_watcher_config.go @@ -0,0 +1,30 @@ +// MIT License +// +// Copyright (c) 2021 TFG Co +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package config + +import "time" + +type RuntimeWatcherConfig struct { + DisruptionWorkerIntervalSeconds time.Duration + DisruptionSafetyPercentage float64 +} diff --git a/internal/core/worker/runtimewatcher/runtime_watcher_worker.go b/internal/core/worker/runtimewatcher/runtime_watcher_worker.go index 5c6d4ce60..95c903d40 100644 --- a/internal/core/worker/runtimewatcher/runtime_watcher_worker.go +++ b/internal/core/worker/runtimewatcher/runtime_watcher_worker.go @@ -26,8 +26,10 @@ import ( "context" "fmt" "sync" + "time" "github.com/topfreegames/maestro/internal/core/logs" + "github.com/topfreegames/maestro/internal/core/worker/config" "github.com/topfreegames/maestro/internal/core/entities" "github.com/topfreegames/maestro/internal/core/entities/game_room" @@ -46,12 +48,14 @@ const WorkerName = "runtime_watcher" type runtimeWatcherWorker struct { scheduler *entities.Scheduler roomManager ports.RoomManager + roomStorage ports.RoomStorage // TODO(gabrielcorado): should we access the port directly? do we need to // provide the same `Watcher` interface but on the RoomManager? runtime ports.Runtime logger *zap.Logger ctx context.Context cancelFunc context.CancelFunc + config *config.RuntimeWatcherConfig workerWaitGroup *sync.WaitGroup } @@ -59,9 +63,11 @@ func NewRuntimeWatcherWorker(scheduler *entities.Scheduler, opts *worker.WorkerO return &runtimeWatcherWorker{ scheduler: scheduler, roomManager: opts.RoomManager, + roomStorage: opts.RoomStorage, runtime: opts.Runtime, logger: zap.L().With(zap.String(logs.LogFieldServiceName, WorkerName), zap.String(logs.LogFieldSchedulerName, scheduler.Name)), workerWaitGroup: &sync.WaitGroup{}, + config: opts.RuntimeWatcherConfig, } } @@ -94,10 +100,66 @@ func (w *runtimeWatcherWorker) spawnUpdateRoomWatchers(resultChan chan game_room } } +func (w *runtimeWatcherWorker) mitigateDisruptions() error { + occupiedRoomsAmount, err := w.roomStorage.GetRoomCountByStatus(w.ctx, w.scheduler.Name, game_room.GameStatusOccupied) + if err != nil { + w.logger.Error( + "failed to get occupied rooms for scheduler", + zap.String("scheduler", w.scheduler.Name), + zap.Error(err), + ) + return err + } + err = w.runtime.MitigateDisruption(w.ctx, w.scheduler, occupiedRoomsAmount, w.config.DisruptionSafetyPercentage) + if err != nil { + w.logger.Error( + "failed to mitigate disruption", + zap.String("scheduler", w.scheduler.Name), + zap.Error(err), + ) + return err + } + w.logger.Debug( + "mitigated disruption for occupied rooms", + zap.String("scheduler", w.scheduler.Name), + zap.Int("occupiedRooms", occupiedRoomsAmount), + ) + + return nil +} + +func (w *runtimeWatcherWorker) spawnDisruptionWatcher() { + w.workerWaitGroup.Add(1) + + go func() { + defer w.workerWaitGroup.Done() + ticker := time.NewTicker(time.Second * w.config.DisruptionWorkerIntervalSeconds) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := w.mitigateDisruptions() + if err != nil { + w.logger.Warn( + "Mitigate Disruption watcher run failed", + zap.String("scheduler", w.scheduler.Name), + zap.Error(err), + ) + } + case <-w.ctx.Done(): + return + } + } + + }() +} + func (w *runtimeWatcherWorker) spawnWatchers( resultChan chan game_room.InstanceEvent, ) { w.spawnUpdateRoomWatchers(resultChan) + w.spawnDisruptionWatcher() } func (w *runtimeWatcherWorker) Start(ctx context.Context) error { diff --git a/internal/core/worker/runtimewatcher/runtime_watcher_worker_test.go b/internal/core/worker/runtimewatcher/runtime_watcher_worker_test.go index 8eef6e4de..0c0434405 100644 --- a/internal/core/worker/runtimewatcher/runtime_watcher_worker_test.go +++ b/internal/core/worker/runtimewatcher/runtime_watcher_worker_test.go @@ -37,19 +37,26 @@ import ( "github.com/topfreegames/maestro/internal/core/entities" "github.com/topfreegames/maestro/internal/core/entities/game_room" porterrors "github.com/topfreegames/maestro/internal/core/ports/errors" + "github.com/topfreegames/maestro/internal/core/ports/mock" mockports "github.com/topfreegames/maestro/internal/core/ports/mock" "github.com/topfreegames/maestro/internal/core/worker" + "github.com/topfreegames/maestro/internal/core/worker/config" ) -func workerOptions(t *testing.T) (*gomock.Controller, *mockports.MockRuntime, *mockports.MockRoomManager, *worker.WorkerOptions) { +func workerOptions(t *testing.T) (*gomock.Controller, *mockports.MockRuntime, *mockports.MockRoomManager, *mockports.MockRoomStorage, *worker.WorkerOptions) { mockCtrl := gomock.NewController(t) runtime := mockports.NewMockRuntime(mockCtrl) roomManager := mockports.NewMockRoomManager(mockCtrl) + roomStorage := mock.NewMockRoomStorage(mockCtrl) - return mockCtrl, runtime, roomManager, &worker.WorkerOptions{ + return mockCtrl, runtime, roomManager, roomStorage, &worker.WorkerOptions{ Runtime: runtime, RoomManager: roomManager, + RoomStorage: roomStorage, + RuntimeWatcherConfig: &config.RuntimeWatcherConfig{ + DisruptionWorkerIntervalSeconds: 1, + }, } } @@ -61,7 +68,7 @@ func TestRuntimeWatcher_Start(t *testing.T) { for _, event := range events { t.Run(fmt.Sprintf("when %s happens, updates instance", event.String()), func(t *testing.T) { - mockCtrl, runtime, roomManager, workerOptions := workerOptions(t) + mockCtrl, runtime, roomManager, _, workerOptions := workerOptions(t) scheduler := &entities.Scheduler{Name: "test"} watcher := NewRuntimeWatcherWorker(scheduler, workerOptions) @@ -102,7 +109,7 @@ func TestRuntimeWatcher_Start(t *testing.T) { }) t.Run(fmt.Sprintf("when %s happens, and update instance fails, does nothing", event.String()), func(t *testing.T) { - mockCtrl, runtime, roomManager, workerOptions := workerOptions(t) + mockCtrl, runtime, roomManager, _, workerOptions := workerOptions(t) scheduler := &entities.Scheduler{Name: "test"} watcher := NewRuntimeWatcherWorker(scheduler, workerOptions) @@ -144,7 +151,7 @@ func TestRuntimeWatcher_Start(t *testing.T) { } t.Run("fails to start watcher", func(t *testing.T) { - _, runtime, _, workerOptions := workerOptions(t) + _, runtime, _, _, workerOptions := workerOptions(t) scheduler := &entities.Scheduler{Name: "test"} watcher := NewRuntimeWatcherWorker(scheduler, workerOptions) @@ -167,7 +174,7 @@ func TestRuntimeWatcher_Start(t *testing.T) { }) t.Run("clean room state on delete event", func(t *testing.T) { - mockCtrl, runtime, roomManager, workerOptions := workerOptions(t) + mockCtrl, runtime, roomManager, _, workerOptions := workerOptions(t) scheduler := &entities.Scheduler{Name: "test"} watcher := NewRuntimeWatcherWorker(scheduler, workerOptions) @@ -206,7 +213,7 @@ func TestRuntimeWatcher_Start(t *testing.T) { }) t.Run("when clean room state fails, does nothing", func(t *testing.T) { - mockCtrl, runtime, roomManager, workerOptions := workerOptions(t) + mockCtrl, runtime, roomManager, _, workerOptions := workerOptions(t) scheduler := &entities.Scheduler{Name: "test"} watcher := NewRuntimeWatcherWorker(scheduler, workerOptions) @@ -245,25 +252,29 @@ func TestRuntimeWatcher_Start(t *testing.T) { }) t.Run("when resultChan is closed, worker stops without error", func(t *testing.T) { - mockCtrl, runtime, _, workerOptions := workerOptions(t) + mockCtrl, runtime, _, roomStorage, workerOptions := workerOptions(t) scheduler := &entities.Scheduler{Name: "test"} watcher := NewRuntimeWatcherWorker(scheduler, workerOptions) runtimeWatcher := mockports.NewMockRuntimeWatcher(mockCtrl) runtime.EXPECT().WatchGameRoomInstances(gomock.Any(), scheduler).Return(runtimeWatcher, nil) - resultChan := make(chan game_room.InstanceEvent) runtimeWatcher.EXPECT().ResultChan().Return(resultChan) runtimeWatcher.EXPECT().Stop() + runtime.EXPECT().MitigateDisruption(gomock.Any(), gomock.Any(), 0, 0.0).Return(nil).MinTimes(0) + roomStorage.EXPECT().GetRoomCountByStatus(gomock.Any(), gomock.Any(), gomock.Any()).Return(0, nil).MinTimes(0) + + ctx, cancelFunc := context.WithCancel(context.Background()) + go func() { time.Sleep(time.Millisecond * 100) close(resultChan) + cancelFunc() }() - err := watcher.Start(context.Background()) + err := watcher.Start(ctx) require.NoError(t, err) }) - } diff --git a/internal/core/worker/worker.go b/internal/core/worker/worker.go index 3adf293e2..3f2dd9f48 100644 --- a/internal/core/worker/worker.go +++ b/internal/core/worker/worker.go @@ -55,6 +55,7 @@ type WorkerOptions struct { RoomStorage ports.RoomStorage InstanceStorage ports.GameRoomInstanceStorage MetricsReporterConfig *config.MetricsReporterConfig + RuntimeWatcherConfig *config.RuntimeWatcherConfig } // Configuration holds all worker configuration parameters.