From c2cb9573a746e7cb1c6102fd811dd59c9460b48d Mon Sep 17 00:00:00 2001 From: gabrielcorado Date: Tue, 11 Aug 2020 14:35:25 -0300 Subject: [PATCH 1/3] refactor(eventforwarder): add a flag to not return forwarder response This also prevents the API calls that forward events to fail due to a Forwarder failure. --- eventforwarder/forward.go | 52 +++++++++--- eventforwarder/forward_test.go | 140 +++++++++++++++++++++++++++++++++ models/config_yaml.go | 23 ++++++ models/config_yaml_test.go | 38 +++++++++ models/scheduler.go | 5 +- 5 files changed, 245 insertions(+), 13 deletions(-) diff --git a/eventforwarder/forward.go b/eventforwarder/forward.go index 9dd806d87..c89e159a8 100644 --- a/eventforwarder/forward.go +++ b/eventforwarder/forward.go @@ -25,8 +25,9 @@ type Response struct { // Forwarder is the struct that defines a forwarder with its EventForwarder as configured // in maestro and the metadata configured for each scheduler type Forwarder struct { - Func EventForwarder - Metadata map[string]interface{} + Func EventForwarder + ReturnResponse bool + Metadata map[string]interface{} } func getEnabledForwarders( @@ -40,7 +41,7 @@ func getEnabledForwarders( if fwd.Enabled { enabledForwarders = append( enabledForwarders, - &Forwarder{configuredFwdInfo.Forwarder, fwd.Metadata}, + &Forwarder{configuredFwdInfo.Forwarder, fwd.ForwardResponse, fwd.Metadata}, ) } } @@ -66,6 +67,7 @@ func ForwardRoomEvent( addrGetter models.AddrGetter, ) (res *Response, err error) { var eventWasForwarded bool + var warning error startTime := time.Now() defer func() { reportRPCStatus( @@ -76,6 +78,7 @@ func ForwardRoomEvent( logger, err, time.Now().Sub(startTime), + warning, ) }() @@ -137,7 +140,8 @@ func ForwardRoomEvent( } eventWasForwarded = true - return ForwardEventToForwarders(ctx, enabledForwarders, status, infos, l) + res, warning, err = ForwardEventToForwarders(ctx, enabledForwarders, status, infos, l) + return res, err } } } @@ -160,6 +164,7 @@ func ForwardPlayerEvent( logger logrus.FieldLogger, ) (resp *Response, err error) { var eventWasForwarded bool + var warning error startTime := time.Now() defer func() { reportRPCStatus( @@ -170,6 +175,7 @@ func ForwardPlayerEvent( logger, err, time.Now().Sub(startTime), + warning, ) }() @@ -196,7 +202,8 @@ func ForwardPlayerEvent( }).Debug("got enabled forwarders") if len(enabledForwarders) > 0 { eventWasForwarded = true - return ForwardEventToForwarders(ctx, enabledForwarders, event, metadata, l) + resp, warning, err = ForwardEventToForwarders(ctx, enabledForwarders, event, metadata, l) + return resp, err } } } @@ -204,14 +211,19 @@ func ForwardPlayerEvent( return nil, nil } -// ForwardEventToForwarders forwards +// ForwardEventToForwarders forwards the event, in case wheere `ReturnMessage` +// the forwarder result won't be used. It returns the response which consists in +// the last forwarder response, an error representing a warning (this happen +// when a forwarder that has `ReturnMessage = false` returns an error) and an +// error representing a failure on a `Forward` call. +// TODO: check if a failure on one forwarder should affect all the forwarders func ForwardEventToForwarders( ctx context.Context, forwarders []*Forwarder, event string, infos map[string]interface{}, logger logrus.FieldLogger, -) (*Response, error) { +) (*Response, error, error) { logger.WithFields(logrus.Fields{ "forwarders": len(forwarders), "infos": infos, @@ -221,8 +233,18 @@ func ForwardEventToForwarders( respMessage := []string{} for _, f := range forwarders { code, message, err := f.Func.Forward(ctx, event, infos, f.Metadata) + if !f.ReturnResponse { + if err != nil { + // if there is any error on a forward that does not return the + // response we set it as a "warning" + return nil, err, nil + } + + continue + } + if err != nil { - return nil, err + return nil, nil, err } // return the highest code received // if a forwarder returns 200 and another 500 we should return 500 to indicate the error @@ -234,15 +256,17 @@ func ForwardEventToForwarders( respMessage = append(respMessage, message) } } + resp := &Response{ Code: respCode, Message: strings.Join(respMessage, ";"), } - return resp, nil + return resp, nil, nil } -// reportRPCStatus sends to StatsD success true if err is null -// and false otherwise +// reportRPCStatus sends RPC status to StatsD. In case where +// `eventForwarderErr` or `eventForwarderWarning` are not nil we send it as +// `failure`, otherwise it is sent as `success`. func reportRPCStatus( eventWasForwarded bool, schedulerName, forwardRoute string, @@ -251,6 +275,7 @@ func reportRPCStatus( logger logrus.FieldLogger, eventForwarderErr error, responseTime time.Duration, + eventForwarderWarning error, ) { if !reporters.HasReporters() { return @@ -286,6 +311,11 @@ func reportRPCStatus( status[reportersConstants.TagReason] = eventForwarderErr.Error() } + if eventForwarderWarning != nil { + status[reportersConstants.TagStatus] = "failed" + status[reportersConstants.TagReason] = eventForwarderWarning.Error() + } + reporterErr := reporters.Report(reportersConstants.EventRPCStatus, status) if reporterErr != nil { logger. diff --git a/eventforwarder/forward_test.go b/eventforwarder/forward_test.go index 9c90e7e92..634ca0e8e 100644 --- a/eventforwarder/forward_test.go +++ b/eventforwarder/forward_test.go @@ -258,6 +258,146 @@ var _ = Describe("Forward", func() { Expect(err.Error()).To(Equal(errMsg)) }) + It("should forward room event but not return its response", func() { + yaml := `name: scheduler +game: game +forwarders: + mockplugin: + mockfwd: + enabled: true + forwardResponse: false +` + mt.MockLoadScheduler(schedulerName, mockDB). + Do(func(scheduler *models.Scheduler, _ string, _ string) { + *scheduler = *models.NewScheduler(schedulerName, gameName, yaml) + }) + _, err := cache.LoadScheduler(mockDB, schedulerName, false) + Expect(err).NotTo(HaveOccurred()) + + ctx := context.Background() + mockEventForwarder.EXPECT().Forward( + ctx, + models.StatusReady, + map[string]interface{}{ + "host": nodeAddress, + "port": hostPort, + "roomId": roomName, + "game": gameName, + "metadata": map[string]interface{}{ + "ipv6Label": ipv6Label, + "ports": fmt.Sprintf(`[{"name":"port","port":%d,"protocol":""}]`, hostPort), + }, + }, + metadata, + ).Return(int32(200), "success", nil) + + mockReporter.EXPECT().Report(reportersConstants.EventRPCStatus, map[string]interface{}{ + reportersConstants.TagGame: gameName, + reportersConstants.TagScheduler: schedulerName, + reportersConstants.TagHostname: Hostname(), + reportersConstants.TagRoute: RouteRoomEvent, + reportersConstants.TagStatus: "success", + }) + + mockReporter.EXPECT().Report(reportersConstants.EventNodeIpv6Status, map[string]interface{}{ + reportersConstants.TagNodeHost: nodeAddress, + reportersConstants.TagStatus: "success", + }) + + mockReporter.EXPECT().Report(reportersConstants.EventRPCDuration, gomock.Any()) + + response, err := ForwardRoomEvent( + ctx, + mockForwarders, + mockRedisClient, + mockDB, + clientset, + mmr, + room, + models.StatusReady, + "", + nil, + cache, + logger, + roomAddrGetter, + ) + + Expect(err).NotTo(HaveOccurred()) + Expect(response.Code).To(Equal(0)) + Expect(response.Message).To(Equal("")) + }) + + It("should return as success but report error if forward fails", func() { + yaml := `name: scheduler +game: game +forwarders: + mockplugin: + mockfwd: + enabled: true + forwardResponse: false +` + mt.MockLoadScheduler(schedulerName, mockDB). + Do(func(scheduler *models.Scheduler, _ string, _ string) { + *scheduler = *models.NewScheduler(schedulerName, gameName, yaml) + }) + _, err := cache.LoadScheduler(mockDB, schedulerName, false) + Expect(err).NotTo(HaveOccurred()) + + errMsg := "event forward failed" + ctx := context.Background() + noIpv6roomAddrGetter := models.NewRoomAddressesFromHostPort(logger, "", false, 0) + mockEventForwarder.EXPECT().Forward( + ctx, + models.StatusReady, + map[string]interface{}{ + "host": nodeAddress, + "port": hostPort, + "roomId": roomName, + "game": gameName, + "metadata": map[string]interface{}{ + "ipv6Label": "", + "ports": fmt.Sprintf(`[{"name":"port","port":%d,"protocol":""}]`, hostPort), + }, + }, + metadata, + ).Return(int32(0), "", errors.New(errMsg)) + + mockReporter.EXPECT().Report(reportersConstants.EventRPCStatus, map[string]interface{}{ + reportersConstants.TagGame: gameName, + reportersConstants.TagScheduler: schedulerName, + reportersConstants.TagHostname: Hostname(), + reportersConstants.TagRoute: RouteRoomEvent, + reportersConstants.TagStatus: "failed", + reportersConstants.TagReason: errMsg, + }) + + mockReporter.EXPECT().Report(reportersConstants.EventNodeIpv6Status, map[string]interface{}{ + reportersConstants.TagNodeHost: nodeAddress, + reportersConstants.TagStatus: "failed", + }) + + mockReporter.EXPECT().Report(reportersConstants.EventRPCDuration, gomock.Any()) + + response, err := ForwardRoomEvent( + ctx, + mockForwarders, + mockRedisClient, + mockDB, + clientset, + mmr, + room, + models.StatusReady, + "", + nil, + cache, + logger, + noIpv6roomAddrGetter, + ) + + Expect(err).NotTo(HaveOccurred()) + Expect(response).To(BeNil()) + }) + It("should not report if scheduler has no forwarders", func() { yaml := `name: scheduler game: game diff --git a/models/config_yaml.go b/models/config_yaml.go index 921e0ac04..79810e842 100644 --- a/models/config_yaml.go +++ b/models/config_yaml.go @@ -9,6 +9,10 @@ import ( yaml "gopkg.in/yaml.v2" ) +// DefaultForwarderForwardResponse is the default value set to `Forwarder` +// `ForwardResponse` property. +const DefaultForwarderForwardResponse = true + // ConfigYAMLv1 is the ConfigYAML before refactor to n containers per pod type ConfigYAMLv1 struct { Name string `yaml:"name"` @@ -297,3 +301,22 @@ func (c *ConfigYAML) HasPorts() bool { } return false } + +// UnmarshalYAML implements a custom way to unmarshal the `Forward` type. This +// is done in order to set default values to the type. +// TODO: make a pattern to set default values for the scheduler config, instead +// of having this or the `EnsureDefaultValues`. +func (f *Forwarder) UnmarshalYAML(unmarshal func(interface{}) error) error { + // We define a "temporary" type in order to avoid creating a recursion on + // `Forward` unmarshal. + type rawForwarder Forwarder + raw := rawForwarder{ForwardResponse: DefaultForwarderForwardResponse} + err := unmarshal(&raw) + if err != nil { + return err + } + + *f = Forwarder(raw) + + return nil +} diff --git a/models/config_yaml_test.go b/models/config_yaml_test.go index c2efbcedc..ac130a458 100644 --- a/models/config_yaml_test.go +++ b/models/config_yaml_test.go @@ -7,6 +7,44 @@ import ( ) var _ = Describe("ConfigYaml", func() { + Describe("NewCofigYaml", func() { + It("should define default value for forwarder", func() { + configYaml, _ := NewConfigYAML(`name: scheduler-name +game: game +image: nginx:alpine +ports: +- containerPort: 8080 + protocol: TCP + name: tcp +limits: + cpu: 100m + memory: 100Mi +requests: + cpu: 50m + memory: 50Mi +cmd: ["/bin/bash", "-c", "./start.sh"] +env: +- name: ENV_1 + value: VALUE_1 +containers: [] +forwarders: + grpc: + sampleGRPC: + enabled: true + metadata: {} + anotherGRPC: + enabled: true + metadata: {} +`) + + for _, pluginType := range configYaml.Forwarders { + for _, plugin := range pluginType { + Expect(plugin.ForwardResponse).To(Equal(DefaultForwarderForwardResponse)) + } + } + }) + }) + Describe("ToYaml", func() { It("should return yaml for version v1", func() { configYaml, _ := NewConfigYAML(`name: scheduler-name diff --git a/models/scheduler.go b/models/scheduler.go index 84244fd38..26c093ff0 100644 --- a/models/scheduler.go +++ b/models/scheduler.go @@ -117,8 +117,9 @@ type AutoScaling struct { // Forwarder has the configuration for the event forwarders type Forwarder struct { - Enabled bool `yaml:"enabled" json:"enabled"` - Metadata map[string]interface{} `yaml:"metadata" json:"metadata"` + Enabled bool `yaml:"enabled" json:"enabled"` + ForwardResponse bool `yaml:"forwardResponse" json:"forwardResponse"` + Metadata map[string]interface{} `yaml:"metadata" json:"metadata"` } // Container represents a container inside a pod From 214a89772ad4ff69ac89f943fabce7110b77554a Mon Sep 17 00:00:00 2001 From: Gabriel Corado Date: Tue, 11 Aug 2020 18:32:54 -0300 Subject: [PATCH 2/3] chore(eventforwarder): name return values This will make it easier to know which `error` is for warning and which is for error handling. --- eventforwarder/forward.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eventforwarder/forward.go b/eventforwarder/forward.go index c89e159a8..ae3bf7472 100644 --- a/eventforwarder/forward.go +++ b/eventforwarder/forward.go @@ -223,7 +223,7 @@ func ForwardEventToForwarders( event string, infos map[string]interface{}, logger logrus.FieldLogger, -) (*Response, error, error) { +) (resp *Response, warning error, err error) { logger.WithFields(logrus.Fields{ "forwarders": len(forwarders), "infos": infos, @@ -257,7 +257,7 @@ func ForwardEventToForwarders( } } - resp := &Response{ + resp = &Response{ Code: respCode, Message: strings.Join(respMessage, ";"), } From 23681d4dbe545bdc71af18fc76a0cd565d42db8c Mon Sep 17 00:00:00 2001 From: gabrielcorado Date: Tue, 18 Aug 2020 13:40:53 -0300 Subject: [PATCH 3/3] chore(docs): add documentation about forwarders responses --- README.md | 30 ++-------------------------- docs/forwarders.md | 49 ++++++++++++++++++++++++++++++++++++++++++++++ docs/index.rst | 1 + 3 files changed, 52 insertions(+), 28 deletions(-) create mode 100644 docs/forwarders.md diff --git a/README.md b/README.md index 74d82266f..a124d5d6d 100644 --- a/README.md +++ b/README.md @@ -354,34 +354,8 @@ Maestro's auto scaling policies are based on the number of rooms that are in rea In order to properly set their statuses, game rooms must call the maestro-api HTTP routes described in the [Maestro API docs](http://maestro.readthedocs.io/en/latest/api.html). -## Event Forwarders: +## Event Forwarders Event forwarders are pluggable components that forwards events like: RoomReady, RoomTerminated, RoomTerminating, etc... to other services. -A event forwarder is a go native plugin that should be compiled and put into bin folder, it should contain a method ```func NewForwarder(config *viper.Viper) (eventforwarder.EventForwarder)``` that returns a configured instance of a struct that implements "eventforwarder.EventForwarder". - -An example is provided in the plugins/grpc folder, compile it with: -``` -go build -o bin/grpc.so -buildmode=plugin plugins/grpc/forwarder.go -``` - -Then to turn it on, include a config like that in the active config file: -``` -forwarders: - grpc: - matchmaking: - address: "10.0.23.57:10000" - local: - address: "localhost:10000" -``` - -In this example, maestro will look for a plugin "grpc.so" in the bin folder and create 2 forwarders from it, matchmaking and local one, each using a different address. -Then, every time a room is changing states, all forwarders will be called with infos about the change. - -There's also a route: ```/scheduler/{schedulerName}/rooms/{roomName}/playerevent``` that can be called like that, for example: -``` -curl -X POST -d '{"timestamp":12424124234, "event":"playerJoin", "metadata":{"playerId":"sime"}}' localhost:8080/scheduler/some/rooms/r1/playerevent -``` -It will forward the playerEvent "playerJoin" with the provided metadata and roomId to all the configured forwarders. - -For the provided plugin, the valid values for event field are: ['playerJoin','playerLeft'] +[More information about it can e found here.](docs/forwarders.md) diff --git a/docs/forwarders.md b/docs/forwarders.md new file mode 100644 index 000000000..1a309751d --- /dev/null +++ b/docs/forwarders.md @@ -0,0 +1,49 @@ +Event Forwarders +======== + +Event forwarders are pluggable components that forwards events like: RoomReady, RoomTerminated, RoomTerminating, etc... to other services. + +A event forwarder is a go native plugin that should be compiled and put into bin folder, it should contain a method ```func NewForwarder(config *viper.Viper) (eventforwarder.EventForwarder)``` that returns a configured instance of a struct that implements "eventforwarder.EventForwarder". + +An example is provided in the plugins/grpc folder, compile it with: +``` +go build -o bin/grpc.so -buildmode=plugin plugins/grpc/forwarder.go +``` + +## Configuration +Then to turn it on, include a config like that in the active config file: +``` +forwarders: + grpc: + matchmaking: + address: "10.0.23.57:10000" + local: + address: "localhost:10000" +``` + +In this example, maestro will look for a plugin "grpc.so" in the bin folder and create 2 forwarders from it, matchmaking and local one, each using a different address. +Then, every time a room is changing states, all forwarders will be called with infos about the change. + +### Testing +There's also a route: ```/scheduler/{schedulerName}/rooms/{roomName}/playerevent``` that can be called like that, for example: +``` +curl -X POST -d '{"timestamp":12424124234, "event":"playerJoin", "metadata":{"playerId":"sime"}}' localhost:8080/scheduler/some/rooms/r1/playerevent +``` +It will forward the playerEvent "playerJoin" with the provided metadata and roomId to all the configured forwarders. + +For the provided plugin, the valid values for event field are: ['playerJoin','playerLeft'] + +## Responses +All forwarders responses are put in consideration even if they're not returned +to the clients. This means that if a forwarder fails the API notifying the event +also fails. This behaviour is deprecated and in future versions the forwarder +failure/success won't impact any of the notifiers. To achieve this in the +current version you can add the `forwardMessage (default: true)` configuration +to your scheduler like in this example: +``` +forwarders: + grpc: + matchamaking: + enabled: true + forwardResponse: false +``` diff --git a/docs/index.rst b/docs/index.rst index 83898039e..8b86af94d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -19,6 +19,7 @@ Contents: cli testing autoscaling + forwarders Indices and tables ==================