Skip to content

Commit

Permalink
feat(healthcontroller): ensure rooms in cur scheduler version
Browse files Browse the repository at this point in the history
There might be cases in which we have rooms running with multiple
scheduler versions: during scheduler switch version. We want to
ensure that only rooms for the current version are running and
delete the others following the down surge.
  • Loading branch information
hspedro committed Apr 19, 2024
1 parent 3f32282 commit 53db26a
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 12 deletions.
75 changes: 68 additions & 7 deletions internal/core/operations/healthcontroller/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Config struct {
type Executor struct {
autoscaler ports.Autoscaler
roomStorage ports.RoomStorage
roomManager ports.RoomManager
instanceStorage ports.GameRoomInstanceStorage
schedulerStorage ports.SchedulerStorage
operationManager ports.OperationManager
Expand All @@ -61,10 +62,20 @@ type Executor struct {
var _ operations.Executor = (*Executor)(nil)

// NewExecutor creates a new instance of Executor.
func NewExecutor(roomStorage ports.RoomStorage, instanceStorage ports.GameRoomInstanceStorage, schedulerStorage ports.SchedulerStorage, operationManager ports.OperationManager, autoscaler ports.Autoscaler, config Config) *Executor {
func NewExecutor(
roomStorage ports.RoomStorage,
roomManager ports.RoomManager,
instanceStorage ports.GameRoomInstanceStorage,
schedulerStorage ports.SchedulerStorage,
operationManager ports.OperationManager,
autoscaler ports.Autoscaler,
config Config,
) *Executor {
return &Executor{
autoscaler: autoscaler,
autoscaler: autoscaler,
// TODO: replace roomStorage operations with roomManager
roomStorage: roomStorage,
roomManager: roomManager,
instanceStorage: instanceStorage,
schedulerStorage: schedulerStorage,
operationManager: operationManager,
Expand Down Expand Up @@ -98,21 +109,37 @@ func (ex *Executor) Execute(ctx context.Context, op *operation.Operation, defini

if len(expiredRooms) > 0 {
logger.Sugar().Infof("found %v expired rooms to be deleted", len(expiredRooms))
err = ex.enqueueRemoveExpiredRooms(ctx, op, logger, expiredRooms)
err = ex.enqueueRemoveRooms(ctx, op, logger, expiredRooms)
if err != nil {
logger.Error("could not enqueue operation to delete expired rooms", zap.Error(err))
}
ex.setTookAction(def, true)
}

// Downscale based on if there are rooms with scheduler other than the current version
roomsMarkedForDeletion, err := ex.ensureCurrentSchedulerVersion(ctx, logger, scheduler, availableRooms)
if err != nil {
logger.Error("cannot ensure only rooms with current scheduler are running", zap.Error(err))
return err
}
if len(roomsMarkedForDeletion) > 0 {
logger.Sugar().Infof("found %v rooms not matching cur scheduler to be deleted", len(roomsMarkedForDeletion))
err = ex.enqueueRemoveRooms(ctx, op, logger, roomsMarkedForDeletion)
if err != nil {
logger.Error("could not enqueue operation to delete not-matcihng cur scheduler rooms", zap.Error(err))
}
ex.setTookAction(def, true)
}
availableRoomsAmount := len(availableRooms) - len(roomsMarkedForDeletion)

desiredNumberOfRooms, err := ex.getDesiredNumberOfRooms(ctx, logger, scheduler)
if err != nil {
logger.Error("error getting the desired number of rooms", zap.Error(err))
return err
}
reportDesiredNumberOfRooms(scheduler.Game, scheduler.Name, desiredNumberOfRooms)

err = ex.ensureDesiredAmountOfInstances(ctx, op, def, scheduler, logger, len(availableRooms), desiredNumberOfRooms)
err = ex.ensureDesiredAmountOfInstances(ctx, op, def, scheduler, logger, availableRoomsAmount, desiredNumberOfRooms)
if err != nil {
logger.Error("cannot ensure desired amount of instances", zap.Error(err))
return err
Expand Down Expand Up @@ -277,15 +304,15 @@ func (ex *Executor) isRoomStatus(room *game_room.GameRoom, status game_room.Game
return room.Status == status
}

func (ex *Executor) enqueueRemoveExpiredRooms(ctx context.Context, op *operation.Operation, logger *zap.Logger, expiredRoomsIDs []string) error {
func (ex *Executor) enqueueRemoveRooms(ctx context.Context, op *operation.Operation, logger *zap.Logger, roomsIDs []string) error {
removeOperation, err := ex.operationManager.CreatePriorityOperation(ctx, op.SchedulerName, &remove.Definition{
RoomsIDs: expiredRoomsIDs,
RoomsIDs: roomsIDs,
})
if err != nil {
return err
}

msgToAppend := fmt.Sprintf("created operation (id: %s) to remove %v expired rooms.", removeOperation.ID, len(expiredRoomsIDs))
msgToAppend := fmt.Sprintf("created operation (id: %s) to remove %v rooms.", removeOperation.ID, len(roomsIDs))
logger.Info(msgToAppend)
ex.operationManager.AppendOperationEventToExecutionHistory(ctx, op, msgToAppend)

Expand Down Expand Up @@ -367,3 +394,37 @@ func (ex *Executor) canPerformDownscale(ctx context.Context, scheduler *entities

return can && !waitingCooldown, "ok"
}

// Ensure we only have game room instances with the current scheduler version
func (ex *Executor) ensureCurrentSchedulerVersion(
ctx context.Context,
logger *zap.Logger,
scheduler *entities.Scheduler,
availableRoomIDs []string,
) ([]string, error) {
var roomsMarkedForDeletion []string
logger.Debug(
"ensuring only rooms with current scheduler version are running",
zap.String("scheduler.Name", scheduler.Name),
zap.String("scheduler.Version", scheduler.Spec.Version),
)
downSurgeAmount, err := ex.roomManager.SchedulerDownSurge(ctx, scheduler)
if err != nil {
logger.Error("could not compute down surge amount, not ensuring scheduler version", zap.Error(err))
return roomsMarkedForDeletion, err
}
for _, roomID := range availableRoomIDs {
room, err := ex.roomStorage.GetRoom(ctx, scheduler.Name, roomID)
if err != nil {
continue
}
if room.Version != scheduler.Spec.Version {
roomsMarkedForDeletion = append(roomsMarkedForDeletion, roomID)
}
if len(roomsMarkedForDeletion) == downSurgeAmount {
break
}
}
logger.Debug("marked rooms for deletion", zap.Strings("rooms", roomsMarkedForDeletion))
return roomsMarkedForDeletion, nil
}
Loading

0 comments on commit 53db26a

Please sign in to comment.