Skip to content

Commit

Permalink
Merge pull request #101 from topfreegames/feature/eventforwarder-opti…
Browse files Browse the repository at this point in the history
…onal-result-forward

Add a flag to scheduler forwarders configuration to not return its response
  • Loading branch information
gabrielcorado authored Aug 27, 2020
2 parents 3158ebe + 23681d4 commit a90a274
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 42 deletions.
30 changes: 2 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,34 +354,8 @@ Maestro's auto scaling policies are based on the number of rooms that are in rea

In order to properly set their statuses, game rooms must call the maestro-api HTTP routes described in the [Maestro API docs](http://maestro.readthedocs.io/en/latest/api.html).

## Event Forwarders:
## Event Forwarders

Event forwarders are pluggable components that forwards events like: RoomReady, RoomTerminated, RoomTerminating, etc... to other services.

A event forwarder is a go native plugin that should be compiled and put into bin folder, it should contain a method ```func NewForwarder(config *viper.Viper) (eventforwarder.EventForwarder)``` that returns a configured instance of a struct that implements "eventforwarder.EventForwarder".

An example is provided in the plugins/grpc folder, compile it with:
```
go build -o bin/grpc.so -buildmode=plugin plugins/grpc/forwarder.go
```

Then to turn it on, include a config like that in the active config file:
```
forwarders:
grpc:
matchmaking:
address: "10.0.23.57:10000"
local:
address: "localhost:10000"
```

In this example, maestro will look for a plugin "grpc.so" in the bin folder and create 2 forwarders from it, matchmaking and local one, each using a different address.
Then, every time a room is changing states, all forwarders will be called with infos about the change.

There's also a route: ```/scheduler/{schedulerName}/rooms/{roomName}/playerevent``` that can be called like that, for example:
```
curl -X POST -d '{"timestamp":12424124234, "event":"playerJoin", "metadata":{"playerId":"sime"}}' localhost:8080/scheduler/some/rooms/r1/playerevent
```
It will forward the playerEvent "playerJoin" with the provided metadata and roomId to all the configured forwarders.

For the provided plugin, the valid values for event field are: ['playerJoin','playerLeft']
[More information about it can e found here.](docs/forwarders.md)
49 changes: 49 additions & 0 deletions docs/forwarders.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
Event Forwarders
========

Event forwarders are pluggable components that forwards events like: RoomReady, RoomTerminated, RoomTerminating, etc... to other services.

A event forwarder is a go native plugin that should be compiled and put into bin folder, it should contain a method ```func NewForwarder(config *viper.Viper) (eventforwarder.EventForwarder)``` that returns a configured instance of a struct that implements "eventforwarder.EventForwarder".

An example is provided in the plugins/grpc folder, compile it with:
```
go build -o bin/grpc.so -buildmode=plugin plugins/grpc/forwarder.go
```

## Configuration
Then to turn it on, include a config like that in the active config file:
```
forwarders:
grpc:
matchmaking:
address: "10.0.23.57:10000"
local:
address: "localhost:10000"
```

In this example, maestro will look for a plugin "grpc.so" in the bin folder and create 2 forwarders from it, matchmaking and local one, each using a different address.
Then, every time a room is changing states, all forwarders will be called with infos about the change.

### Testing
There's also a route: ```/scheduler/{schedulerName}/rooms/{roomName}/playerevent``` that can be called like that, for example:
```
curl -X POST -d '{"timestamp":12424124234, "event":"playerJoin", "metadata":{"playerId":"sime"}}' localhost:8080/scheduler/some/rooms/r1/playerevent
```
It will forward the playerEvent "playerJoin" with the provided metadata and roomId to all the configured forwarders.

For the provided plugin, the valid values for event field are: ['playerJoin','playerLeft']

## Responses
All forwarders responses are put in consideration even if they're not returned
to the clients. This means that if a forwarder fails the API notifying the event
also fails. This behaviour is deprecated and in future versions the forwarder
failure/success won't impact any of the notifiers. To achieve this in the
current version you can add the `forwardMessage (default: true)` configuration
to your scheduler like in this example:
```
forwarders:
grpc:
matchamaking:
enabled: true
forwardResponse: false
```
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Contents:
cli
testing
autoscaling
forwarders

Indices and tables
==================
Expand Down
54 changes: 42 additions & 12 deletions eventforwarder/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type Response struct {
// Forwarder is the struct that defines a forwarder with its EventForwarder as configured
// in maestro and the metadata configured for each scheduler
type Forwarder struct {
Func EventForwarder
Metadata map[string]interface{}
Func EventForwarder
ReturnResponse bool
Metadata map[string]interface{}
}

func getEnabledForwarders(
Expand All @@ -40,7 +41,7 @@ func getEnabledForwarders(
if fwd.Enabled {
enabledForwarders = append(
enabledForwarders,
&Forwarder{configuredFwdInfo.Forwarder, fwd.Metadata},
&Forwarder{configuredFwdInfo.Forwarder, fwd.ForwardResponse, fwd.Metadata},
)
}
}
Expand All @@ -66,6 +67,7 @@ func ForwardRoomEvent(
addrGetter models.AddrGetter,
) (res *Response, err error) {
var eventWasForwarded bool
var warning error
startTime := time.Now()
defer func() {
reportRPCStatus(
Expand All @@ -76,6 +78,7 @@ func ForwardRoomEvent(
logger,
err,
time.Now().Sub(startTime),
warning,
)
}()

Expand Down Expand Up @@ -137,7 +140,8 @@ func ForwardRoomEvent(
}
eventWasForwarded = true

return ForwardEventToForwarders(ctx, enabledForwarders, status, infos, l)
res, warning, err = ForwardEventToForwarders(ctx, enabledForwarders, status, infos, l)
return res, err
}
}
}
Expand All @@ -160,6 +164,7 @@ func ForwardPlayerEvent(
logger logrus.FieldLogger,
) (resp *Response, err error) {
var eventWasForwarded bool
var warning error
startTime := time.Now()
defer func() {
reportRPCStatus(
Expand All @@ -170,6 +175,7 @@ func ForwardPlayerEvent(
logger,
err,
time.Now().Sub(startTime),
warning,
)
}()

Expand All @@ -196,22 +202,28 @@ func ForwardPlayerEvent(
}).Debug("got enabled forwarders")
if len(enabledForwarders) > 0 {
eventWasForwarded = true
return ForwardEventToForwarders(ctx, enabledForwarders, event, metadata, l)
resp, warning, err = ForwardEventToForwarders(ctx, enabledForwarders, event, metadata, l)
return resp, err
}
}
}
l.Debug("no forwarders configured and enabled")
return nil, nil
}

