Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream/match: allow a single connection to maintain its own match lookup for multi-connection #1613

Open
wants to merge 108 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
108 commits
Select commit Hold shift + click to select a range
64c24ab
gateio: Add multi asset websocket support WIP.
Jul 14, 2024
f509399
meow
Jul 14, 2024
feed04e
Add tests and shenanigans
Jul 15, 2024
31a26c0
integrate flushing and for enabling/disabling pairs from rpc shenanigans
Jul 15, 2024
e1f2f7a
some changes
Jul 15, 2024
76524cc
linter: fixes strikes again.
Jul 15, 2024
640e82e
Change name ConnectionAssociation -> ConnectionCandidate for better c…
Jul 15, 2024
3a0440d
Add subscription tests (state functional)
Jul 16, 2024
c7d2b62
glorious:nits + proxy handling
Jul 16, 2024
fc281ee
Spelling
Jul 16, 2024
eaa44ba
linter: fixerino
Jul 16, 2024
a8debf9
instead of nil, dont do nil.
Jul 16, 2024
16b0e22
clean up nils
Jul 16, 2024
32252b2
cya nils
Jul 16, 2024
7c5d9c3
don't need to set URL or check if its running
Jul 17, 2024
2f93b64
stream match update
Jul 17, 2024
dd94e4e
update tests
Jul 17, 2024
ca597f3
linter: fix
Jul 17, 2024
2e0f0ae
glorious: nits + handle context cancellations
Jul 18, 2024
2d2f872
stop ping handler routine leak
Jul 19, 2024
39191c8
* Fix bug where reader routine on error that is not a disconnection e…
Jul 19, 2024
f1c3895
Allow rollback on connect on any error across all connections
Jul 19, 2024
09bff6c
fix shadow jutsu
Jul 19, 2024
e66c9be
glorious/gk: nitters - adds in ws mock server
Jul 22, 2024
03de669
linter: fix
Jul 22, 2024
eddb58b
Merge branch 'master' into gateio_ws
Jul 23, 2024
8159b05
fix deadlock on connection as the previous channel had no reader and …
Jul 24, 2024
4f0b42f
Merge branch 'master' into gateio_ws
Jul 24, 2024
2af934e
Merge branch 'master' into stream_match
Jul 24, 2024
0e3bb31
glorious: whooops
Jul 24, 2024
f98c3aa
gk: nits
Jul 24, 2024
d89a46a
Leak issue and edge case
Jul 25, 2024
8281b88
Websocket: Add SendMessageReturnResponses
gbjk Apr 8, 2024
a38a1d1
whooooooopsie
Jul 25, 2024
77ef366
gk: nitssssss
Jul 26, 2024
6341ccf
Update exchanges/stream/stream_match.go
shazbert Jul 26, 2024
c0d1d43
Update exchanges/stream/stream_match_test.go
shazbert Jul 26, 2024
9e57e65
linter: appease the linter gods
Jul 26, 2024
51b9f17
gk: nits
Jul 26, 2024
6376a12
gk: drain brain
Jul 26, 2024
d275e39
Merge branch 'master' into stream_match
Jul 31, 2024
ee8a35c
glorious: nits
Aug 1, 2024
431c047
glorious: nits
Aug 15, 2024
44a3461
glorious: nits
Aug 15, 2024
5a0ed87
Merge branch 'gateio_ws' into match_to_conns
Aug 15, 2024
e1782dc
start to decouple match from a global reference to a connection
Aug 15, 2024
d5bbd10
Merge branch 'master' into gateio_ws
Aug 19, 2024
d2f7c2e
Merge branch 'master' into gateio_ws
Aug 23, 2024
16d88cf
Merge branch 'master' into gateio_ws
Aug 24, 2024
c0013ed
Merge branch 'gateio_ws' into match_to_conns
Aug 24, 2024
0402bc7
Update exchanges/stream/websocket.go
shazbert Aug 30, 2024
0dfda95
glorious: nits
Aug 30, 2024
c58834e
add tests
Aug 30, 2024
d29893b
linter: fix
Aug 30, 2024
79e0eeb
Merge branch 'master' into gateio_ws
Aug 30, 2024
4f7bcd9
Merge branch 'master' into gateio_ws
Sep 2, 2024
45ff199
After merge
Sep 2, 2024
06acaac
Add error connection info
Sep 2, 2024
da424cb
Merge branch 'gateio_ws' into match_to_conns
Sep 5, 2024
af986da
Fix edge case where it does not reconnect made by an already closed c…
Sep 9, 2024
1171278
stream coverage
Sep 12, 2024
4be6214
glorious: nits
Sep 12, 2024
a068ddb
Merge branch 'master' into gateio_ws
Sep 12, 2024
81cba36
glorious: nits removed asset error handling in stream package
Sep 12, 2024
bdc6954
linter: fix
Sep 13, 2024
3fe44ca
rm block
Sep 13, 2024
818584f
Add basic readme
Sep 13, 2024
289ac71
Merge branch 'master' into gateio_ws
Sep 13, 2024
dee32a4
Merge branch 'master' into gateio_ws
Sep 13, 2024
7063311
Merge branch 'master' into gateio_ws
Sep 16, 2024
0709ba3
fix asset enabled flush cycle for multi connection
Sep 16, 2024
b877b38
spella: fix
Sep 16, 2024
f7d1ec8
linter: fix
Sep 18, 2024
818432d
Merge branch 'master' into gateio_ws
Sep 18, 2024
bdc7afb
Add glorious suggestions, fix some race thing
Sep 19, 2024
3d6541e
reinstate name before any routine gets spawned
Sep 19, 2024
33c4128
stop on error in mock tests
Sep 19, 2024
ea25763
glorious: nits
Sep 19, 2024
ae92cd2
glorious: nits found in CI build
Sep 24, 2024
c49a75c
Add test for drain, bumped wait times as there seems to be something …
Sep 24, 2024
5551d54
mutex across shutdown and connect for protection
Sep 24, 2024
99c3c7b
lint: fix
Sep 24, 2024
f56cb78
test time withoffset, reinstate stop
Sep 24, 2024
b4bc7e2
fix whoops
Sep 24, 2024
ce4a5ce
const trafficCheckInterval; rm testmain
Sep 24, 2024
a745992
y
Sep 24, 2024
086fcc3
fix lint
Sep 24, 2024
a64e842
bump time check window
Sep 24, 2024
9e53313
stream: fix intermittant test failures while testing routines and rem…
Sep 25, 2024
10230b0
spells
Sep 25, 2024
4e02c4a
cant do what I did
Sep 25, 2024
7be80a0
protect race due to routine.
Sep 25, 2024
b51cf2f
update testURL
Sep 25, 2024
ff7ae03
use mock websocket connection instead of test URL's
Sep 25, 2024
1040153
linter: fix
Sep 25, 2024
f529bd2
remove url because its throwing errors on CI builds
Sep 25, 2024
5d0a7f7
connections drop all the time, don't need to worry about not being ab…
Sep 25, 2024
56cb431
remove another superfluous url thats not really set up for this
Sep 25, 2024
ca4999e
spawn overwatch routine when there is no errors, inline checker inste…
Sep 25, 2024
4240a0a
linter: fixerino uperino
Sep 26, 2024
8d6febc
glorious: panix
Sep 30, 2024
3df82ef
Merge branch 'master' into gateio_ws
Oct 1, 2024
7ff07d1
Merge branch 'master' into gateio_ws
Oct 1, 2024
3a24640
linter: things
Oct 1, 2024
1eef208
whoops
Oct 1, 2024
261b577
Merge branch 'master' into gateio_ws
Oct 1, 2024
9b8471d
Merge branch 'gateio_ws' into match_to_conns
Oct 2, 2024
66157c3
Merge branch 'master' into match_to_conns
Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 48 additions & 48 deletions exchanges/gateio/gateio_test.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions exchanges/gateio/gateio_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ func (g *Gateio) generateWsSignature(secret, event, channel string, t int64) (st
}

