diff --git a/cmd/runtimewatcher/wire.go b/cmd/runtimewatcher/wire.go index f7639ff5e..a6272fb67 100644 --- a/cmd/runtimewatcher/wire.go +++ b/cmd/runtimewatcher/wire.go @@ -31,6 +31,7 @@ import ( "github.com/topfreegames/maestro/internal/core/services/events" "github.com/topfreegames/maestro/internal/core/services/workers" "github.com/topfreegames/maestro/internal/core/worker" + workerconfigs "github.com/topfreegames/maestro/internal/core/worker/config" "github.com/topfreegames/maestro/internal/core/worker/runtimewatcher" "github.com/topfreegames/maestro/internal/service" ) @@ -42,16 +43,24 @@ func provideRuntimeWatcherBuilder() *worker.WorkerBuilder { } } +func provideRuntimeWatcherConfig(c config.Config) *workerconfigs.RuntimeWatcherConfig { + return &workerconfigs.RuntimeWatcherConfig{ + DisruptionWorkerIntervalSeconds: c.GetDuration("runtimeWatcher.disruptionWorker.intervalSeconds"), + DisruptionSafetyPercentage: c.GetFloat64("runtimeWatcher.disruptionWorker.safetyPercentage"), + } +} + var WorkerOptionsSet = wire.NewSet( service.NewRuntimeKubernetes, + service.NewRoomStorageRedis, RoomManagerSet, - wire.Struct(new(worker.WorkerOptions), "RoomManager", "Runtime")) + provideRuntimeWatcherConfig, + wire.Struct(new(worker.WorkerOptions), "Runtime", "RoomStorage", "RoomManager", "RuntimeWatcherConfig")) var RoomManagerSet = wire.NewSet( service.NewSchedulerStoragePg, service.NewClockTime, service.NewPortAllocatorRandom, - service.NewRoomStorageRedis, service.NewGameRoomInstanceStorageRedis, service.NewSchedulerCacheRedis, service.NewRoomManagerConfig, diff --git a/cmd/runtimewatcher/wire_gen.go b/cmd/runtimewatcher/wire_gen.go index 2bfbc894c..fab8618bb 100644 --- a/cmd/runtimewatcher/wire_gen.go +++ b/cmd/runtimewatcher/wire_gen.go @@ -12,6 +12,7 @@ import ( "github.com/topfreegames/maestro/internal/core/services/events" "github.com/topfreegames/maestro/internal/core/services/workers" "github.com/topfreegames/maestro/internal/core/worker" + config2 "github.com/topfreegames/maestro/internal/core/worker/config" "github.com/topfreegames/maestro/internal/core/worker/runtimewatcher" "github.com/topfreegames/maestro/internal/service" ) @@ -24,8 +25,7 @@ func initializeRuntimeWatcher(c config.Config) (*workers.WorkersManager, error) if err != nil { return nil, err } - clock := service.NewClockTime() - portAllocator, err := service.NewPortAllocatorRandom(c) + runtime, err := service.NewRuntimeKubernetes(c) if err != nil { return nil, err } @@ -33,11 +33,12 @@ func initializeRuntimeWatcher(c config.Config) (*workers.WorkersManager, error) if err != nil { return nil, err } - gameRoomInstanceStorage, err := service.NewGameRoomInstanceStorageRedis(c) + clock := service.NewClockTime() + portAllocator, err := service.NewPortAllocatorRandom(c) if err != nil { return nil, err } - runtime, err := service.NewRuntimeKubernetes(c) + gameRoomInstanceStorage, err := service.NewGameRoomInstanceStorageRedis(c) if err != nil { return nil, err } @@ -59,9 +60,12 @@ func initializeRuntimeWatcher(c config.Config) (*workers.WorkersManager, error) return nil, err } roomManager := service.NewRoomManager(clock, portAllocator, roomStorage, gameRoomInstanceStorage, runtime, eventsService, roomManagerConfig) + runtimeWatcherConfig := provideRuntimeWatcherConfig(c) workerOptions := &worker.WorkerOptions{ - RoomManager: roomManager, - Runtime: runtime, + Runtime: runtime, + RoomStorage: roomStorage, + RoomManager: roomManager, + RuntimeWatcherConfig: runtimeWatcherConfig, } workersManager := workers.NewWorkersManager(workerBuilder, c, schedulerStorage, workerOptions) return workersManager, nil @@ -76,6 +80,14 @@ func provideRuntimeWatcherBuilder() *worker.WorkerBuilder { } } -var WorkerOptionsSet = wire.NewSet(service.NewRuntimeKubernetes, RoomManagerSet, wire.Struct(new(worker.WorkerOptions), "RoomManager", "Runtime")) +func provideRuntimeWatcherConfig(c config.Config) *config2.RuntimeWatcherConfig { + return &config2.RuntimeWatcherConfig{ + DisruptionWorkerIntervalSeconds: c.GetDuration("runtimeWatcher.disruptionWorker.intervalSeconds"), + DisruptionSafetyPercentage: c.GetFloat64("runtimeWatcher.disruptionWorker.safetyPercentage"), + } +} + +var WorkerOptionsSet = wire.NewSet(service.NewRuntimeKubernetes, service.NewRoomStorageRedis, RoomManagerSet, + provideRuntimeWatcherConfig, wire.Struct(new(worker.WorkerOptions), "Runtime", "RoomStorage", "RoomManager", "RuntimeWatcherConfig")) -var RoomManagerSet = wire.NewSet(service.NewSchedulerStoragePg, service.NewClockTime, service.NewPortAllocatorRandom, service.NewRoomStorageRedis, service.NewGameRoomInstanceStorageRedis, service.NewSchedulerCacheRedis, service.NewRoomManagerConfig, service.NewRoomManager, service.NewEventsForwarder, events.NewEventsForwarderService, service.NewEventsForwarderServiceConfig) +var RoomManagerSet = wire.NewSet(service.NewSchedulerStoragePg, service.NewClockTime, service.NewPortAllocatorRandom, service.NewGameRoomInstanceStorageRedis, service.NewSchedulerCacheRedis, service.NewRoomManagerConfig, service.NewRoomManager, service.NewEventsForwarder, events.NewEventsForwarderService, service.NewEventsForwarderServiceConfig) diff --git a/config/config.yaml b/config/config.yaml index 58bb1679f..39a57a4ed 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -59,6 +59,10 @@ workers: reporter: metrics: intervalMillis: 10000 +runtimeWatcher: + disruptionWorker: + intervalSeconds: 5 + safetyPercentage: 0.05 services: roomManager: diff --git a/docs/reference/Architecture.md b/docs/reference/Architecture.md index 299682623..7082dcbfa 100644 --- a/docs/reference/Architecture.md +++ b/docs/reference/Architecture.md @@ -46,7 +46,8 @@ You could find all operations at [Operations section](Operations.md#available-op > Note: In Maestro a worker is a collection of routines that executes a flow related to one and only one **Scheduler** each. -Runtime Watcher Worker listens to runtime events related to the **Scheduler** and reflects the changes in **Maestro**. Currently, it listens for Game Rooms creation, deletion, and update. +Runtime Watcher Worker listens to runtime events related to the **Scheduler** and reflects the changes in **Maestro**. Currently, it mitigate disruptions by looking at the current +amount of occupied rooms, and it listens for Game Rooms creation, deletion, and update. ![Runtime Watcher Worker IMAGE](../images/Architecture-Runtime-Watcher-Worker.jpg) diff --git a/docs/reference/Kubernetes.md b/docs/reference/Kubernetes.md index 032a8150c..0fd7caaf4 100644 --- a/docs/reference/Kubernetes.md +++ b/docs/reference/Kubernetes.md @@ -39,8 +39,31 @@ flowchart BT ``` ### Runtime watcher -The runtime watcher component maintains a worker process for each scheduler that keeps watching and processing _change -events_ in pods resources. For doing that, it uses a [pods informer](https://pkg.go.dev/k8s.io/client-go/informers), +The runtime watcher component spawn two types of workers: one that is responsible for mitigating disruptions and another for processing _change events_ in pods resources. + +#### Disruption Worker + +This worker consists in a single goroutine with a ticker. Each time it runs, it will +check the number of occupied rooms at the time and try to mitigate disruptions. For k8s, +this mitigation consists on applying a PDB to the scheduler's namespace, that has +`minAvailable` equals to the number of occupied rooms plus a safety percentage. + +One can configure the interval in which this worker runs and also the safety percentage +in `config/config.yaml`: + +```yaml +runtimeWatcher: + disruptionWorker: + intervalSeconds: 5 + safetyPercentage: 0.05 +``` + +#### Pod Change Events Worker + +For this type of worker, runtime watcher spawns multiple goroutines, maintaining a worker +process for each scheduler that keeps watching and processing _change events_ in pods +resources. For doing that, it uses a +[pods informer](https://pkg.go.dev/k8s.io/client-go/informers), binding handlers for **add**, **update** and **delete** events for all pods managed by it. This component is not responsible for updating/creating/deleting diff --git a/internal/adapters/runtime/kubernetes/kubernetes.go b/internal/adapters/runtime/kubernetes/kubernetes.go index 7cdc6d743..519f3428e 100644 --- a/internal/adapters/runtime/kubernetes/kubernetes.go +++ b/internal/adapters/runtime/kubernetes/kubernetes.go @@ -23,7 +23,9 @@ package kubernetes import ( + "github.com/topfreegames/maestro/internal/core/logs" "github.com/topfreegames/maestro/internal/core/ports" + "go.uber.org/zap" kube "k8s.io/client-go/kubernetes" ) @@ -31,8 +33,12 @@ var _ ports.Runtime = (*kubernetes)(nil) type kubernetes struct { clientSet kube.Interface + logger *zap.Logger } func New(clientSet kube.Interface) *kubernetes { - return &kubernetes{clientSet} + return &kubernetes{ + clientSet: clientSet, + logger: zap.L().With(zap.String(logs.LogFieldRuntime, "kubernetes")), + } } diff --git a/internal/adapters/runtime/kubernetes/scheduler.go b/internal/adapters/runtime/kubernetes/scheduler.go index 677df5583..0716afe2d 100644 --- a/internal/adapters/runtime/kubernetes/scheduler.go +++ b/internal/adapters/runtime/kubernetes/scheduler.go @@ -24,14 +24,113 @@ package kubernetes import ( "context" + "strconv" "github.com/topfreegames/maestro/internal/core/entities" "github.com/topfreegames/maestro/internal/core/ports/errors" + "go.uber.org/zap" v1 "k8s.io/api/core/v1" + v1Policy "k8s.io/api/policy/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" ) +const ( + DefaultDisruptionSafetyPercentage float64 = 0.05 + MajorKubeVersionPDB int = 1 + MinorKubeVersionPDB int = 21 +) + +func (k *kubernetes) isPDBSupported() bool { + // Check based on the kube version of the clientSet if PDBs are supported (1.21+) + version, err := k.clientSet.Discovery().ServerVersion() + if err != nil { + k.logger.Warn("Could not get kube API version, can not check for PDB support", zap.Error(err)) + return false + } + major, err := strconv.Atoi(version.Major) + if err != nil { + k.logger.Warn( + "Could not convert major kube API version to int, can not check for PDB support", + zap.String("majorKubeAPIVersion", version.Major), + ) + return false + } + if major < MajorKubeVersionPDB { + k.logger.Warn( + "Can not create PDB for this kube API version", + zap.Int("majorKubeAPIVersion", major), + zap.Int("majorPDBVersionRequired", MajorKubeVersionPDB), + ) + return false + } + minor, err := strconv.Atoi(version.Minor) + if err != nil { + k.logger.Warn( + "Could not convert minor kube API version to int, can not check for PDB support", + zap.String("minorKubeAPIVersion", version.Minor), + ) + return false + } + if minor < MinorKubeVersionPDB { + k.logger.Warn( + "Can not create PDB for this kube API version", + zap.Int("minorKubeAPIVersion", minor), + zap.Int("minorPDBVersionRequired", MinorKubeVersionPDB), + ) + return false + } + return true +} + +func (k *kubernetes) createPDBFromScheduler(ctx context.Context, scheduler *entities.Scheduler) (*v1Policy.PodDisruptionBudget, error) { + if scheduler == nil { + return nil, errors.NewErrInvalidArgument("scheduler pointer can not be nil") + } + pdbSpec := &v1Policy.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: scheduler.Name, + }, + Spec: v1Policy.PodDisruptionBudgetSpec{ + MinAvailable: &intstr.IntOrString{ + Type: intstr.Int, + IntVal: int32(0), + }, + }, + } + + if scheduler.Autoscaling != nil && scheduler.Autoscaling.Enabled { + pdbSpec.Spec.MinAvailable = &intstr.IntOrString{ + Type: intstr.Int, + IntVal: int32(scheduler.Autoscaling.Min), + } + } + + pdb, err := k.clientSet.PolicyV1().PodDisruptionBudgets(scheduler.Name).Create(ctx, pdbSpec, metav1.CreateOptions{}) + if err != nil && !kerrors.IsAlreadyExists(err) { + k.logger.Warn("error creating pdb", zap.String("scheduler", scheduler.Name), zap.Error(err)) + return nil, err + } + + return pdb, nil +} + +func (k *kubernetes) deletePDBFromScheduler(ctx context.Context, scheduler *entities.Scheduler) error { + if scheduler == nil { + return errors.NewErrInvalidArgument("scheduler pointer can not be nil") + } + if !k.isPDBSupported() { + return errors.NewErrUnexpected("PDBs are not supported for this kube API version") + } + err := k.clientSet.PolicyV1().PodDisruptionBudgets(scheduler.Name).Delete(ctx, scheduler.Name, metav1.DeleteOptions{}) + if err != nil && !kerrors.IsNotFound(err) { + k.logger.Warn("error deleting pdb", zap.String("scheduler", scheduler.Name), zap.Error(err)) + return err + } + return nil +} + func (k *kubernetes) CreateScheduler(ctx context.Context, scheduler *entities.Scheduler) error { namespace := &v1.Namespace{ ObjectMeta: metav1.ObjectMeta{ @@ -48,11 +147,20 @@ func (k *kubernetes) CreateScheduler(ctx context.Context, scheduler *entities.Sc return errors.NewErrUnexpected("error creating scheduler: %s", err) } + _, err = k.createPDBFromScheduler(ctx, scheduler) + if err != nil { + k.logger.Warn("PDB Creation during scheduler creation failed", zap.String("scheduler", scheduler.Name), zap.Error(err)) + } + return nil } func (k *kubernetes) DeleteScheduler(ctx context.Context, scheduler *entities.Scheduler) error { - err := k.clientSet.CoreV1().Namespaces().Delete(ctx, scheduler.Name, metav1.DeleteOptions{}) + err := k.deletePDBFromScheduler(ctx, scheduler) + if err != nil { + k.logger.Warn("PDB Deletion during scheduler deletion failed", zap.String("scheduler", scheduler.Name), zap.Error(err)) + } + err = k.clientSet.CoreV1().Namespaces().Delete(ctx, scheduler.Name, metav1.DeleteOptions{}) if err != nil { if kerrors.IsNotFound(err) { return errors.NewErrNotFound("scheduler '%s' not found", scheduler.Name) @@ -63,3 +171,57 @@ func (k *kubernetes) DeleteScheduler(ctx context.Context, scheduler *entities.Sc return nil } + +func (k *kubernetes) MitigateDisruption( + ctx context.Context, + scheduler *entities.Scheduler, + roomAmount int, + safetyPercentage float64, +) error { + if scheduler == nil { + return errors.NewErrInvalidArgument("empty pointer received for scheduler, can not mitigate disruptions") + } + + incSafetyPercentage := 1.0 + if safetyPercentage < DefaultDisruptionSafetyPercentage { + k.logger.Warn( + "invalid safety percentage, using default percentage", + zap.Float64("safetyPercentage", safetyPercentage), + zap.Float64("DefaultDisruptionSafetyPercentage", DefaultDisruptionSafetyPercentage), + ) + safetyPercentage = DefaultDisruptionSafetyPercentage + } + incSafetyPercentage += safetyPercentage + + // For kubernetes mitigating disruptions means updating the current PDB + // minAvailable to the number of occupied rooms if above a threshold + pdb, err := k.clientSet.PolicyV1().PodDisruptionBudgets(scheduler.Name).Get(ctx, scheduler.Name, metav1.GetOptions{}) + if err != nil && !kerrors.IsNotFound(err) { + // Non-recoverable errors + return errors.NewErrUnexpected("non recoverable error when getting PDB for scheduler '%s': %s", scheduler.Name, err) + } + + if pdb == nil || kerrors.IsNotFound(err) { + pdb, err = k.createPDBFromScheduler(ctx, scheduler) + if err != nil { + return errors.NewErrUnexpected("error creating PDB for scheduler '%s': %s", scheduler.Name, err) + } + } + + currentPdbMinAvailable := pdb.Spec.MinAvailable.IntVal + if currentPdbMinAvailable == int32(float64(roomAmount)*incSafetyPercentage) { + return nil + } + + pdb.Spec.MinAvailable = &intstr.IntOrString{ + Type: intstr.Int, + IntVal: int32(float64(roomAmount) * incSafetyPercentage), + } + + _, err = k.clientSet.PolicyV1().PodDisruptionBudgets(scheduler.Name).Update(ctx, pdb, metav1.UpdateOptions{}) + if err != nil { + return errors.NewErrUnexpected("error updating PDB to mitigate disruptions for scheduler '%s': %s", scheduler.Name, err) + } + + return nil +} diff --git a/internal/adapters/runtime/kubernetes/scheduler_test.go b/internal/adapters/runtime/kubernetes/scheduler_test.go index b2314b4f3..85996a37c 100644 --- a/internal/adapters/runtime/kubernetes/scheduler_test.go +++ b/internal/adapters/runtime/kubernetes/scheduler_test.go @@ -28,12 +28,15 @@ package kubernetes import ( "context" "testing" + "time" "github.com/stretchr/testify/require" "github.com/topfreegames/maestro/internal/core/entities" + "github.com/topfreegames/maestro/internal/core/entities/autoscaling" "github.com/topfreegames/maestro/internal/core/ports/errors" "github.com/topfreegames/maestro/test" v1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -87,3 +90,241 @@ func TestSchedulerDeletion(t *testing.T) { require.ErrorIs(t, err, errors.ErrNotFound) }) } + +func TestPDBCreationAndDeletion(t *testing.T) { + ctx := context.Background() + client := test.GetKubernetesClientSet(t, kubernetesContainer) + kubernetesRuntime := New(client) + + t.Run("create pdb from scheduler without autoscaling", func(t *testing.T) { + if !kubernetesRuntime.isPDBSupported() { + t.Log("Kubernetes version does not support PDB, skipping") + t.SkipNow() + } + + scheduler := &entities.Scheduler{Name: "scheduler-pdb-test-no-autoscaling"} + err := kubernetesRuntime.CreateScheduler(ctx, scheduler) + if err != nil { + require.ErrorIs(t, errors.ErrAlreadyExists, err) + } + + defer func() { + err := kubernetesRuntime.DeleteScheduler(ctx, scheduler) + if err != nil { + require.ErrorIs(t, errors.ErrNotFound, err) + } + }() + + pdb, err := client.PolicyV1().PodDisruptionBudgets(scheduler.Name).Get(ctx, scheduler.Name, metav1.GetOptions{}) + require.NoError(t, err) + require.NotNil(t, pdb) + require.Equal(t, pdb.Name, scheduler.Name) + require.Equal(t, pdb.Spec.MinAvailable.IntVal, int32(0)) + }) + + t.Run("create pdb from scheduler with autoscaling", func(t *testing.T) { + if !kubernetesRuntime.isPDBSupported() { + t.Log("Kubernetes version does not support PDB, skipping") + t.SkipNow() + } + + scheduler := &entities.Scheduler{ + Name: "scheduler-pdb-test-with-autoscaling", + Autoscaling: &autoscaling.Autoscaling{ + Enabled: true, + Min: 2, + Max: 3, + Policy: autoscaling.Policy{ + Type: autoscaling.RoomOccupancy, + Parameters: autoscaling.PolicyParameters{ + RoomOccupancy: &autoscaling.RoomOccupancyParams{ + ReadyTarget: 0.1, + }, + }, + }, + }, + } + err := kubernetesRuntime.CreateScheduler(ctx, scheduler) + if err != nil { + require.ErrorIs(t, errors.ErrAlreadyExists, err) + } + + defer func() { + err := kubernetesRuntime.DeleteScheduler(ctx, scheduler) + if err != nil { + require.ErrorIs(t, errors.ErrNotFound, err) + } + }() + + pdb, err := client.PolicyV1().PodDisruptionBudgets(scheduler.Name).Get(ctx, scheduler.Name, metav1.GetOptions{}) + require.NoError(t, err) + require.NotNil(t, pdb) + require.Equal(t, pdb.Name, scheduler.Name) + require.Equal(t, pdb.Spec.MinAvailable.IntVal, int32(2)) + }) + + t.Run("delete pdb on scheduler deletion", func(t *testing.T) { + if !kubernetesRuntime.isPDBSupported() { + t.Log("Kubernetes version does not support PDB, skipping") + t.SkipNow() + } + + scheduler := &entities.Scheduler{Name: "scheduler-pdb-test-delete"} + err := kubernetesRuntime.CreateScheduler(ctx, scheduler) + if err != nil { + require.ErrorIs(t, errors.ErrAlreadyExists, err) + } + + pdb, err := client.PolicyV1().PodDisruptionBudgets(scheduler.Name).Get(ctx, scheduler.Name, metav1.GetOptions{}) + require.NoError(t, err) + require.NotNil(t, pdb) + require.Equal(t, pdb.Name, scheduler.Name) + require.Equal(t, pdb.Spec.MinAvailable.IntVal, int32(0)) + + err = kubernetesRuntime.DeleteScheduler(ctx, scheduler) + if err != nil { + require.ErrorIs(t, errors.ErrNotFound, err) + } + + _, err = client.PolicyV1().PodDisruptionBudgets(scheduler.Name).Get(ctx, scheduler.Name, metav1.GetOptions{}) + require.True(t, kerrors.IsNotFound(err)) + }) +} + +func TestMitigateDisruption(t *testing.T) { + ctx := context.Background() + client := test.GetKubernetesClientSet(t, kubernetesContainer) + kubernetesRuntime := New(client) + + t.Run("should not mitigate disruption if scheduler is nil", func(t *testing.T) { + err := kubernetesRuntime.MitigateDisruption(ctx, nil, 0, 0.0) + require.ErrorIs(t, errors.ErrInvalidArgument, err) + }) + + t.Run("should create PDB on mitigatation if not created before", func(t *testing.T) { + if !kubernetesRuntime.isPDBSupported() { + t.Log("Kubernetes version does not support PDB, skipping") + t.SkipNow() + } + + scheduler := &entities.Scheduler{Name: "scheduler-pdb-mitigation-create"} + namespace := &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: scheduler.Name, + }, + } + + _, err := client.CoreV1().Namespaces().Create(ctx, namespace, metav1.CreateOptions{}) + require.NoError(t, err) + + err = kubernetesRuntime.MitigateDisruption(ctx, scheduler, 0, 0.0) + require.NoError(t, err) + + pdb, err := client.PolicyV1().PodDisruptionBudgets(scheduler.Name).Get(ctx, scheduler.Name, metav1.GetOptions{}) + require.NoError(t, err) + require.NotNil(t, pdb) + require.Equal(t, pdb.Name, scheduler.Name) + require.Equal(t, pdb.Spec.MinAvailable.IntVal, int32(0)) + }) + + t.Run("should update PDB on mitigatation if not equal to current minAvailable", func(t *testing.T) { + if !kubernetesRuntime.isPDBSupported() { + t.Log("Kubernetes version does not support PDB, skipping") + t.SkipNow() + } + + scheduler := &entities.Scheduler{ + Name: "scheduler-pdb-mitigation-update", + Autoscaling: &autoscaling.Autoscaling{ + Enabled: true, + Min: 100, + Max: 200, + Policy: autoscaling.Policy{ + Type: autoscaling.RoomOccupancy, + Parameters: autoscaling.PolicyParameters{ + RoomOccupancy: &autoscaling.RoomOccupancyParams{ + ReadyTarget: 0.1, + }, + }, + }, + }, + } + err := kubernetesRuntime.CreateScheduler(ctx, scheduler) + if err != nil { + require.ErrorIs(t, errors.ErrAlreadyExists, err) + } + + defer func() { + err := kubernetesRuntime.DeleteScheduler(ctx, scheduler) + if err != nil { + require.ErrorIs(t, errors.ErrNotFound, err) + } + }() + + pdb, err := client.PolicyV1().PodDisruptionBudgets(scheduler.Name).Get(ctx, scheduler.Name, metav1.GetOptions{}) + require.NoError(t, err) + require.NotNil(t, pdb) + require.Equal(t, pdb.Name, scheduler.Name) + require.Equal(t, pdb.Spec.MinAvailable.IntVal, int32(scheduler.Autoscaling.Min)) + + err = kubernetesRuntime.MitigateDisruption(ctx, scheduler, scheduler.Autoscaling.Min, 0.0) + require.NoError(t, err) + + pdb, err = client.PolicyV1().PodDisruptionBudgets(scheduler.Name).Get(ctx, scheduler.Name, metav1.GetOptions{}) + require.NoError(t, err) + require.NotNil(t, pdb) + require.Equal(t, pdb.Name, scheduler.Name) + + incSafetyPercentage := 1.0 + DefaultDisruptionSafetyPercentage + newRoomAmount := int32(float64(scheduler.Autoscaling.Min) * incSafetyPercentage) + require.Equal(t, pdb.Spec.MinAvailable.IntVal, newRoomAmount) + }) + + t.Run("should default safety percentage if invalid value", func(t *testing.T) { + if !kubernetesRuntime.isPDBSupported() { + t.Log("Kubernetes version does not support PDB, skipping") + t.SkipNow() + } + + scheduler := &entities.Scheduler{ + Name: "scheduler-pdb-mitigation-no-update", + Autoscaling: &autoscaling.Autoscaling{ + Enabled: true, + Min: 100, + Max: 200, + Policy: autoscaling.Policy{ + Type: autoscaling.RoomOccupancy, + Parameters: autoscaling.PolicyParameters{ + RoomOccupancy: &autoscaling.RoomOccupancyParams{ + ReadyTarget: 0.1, + }, + }, + }, + }, + } + err := kubernetesRuntime.CreateScheduler(ctx, scheduler) + if err != nil { + require.ErrorIs(t, errors.ErrAlreadyExists, err) + } + + defer func() { + err := kubernetesRuntime.DeleteScheduler(ctx, scheduler) + if err != nil { + require.ErrorIs(t, errors.ErrNotFound, err) + } + }() + + time.Sleep(time.Millisecond * 100) + newValue := 100 + err = kubernetesRuntime.MitigateDisruption(ctx, scheduler, newValue, 0.0) + require.NoError(t, err) + + pdb, err := client.PolicyV1().PodDisruptionBudgets(scheduler.Name).Get(ctx, scheduler.Name, metav1.GetOptions{}) + require.NoError(t, err) + require.NotNil(t, pdb) + require.Equal(t, pdb.Name, scheduler.Name) + + incSafetyPercentage := 1.0 + DefaultDisruptionSafetyPercentage + require.Equal(t, int32(float64(newValue)*incSafetyPercentage), pdb.Spec.MinAvailable.IntVal) + }) +} diff --git a/internal/core/logs/logs.go b/internal/core/logs/logs.go index 10853acd6..7413abfba 100644 --- a/internal/core/logs/logs.go +++ b/internal/core/logs/logs.go @@ -34,4 +34,5 @@ const ( LogFieldExecutorName = "executor_name" LogFieldHandlerName = "handler_name" LogFieldOperationPhase = "operation_phase" + LogFieldRuntime = "runtime" ) diff --git a/internal/core/ports/mock/runtime_mock.go b/internal/core/ports/mock/runtime_mock.go index db456670b..afb0ebd0a 100644 --- a/internal/core/ports/mock/runtime_mock.go +++ b/internal/core/ports/mock/runtime_mock.go @@ -109,6 +109,20 @@ func (mr *MockRuntimeMockRecorder) DeleteScheduler(ctx, scheduler interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteScheduler", reflect.TypeOf((*MockRuntime)(nil).DeleteScheduler), ctx, scheduler) } +// MitigateDisruption mocks base method. +func (m *MockRuntime) MitigateDisruption(ctx context.Context, scheduler *entities.Scheduler, roomAmount int, safetyPercentage float64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MitigateDisruption", ctx, scheduler, roomAmount, safetyPercentage) + ret0, _ := ret[0].(error) + return ret0 +} + +// MitigateDisruption indicates an expected call of MitigateDisruption. +func (mr *MockRuntimeMockRecorder) MitigateDisruption(ctx, scheduler, roomAmount, safetyPercentage interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MitigateDisruption", reflect.TypeOf((*MockRuntime)(nil).MitigateDisruption), ctx, scheduler, roomAmount, safetyPercentage) +} + // WatchGameRoomInstances mocks base method. func (m *MockRuntime) WatchGameRoomInstances(ctx context.Context, scheduler *entities.Scheduler) (ports.RuntimeWatcher, error) { m.ctrl.T.Helper() diff --git a/internal/core/ports/runtime.go b/internal/core/ports/runtime.go index 985d7cb5c..82f110416 100644 --- a/internal/core/ports/runtime.go +++ b/internal/core/ports/runtime.go @@ -45,6 +45,8 @@ type Runtime interface { WatchGameRoomInstances(ctx context.Context, scheduler *entities.Scheduler) (RuntimeWatcher, error) // Create a name to the room. CreateGameRoomName(ctx context.Context, scheduler entities.Scheduler) (string, error) + // Apply changes to runtime to mitigate disruptions looking at current number of rooms + MitigateDisruption(ctx context.Context, scheduler *entities.Scheduler, roomAmount int, safetyPercentage float64) error } // RuntimeWatcher defines a process of watcher, it will have a chan with the diff --git a/internal/core/worker/config/runtime_watcher_config.go b/internal/core/worker/config/runtime_watcher_config.go new file mode 100644 index 000000000..a86c287d5 --- /dev/null +++ b/internal/core/worker/config/runtime_watcher_config.go @@ -0,0 +1,30 @@ +// MIT License +// +// Copyright (c) 2021 TFG Co +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package config + +import "time" + +type RuntimeWatcherConfig struct { + DisruptionWorkerIntervalSeconds time.Duration + DisruptionSafetyPercentage float64 +} diff --git a/internal/core/worker/runtimewatcher/runtime_watcher_worker.go b/internal/core/worker/runtimewatcher/runtime_watcher_worker.go index f5110f1ca..95c903d40 100644 --- a/internal/core/worker/runtimewatcher/runtime_watcher_worker.go +++ b/internal/core/worker/runtimewatcher/runtime_watcher_worker.go @@ -26,8 +26,10 @@ import ( "context" "fmt" "sync" + "time" "github.com/topfreegames/maestro/internal/core/logs" + "github.com/topfreegames/maestro/internal/core/worker/config" "github.com/topfreegames/maestro/internal/core/entities" "github.com/topfreegames/maestro/internal/core/entities/game_room" @@ -46,50 +48,44 @@ const WorkerName = "runtime_watcher" type runtimeWatcherWorker struct { scheduler *entities.Scheduler roomManager ports.RoomManager + roomStorage ports.RoomStorage // TODO(gabrielcorado): should we access the port directly? do we need to // provide the same `Watcher` interface but on the RoomManager? - runtime ports.Runtime - logger *zap.Logger - ctx context.Context - cancelFunc context.CancelFunc + runtime ports.Runtime + logger *zap.Logger + ctx context.Context + cancelFunc context.CancelFunc + config *config.RuntimeWatcherConfig + workerWaitGroup *sync.WaitGroup } func NewRuntimeWatcherWorker(scheduler *entities.Scheduler, opts *worker.WorkerOptions) worker.Worker { return &runtimeWatcherWorker{ - scheduler: scheduler, - roomManager: opts.RoomManager, - runtime: opts.Runtime, - logger: zap.L().With(zap.String(logs.LogFieldServiceName, WorkerName), zap.String(logs.LogFieldSchedulerName, scheduler.Name)), + scheduler: scheduler, + roomManager: opts.RoomManager, + roomStorage: opts.RoomStorage, + runtime: opts.Runtime, + logger: zap.L().With(zap.String(logs.LogFieldServiceName, WorkerName), zap.String(logs.LogFieldSchedulerName, scheduler.Name)), + workerWaitGroup: &sync.WaitGroup{}, + config: opts.RuntimeWatcherConfig, } } -func (w *runtimeWatcherWorker) Start(ctx context.Context) error { - watcher, err := w.runtime.WatchGameRoomInstances(ctx, w.scheduler) - if err != nil { - return fmt.Errorf("failed to start watcher: %w", err) - } - - w.ctx, w.cancelFunc = context.WithCancel(ctx) - defer w.cancelFunc() - - resultChan := watcher.ResultChan() - - workerWaitGroup := &sync.WaitGroup{} - +func (w *runtimeWatcherWorker) spawnUpdateRoomWatchers(resultChan chan game_room.InstanceEvent) { for i := 0; i < 200; i++ { - workerWaitGroup.Add(1) + w.workerWaitGroup.Add(1) go func(goroutineNumber int) { - defer workerWaitGroup.Done() + defer w.workerWaitGroup.Done() goroutineLogger := w.logger.With(zap.Int("goroutine", goroutineNumber)) goroutineLogger.Info("Starting event processing goroutine") for { select { case event, ok := <-resultChan: if !ok { - w.logger.Warn("resultChan closed, finishing worker goroutine", zap.Error(err)) + w.logger.Warn("resultChan closed, finishing worker goroutine") return } - err = w.processEvent(w.ctx, event) + err := w.processEvent(w.ctx, event) if err != nil { w.logger.Warn("failed to process event", zap.Error(err)) reportEventProcessingStatus(event, false) @@ -102,8 +98,82 @@ func (w *runtimeWatcherWorker) Start(ctx context.Context) error { } }(i) } +} + +func (w *runtimeWatcherWorker) mitigateDisruptions() error { + occupiedRoomsAmount, err := w.roomStorage.GetRoomCountByStatus(w.ctx, w.scheduler.Name, game_room.GameStatusOccupied) + if err != nil { + w.logger.Error( + "failed to get occupied rooms for scheduler", + zap.String("scheduler", w.scheduler.Name), + zap.Error(err), + ) + return err + } + err = w.runtime.MitigateDisruption(w.ctx, w.scheduler, occupiedRoomsAmount, w.config.DisruptionSafetyPercentage) + if err != nil { + w.logger.Error( + "failed to mitigate disruption", + zap.String("scheduler", w.scheduler.Name), + zap.Error(err), + ) + return err + } + w.logger.Debug( + "mitigated disruption for occupied rooms", + zap.String("scheduler", w.scheduler.Name), + zap.Int("occupiedRooms", occupiedRoomsAmount), + ) + + return nil +} + +func (w *runtimeWatcherWorker) spawnDisruptionWatcher() { + w.workerWaitGroup.Add(1) + + go func() { + defer w.workerWaitGroup.Done() + ticker := time.NewTicker(time.Second * w.config.DisruptionWorkerIntervalSeconds) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := w.mitigateDisruptions() + if err != nil { + w.logger.Warn( + "Mitigate Disruption watcher run failed", + zap.String("scheduler", w.scheduler.Name), + zap.Error(err), + ) + } + case <-w.ctx.Done(): + return + } + } + + }() +} + +func (w *runtimeWatcherWorker) spawnWatchers( + resultChan chan game_room.InstanceEvent, +) { + w.spawnUpdateRoomWatchers(resultChan) + w.spawnDisruptionWatcher() +} + +func (w *runtimeWatcherWorker) Start(ctx context.Context) error { + watcher, err := w.runtime.WatchGameRoomInstances(ctx, w.scheduler) + if err != nil { + return fmt.Errorf("failed to start watcher: %w", err) + } + + w.ctx, w.cancelFunc = context.WithCancel(ctx) + defer w.cancelFunc() + + w.spawnWatchers(watcher.ResultChan()) - workerWaitGroup.Wait() + w.workerWaitGroup.Wait() watcher.Stop() return nil } diff --git a/internal/core/worker/runtimewatcher/runtime_watcher_worker_test.go b/internal/core/worker/runtimewatcher/runtime_watcher_worker_test.go index 8eef6e4de..0c0434405 100644 --- a/internal/core/worker/runtimewatcher/runtime_watcher_worker_test.go +++ b/internal/core/worker/runtimewatcher/runtime_watcher_worker_test.go @@ -37,19 +37,26 @@ import ( "github.com/topfreegames/maestro/internal/core/entities" "github.com/topfreegames/maestro/internal/core/entities/game_room" porterrors "github.com/topfreegames/maestro/internal/core/ports/errors" + "github.com/topfreegames/maestro/internal/core/ports/mock" mockports "github.com/topfreegames/maestro/internal/core/ports/mock" "github.com/topfreegames/maestro/internal/core/worker" + "github.com/topfreegames/maestro/internal/core/worker/config" ) -func workerOptions(t *testing.T) (*gomock.Controller, *mockports.MockRuntime, *mockports.MockRoomManager, *worker.WorkerOptions) { +func workerOptions(t *testing.T) (*gomock.Controller, *mockports.MockRuntime, *mockports.MockRoomManager, *mockports.MockRoomStorage, *worker.WorkerOptions) { mockCtrl := gomock.NewController(t) runtime := mockports.NewMockRuntime(mockCtrl) roomManager := mockports.NewMockRoomManager(mockCtrl) + roomStorage := mock.NewMockRoomStorage(mockCtrl) - return mockCtrl, runtime, roomManager, &worker.WorkerOptions{ + return mockCtrl, runtime, roomManager, roomStorage, &worker.WorkerOptions{ Runtime: runtime, RoomManager: roomManager, + RoomStorage: roomStorage, + RuntimeWatcherConfig: &config.RuntimeWatcherConfig{ + DisruptionWorkerIntervalSeconds: 1, + }, } } @@ -61,7 +68,7 @@ func TestRuntimeWatcher_Start(t *testing.T) { for _, event := range events { t.Run(fmt.Sprintf("when %s happens, updates instance", event.String()), func(t *testing.T) { - mockCtrl, runtime, roomManager, workerOptions := workerOptions(t) + mockCtrl, runtime, roomManager, _, workerOptions := workerOptions(t) scheduler := &entities.Scheduler{Name: "test"} watcher := NewRuntimeWatcherWorker(scheduler, workerOptions) @@ -102,7 +109,7 @@ func TestRuntimeWatcher_Start(t *testing.T) { }) t.Run(fmt.Sprintf("when %s happens, and update instance fails, does nothing", event.String()), func(t *testing.T) { - mockCtrl, runtime, roomManager, workerOptions := workerOptions(t) + mockCtrl, runtime, roomManager, _, workerOptions := workerOptions(t) scheduler := &entities.Scheduler{Name: "test"} watcher := NewRuntimeWatcherWorker(scheduler, workerOptions) @@ -144,7 +151,7 @@ func TestRuntimeWatcher_Start(t *testing.T) { } t.Run("fails to start watcher", func(t *testing.T) { - _, runtime, _, workerOptions := workerOptions(t) + _, runtime, _, _, workerOptions := workerOptions(t) scheduler := &entities.Scheduler{Name: "test"} watcher := NewRuntimeWatcherWorker(scheduler, workerOptions) @@ -167,7 +174,7 @@ func TestRuntimeWatcher_Start(t *testing.T) { }) t.Run("clean room state on delete event", func(t *testing.T) { - mockCtrl, runtime, roomManager, workerOptions := workerOptions(t) + mockCtrl, runtime, roomManager, _, workerOptions := workerOptions(t) scheduler := &entities.Scheduler{Name: "test"} watcher := NewRuntimeWatcherWorker(scheduler, workerOptions) @@ -206,7 +213,7 @@ func TestRuntimeWatcher_Start(t *testing.T) { }) t.Run("when clean room state fails, does nothing", func(t *testing.T) { - mockCtrl, runtime, roomManager, workerOptions := workerOptions(t) + mockCtrl, runtime, roomManager, _, workerOptions := workerOptions(t) scheduler := &entities.Scheduler{Name: "test"} watcher := NewRuntimeWatcherWorker(scheduler, workerOptions) @@ -245,25 +252,29 @@ func TestRuntimeWatcher_Start(t *testing.T) { }) t.Run("when resultChan is closed, worker stops without error", func(t *testing.T) { - mockCtrl, runtime, _, workerOptions := workerOptions(t) + mockCtrl, runtime, _, roomStorage, workerOptions := workerOptions(t) scheduler := &entities.Scheduler{Name: "test"} watcher := NewRuntimeWatcherWorker(scheduler, workerOptions) runtimeWatcher := mockports.NewMockRuntimeWatcher(mockCtrl) runtime.EXPECT().WatchGameRoomInstances(gomock.Any(), scheduler).Return(runtimeWatcher, nil) - resultChan := make(chan game_room.InstanceEvent) runtimeWatcher.EXPECT().ResultChan().Return(resultChan) runtimeWatcher.EXPECT().Stop() + runtime.EXPECT().MitigateDisruption(gomock.Any(), gomock.Any(), 0, 0.0).Return(nil).MinTimes(0) + roomStorage.EXPECT().GetRoomCountByStatus(gomock.Any(), gomock.Any(), gomock.Any()).Return(0, nil).MinTimes(0) + + ctx, cancelFunc := context.WithCancel(context.Background()) + go func() { time.Sleep(time.Millisecond * 100) close(resultChan) + cancelFunc() }() - err := watcher.Start(context.Background()) + err := watcher.Start(ctx) require.NoError(t, err) }) - } diff --git a/internal/core/worker/worker.go b/internal/core/worker/worker.go index 3adf293e2..3f2dd9f48 100644 --- a/internal/core/worker/worker.go +++ b/internal/core/worker/worker.go @@ -55,6 +55,7 @@ type WorkerOptions struct { RoomStorage ports.RoomStorage InstanceStorage ports.GameRoomInstanceStorage MetricsReporterConfig *config.MetricsReporterConfig + RuntimeWatcherConfig *config.RuntimeWatcherConfig } // Configuration holds all worker configuration parameters.