Skip to content

Commit

Permalink
Chore/add ttl to add and remove rooms operation (#491)
Browse files Browse the repository at this point in the history
* Add add_rooms and remove_rooms operation TTL
  • Loading branch information
arthur29 authored Jul 12, 2022
1 parent c674949 commit f053f0a
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 60 deletions.
2 changes: 1 addition & 1 deletion cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var migrateCmd = &cobra.Command{

m, err := migrate.New(
config.GetString("migration.path"),
service.GetSchedulerStoragePostgresUrl(config))
service.GetSchedulerStoragePostgresURL(config))
if err != nil {
zap.L().With(zap.Error(err)).Fatal("failed to create migration")
}
Expand Down
89 changes: 55 additions & 34 deletions internal/service/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"github.com/topfreegames/maestro/internal/core/services/scheduler_manager"

"github.com/topfreegames/maestro/internal/core/entities/autoscaling"
"github.com/topfreegames/maestro/internal/core/operations/add_rooms"
"github.com/topfreegames/maestro/internal/core/operations/healthcontroller"
"github.com/topfreegames/maestro/internal/core/operations/remove_rooms"

operationadapters "github.com/topfreegames/maestro/internal/adapters/operation"

Expand Down Expand Up @@ -62,120 +64,134 @@ import (
// configurations paths for the adapters
const (
// Kubernetes runtime
runtimeKubernetesMasterUrlPath = "adapters.runtime.kubernetes.masterUrl"
runtimeKubernetesMasterURLPath = "adapters.runtime.kubernetes.masterUrl"
runtimeKubernetesKubeconfigPath = "adapters.runtime.kubernetes.kubeconfig"
runtimeKubernetesInCluster = "adapters.runtime.kubernetes.inCluster"
runtimeKubernetesInClusterPath = "adapters.runtime.kubernetes.inCluster"
// Redis operation storage
operationStorageRedisUrlPath = "adapters.operationStorage.redis.url"
operationLeaseStorageRedisUrlPath = "adapters.operationLeaseStorage.redis.url"
operationStorageRedisURLPath = "adapters.operationStorage.redis.url"
operationLeaseStorageRedisURLPath = "adapters.operationLeaseStorage.redis.url"
// Redis room storage
roomStorageRedisUrlPath = "adapters.roomStorage.redis.url"
roomStorageRedisURLPath = "adapters.roomStorage.redis.url"
// Redis scheduler cache
schedulerCacheRedisUrlPath = "adapters.schedulerCache.redis.url"
schedulerCacheRedisURLPath = "adapters.schedulerCache.redis.url"
// Redis instance storage
instanceStorageRedisUrlPath = "adapters.instanceStorage.redis.url"
instanceStorageRedisURLPath = "adapters.instanceStorage.redis.url"
instanceStorageRedisScanSizePath = "adapters.instanceStorage.redis.scanSize"
// Redis operation flow
operationFlowRedisUrlPath = "adapters.operationFlow.redis.url"
operationFlowRedisURLPath = "adapters.operationFlow.redis.url"
// Redis configs
redisPoolSize = "adapters.redis.poolSize"
redisPoolSizePath = "adapters.redis.poolSize"
// Random port allocator
portAllocatorRandomRangePath = "adapters.portAllocator.random.range"
// Postgres scheduler storage
schedulerStoragePostgresUrlPath = "adapters.schedulerStorage.postgres.url"
schedulerStoragePostgresURLPath = "adapters.schedulerStorage.postgres.url"

// Health Controller operation TTL
healthControllerOperationTTL = "workers.redis.operationsTtl"
// operation TTL
operationsTTLPath = "workers.redis.operationsTtl"
)

// NewSchedulerManager instantiates a new scheduler manager.
func NewSchedulerManager(schedulerStorage ports.SchedulerStorage, schedulerCache ports.SchedulerCache, operationManager ports.OperationManager, roomStorage ports.RoomStorage) ports.SchedulerManager {
return scheduler_manager.NewSchedulerManager(schedulerStorage, schedulerCache, operationManager, roomStorage)
}

// NewOperationManager instantiates a new operation manager
func NewOperationManager(flow ports.OperationFlow, storage ports.OperationStorage, operationDefinitionConstructors map[string]operations.DefinitionConstructor, leaseStorage ports.OperationLeaseStorage, config operation_manager.OperationManagerConfig, schedulerStorage ports.SchedulerStorage) ports.OperationManager {
return operation_manager.New(flow, storage, operationDefinitionConstructors, leaseStorage, config, schedulerStorage)
}

// NewRoomManager instantiates a room manager.
func NewRoomManager(clock ports.Clock, portAllocator ports.PortAllocator, roomStorage ports.RoomStorage, instanceStorage ports.GameRoomInstanceStorage, runtime ports.Runtime, eventsService ports.EventsService, config room_manager.RoomManagerConfig) ports.RoomManager {
return room_manager.New(clock, portAllocator, roomStorage, instanceStorage, runtime, eventsService, config)
}

// NewEventsForwarder instantiates GRPC as events forwarder.
func NewEventsForwarder(c config.Config) (ports.EventsForwarder, error) {
forwarderGrpc := eventsadapters.NewForwarderClient()
return eventsadapters.NewEventsForwarder(forwarderGrpc), nil
}

// NewRuntimeKubernetes instantiates kubernetes as runtime.
func NewRuntimeKubernetes(c config.Config) (ports.Runtime, error) {
var masterUrl string
var masterURL string
var kubeConfigPath string

inCluster := c.GetBool(runtimeKubernetesInCluster)
inCluster := c.GetBool(runtimeKubernetesInClusterPath)
if !inCluster {
masterUrl = c.GetString(runtimeKubernetesMasterUrlPath)
masterURL = c.GetString(runtimeKubernetesMasterURLPath)
kubeConfigPath = c.GetString(runtimeKubernetesKubeconfigPath)
}

clientSet, err := createKubernetesClient(masterUrl, kubeConfigPath)
clientSet, err := createKubernetesClient(masterURL, kubeConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to initialize Kubernetes runtime: %w", err)
}

return kubernetesRuntime.New(clientSet), nil
}

// NewOperationStorageRedis instantiates redis as operation storage.
func NewOperationStorageRedis(clock ports.Clock, c config.Config) (ports.OperationStorage, error) {
client, err := createRedisClient(c, c.GetString(operationStorageRedisUrlPath))
client, err := createRedisClient(c, c.GetString(operationStorageRedisURLPath))
if err != nil {
return nil, fmt.Errorf("failed to initialize Redis operation storage: %w", err)
}

operationsTTlMap := map[operationadapters.Definition]time.Duration{
healthcontroller.OperationName: c.GetDuration(healthControllerOperationTTL),
operationsTTLPathMap := map[operationadapters.Definition]time.Duration{
healthcontroller.OperationName: c.GetDuration(operationsTTLPath),
add_rooms.OperationName: c.GetDuration(operationsTTLPath),
remove_rooms.OperationName: c.GetDuration(operationsTTLPath),
}

return operationadapters.NewRedisOperationStorage(client, clock, operationsTTlMap), nil
return operationadapters.NewRedisOperationStorage(client, clock, operationsTTLPathMap), nil
}

// NewOperationLeaseStorageRedis instantiates redis as operation lease storage.
func NewOperationLeaseStorageRedis(clock ports.Clock, c config.Config) (ports.OperationLeaseStorage, error) {
client, err := createRedisClient(c, c.GetString(operationLeaseStorageRedisUrlPath))
client, err := createRedisClient(c, c.GetString(operationLeaseStorageRedisURLPath))
if err != nil {
return nil, fmt.Errorf("failed to initialize Redis operation lease storage: %w", err)
}

return operationadapters.NewRedisOperationLeaseStorage(client, clock), nil
}

// NewRoomStorageRedis instantiates redis as room storage.
func NewRoomStorageRedis(c config.Config) (ports.RoomStorage, error) {
client, err := createRedisClient(c, c.GetString(roomStorageRedisUrlPath))
client, err := createRedisClient(c, c.GetString(roomStorageRedisURLPath))
if err != nil {
return nil, fmt.Errorf("failed to initialize Redis room storage: %w", err)
}

return roomStorageRedis.NewRedisStateStorage(client), nil
}

// NewGameRoomInstanceStorageRedis instantiates redis as instance storage.
func NewGameRoomInstanceStorageRedis(c config.Config) (ports.GameRoomInstanceStorage, error) {
client, err := createRedisClient(c, c.GetString(instanceStorageRedisUrlPath))
client, err := createRedisClient(c, c.GetString(instanceStorageRedisURLPath))
if err != nil {
return nil, fmt.Errorf("failed to initialize Redis instance storage: %w", err)
}

return instanceStorageRedis.NewRedisInstanceStorage(client, c.GetInt(instanceStorageRedisScanSizePath)), nil
}

// NewSchedulerCacheRedis instantiates redis as scheduler cache.
func NewSchedulerCacheRedis(c config.Config) (ports.SchedulerCache, error) {
client, err := createRedisClient(c, c.GetString(schedulerCacheRedisUrlPath))
client, err := createRedisClient(c, c.GetString(schedulerCacheRedisURLPath))
if err != nil {
return nil, fmt.Errorf("failed to initialize Redis scheduler cache: %w", err)
}

return scheduleradapters.NewRedisSchedulerCache(client), nil
}

// NewClockTime instantiates a new clock.
func NewClockTime() ports.Clock {
return clockTime.NewClock()
}

// NewPortAllocatorRandom instantiates a new port allocator.
func NewPortAllocatorRandom(c config.Config) (ports.PortAllocator, error) {
portRange, err := entities.ParsePortRange(c.GetString(portAllocatorRandomRangePath))
if err != nil {
Expand All @@ -185,40 +201,45 @@ func NewPortAllocatorRandom(c config.Config) (ports.PortAllocator, error) {
return portAllocatorRandom.NewRandomPortAllocator(portRange), nil
}

func GetSchedulerStoragePostgresUrl(c config.Config) string {
return c.GetString(schedulerStoragePostgresUrlPath)
}

// NewSchedulerStoragePg instanteates a postgres connection as scheduler storage.
func NewSchedulerStoragePg(c config.Config) (ports.SchedulerStorage, error) {
opts, err := connectToPostgres(GetSchedulerStoragePostgresUrl(c))
opts, err := connectToPostgres(GetSchedulerStoragePostgresURL(c))
if err != nil {
return nil, fmt.Errorf("failed to initialize postgres scheduler storage: %w", err)
}

return scheduleradapters.NewSchedulerStorage(opts), nil
}

// GetSchedulerStoragePostgresURL get scheduler storage postgres URL.
func GetSchedulerStoragePostgresURL(c config.Config) string {
return c.GetString(schedulerStoragePostgresURLPath)
}

func createRedisClient(c config.Config, url string) (*redis.Client, error) {
opts, err := redis.ParseURL(url)
if err != nil {
return nil, fmt.Errorf("invalid redis URL: %w", err)
}
opts.PoolSize = c.GetInt(redisPoolSize)
opts.PoolSize = c.GetInt(redisPoolSizePath)
return redis.NewClient(opts), nil
}

// NewPolicyMap instantiates a new policy to be used by autoscaler expecting a room storage as parameter.
func NewPolicyMap(roomStorage ports.RoomStorage) autoscaler.PolicyMap {
return autoscaler.PolicyMap{
autoscaling.RoomOccupancy: roomoccupancy.NewPolicy(roomStorage),
}
}

// NewAutoscaler instantiates a new autoscaler expecting a Policy Map as parameter.
func NewAutoscaler(policies autoscaler.PolicyMap) autoscalerports.Autoscaler {
return autoscaler.NewAutoscaler(policies)
}

// NewOperationFlowRedis instantiates a new operation flow using redis as backend.
func NewOperationFlowRedis(c config.Config) (ports.OperationFlow, error) {
client, err := createRedisClient(c, c.GetString(operationFlowRedisUrlPath))
client, err := createRedisClient(c, c.GetString(operationFlowRedisURLPath))
if err != nil {
return nil, fmt.Errorf("failed to initialize Redis operation storage: %w", err)
}
Expand All @@ -235,10 +256,10 @@ func connectToPostgres(url string) (*pg.Options, error) {
return opts, nil
}

func createKubernetesClient(masterUrl, kubeconfigPath string) (kubernetes.Interface, error) {
// NOTE: if neither masterUrl or kubeconfigPath are not passed, this will
func createKubernetesClient(masterURL, kubeconfigPath string) (kubernetes.Interface, error) {
// NOTE: if neither masterURL or kubeconfigPath are not passed, this will
// fallback to in cluster config.
kubeconfig, err := clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath)
kubeconfig, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("failed to construct kubernetes config: %w", err)
}
Expand Down
Loading

0 comments on commit f053f0a

Please sign in to comment.