Skip to content

Commit

Permalink
refactor(healthcontroller): perform rolling update
Browse files Browse the repository at this point in the history
It is now responsibility of the healthcontroller to perform a rolling
update. When there was a switch on the current scheduler active version
we need to rollout pods with the new version. Health controller either
does autoscaling or rolling update.

On rolling update, the following will happen:
1. Are there rooms with a previous scheduler version? If so, start
   update
2. Compute how many rooms we can surge (above desired from autoscaling)
3. Enqueue priority operation to create this rooms
4. Check how many Ready rooms we have above desired from autoscaling
5. Enqueue operation to delete those rooms - they are a buffer, should
   never offend readyTarget
  • Loading branch information
hspedro committed May 22, 2024
1 parent ed26eba commit 1ca2dd7
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 11 deletions.
147 changes: 141 additions & 6 deletions internal/core/operations/healthcontroller/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Config struct {
type Executor struct {
autoscaler ports.Autoscaler
roomStorage ports.RoomStorage
roomManager ports.RoomManager
instanceStorage ports.GameRoomInstanceStorage
schedulerStorage ports.SchedulerStorage
operationManager ports.OperationManager
Expand All @@ -60,10 +61,20 @@ type Executor struct {
var _ operations.Executor = (*Executor)(nil)

// NewExecutor creates a new instance of Executor.
func NewExecutor(roomStorage ports.RoomStorage, instanceStorage ports.GameRoomInstanceStorage, schedulerStorage ports.SchedulerStorage, operationManager ports.OperationManager, autoscaler ports.Autoscaler, config Config) *Executor {
func NewExecutor(
roomStorage ports.RoomStorage,
roomManager ports.RoomManager,
instanceStorage ports.GameRoomInstanceStorage,
schedulerStorage ports.SchedulerStorage,
operationManager ports.OperationManager,
autoscaler ports.Autoscaler,
config Config,
) *Executor {
return &Executor{
autoscaler: autoscaler,
autoscaler: autoscaler,
// TODO: replace roomStorage operations with roomManager
roomStorage: roomStorage,
roomManager: roomManager,
instanceStorage: instanceStorage,
schedulerStorage: schedulerStorage,
operationManager: operationManager,
Expand Down Expand Up @@ -97,7 +108,7 @@ func (ex *Executor) Execute(ctx context.Context, op *operation.Operation, defini

if len(expiredRooms) > 0 {
logger.Sugar().Infof("found %v expired rooms to be deleted", len(expiredRooms))
err = ex.enqueueRemoveExpiredRooms(ctx, op, logger, expiredRooms)
err = ex.enqueueRemoveRooms(ctx, op, logger, expiredRooms)
if err != nil {
logger.Error("could not enqueue operation to delete expired rooms", zap.Error(err))
}
Expand All @@ -111,6 +122,11 @@ func (ex *Executor) Execute(ctx context.Context, op *operation.Operation, defini
}
reportDesiredNumberOfRooms(scheduler.Game, scheduler.Name, desiredNumberOfRooms)

// Check if the system is in a rollingUpdate by listing rooms that are not the current scheduler version
roomsPreviousSchedulerVersion, isRollingUpdate := ex.checkRollingUpdate(ctx, logger, scheduler, availableRooms)
if isRollingUpdate {
return ex.performRollingUpdate(ctx, op, def, logger, scheduler, desiredNumberOfRooms, availableRooms, roomsPreviousSchedulerVersion)
}
err = ex.ensureDesiredAmountOfInstances(ctx, op, def, scheduler, logger, len(availableRooms), desiredNumberOfRooms)
if err != nil {
logger.Error("cannot ensure desired amount of instances", zap.Error(err))
Expand Down Expand Up @@ -276,15 +292,15 @@ func (ex *Executor) isRoomStatus(room *game_room.GameRoom, status game_room.Game
return room.Status == status
}

func (ex *Executor) enqueueRemoveExpiredRooms(ctx context.Context, op *operation.Operation, logger *zap.Logger, expiredRoomsIDs []string) error {
func (ex *Executor) enqueueRemoveRooms(ctx context.Context, op *operation.Operation, logger *zap.Logger, roomsIDs []string) error {
removeOperation, err := ex.operationManager.CreatePriorityOperation(ctx, op.SchedulerName, &remove.Definition{
RoomsIDs: expiredRoomsIDs,
RoomsIDs: roomsIDs,
})
if err != nil {
return err
}

msgToAppend := fmt.Sprintf("created operation (id: %s) to remove %v expired rooms.", removeOperation.ID, len(expiredRoomsIDs))
msgToAppend := fmt.Sprintf("created operation (id: %s) to remove %v rooms.", removeOperation.ID, len(roomsIDs))
logger.Info(msgToAppend)
ex.operationManager.AppendOperationEventToExecutionHistory(ctx, op, msgToAppend)

Expand Down Expand Up @@ -366,3 +382,122 @@ func (ex *Executor) canPerformDownscale(ctx context.Context, scheduler *entities

return can && !waitingCooldown, "ok"
}

func (ex *Executor) checkRollingUpdate(
ctx context.Context,
logger *zap.Logger,
scheduler *entities.Scheduler,
availableRoomsIDs []string,
) ([]string, bool) {
logger.Debug("checking if system is in the middle of a rolling update of scheduler", zap.String("scheduler.Version", scheduler.Spec.Version))
var roomsWithPreviousSchedulerVersion []string
for _, roomID := range availableRoomsIDs {
room, err := ex.roomStorage.GetRoom(ctx, scheduler.Name, roomID)
if err == nil && room.Version != scheduler.Spec.Version {
roomsWithPreviousSchedulerVersion = append(roomsWithPreviousSchedulerVersion, roomID)
}
}
logger.Debug(
"rooms that did not match current scheduler versions",
zap.String("scheduler.Version", scheduler.Spec.Version),
zap.Int("rooms", len(roomsWithPreviousSchedulerVersion)),
)
return roomsWithPreviousSchedulerVersion, len(roomsWithPreviousSchedulerVersion) != 0
}

func (ex *Executor) performRollingUpdate(
ctx context.Context,
op *operation.Operation,
def *Definition,
logger *zap.Logger,
scheduler *entities.Scheduler,
desiredNumberOfRooms int,
availableRoomsIDs []string,
roomsWithPreviousSchedulerVersion []string,
) error {
logger.Info("performing rolling update", zap.String("scheduler.Version", scheduler.Spec.Version))
maxSurgeAmount, err := ex.roomManager.SchedulerMaxSurge(ctx, scheduler)
if err != nil {
logger.Error("failed to perform rolling update while getting max surge amount of rooms", zap.Error(err))
return err
}
maxRoomsToSurge := desiredNumberOfRooms + maxSurgeAmount - len(availableRoomsIDs)
if len(roomsWithPreviousSchedulerVersion) < maxRoomsToSurge {
maxRoomsToSurge = len(roomsWithPreviousSchedulerVersion)
}
if maxRoomsToSurge <= 0 {
maxRoomsToSurge = 1
}
logger.Info(
"upscaling new rooms",
zap.Int("desired", desiredNumberOfRooms),
zap.Int("maxSurgeAmount", maxSurgeAmount),
zap.Int("available", len(availableRoomsIDs)),
zap.Int("oldRooms", len(roomsWithPreviousSchedulerVersion)),
zap.Int("maxRoomsToSurge", maxRoomsToSurge),
)
addOp, err := ex.operationManager.CreatePriorityOperation(ctx, op.SchedulerName, &add.Definition{
Amount: int32(maxRoomsToSurge),
})
if err != nil {
logger.Error("failed to enqueue add operation for rolling update", zap.Error(err))
return err
}
msgToAppend := fmt.Sprintf("created operation (id: %s) to surge %v rooms.", addOp.ID, maxSurgeAmount)
ex.operationManager.AppendOperationEventToExecutionHistory(ctx, op, msgToAppend)
ex.setTookAction(def, true)

roomsMarkedForDeletion, err := ex.markPreviousSchedulerRoomsForDeletion(
ctx,
logger,
scheduler,
roomsWithPreviousSchedulerVersion,
desiredNumberOfRooms,
)
if err != nil {
logger.Error("could not delete rooms with previous scheduler version", zap.Error(err))
return err
}
if len(roomsMarkedForDeletion) <= 0 {
return nil
}
removeOp, err := ex.operationManager.CreateOperation(ctx, op.SchedulerName, &remove.Definition{
RoomsIDs: roomsMarkedForDeletion,
})
if err != nil {
logger.Error("failed to enqueue remove operation for rolling update", zap.Error(err))
return err
}
msgToAppend = fmt.Sprintf("created operation (id: %s) to remove rooms with previous scheduler version.", removeOp.ID)
ex.operationManager.AppendOperationEventToExecutionHistory(ctx, op, msgToAppend)
ex.setTookAction(def, true)

return nil
}

func (ex *Executor) markPreviousSchedulerRoomsForDeletion(
ctx context.Context,
logger *zap.Logger,
scheduler *entities.Scheduler,
roomsWithPreviousSchedulerVersion []string,
desiredNumberOfRooms int,
) ([]string, error) {
logger.Debug(
"ensuring only rooms with current scheduler version are running",
zap.String("scheduler.Name", scheduler.Name),
zap.String("scheduler.Version", scheduler.Spec.Version),
)
readyRoomIDs, err := ex.roomStorage.GetRoomIDsByStatus(ctx, scheduler.Name, game_room.GameStatusReady)
if err != nil {
return []string{}, fmt.Errorf("failed to list scheduler rooms on ready status: %w", err)
}
bufferRoomsToBeRemoved := len(readyRoomIDs) - desiredNumberOfRooms
if bufferRoomsToBeRemoved < 0 {
logger.Sugar().Debugf("can not delete old rooms without offending autoscale: %d", bufferRoomsToBeRemoved)
return []string{}, nil
}
if bufferRoomsToBeRemoved > len(roomsWithPreviousSchedulerVersion) {
return roomsWithPreviousSchedulerVersion, nil
}
return roomsWithPreviousSchedulerVersion[:bufferRoomsToBeRemoved], nil
}
Loading

0 comments on commit 1ca2dd7

Please sign in to comment.