-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathldb_rotating_reader.go
168 lines (150 loc) · 5.17 KB
/
ldb_rotating_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package ctlstore
import (
"context"
"errors"
"fmt"
"github.com/segmentio/ctlstore/pkg/errs"
"github.com/segmentio/ctlstore/pkg/globalstats"
"github.com/segmentio/ctlstore/pkg/ldb"
"github.com/segmentio/events/v2"
"github.com/segmentio/stats/v4"
"path"
"strconv"
"sync/atomic"
"time"
)
// LDBRotatingReader reads data from multiple LDBs on a rotating schedule.
// The main benefit is relieving read pressure on a particular ldb file when it becomes inactive,
// allowing sqlite maintenance
type LDBRotatingReader struct {
active int32
dbs []*LDBReader
schedule []int8
now func() time.Time
tickerInterval time.Duration
}
// RotationPeriod how many minutes each reader is active for before rotating to the next
type RotationPeriod int
const (
// Every30 rotate on 30 minute mark in an hour
Every30 RotationPeriod = 30
// Every20 rotate on 20 minute marks in an hour
Every20 RotationPeriod = 20
// Every15 rotate on 15 minute marks in an hour
Every15 RotationPeriod = 15
// Every10 rotate on 10 minute marks in an hour
Every10 RotationPeriod = 10
// Every6 rotate on 6 minute marks in an hour
Every6 RotationPeriod = 6
// for simpler migration, the first ldb retains the original name
defaultPath = DefaultCtlstorePath + ldb.DefaultLDBFilename
ldbFormat = DefaultCtlstorePath + "ldb-%d.db"
)
func defaultPaths(count int) []string {
paths := []string{defaultPath}
for i := 1; i < count; i++ {
paths = append(paths, fmt.Sprintf(ldbFormat, i+1))
}
return paths
}
// RotatingReader creates a new reader that rotates which ldb it reads from on a rotation period with the default location in /var/spool/ctlstore
func RotatingReader(ctx context.Context, minutesPerRotation RotationPeriod, ldbsCount int) (RowRetriever, error) {
return CustomerRotatingReader(ctx, minutesPerRotation, defaultPaths(ldbsCount)...)
}
// CustomerRotatingReader creates a new reader that rotates which ldb it reads from on a rotation period
func CustomerRotatingReader(ctx context.Context, minutesPerRotation RotationPeriod, ldbPaths ...string) (RowRetriever, error) {
r, err := rotatingReader(minutesPerRotation, ldbPaths...)
if err != nil {
return nil, err
}
r.setActive()
go r.rotate(ctx)
return r, nil
}
func rotatingReader(minutesPerRotation RotationPeriod, ldbPaths ...string) (*LDBRotatingReader, error) {
if len(ldbPaths) < 2 {
return nil, errors.New("RotatingReader requires more than 1 ldb")
}
if !isValid(minutesPerRotation) {
return nil, errors.New(fmt.Sprintf("invalid rotation period: %v", minutesPerRotation))
}
if len(ldbPaths) > 60/int(minutesPerRotation) {
return nil, errors.New("cannot have more ldbs than rotations per hour")
}
var r LDBRotatingReader
for _, p := range ldbPaths {
events.Log("Opening ldb %s for reading", p)
reader, err := newLDBReader(p)
if err != nil {
return nil, err
}
r.dbs = append(r.dbs, reader)
}
r.schedule = make([]int8, 60)
idx := 0
for i := 1; i < 61; i++ {
r.schedule[i-1] = int8(idx % len(ldbPaths))
if i%int(minutesPerRotation) == 0 {
idx++
}
}
return &r, nil
}
func (r *LDBRotatingReader) setActive() {
if r.now == nil {
r.now = time.Now
}
atomic.StoreInt32(&r.active, int32(r.schedule[r.now().Minute()]))
}
// GetRowsByKeyPrefix delegates to the active LDBReader
func (r *LDBRotatingReader) GetRowsByKeyPrefix(ctx context.Context, familyName string, tableName string, key ...interface{}) (*Rows, error) {
return r.dbs[atomic.LoadInt32(&r.active)].GetRowsByKeyPrefix(ctx, familyName, tableName, key...)
}
// GetRowByKey delegates to the active LDBReader
func (r *LDBRotatingReader) GetRowByKey(ctx context.Context, out interface{}, familyName string, tableName string, key ...interface{}) (found bool, err error) {
return r.dbs[atomic.LoadInt32(&r.active)].GetRowByKey(ctx, out, familyName, tableName, key...)
}
// rotate by default checks every 1 minute if the active db has changed according to schedule
func (r *LDBRotatingReader) rotate(ctx context.Context) {
if r.tickerInterval == 0 {
r.tickerInterval = 1 * time.Minute
}
ticker := time.NewTicker(r.tickerInterval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
next := r.schedule[r.now().Minute()]
last := atomic.LoadInt32(&r.active)
// move the next to active and close and reopen the last one
if int32(next) != last {
atomic.StoreInt32(&r.active, int32(next))
stats.Incr("rotating_reader.rotate")
globalstats.Set("rotating_reader.active", next)
err := r.dbs[last].Close()
if err != nil {
events.Log("failed to close LDBReader for %s on rotation: %{error}v", r.dbs[last].path, err)
errs.Incr("rotating_reader.closing_ldbreader", stats.T("id", strconv.Itoa(int(last))))
return
}
reader, err := newLDBReader(r.dbs[last].path)
if err != nil {
events.Log("failed to open LDBReader for %s on rotation: %{error}v", r.dbs[last].path, err)
errs.Incr("rotating_reader.opening_ldbreader",
stats.T("id", strconv.Itoa(int(last))),
stats.T("path", path.Base(r.dbs[last].path)))
return
}
r.dbs[last] = reader
}
}
}
}
func isValid(rf RotationPeriod) bool {
switch rf {
case Every6, Every10, Every15, Every20, Every30:
return true
}
return false
}