-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlogstore.go
305 lines (267 loc) · 7.74 KB
/
logstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
package main
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path"
"sort"
"strconv"
"sync"
"time"
)
const DEFAULT_DATABASE_DIR = "./store"
const MAX_LOG_FILE_BYTES = 1024 * 1024 * 512
const SHARDS = 128 // Tune this per your workload
type LogStoreOptions struct {
maxLogFileBytes int
}
type LogStore struct {
logDir string
keys *ConcurrentMap[Item]
logFile *os.File
logFileMu sync.Mutex
opts *LogStoreOptions
}
type Item struct {
file string
expire int
valuePos int
valueSize int
}
// StreamGet streams a value from the log store.
// Goroutine safe due to map sharps with locks
func (logStore *LogStore) StreamGet(key string, w io.Writer) (bool, error) {
access := logStore.keys.AccessShard(key)
defer access.Unlock()
item, found := logStore.keys.Get(key)
if !found {
return false, nil
} else if int(time.Now().UnixMilli()) >= item.expire {
// Clean up expired items
logStore.keys.Delete(key)
return false, nil
}
f, err := os.Open(item.file)
if err != nil {
return false, fmt.Errorf("couldn't open log file %s: %s", logStore.logFile.Name(), err)
}
defer f.Close()
_, err = f.Seek(int64(item.valuePos), 0)
if err != nil {
return false, fmt.Errorf("couldn't seek in %s: %s", logStore.logFile.Name(), err)
}
_, err = io.CopyN(w, f, int64(item.valueSize))
if err != nil {
// TODO: hmm, should this return false?
// not really because the key _was_ found..
return true, err
}
return true, nil
}
// Set sets a value. Setting `expire` to 0 is effectively a delete operation.
// Goroutine safe due to map sharps with locks, and a log file lock
func (logStore *LogStore) Set(key string, expire int, value []byte) error {
access := logStore.keys.AccessShard(key)
defer access.Unlock()
logStore.logFileMu.Lock()
defer logStore.logFileMu.Unlock()
fi, err := logStore.logFile.Stat()
if err != nil {
return fmt.Errorf("couldn't stat log file %s: %s", logStore.logFile.Name(), err)
}
end := int(fi.Size())
line := []byte(fmt.Sprintf("%d,%d,%d,%s,", expire, len(key), len(value), key))
lineLength := len(line) + len(value) + 1 // And the ending comma
// Roll log file if we need to
if end+lineLength >= logStore.opts.maxLogFileBytes {
err = logStore.nextLogFile()
if err != nil {
return err
}
// New log files are empty
end = 0
}
data := append(append(line, value...), ","...)
_, err = logStore.logFile.Write(data)
if err != nil {
return fmt.Errorf("couldn't write to %s: %s", logStore.logFile.Name(), err)
}
item := Item{
logStore.logFile.Name(),
expire,
end + len(line),
len(value),
}
// To support deletes, instead of adding expired items
// to the in-memory key dictionary, remove the old key
if int(time.Now().UnixMilli()) >= expire {
logStore.keys.Delete(string(key))
return nil
} else {
logStore.keys.Set(key, item)
}
return nil
}
// nextLogFile rolls the LogStore's log file to a newly created file
func (logStore *LogStore) nextLogFile() error {
defer logStore.logFile.Close()
logFile, err := createLogFile(logStore.logDir)
if err != nil {
return err
}
logStore.logFile = logFile
return nil
}
// parseLogFile opens a log file and parses the items. It returns a key dictionary
// without any expired items
func parseLogFile(path string) (map[string]Item, error) {
const COMMA byte = 44
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("couldn't open log file %s: %s", path, err)
}
defer f.Close()
keys := make(map[string]Item)
r := bufio.NewReader(f)
cur := 0
for {
_expire, err := r.ReadBytes(COMMA)
cur += len(_expire)
if err == io.EOF {
break
}
if err != nil {
return nil, fmt.Errorf("couldn't parse expire %s", err)
}
expire, err := strconv.Atoi(string(_expire[:len(_expire)-1]))
if err != nil {
return nil, fmt.Errorf("couldn't parse expire %s", err)
}
_keySize, err := r.ReadBytes(COMMA)
cur += len(_keySize)
if err != nil {
return nil, fmt.Errorf("couldn't skip keySize: %s", err)
}
keySize, err := strconv.Atoi(string(_keySize[:len(_keySize)-1]))
if err != nil {
return nil, fmt.Errorf("couldn't parse keySize: %s", err)
}
_valueSize, err := r.ReadBytes(COMMA)
cur += len(_valueSize)
if err != nil {
return nil, fmt.Errorf("couldn't parse valueSize: %s", err)
}
valueSize, err := strconv.Atoi(string(_valueSize[:len(_valueSize)-1]))
if err != nil {
return nil, fmt.Errorf("couldn't parse valueSize: %s", err)
}
key := make([]byte, keySize+1) // Read key (+ 1 for the comma between metadata)
n, err := r.Read(key)
cur += n
if err != nil {
return nil, fmt.Errorf("couldn't parse key: %s", err)
}
key = key[:len(key)-1]
valueOffset := cur // The value can be found at the current cursor
n, err = r.Discard(valueSize + 1) // Skip value (+ 1 for the comma between metadata)
cur += n
if err != nil {
return nil, fmt.Errorf("during key (%s) couldn't skip value: %s", key, err)
}
if int(time.Now().UnixMilli()) < expire {
keys[string(key)] = Item{
path,
expire,
valueOffset,
valueSize,
}
} else {
// Don't load expired items into memory, clean up items that have been overwritten
delete(keys, string(key))
}
}
return keys, nil
}
// CreateLogStore creates a new log store and loads existing log files from disk
func CreateLogStore(logDir string, opts *LogStoreOptions) (*LogStore, error) {
if opts == nil {
opts = &LogStoreOptions{maxLogFileBytes: MAX_LOG_FILE_BYTES}
}
err := os.MkdirAll(logDir, os.ModePerm)
if err != nil {
return nil, fmt.Errorf("couldn't create directory %s: %s", logDir, err)
}
// Load log files from disk one-by-one in lexical order (file names
// start with a timestamp). No need to do this concurrently yet
logFiles, err := ioutil.ReadDir(logDir)
if err != nil {
return nil, fmt.Errorf("couldn't read directory %s: %s", logDir, err)
}
sort.Slice(logFiles, func(i, j int) bool {
return logFiles[i].Name() < logFiles[j].Name()
})
concurrentMap := NewConcurrentMap[Item](SHARDS)
for _, fileInfo := range logFiles {
keys, err := parseLogFile(path.Join(logDir, fileInfo.Name()))
if err != nil {
return nil, fmt.Errorf("couldn't parse log file %s: %s", path.Join(logDir, fileInfo.Name()), err)
}
concurrentMap.MSet(keys)
}
var logFile *os.File
if len(logFiles) > 0 {
latest := logFiles[len(logFiles)-1]
latestPath := path.Join(logDir, latest.Name())
fi, err := os.Stat(latestPath)
if err != nil {
return nil, fmt.Errorf("couldn't stat log file %s: %s", path.Join(logDir, latestPath), err)
}
if fi.Size() >= int64(opts.maxLogFileBytes) {
// If the latest log file on disk is at capacity create a new one
logFile, err = createLogFile(logDir)
if err != nil {
return nil, err
}
} else {
// Otherwise, open the latest log file because there's still room
logFile, err = os.OpenFile(latestPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
return nil, err
}
}
} else {
// If this is a new directory, create the first log file
logFile, err = createLogFile(logDir)
if err != nil {
return nil, err
}
}
return &LogStore{
logDir,
concurrentMap,
logFile,
sync.Mutex{},
opts,
}, nil
}
// createLogFile generates a log file on disk and returns a file pointer
func createLogFile(logDir string) (*os.File, error) {
id := fmt.Sprintf("%d-%s", time.Now().UnixMilli(), rndFileString(16))
logFile, err := os.Create(path.Join(logDir, id))
if err != nil {
return nil, fmt.Errorf("couldn't create log file %s: %s", path.Join(logDir, id), err)
}
return logFile, nil
}
// rndFileString returns a random string appropriate for a filename
func rndFileString(length int) []byte {
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
b := make([]byte, length)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return b
}