-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtaylor.go
240 lines (216 loc) · 8.35 KB
/
taylor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
package main
import (
"context"
"flag"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/nxadm/tail"
"github.com/tidwall/gjson"
"log"
"log/syslog"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
var logger *log.Logger
var dbchan = make(chan string, 100)
var linecount int64 = 0
var lastlines int64 = 0
var errorcount int64 = 0
var timeparsecount int64 = 0
func main() {
// Set up logging to syslog
logwriter, err := syslog.New(syslog.LOG_NOTICE, "taylor")
if err == nil {
log.SetOutput(logwriter)
}
// Supress timestamp - included by syslog
log.SetFlags(log.Flags() &^ (log.Ldate | log.Ltime))
// Config
config_file := flag.String("filename", "/var/log/syslog", "logfilename")
config_skip := flag.String("skip", "TXT,DNSKEY", "List for DNS types to skip")
flag.Parse()
log.Printf("Starting taylor..\n")
// Make channel for loglines
logchannel := make(chan string, 100)
// Read commandline params
filename := *config_file
skipfields := strings.Split(*config_skip, ",")
// Func to catch kill -USR1
// Will print stats to syslog
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGUSR1)
go func() {
for {
<-c
log.Printf("Signal caught, linecount: %v, errorcount: %v, timeparseerror: %v\n", linecount, errorcount, timeparsecount)
}
}()
// Func to print stats every 10mins to syslog
go func() {
for {
time.Sleep(600 * time.Second)
log.Printf("Lines last 10min: %v, errorcount: %v\n", Abs(linecount-lastlines), errorcount)
lastlines = linecount
}
}()
// Start dbwriter
log.Printf("Started dbwriter\n")
go dbwriter(dbchan)
// Start reader loop
log.Printf("Started reader of logile: %v\n", filename)
go filereader(filename, logchannel)
// Wait for new lines on channel
for {
select {
// New line from logchannel (reader loop)
case newline := <-logchannel:
linecount++
// Parse line async
go parseline(newline, skipfields)
// Print lincount in log if no new lines for 60s
case <-time.After(time.Second * 60):
log.Printf("linecount: %v\n", linecount)
}
}
}
// Open logfile in "tail -f" mode, send newlines to channel
func filereader(filename string, outchan chan string) {
// Get syslogger to pass to tail.Config
syslogger := getSysLogger()
for {
// Open logfile in "tail" mode, send lines to channel
t, err := tail.TailFile(filename, tail.Config{ReOpen: true, Follow: true, Logger: syslogger})
if err != nil {
log.Printf("Error opening file: %v\n", filename)
panic(err)
}
// Blocking loop, pass new lines to outchan
for line := range t.Lines {
outchan <- string(line.Text)
}
// Something went wrong! Reopen file?
log.Println("taylor t.Lines closed. Trying reopen")
}
}
// Parse a single JSON line
func parseline(newline string, skipfields []string) {
// Only parse dns.type: answer lines
if gjson.Get(newline, "dns.type").String() != "answer" {
return
}
// Get all answers
dnsarr := gjson.Get(newline, "dns.answers").Array()
// If len answers >0 iterate results
if len(dnsarr) > 0 {
// Get time from JSON
timestring := gjson.Get(newline, "timestamp").String()
// Parse time using string to match surricata logs
thetime, err := time.Parse("2006-01-02T15:04:05.99999-0700", timestring)
if err != nil {
// Error parsing timestring, inc. errorcounter and return
timeparsecount++
return
}
// Convert to int64
timestamp := int64(thetime.Unix())
// Iterate all answers
for _, e := range dnsarr {
// Get line as string
element := e.String()
// Get rrtype
rrtype := gjson.Get(element, "rrtype").String()
// Check if type should be skipped
if stringInSlice(rrtype, skipfields) {
// Skip if type is to be skipped
return
}
// Get rrname and rdata
rrname := gjson.Get(element, "rrname").String()
rdata := gjson.Get(element, "rdata").String()
// Return if any len = 0 or rrname/rdata contains ,
if len(rrname) == 0 || len(rrtype) == 0 || len(rdata) == 0 || strings.Contains(rrname, ",") || strings.Contains(rdata, ",") {
// Inc. errorcount if error
errorcount++
return
}
//DEBUG print fmt.Printf("%q,%q,%q,%v,%v,1\n", rrname, rdata, rrtype, timestamp, timestamp)
// Build INSERT line
dboutline := fmt.Sprintf("'%s','%s','%s',%d,%d,1", rrname, rdata, rrtype, timestamp, timestamp)
// Send line to dbwriter
dbchan <- dboutline
}
}
}
// Helper to check if string is in slice
func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}
// Helper to get *log.Logger
func getSysLogger() *log.Logger {
logger, err := syslog.NewLogger(syslog.LOG_INFO, (log.LstdFlags &^ (log.Ldate | log.Ltime)))
if err != nil {
log.Fatal(err)
}
return logger
}
// Simple Abs function
func Abs(x int64) int64 {
if x < 0 {
return -x
}
return x
}
// Dbwriter, read lines from channel and do async inserts
//
// Change db params if you use external db
// TODO: move db params to arguments
//
func dbwriter(dbline chan string) {
var (
// Setup context
ctx = context.Background()
// Make db connection
conn, err = clickhouse.Open(&clickhouse.Options{
// Dbserver
Addr: []string{"127.0.0.1:9000"},
// Dbauth
Auth: clickhouse.Auth{
Database: "padde",
Username: "default",
Password: "",
},
//Debug: true,
DialTimeout: time.Second,
MaxOpenConns: 10,
MaxIdleConns: 5,
ConnMaxLifetime: time.Hour,
})
)
// Fail if unable to connect to db
if err != nil {
log.Fatal(err)
}
// Blocking loop, wait for lines - do insert
for {
// Wait for new lines, insert into DB
select {
// Get newline from dbline channel
case newline := <-dbline:
// Build INSERT string
linevalues := "INSERT INTO padde.log VALUES(" + newline + ")"
// Async db insert
err := conn.AsyncInsert(ctx, linevalues, false)
if err != nil {
log.Println("Error on DB insert %v", err)
}
}
}
}