// WsHandleSpotData handles spot data
func (g *Gateio) WsHandleSpotData(_ context.Context, respRaw []byte) error {
func (g *Gateio) WsHandleSpotData(_ context.Context, conn stream.Connection, respRaw []byte) error {
var push WsResponse
err := json.Unmarshal(respRaw, &push)
if err != nil {
return err
}

if push.Event == subscribeEvent || push.Event == unsubscribeEvent {
if !g.Websocket.Match.IncomingWithData(push.ID, respRaw) {
if !conn.RouteIncomingWebsocketData(push.ID, respRaw) {
return fmt.Errorf("couldn't match subscription message with ID: %d", push.ID)
}
return nil
Expand Down
12 changes: 6 additions & 6 deletions exchanges/gateio/gateio_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ func (g *Gateio) Setup(exch *config.Exchange) error {
RateLimit: request.NewWeightedRateLimitByDuration(gateioWebsocketRateLimit),
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
Handler: func(ctx context.Context, incoming []byte) error {
return g.WsHandleFuturesData(ctx, incoming, asset.Futures)
Handler: func(ctx context.Context, conn stream.Connection, incoming []byte) error {
return g.WsHandleFuturesData(ctx, conn, incoming, asset.Futures)
},
Subscriber: g.FuturesSubscribe,
Unsubscriber: g.FuturesUnsubscribe,
Expand All @@ -248,8 +248,8 @@ func (g *Gateio) Setup(exch *config.Exchange) error {
RateLimit: request.NewWeightedRateLimitByDuration(gateioWebsocketRateLimit),
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
Handler: func(ctx context.Context, incoming []byte) error {
return g.WsHandleFuturesData(ctx, incoming, asset.Futures)
Handler: func(ctx context.Context, conn stream.Connection, incoming []byte) error {
return g.WsHandleFuturesData(ctx, conn, incoming, asset.Futures)
},
Subscriber: g.FuturesSubscribe,
Unsubscriber: g.FuturesUnsubscribe,
Expand All @@ -268,8 +268,8 @@ func (g *Gateio) Setup(exch *config.Exchange) error {
RateLimit: request.NewWeightedRateLimitByDuration(gateioWebsocketRateLimit),
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
Handler: func(ctx context.Context, incoming []byte) error {
return g.WsHandleFuturesData(ctx, incoming, asset.DeliveryFutures)
Handler: func(ctx context.Context, conn stream.Connection, incoming []byte) error {
return g.WsHandleFuturesData(ctx, conn, incoming, asset.DeliveryFutures)
},
Subscriber: g.DeliveryFuturesSubscribe,
Unsubscriber: g.DeliveryFuturesUnsubscribe,
Expand Down
4 changes: 2 additions & 2 deletions exchanges/gateio/gateio_ws_futures.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ func (g *Gateio) FuturesUnsubscribe(ctx context.Context, conn stream.Connection,
}

// WsHandleFuturesData handles futures websocket data
func (g *Gateio) WsHandleFuturesData(_ context.Context, respRaw []byte, a asset.Item) error {
func (g *Gateio) WsHandleFuturesData(_ context.Context, conn stream.Connection, respRaw []byte, a asset.Item) error {
var push WsResponse
err := json.Unmarshal(respRaw, &push)
if err != nil {
return err
}

if push.Event == subscribeEvent || push.Event == unsubscribeEvent {
if !g.Websocket.Match.IncomingWithData(push.ID, respRaw) {
if !conn.RouteIncomingWebsocketData(push.ID, respRaw) {
return fmt.Errorf("couldn't match subscription message with ID: %d", push.ID)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions exchanges/gateio/gateio_ws_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,15 @@ func (g *Gateio) OptionsUnsubscribe(ctx context.Context, conn stream.Connection,
}

// WsHandleOptionsData handles options websocket data
func (g *Gateio) WsHandleOptionsData(_ context.Context, respRaw []byte) error {
func (g *Gateio) WsHandleOptionsData(_ context.Context, conn stream.Connection, respRaw []byte) error {
var push WsResponse
err := json.Unmarshal(respRaw, &push)
if err != nil {
return err
}

if push.Event == subscribeEvent || push.Event == unsubscribeEvent {
if !g.Websocket.Match.IncomingWithData(push.ID, respRaw) {
if !conn.RouteIncomingWebsocketData(push.ID, respRaw) {
return fmt.Errorf("couldn't match subscription message with ID: %d", push.ID)
}
return nil
Expand Down
6 changes: 5 additions & 1 deletion exchanges/stream/stream_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type Connection interface {
SetProxy(string)
GetURL() string
Shutdown() error

// RouteIncomingWebsocketData routes incoming websocket data to the correct handler.
// Returns true if a handler was found and data was passed to it.
RouteIncomingWebsocketData(signature any, incoming []byte) (matched bool)
}

// Response defines generalised data from the stream connection
Expand Down Expand Up @@ -71,7 +75,7 @@ type ConnectionSetup struct {
// Handler defines the function that will be called when a message is
// received from the exchange's websocket server. This function should
// handle the incoming message and pass it to the appropriate data handler.
Handler func(ctx context.Context, incoming []byte) error
Handler func(ctx context.Context, conn Connection, incoming []byte) error
// BespokeGenerateMessageID is a function that returns a unique message ID.
// This is useful for when an exchange connection requires a unique or
// structured message ID for each message sent.
Expand Down
15 changes: 11 additions & 4 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ func (w *Websocket) getConnectionFromSetup(c *ConnectionSetup) *WebsocketConnect
if c.URL != "" {
connectionURL = c.URL
}
match := w.Match
if w.useMultiConnectionManagement {
// If we are using multi connection management, we can decouple
// the match from the global match and have a match per connection.
match = NewMatch()
}

return &WebsocketConnection{
ExchangeName: w.exchangeName,
URL: connectionURL,
Expand All @@ -298,7 +305,7 @@ func (w *Websocket) getConnectionFromSetup(c *ConnectionSetup) *WebsocketConnect
readMessageErrors: w.ReadMessageErrors,
shutdown: w.ShutdownC,
Wg: &w.Wg,
Match: w.Match,
Match: match,
RateLimit: c.RateLimit,
Reporter: c.ConnectionLevelReporter,
bespokeGenerateMessageID: c.BespokeGenerateMessageID,
Expand Down Expand Up @@ -1064,7 +1071,7 @@ func (w *Websocket) checkSubscriptions(conn Connection, subs subscription.List)
if s.State() == subscription.ResubscribingState {
continue
}
if found := w.subscriptions.Get(s); found != nil {
if found := subscriptionStore.Get(s); found != nil {
return fmt.Errorf("%w: %s", subscription.ErrDuplicate, s)
}
}
Expand All @@ -1073,14 +1080,14 @@ func (w *Websocket) checkSubscriptions(conn Connection, subs subscription.List)
}

// Reader reads and handles data from a specific connection
func (w *Websocket) Reader(ctx context.Context, conn Connection, handler func(ctx context.Context, message []byte) error) {
func (w *Websocket) Reader(ctx context.Context, conn Connection, handler func(ctx context.Context, conn Connection, message []byte) error) {
defer w.Wg.Done()
for {
resp := conn.ReadMessage()
if resp.Raw == nil {
return // Connection has been closed
}
if err := handler(ctx, resp.Raw); err != nil {
if err := handler(ctx, conn, resp.Raw); err != nil {
w.DataHandler <- fmt.Errorf("connection URL:[%v] error: %w", conn.GetURL(), err)
}
}
Expand Down
6 changes: 6 additions & 0 deletions exchanges/stream/websocket_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,9 @@ func removeURLQueryString(url string) string {
}
return url
}

// RouteIncomingWebsocketData routes incoming websocket data to the correct handler.
// Returns true if a handler was found and data was passed to it.
func (w *WebsocketConnection) RouteIncomingWebsocketData(signature any, incoming []byte) (matched bool) {
return w.Match.IncomingWithData(signature, incoming)
}
10 changes: 5 additions & 5 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestConnectionMessageErrors(t *testing.T) {
err = ws.Connect()
require.ErrorIs(t, err, errWebsocketDataHandlerUnset)

ws.connectionManager[0].Setup.Handler = func(context.Context, []byte) error {
ws.connectionManager[0].Setup.Handler = func(context.Context, Connection, []byte) error {
return errDastardlyReason
}
err = ws.Connect()
Expand All @@ -266,7 +266,7 @@ func TestConnectionMessageErrors(t *testing.T) {
err = ws.Connect()
require.ErrorIs(t, err, errDastardlyReason)

ws.connectionManager[0].Setup.Handler = func(context.Context, []byte) error {
ws.connectionManager[0].Setup.Handler = func(context.Context, Connection, []byte) error {
return errDastardlyReason
}
err = ws.Connect()
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestSubscribeUnsubscribe(t *testing.T) {
Unsubscriber: func(ctx context.Context, c Connection, s subscription.List) error {
return currySimpleUnsubConn(multi)(ctx, c, s)
},
Handler: func(context.Context, []byte) error { return nil },
Handler: func(context.Context, Connection, []byte) error { return nil },
}
require.NoError(t, multi.SetupNewConnection(amazingCandidate))

Expand Down Expand Up @@ -1130,7 +1130,7 @@ func TestFlushChannels(t *testing.T) {
Unsubscriber: func(ctx context.Context, c Connection, s subscription.List) error {
return currySimpleUnsubConn(w)(ctx, c, s)
},
Handler: func(context.Context, []byte) error { return nil },
Handler: func(context.Context, Connection, []byte) error { return nil },
}
require.NoError(t, w.SetupNewConnection(amazingCandidate))
require.NoError(t, w.FlushChannels(), "FlushChannels must not error")
Expand Down Expand Up @@ -1228,7 +1228,7 @@ func TestSetupNewConnection(t *testing.T) {
err = multi.SetupNewConnection(connSetup)
require.ErrorIs(t, err, errWebsocketDataHandlerUnset)

connSetup.Handler = func(context.Context, []byte) error { return nil }
connSetup.Handler = func(context.Context, Connection, []byte) error { return nil }
err = multi.SetupNewConnection(connSetup)
require.NoError(t, err)

Expand Down
Loading