Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic PDB to Game Room Namespaces #607

Merged
merged 6 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions cmd/runtimewatcher/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/topfreegames/maestro/internal/core/services/events"
"github.com/topfreegames/maestro/internal/core/services/workers"
"github.com/topfreegames/maestro/internal/core/worker"
workerconfigs "github.com/topfreegames/maestro/internal/core/worker/config"
"github.com/topfreegames/maestro/internal/core/worker/runtimewatcher"
"github.com/topfreegames/maestro/internal/service"
)
Expand All @@ -42,16 +43,24 @@ func provideRuntimeWatcherBuilder() *worker.WorkerBuilder {
}
}

func provideRuntimeWatcherConfig(c config.Config) *workerconfigs.RuntimeWatcherConfig {
return &workerconfigs.RuntimeWatcherConfig{
DisruptionWorkerIntervalSeconds: c.GetDuration("runtimeWatcher.disruptionWorker.intervalSeconds"),
DisruptionSafetyPercentage: c.GetFloat64("runtimeWatcher.disruptionWorker.safetyPercentage"),
}
}

var WorkerOptionsSet = wire.NewSet(
service.NewRuntimeKubernetes,
service.NewRoomStorageRedis,
RoomManagerSet,
wire.Struct(new(worker.WorkerOptions), "RoomManager", "Runtime"))
provideRuntimeWatcherConfig,
wire.Struct(new(worker.WorkerOptions), "Runtime", "RoomStorage", "RoomManager", "RuntimeWatcherConfig"))

