-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbatch.go
86 lines (67 loc) · 2.11 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package firehosebatcher
import (
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
var (
ErrBatchSizeOverflow = errors.New("batch size overflow")
ErrBatchLengthOverflow = errors.New("batch length overflow")
)
// Batch is really just a wrapper around a slice of `firehose.Record`s that tracks the size and length to make sure we don't create a batch that can't be sent to Firehose.
type Batch struct {
size int
contents []*firehose.Record
fillTimer *prometheus.Timer
}
// NewBatch construct a batch with an intializing record
func NewBatch(r *firehose.Record) *Batch {
b := &Batch{
fillTimer: prometheus.NewTimer(BatchFillLatency),
}
b.Add(r)
BatchesCreated.Inc()
return b
}
// Add attempts to add a record to the batch. If adding the record would cause either the batch's total size or total length to exceed AWS API limits this will return an appropriate error.
func (b *Batch) Add(r *firehose.Record) error {
if b.contents == nil {
b.contents = make([]*firehose.Record, 0, BATCH_ITEM_LIMIT)
}
rSize := len(r.Data)
if b.size+rSize > BATCH_SIZE_LIMIT {
return ErrBatchSizeOverflow
}
if b.Length()+1 > BATCH_ITEM_LIMIT {
return ErrBatchLengthOverflow
}
b.contents = append(b.contents, r)
b.size += rSize
BytesBatched.Add(float64(rSize))
return nil
}
// Size return the number bytes stored in this batch
func (b *Batch) Size() int {
return b.size
}
// Length return the number of records in the batch
func (b *Batch) Length() int {
return len(b.contents)
}
// Send calls firehose.PutRecordBatch with the given batch. It does not current handle or retry on any sort of failure. This can cause unrecoverable message drops.
func (b *Batch) Send(client *firehose.Firehose, streamName string) error {
prbo, err := client.PutRecordBatch(&firehose.PutRecordBatchInput{
DeliveryStreamName: &streamName,
Records: b.contents,
})
if err != nil {
return err
}
if *prbo.FailedPutCount > 0 {
return errors.Errorf(
"failed to send the full batch (%d failed), retries not currently handled",
*prbo.FailedPutCount,
)
}
return nil
}