From 8f56921c847f32ab0257617a88f8016dc75e7cb4 Mon Sep 17 00:00:00 2001 From: Pier-Olivier Thibault Date: Sun, 4 Nov 2018 13:20:05 -0500 Subject: [PATCH 1/4] Refactor and test the agent with documentation Some JSON value would assume to have some kind of values and it would not process the story. Now, the data gets parsed properly and only the required fields are unmarshalled. Also adding test to make sure all JSON values can be tested as well as testing different scenario. Added some initial documentation to get the ball started. Stuff --- README.md | 26 ++++++++++ integrations/integrations.go | 4 +- integrations/scalyr/event.go | 15 +++++- integrations/scalyr/event_test.go | 73 +++++++++++++++++++++++++++- integrations/scalyr/instance.go | 4 +- integrations/scalyr/instance_test.go | 3 ++ integrations/scalyr/payload.go | 2 +- integrations/scalyr/payload_test.go | 71 +++++++++++++++++++++++++++ main.go | 4 +- stories/queue.go | 6 +-- stories/queue_test.go | 57 +++++++++++++++++++++- stories/story.go | 4 +- stories/story_test.go | 43 +++++++++++++++- 13 files changed, 295 insertions(+), 17 deletions(-) create mode 100644 README.md create mode 100644 integrations/scalyr/payload_test.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..f7a8870 --- /dev/null +++ b/README.md @@ -0,0 +1,26 @@ +# Stories + +Stories is a lightweight agent for StoryTeller logs. + +## Configuration + +Currently, the agent has a hardcoded dependency to Scalyr as a provider. This means that the agent will look for the `SCALYR_WRITE_TOKEN` Environment variable to be set. Other than that, `stories` can be configured with the following flags at startup. + +### `--buffer=int` +_default: 1000_ +The buffer size is how many stories that the agent can store before it sends it in a batch to the provider. + +### `--interval=int` +_default: 1_ +The interval, in seconds, before the agent batch the stories that it has buffered. + + +### `--socket=string` +_default: /tmp/stories.sock_ +The path where the unix socket will be created. This is used by libraries to send over events. + +#### Batching against an interval + +The way the agent works is that there is a runloop set to the `interval` set at runtime that will send in batch any stories that were collected during the interval. + +If, during the interval, the buffer reach its limit, the agent will trigger a batch send to the provider. If all default settings are set and you send twice the size of the buffer over the interval period, 2 batch will be sent to the provider. diff --git a/integrations/integrations.go b/integrations/integrations.go index 047de4d..14fc01c 100644 --- a/integrations/integrations.go +++ b/integrations/integrations.go @@ -2,8 +2,8 @@ package integrations import ( "errors" - "github.com/pothibo/stories/integrations/scalyr" - "github.com/pothibo/stories/stories" + "github.com/convertkit/stories/integrations/scalyr" + "github.com/convertkit/stories/stories" "net/http" ) diff --git a/integrations/scalyr/event.go b/integrations/scalyr/event.go index 330a2c7..342d1d7 100644 --- a/integrations/scalyr/event.go +++ b/integrations/scalyr/event.go @@ -2,7 +2,7 @@ package scalyr import ( "encoding/json" - "github.com/pothibo/stories/stories" + "github.com/convertkit/stories/stories" ) type Event stories.Story @@ -11,7 +11,18 @@ func (e *Event) MarshalJSON() ([]byte, error) { data := make(map[string]interface{}) data["ts"] = e.Timestamp data["sev"] = e.Severity - data["attrs"] = e.Data + + attributes := e.Data + + message, err := json.Marshal(e.Message) + + if err != nil { + return nil, err + } + + attributes["message"] = message + + data["attrs"] = attributes return json.Marshal(data) } diff --git a/integrations/scalyr/event_test.go b/integrations/scalyr/event_test.go index 07a3ad9..580ec9a 100644 --- a/integrations/scalyr/event_test.go +++ b/integrations/scalyr/event_test.go @@ -1,9 +1,80 @@ package scalyr import ( + "bytes" + "encoding/json" + "github.com/convertkit/stories/stories" "testing" ) -func TestEventJSONIsAcceptable(t *testing.T) { +func story(t *testing.T) *stories.Story { + story, err := stories.NewStory([]byte(`{ + "severity": 4, + "timestamp": "1541354132811", + "message": "Hello world!", + "data": { + "foo": { + "bar": "Something", + "yolo": true + }, + "object_id": 1234, + "boolean": true, + "content": "Stuff" + } + }`)) + if err != nil { + t.Fail() + } + + return story +} + +func payloadForEventTest(story *stories.Story, t *testing.T) map[string]interface{} { + event := Event(*story) + content, err := json.Marshal(&event) + + if err != nil { + t.Fail() + } + + var payload map[string]interface{} + + err = json.Unmarshal(content, &payload) + + if err != nil { + t.Error(err) + t.Fail() + } + + return payload +} + +func TestTimestampInJSON(t *testing.T) { + story := story(t) + payload := payloadForEventTest(story, t) + + if bytes.Compare([]byte(payload["ts"].(string)), []byte("1541354132811")) != 0 { + t.Fail() + } +} + +func TestSevInJSON(t *testing.T) { + story := story(t) + payload := payloadForEventTest(story, t) + + if payload["sev"].(float64) != float64(4) { + t.Fail() + } +} + +func TestMessageInAttributesJSON(t *testing.T) { + story := story(t) + payload := payloadForEventTest(story, t) + + attributes := payload["attrs"].(map[string]interface{}) + + if bytes.Compare([]byte(attributes["message"].(string)), []byte("Hello world!")) != 0 { + t.Fail() + } } diff --git a/integrations/scalyr/instance.go b/integrations/scalyr/instance.go index 451d2e4..d69865e 100644 --- a/integrations/scalyr/instance.go +++ b/integrations/scalyr/instance.go @@ -4,8 +4,8 @@ import ( "bytes" "encoding/json" "errors" + "github.com/convertkit/stories/stories" "github.com/google/uuid" - "github.com/pothibo/stories/stories" "log" "net/http" "os" @@ -34,7 +34,7 @@ func (i *Instance) Configure() error { i.SessionInfo = make(map[string]string) i.SessionInfo["logfile"] = "stories" - i.SessionInfo["serverHost"] = "test" + i.SessionInfo["serverHost"] = "production" i.Url = "https://www.scalyr.com/addEvents" diff --git a/integrations/scalyr/instance_test.go b/integrations/scalyr/instance_test.go index 73d4195..45be9b4 100644 --- a/integrations/scalyr/instance_test.go +++ b/integrations/scalyr/instance_test.go @@ -1,11 +1,14 @@ package scalyr import ( + "os" "strings" "testing" ) func TestConfigureInstanceGenerateASessionUUID(t *testing.T) { + os.Setenv("SCALYR_WRITE_TOKEN", "test") + instance := &Instance{} err := instance.Configure() diff --git a/integrations/scalyr/payload.go b/integrations/scalyr/payload.go index 78f3cd9..9f6c0c0 100644 --- a/integrations/scalyr/payload.go +++ b/integrations/scalyr/payload.go @@ -2,7 +2,7 @@ package scalyr import ( "encoding/json" - "github.com/pothibo/stories/stories" + "github.com/convertkit/stories/stories" ) type Payload struct { diff --git a/integrations/scalyr/payload_test.go b/integrations/scalyr/payload_test.go new file mode 100644 index 0000000..c37dbe1 --- /dev/null +++ b/integrations/scalyr/payload_test.go @@ -0,0 +1,71 @@ +package scalyr + +import ( + "encoding/json" + "github.com/convertkit/stories/stories" + "os" + "testing" +) + +func instanceForInstanceTest(t *testing.T) *Instance { + os.Setenv("SCALYR_WRITE_TOKEN", "test") + + instance := &Instance{} + err := instance.Configure() + + if err != nil { + t.Error(err) + t.Fail() + } + + return instance +} + +func payloadForInstanceTest(t *testing.T) *Payload { + story, err := stories.NewStory([]byte(`{ + "severity": 4, + "timestamp": "1541354132811", + "message": "Hello world!", + "data": { + "foo": { + "bar": "Something", + "yolo": true + }, + "object_id": 1234, + "boolean": true, + "content": "Stuff" + } + }`)) + + if err != nil { + t.Fail() + } + + stories := []*stories.Story{story} + return NewPayload(instanceForInstanceTest(t), stories) +} + +func TestTokenPresentInJSON(t *testing.T) { + payload := payloadForInstanceTest(t) + body, err := json.Marshal(&payload) + + if err != nil { + t.Error(err) + } + + var data map[string]interface{} + + err = json.Unmarshal(body, &data) + + if err != nil { + t.Error(err) + } + + if data["token"] == nil { + t.Fail() + } + + if data["token"] != payload.Token { + t.Fail() + } +} diff --git a/main.go b/main.go index f442e25..4cad7ac 100644 --- a/main.go +++ b/main.go @@ -2,8 +2,8 @@ package main import ( "flag" - "github.com/pothibo/stories/integrations" - "github.com/pothibo/stories/stories" + "github.com/convertkit/stories/integrations" + "github.com/convertkit/stories/stories" "log" "net" "os" diff --git a/stories/queue.go b/stories/queue.go index 5e67fa4..37d66da 100644 --- a/stories/queue.go +++ b/stories/queue.go @@ -59,11 +59,11 @@ func (q *Queue) Collect() []*Story { return stories } -func (q *Queue) Size() int { +func (q *Queue) Capacity() int { return cap(q.channel) } -func (q *Queue) InQueue() int { +func (q *Queue) Size() int { return len(q.channel) } @@ -72,7 +72,7 @@ func (q *Queue) IsEmpty() bool { } func (q *Queue) IsFull() bool { - return q.InQueue() == q.Size() + return q.Size() >= q.Capacity() } func read(c net.Conn) ([]byte, int, error) { diff --git a/stories/queue_test.go b/stories/queue_test.go index 6000f16..fff2bbb 100644 --- a/stories/queue_test.go +++ b/stories/queue_test.go @@ -2,22 +2,77 @@ package stories import ( "testing" + "net" ) func TestQueueInitializedWithSize(t *testing.T) { size := 100 queue := NewQueueOfSize(size) - if queue.Size() != size { + if queue.Capacity() != size { t.Fail() } } func TestCollectingWillClearTheQueue(t *testing.T) { + size := 10 + queue := NewQueueOfSize(size) + + for i := 0; i < size; i++ { + server, client := net.Pipe() + go func() { + server.Write(storyJSON()) + server.Close() + }() + + err := queue.Add(client) + + if err != nil { + t.Fatal(err) + } + } + + if queue.IsEmpty() { + t.Fatal("Queue should not have been empty") + } + + if len(queue.Collect()) != size { + t.Fatalf("Wanted a queue of %d. Got %d", queue.Capacity(), size) + } + + if queue.IsEmpty() != true { + t.Fatalf("Queue should have been empty, has a size of %d", queue.Capacity()) + } } func TestQueueIsFullAtMaxSize(t *testing.T) { + size := 10 + queue := NewQueueOfSize(size) + + for i := 0; i < size; i++ { + server, client := net.Pipe() + go func() { + server.Write(storyJSON()) + server.Close() + }() + + err := queue.Add(client) + + if err != nil { + t.Fatal(err) + } + } + + if queue.IsFull() != true { + t.Fatal("Queue should have been full") + } } func TestEmptyQueue(t *testing.T) { + size := 10 + queue := NewQueueOfSize(size) + + if queue.IsEmpty() != true { + t.Fatal("Queue should have been empty") + } } diff --git a/stories/story.go b/stories/story.go index a29d26a..754ef06 100644 --- a/stories/story.go +++ b/stories/story.go @@ -8,7 +8,7 @@ type Story struct { Severity int Message string Timestamp string - Data map[string]string + Data map[string]json.RawMessage } func NewStory(bytes []byte) (*Story, error) { @@ -20,7 +20,7 @@ func NewStory(bytes []byte) (*Story, error) { } if story.Data == nil { - story.Data = make(map[string]string) + story.Data = make(map[string]json.RawMessage) } if story.Severity == 0 { diff --git a/stories/story_test.go b/stories/story_test.go index 8c447ad..1208bec 100644 --- a/stories/story_test.go +++ b/stories/story_test.go @@ -5,6 +5,23 @@ import ( "testing" ) +func storyJSON() []byte { + return []byte(`{ + "severity": 4, + "data": { + "foo": { + "bar": "Something", + "ints": 1234, + "array": ["1234", 9832], + "yolo": true + }, + "object_id": 1234, + "boolean": true, + "some": "value" + } + }`) +} + func TestNewStoryWithInvalidJSON(t *testing.T) { story, err := NewStory([]byte("Invalid JSON")) @@ -18,7 +35,7 @@ func TestNewStoryWithInvalidJSON(t *testing.T) { } func TestNewStoryWithValidJSON(t *testing.T) { - story, err := NewStory([]byte("{\"foo\": \"bar\"}")) + story, err := NewStory([]byte(`{"foo": "bar"}`)) if story == nil { t.Error(err) @@ -82,3 +99,27 @@ func TestNewStoryDataIsNeverNil(t *testing.T) { t.Fail() } } + +func TestNewStoryWithCompleteData(t *testing.T) { + event, err := NewStory(storyJSON()) + + json, err := json.Marshal(event) + + if err != nil { + t.Error(err) + } + + story, err := NewStory(json) + + if err != nil { + t.Fail() + } + + if story.Severity != 4 { + t.Fail() + } + + if story.Data["foo"] == nil { + t.Fail() + } +} From 7d86217cf27d6101a8442362540820a88f670306 Mon Sep 17 00:00:00 2001 From: Pier-Olivier Thibault Date: Mon, 5 Nov 2018 21:36:10 -0500 Subject: [PATCH 2/4] Trying to fix CircleCI --- .circleci/config.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index ee26648..171ec65 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -15,6 +15,7 @@ jobs: keys: - v1-pkg-cache + - run: go get github.com/google/uuid - run: name: Run unit tests environment: From 6c2e553bc8fd7ea6c3389299d06acf3812f7eb7c Mon Sep 17 00:00:00 2001 From: = Date: Tue, 6 Nov 2018 10:42:15 -0500 Subject: [PATCH 3/4] Better comparison operator with strings --- integrations/scalyr/event_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/integrations/scalyr/event_test.go b/integrations/scalyr/event_test.go index 580ec9a..dd46b9b 100644 --- a/integrations/scalyr/event_test.go +++ b/integrations/scalyr/event_test.go @@ -1,7 +1,6 @@ package scalyr import ( - "bytes" "encoding/json" "github.com/convertkit/stories/stories" "testing" @@ -54,7 +53,7 @@ func TestTimestampInJSON(t *testing.T) { story := story(t) payload := payloadForEventTest(story, t) - if bytes.Compare([]byte(payload["ts"].(string)), []byte("1541354132811")) != 0 { + if payload["ts"] != string("1541354132811") { t.Fail() } } @@ -74,7 +73,7 @@ func TestMessageInAttributesJSON(t *testing.T) { attributes := payload["attrs"].(map[string]interface{}) - if bytes.Compare([]byte(attributes["message"].(string)), []byte("Hello world!")) != 0 { + if attributes["message"] != string("Hello world!") { t.Fail() } } From 954ad428734e53391e809285d59dded1c5a16201 Mon Sep 17 00:00:00 2001 From: thejohnny Date: Wed, 7 Nov 2018 07:58:57 -0500 Subject: [PATCH 4/4] Update integrations/scalyr/instance.go Co-Authored-By: pothibo --- integrations/scalyr/instance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/scalyr/instance.go b/integrations/scalyr/instance.go index d69865e..508d171 100644 --- a/integrations/scalyr/instance.go +++ b/integrations/scalyr/instance.go @@ -34,7 +34,7 @@ func (i *Instance) Configure() error { i.SessionInfo = make(map[string]string) i.SessionInfo["logfile"] = "stories" - i.SessionInfo["serverHost"] = "production" + i.SessionInfo["serverHost"] = os.Getenv("RACK_ENV") i.Url = "https://www.scalyr.com/addEvents"