-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfirehose_batcher_test.go
140 lines (124 loc) · 3.86 KB
/
firehose_batcher_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package firehosebatcher
import (
"bytes"
"testing"
"time"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/stretchr/testify/assert"
)
func dataToBatch(msgs [][]byte) *Batch {
b := new(Batch)
for _, m := range msgs {
err := b.Add(&firehose.Record{Data: m})
if err != nil {
panic(err)
}
}
return b
}
// Compare the results of startBatching to what a simple for loop + slice would generate
func TestBatching(t *testing.T) {
tests := []struct {
name string
desc string
data [][]byte
sendTimeout time.Duration
expectedBatches []*Batch
}{
{
name: "Short batch, should timeout",
desc: `Send a small amount of data in, expect a small amount of data out. The batch sending should be triggered by the send timeout.`,
sendTimeout: time.Millisecond * 5,
data: [][]byte{
[]byte("abcd"),
[]byte("efgh"),
},
expectedBatches: []*Batch{
dataToBatch([][]byte{
[]byte("abcd"),
[]byte("efgh"),
}),
},
},
{
name: "Oversized batch (length)",
desc: `This tests the case in which we get too many individual records. This should send two batches; the first with len(records == BATCH_ITEM_LIMIT, and a second with records == 1`,
sendTimeout: time.Millisecond * 5,
data: bytes.Split(
bytes.Repeat([]byte{0}, BATCH_ITEM_LIMIT+1),
[]byte{},
),
expectedBatches: []*Batch{
dataToBatch(
bytes.Split(
bytes.Repeat([]byte{0}, BATCH_ITEM_LIMIT),
[]byte{},
),
),
dataToBatch([][]byte{[]byte{0}}),
},
},
{
name: "Oversized batch (size)",
desc: `This tests the case in which we get too much data and need to split into multiple batches because the data volume is too large. The first record should have sum(len(record) for record in batch) == BATCH_SIZE_LIMIT, and a second with sum(len(record) for record in batch) == 1`,
sendTimeout: time.Millisecond * 5,
data: [][]byte{
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8), // 1
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8), // 2
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8), // 3
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8), // 4
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8), // 5
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8), // 6
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8), // 7
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8), // 8
[]byte{0},
},
expectedBatches: []*Batch{
dataToBatch([][]byte{
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8),
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8),
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8),
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8),
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8),
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8),
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8),
bytes.Repeat([]byte{0}, BATCH_SIZE_LIMIT/8),
}),
dataToBatch([][]byte{[]byte{0}}),
},
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
assert := assert.New(t)
fb, err := New(nil, "n/a", test.sendTimeout)
if err != nil {
panic(err)
}
go fb.startBatching()
go func() {
for _, msg := range test.data {
assert.NoError(fb.AddRaw(msg))
}
}()
for _, expectedBatch := range test.expectedBatches {
select {
case batch := <-fb.batchSendBuffer:
assert.Equal(expectedBatch.Length(), batch.Length(), "batch length")
assert.Equal(expectedBatch.Size(), batch.Size(), "batch size")
assert.Equal(expectedBatch.contents, batch.contents, "batch contents")
case <-time.After(test.sendTimeout * 5):
t.Error("5 * sendTime exceeded")
}
}
// Make sure we don't end up with an extra batch because we messed up a test
select {
case <-fb.batchSendBuffer:
t.Error("Unexpected batch! Recieved an unexpected batch after the end of the test.")
case <-time.After(test.sendTimeout):
// noop, this is what should happen
}
})
}
}