// ForwardEventToForwarders forwards
// ForwardEventToForwarders forwards the event, in case wheere `ReturnMessage`
// the forwarder result won't be used. It returns the response which consists in
// the last forwarder response, an error representing a warning (this happen
// when a forwarder that has `ReturnMessage = false` returns an error) and an
// error representing a failure on a `Forward` call.
// TODO: check if a failure on one forwarder should affect all the forwarders
func ForwardEventToForwarders(
ctx context.Context,
forwarders []*Forwarder,
event string,
infos map[string]interface{},
logger logrus.FieldLogger,
) (*Response, error) {
) (resp *Response, warning error, err error) {
logger.WithFields(logrus.Fields{
"forwarders": len(forwarders),
"infos": infos,
Expand All @@ -221,8 +233,18 @@ func ForwardEventToForwarders(
respMessage := []string{}
for _, f := range forwarders {
code, message, err := f.Func.Forward(ctx, event, infos, f.Metadata)
if !f.ReturnResponse {
if err != nil {
// if there is any error on a forward that does not return the
// response we set it as a "warning"
return nil, err, nil
}

continue
}

if err != nil {
return nil, err
return nil, nil, err
}
// return the highest code received
// if a forwarder returns 200 and another 500 we should return 500 to indicate the error
Expand All @@ -234,15 +256,17 @@ func ForwardEventToForwarders(
respMessage = append(respMessage, message)
}
}
resp := &Response{

resp = &Response{
Code: respCode,
Message: strings.Join(respMessage, ";"),
}
return resp, nil
return resp, nil, nil
}

// reportRPCStatus sends to StatsD success true if err is null
// and false otherwise
// reportRPCStatus sends RPC status to StatsD. In case where
// `eventForwarderErr` or `eventForwarderWarning` are not nil we send it as
// `failure`, otherwise it is sent as `success`.
func reportRPCStatus(
eventWasForwarded bool,
schedulerName, forwardRoute string,
Expand All @@ -251,6 +275,7 @@ func reportRPCStatus(
logger logrus.FieldLogger,
eventForwarderErr error,
responseTime time.Duration,
eventForwarderWarning error,
) {
if !reporters.HasReporters() {
return
Expand Down Expand Up @@ -286,6 +311,11 @@ func reportRPCStatus(
status[reportersConstants.TagReason] = eventForwarderErr.Error()
}

if eventForwarderWarning != nil {
status[reportersConstants.TagStatus] = "failed"
status[reportersConstants.TagReason] = eventForwarderWarning.Error()
}

reporterErr := reporters.Report(reportersConstants.EventRPCStatus, status)
if reporterErr != nil {
logger.
Expand Down
140 changes: 140 additions & 0 deletions eventforwarder/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,146 @@ var _ = Describe("Forward", func() {
Expect(err.Error()).To(Equal(errMsg))
})

It("should forward room event but not return its response", func() {
yaml := `name: scheduler
game: game
forwarders:
mockplugin:
mockfwd:
enabled: true
forwardResponse: false
`
mt.MockLoadScheduler(schedulerName, mockDB).
Do(func(scheduler *models.Scheduler, _ string, _ string) {
*scheduler = *models.NewScheduler(schedulerName, gameName, yaml)
})
_, err := cache.LoadScheduler(mockDB, schedulerName, false)
Expect(err).NotTo(HaveOccurred())

ctx := context.Background()
mockEventForwarder.EXPECT().Forward(
ctx,
models.StatusReady,
map[string]interface{}{
"host": nodeAddress,
"port": hostPort,
"roomId": roomName,
"game": gameName,
"metadata": map[string]interface{}{
"ipv6Label": ipv6Label,
"ports": fmt.Sprintf(`[{"name":"port","port":%d,"protocol":""}]`, hostPort),
},
},
metadata,
).Return(int32(200), "success", nil)

mockReporter.EXPECT().Report(reportersConstants.EventRPCStatus, map[string]interface{}{
reportersConstants.TagGame: gameName,
reportersConstants.TagScheduler: schedulerName,
reportersConstants.TagHostname: Hostname(),
reportersConstants.TagRoute: RouteRoomEvent,
reportersConstants.TagStatus: "success",
})

mockReporter.EXPECT().Report(reportersConstants.EventNodeIpv6Status, map[string]interface{}{
reportersConstants.TagNodeHost: nodeAddress,
reportersConstants.TagStatus: "success",
})

mockReporter.EXPECT().Report(reportersConstants.EventRPCDuration, gomock.Any())

response, err := ForwardRoomEvent(
ctx,
mockForwarders,
mockRedisClient,
mockDB,
clientset,
mmr,
room,
models.StatusReady,
"",
nil,
cache,
logger,
roomAddrGetter,
)

Expect(err).NotTo(HaveOccurred())
Expect(response.Code).To(Equal(0))
Expect(response.Message).To(Equal(""))
})

It("should return as success but report error if forward fails", func() {
yaml := `name: scheduler
game: game
forwarders:
mockplugin:
mockfwd:
enabled: true
forwardResponse: false
`
mt.MockLoadScheduler(schedulerName, mockDB).
Do(func(scheduler *models.Scheduler, _ string, _ string) {
*scheduler = *models.NewScheduler(schedulerName, gameName, yaml)
})
_, err := cache.LoadScheduler(mockDB, schedulerName, false)
Expect(err).NotTo(HaveOccurred())

errMsg := "event forward failed"
ctx := context.Background()
noIpv6roomAddrGetter := models.NewRoomAddressesFromHostPort(logger, "", false, 0)
mockEventForwarder.EXPECT().Forward(
ctx,
models.StatusReady,
map[string]interface{}{
"host": nodeAddress,
"port": hostPort,
"roomId": roomName,
"game": gameName,
"metadata": map[string]interface{}{
"ipv6Label": "",
"ports": fmt.Sprintf(`[{"name":"port","port":%d,"protocol":""}]`, hostPort),
},
},
metadata,
).Return(int32(0), "", errors.New(errMsg))

mockReporter.EXPECT().Report(reportersConstants.EventRPCStatus, map[string]interface{}{
reportersConstants.TagGame: gameName,
reportersConstants.TagScheduler: schedulerName,
reportersConstants.TagHostname: Hostname(),
reportersConstants.TagRoute: RouteRoomEvent,
reportersConstants.TagStatus: "failed",
reportersConstants.TagReason: errMsg,
})

mockReporter.EXPECT().Report(reportersConstants.EventNodeIpv6Status, map[string]interface{}{
reportersConstants.TagNodeHost: nodeAddress,
reportersConstants.TagStatus: "failed",
})

mockReporter.EXPECT().Report(reportersConstants.EventRPCDuration, gomock.Any())

response, err := ForwardRoomEvent(
ctx,
mockForwarders,
mockRedisClient,
mockDB,
clientset,
mmr,
room,
models.StatusReady,
"",
nil,
cache,
logger,
noIpv6roomAddrGetter,
)

Expect(err).NotTo(HaveOccurred())
Expect(response).To(BeNil())
})

It("should not report if scheduler has no forwarders", func() {
yaml := `name: scheduler
game: game
Expand Down
Loading

0 comments on commit a90a274

Please sign in to comment.