-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathdemux.go
73 lines (65 loc) · 1.7 KB
/
demux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package tradier
import (
"github.com/pkg/errors"
)
// StreamDemuxer demuxes the different types of messages in a market events stream.
type StreamDemuxer struct {
Quotes func(quote *QuoteEvent)
Trades func(trade *TradeEvent)
Summaries func(summary *SummaryEvent)
TimeSales func(timeSale *TimeSaleEvent)
Errors func(err error)
}
func (sd *StreamDemuxer) Handle(event *StreamEvent) {
switch {
case event.Type == "quote":
sd.handleQuote(event)
case event.Type == "trade":
sd.handleTrade(event)
case event.Type == "timesale":
sd.handleTimeSale(event)
case event.Type == "summary":
sd.handleSummary(event)
}
}
func (sd *StreamDemuxer) HandleChan(events <-chan *StreamEvent) {
for event := range events {
sd.Handle(event)
}
}
func (sd *StreamDemuxer) handleQuote(m *StreamEvent) {
if sd.Quotes != nil {
if q, err := DecodeQuote(m); err == nil {
sd.Quotes(q)
} else {
sd.Errors(errors.Wrapf(err, "error decoding quote: %v", string(m.Message)))
}
}
}
func (sd *StreamDemuxer) handleTrade(m *StreamEvent) {
if sd.Trades != nil {
if t, err := DecodeTrade(m); err == nil {
sd.Trades(t)
} else {
sd.Errors(errors.Wrapf(err, "error decoding trade: %v", string(m.Message)))
}
}
}
func (sd *StreamDemuxer) handleSummary(m *StreamEvent) {
if sd.Summaries != nil {
if s, err := DecodeSummary(m); err == nil {
sd.Summaries(s)
} else {
sd.Errors(errors.Wrapf(err, "error decoding summary: %v", string(m.Message)))
}
}
}
func (sd *StreamDemuxer) handleTimeSale(m *StreamEvent) {
if sd.TimeSales != nil {
if ts, err := DecodeTimeSale(m); err == nil {
sd.TimeSales(ts)
} else {
sd.Errors(errors.Wrapf(err, "error decoding time sale: %v", string(m.Message)))
}
}
}