var RoomManagerSet = wire.NewSet(
service.NewSchedulerStoragePg,
service.NewClockTime,
service.NewPortAllocatorRandom,
service.NewRoomStorageRedis,
service.NewGameRoomInstanceStorageRedis,
service.NewSchedulerCacheRedis,
service.NewRoomManagerConfig,
Expand Down
28 changes: 20 additions & 8 deletions cmd/runtimewatcher/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ workers:
reporter:
metrics:
intervalMillis: 10000
runtimeWatcher:
disruptionWorker:
intervalSeconds: 5
safetyPercentage: 0.05

services:
roomManager:
Expand Down
3 changes: 2 additions & 1 deletion docs/reference/Architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ You could find all operations at [Operations section](Operations.md#available-op

> Note: In Maestro a worker is a collection of routines that executes a flow related to one and only one **Scheduler** each.

Runtime Watcher Worker listens to runtime events related to the **Scheduler** and reflects the changes in **Maestro**. Currently, it listens for Game Rooms creation, deletion, and update.
Runtime Watcher Worker listens to runtime events related to the **Scheduler** and reflects the changes in **Maestro**. Currently, it mitigate disruptions by looking at the current
amount of occupied rooms, and it listens for Game Rooms creation, deletion, and update.

![Runtime Watcher Worker IMAGE](../images/Architecture-Runtime-Watcher-Worker.jpg)

Expand Down
27 changes: 25 additions & 2 deletions docs/reference/Kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,31 @@ flowchart BT
```

### Runtime watcher
The runtime watcher component maintains a worker process for each scheduler that keeps watching and processing _change
events_ in pods resources. For doing that, it uses a [pods informer](https://pkg.go.dev/k8s.io/client-go/informers),
The runtime watcher component spawn two types of workers: one that is responsible for mitigating disruptions and another for processing _change events_ in pods resources.

#### Disruption Worker

This worker consists in a single goroutine with a ticker. Each time it runs, it will
check the number of occupied rooms at the time and try to mitigate disruptions. For k8s,
this mitigation consists on applying a PDB to the scheduler's namespace, that has
`minAvailable` equals to the number of occupied rooms plus a safety percentage.

One can configure the interval in which this worker runs and also the safety percentage
in `config/config.yaml`:

```yaml
runtimeWatcher:
disruptionWorker:
intervalSeconds: 5
safetyPercentage: 0.05
```

#### Pod Change Events Worker

For this type of worker, runtime watcher spawns multiple goroutines, maintaining a worker
process for each scheduler that keeps watching and processing _change events_ in pods
resources. For doing that, it uses a
[pods informer](https://pkg.go.dev/k8s.io/client-go/informers),
binding handlers for **add**, **update** and **delete** events for all pods managed by it.

This component is not responsible for updating/creating/deleting
Expand Down
8 changes: 7 additions & 1 deletion internal/adapters/runtime/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@
package kubernetes

import (
"github.com/topfreegames/maestro/internal/core/logs"
"github.com/topfreegames/maestro/internal/core/ports"
"go.uber.org/zap"
kube "k8s.io/client-go/kubernetes"
)

var _ ports.Runtime = (*kubernetes)(nil)

type kubernetes struct {
clientSet kube.Interface
logger *zap.Logger
}

func New(clientSet kube.Interface) *kubernetes {
return &kubernetes{clientSet}
return &kubernetes{
clientSet: clientSet,
logger: zap.L().With(zap.String(logs.LogFieldRuntime, "kubernetes")),
}
}
164 changes: 163 additions & 1 deletion internal/adapters/runtime/kubernetes/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,113 @@ package kubernetes

import (
"context"
"strconv"

"github.com/topfreegames/maestro/internal/core/entities"
"github.com/topfreegames/maestro/internal/core/ports/errors"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
v1Policy "k8s.io/api/policy/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

const (
DefaultDisruptionSafetyPercentage float64 = 0.05
MajorKubeVersionPDB int = 1
MinorKubeVersionPDB int = 21
)

func (k *kubernetes) isPDBSupported() bool {
// Check based on the kube version of the clientSet if PDBs are supported (1.21+)
version, err := k.clientSet.Discovery().ServerVersion()
if err != nil {
k.logger.Warn("Could not get kube API version, can not check for PDB support", zap.Error(err))
return false
}
major, err := strconv.Atoi(version.Major)
if err != nil {
k.logger.Warn(
"Could not convert major kube API version to int, can not check for PDB support",
zap.String("majorKubeAPIVersion", version.Major),
)
return false
}
if major < MajorKubeVersionPDB {
k.logger.Warn(
"Can not create PDB for this kube API version",
zap.Int("majorKubeAPIVersion", major),
zap.Int("majorPDBVersionRequired", MajorKubeVersionPDB),
)
return false
}
minor, err := strconv.Atoi(version.Minor)
if err != nil {
k.logger.Warn(
"Could not convert minor kube API version to int, can not check for PDB support",
zap.String("minorKubeAPIVersion", version.Minor),
)
return false
}
if minor < MinorKubeVersionPDB {
k.logger.Warn(
"Can not create PDB for this kube API version",
zap.Int("minorKubeAPIVersion", minor),
zap.Int("minorPDBVersionRequired", MinorKubeVersionPDB),
)
return false
}
return true
}

func (k *kubernetes) createPDBFromScheduler(ctx context.Context, scheduler *entities.Scheduler) (*v1Policy.PodDisruptionBudget, error) {
if scheduler == nil {
return nil, errors.NewErrInvalidArgument("scheduler pointer can not be nil")
}
pdbSpec := &v1Policy.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: scheduler.Name,
},
Spec: v1Policy.PodDisruptionBudgetSpec{
MinAvailable: &intstr.IntOrString{
Type: intstr.Int,
IntVal: int32(0),
},
},
}

if scheduler.Autoscaling != nil && scheduler.Autoscaling.Enabled {
pdbSpec.Spec.MinAvailable = &intstr.IntOrString{
Type: intstr.Int,
IntVal: int32(scheduler.Autoscaling.Min),
}
}

pdb, err := k.clientSet.PolicyV1().PodDisruptionBudgets(scheduler.Name).Create(ctx, pdbSpec, metav1.CreateOptions{})
if err != nil && !kerrors.IsAlreadyExists(err) {
k.logger.Warn("error creating pdb", zap.String("scheduler", scheduler.Name), zap.Error(err))
return nil, err
}

return pdb, nil
}

func (k *kubernetes) deletePDBFromScheduler(ctx context.Context, scheduler *entities.Scheduler) error {
if scheduler == nil {
return errors.NewErrInvalidArgument("scheduler pointer can not be nil")
}
if !k.isPDBSupported() {
return errors.NewErrUnexpected("PDBs are not supported for this kube API version")
}
err := k.clientSet.PolicyV1().PodDisruptionBudgets(scheduler.Name).Delete(ctx, scheduler.Name, metav1.DeleteOptions{})
if err != nil && !kerrors.IsNotFound(err) {
k.logger.Warn("error deleting pdb", zap.String("scheduler", scheduler.Name), zap.Error(err))
return err
}
return nil
}

func (k *kubernetes) CreateScheduler(ctx context.Context, scheduler *entities.Scheduler) error {
namespace := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -48,11 +147,20 @@ func (k *kubernetes) CreateScheduler(ctx context.Context, scheduler *entities.Sc
return errors.NewErrUnexpected("error creating scheduler: %s", err)
}

_, err = k.createPDBFromScheduler(ctx, scheduler)
if err != nil {
k.logger.Warn("PDB Creation during scheduler creation failed", zap.String("scheduler", scheduler.Name), zap.Error(err))
}

return nil
}

func (k *kubernetes) DeleteScheduler(ctx context.Context, scheduler *entities.Scheduler) error {
err := k.clientSet.CoreV1().Namespaces().Delete(ctx, scheduler.Name, metav1.DeleteOptions{})
err := k.deletePDBFromScheduler(ctx, scheduler)
if err != nil {
k.logger.Warn("PDB Deletion during scheduler deletion failed", zap.String("scheduler", scheduler.Name), zap.Error(err))
}
err = k.clientSet.CoreV1().Namespaces().Delete(ctx, scheduler.Name, metav1.DeleteOptions{})
if err != nil {
if kerrors.IsNotFound(err) {
return errors.NewErrNotFound("scheduler '%s' not found", scheduler.Name)
Expand All @@ -63,3 +171,57 @@ func (k *kubernetes) DeleteScheduler(ctx context.Context, scheduler *entities.Sc

return nil
}

func (k *kubernetes) MitigateDisruption(
ctx context.Context,
scheduler *entities.Scheduler,
roomAmount int,
safetyPercentage float64,
) error {
if scheduler == nil {
return errors.NewErrInvalidArgument("empty pointer received for scheduler, can not mitigate disruptions")
}

incSafetyPercentage := 1.0
if safetyPercentage < DefaultDisruptionSafetyPercentage {
k.logger.Warn(
"invalid safety percentage, using default percentage",
zap.Float64("safetyPercentage", safetyPercentage),
zap.Float64("DefaultDisruptionSafetyPercentage", DefaultDisruptionSafetyPercentage),
)
safetyPercentage = DefaultDisruptionSafetyPercentage
}
incSafetyPercentage += safetyPercentage

// For kubernetes mitigating disruptions means updating the current PDB
// minAvailable to the number of occupied rooms if above a threshold
pdb, err := k.clientSet.PolicyV1().PodDisruptionBudgets(scheduler.Name).Get(ctx, scheduler.Name, metav1.GetOptions{})
if err != nil && !kerrors.IsNotFound(err) {
// Non-recoverable errors
return errors.NewErrUnexpected("non recoverable error when getting PDB for scheduler '%s': %s", scheduler.Name, err)
}

if pdb == nil || kerrors.IsNotFound(err) {
pdb, err = k.createPDBFromScheduler(ctx, scheduler)
if err != nil {
return errors.NewErrUnexpected("error creating PDB for scheduler '%s': %s", scheduler.Name, err)
}
}

currentPdbMinAvailable := pdb.Spec.MinAvailable.IntVal
if currentPdbMinAvailable == int32(float64(roomAmount)*incSafetyPercentage) {
return nil
}

pdb.Spec.MinAvailable = &intstr.IntOrString{
Type: intstr.Int,
IntVal: int32(float64(roomAmount) * incSafetyPercentage),
}

_, err = k.clientSet.PolicyV1().PodDisruptionBudgets(scheduler.Name).Update(ctx, pdb, metav1.UpdateOptions{})
if err != nil {
return errors.NewErrUnexpected("error updating PDB to mitigate disruptions for scheduler '%s': %s", scheduler.Name, err)
}

return nil
}
Loading
Loading