Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: force subscription store count check as stop gap for wrapper side implementation #1717

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions exchanges/bybit/bybit_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/types"
)

Expand All @@ -33,10 +34,11 @@ type Authenticate struct {

// SubscriptionArgument represents a subscription arguments.
type SubscriptionArgument struct {
auth bool `json:"-"`
RequestID string `json:"req_id"`
Operation string `json:"op"`
Arguments []string `json:"args"`
auth bool `json:"-"`
RequestID string `json:"req_id"`
Operation string `json:"op"`
Arguments []string `json:"args"`
associatedSubs subscription.List `json:"-"` // Used to store associated subscriptions
}

// Fee holds fee information
Expand Down
40 changes: 29 additions & 11 deletions exchanges/bybit/bybit_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,28 +167,30 @@ func (by *Bybit) handleSubscriptions(operation string, subs subscription.List) (
if err != nil {
return
}
chans := []string{}
authChans := []string{}
var chans subscription.List
var authChans subscription.List
for _, s := range subs {
if s.Authenticated {
authChans = append(authChans, s.QualifiedChannel)
authChans = append(authChans, s)
} else {
chans = append(chans, s.QualifiedChannel)
chans = append(chans, s)
}
}
for _, b := range common.Batch(chans, 10) {
args = append(args, SubscriptionArgument{
Operation: operation,
RequestID: strconv.FormatInt(by.Websocket.Conn.GenerateMessageID(false), 10),
Arguments: b,
Operation: operation,
RequestID: strconv.FormatInt(by.Websocket.Conn.GenerateMessageID(false), 10),
Arguments: b.QualifiedChannels(),
associatedSubs: b,
})
}
if len(authChans) != 0 {
args = append(args, SubscriptionArgument{
auth: true,
Operation: operation,
RequestID: strconv.FormatInt(by.Websocket.Conn.GenerateMessageID(false), 10),
Arguments: authChans,
auth: true,
Operation: operation,
RequestID: strconv.FormatInt(by.Websocket.Conn.GenerateMessageID(false), 10),
Arguments: authChans.QualifiedChannels(),
associatedSubs: authChans,
})
}
return
Expand Down Expand Up @@ -225,6 +227,22 @@ func (by *Bybit) handleSpotSubscription(operation string, channelsToSubscribe su
if !resp.Success {
return fmt.Errorf("%s with request ID %s msg: %s", resp.Operation, resp.RequestID, resp.RetMsg)
}

var conn stream.Connection
if payloads[a].auth {
conn = by.Websocket.AuthConn
} else {
conn = by.Websocket.Conn
}

if operation == "unsubscribe" {
err = by.Websocket.RemoveSubscriptions(conn, payloads[a].associatedSubs...)
} else {
err = by.Websocket.AddSubscriptions(conn, payloads[a].associatedSubs...)
}
if err != nil {
return err
}
}
return nil
}
Expand Down
9 changes: 8 additions & 1 deletion exchanges/deribit/deribit_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,14 @@ func (d *Deribit) handleSubscription(method string, subs subscription.List) erro
err = common.AppendError(err, errors.New(s.String()))
}
}
return err
if err != nil {
return err
}

if method == "unsubscribe" {
return d.Websocket.RemoveSubscriptions(d.Websocket.Conn, subs...)
}
return d.Websocket.AddSubscriptions(d.Websocket.Conn, subs...)
}

func getValidatedCurrencyCode(pair currency.Pair) string {
Expand Down
59 changes: 40 additions & 19 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ var (
errReadMessageErrorsNil = errors.New("read message errors is nil")
errWebsocketSubscriptionsGeneratorUnset = errors.New("websocket subscriptions generator function needs to be set")
errSubscriptionsExceedsLimit = errors.New("subscriptions exceeds limit")
errSubscriptionsNotAdded = errors.New("subscriptions not added")
errSubscriptionsNotRemoved = errors.New("subscriptions not removed")
errInvalidMaxSubscriptions = errors.New("max subscriptions cannot be less than 0")
errSameProxyAddress = errors.New("cannot set proxy address to the same address")
errNoConnectFunc = errors.New("websocket connect func not set")
Expand Down Expand Up @@ -372,6 +374,10 @@ func (w *Websocket) connect() error {
if err := w.SubscribeToChannels(nil, subs); err != nil {
return err
}

if w.subscriptions.Len() != len(subs) {
return fmt.Errorf("%s %w expecting %d subscribed", w.exchangeName, errSubscriptionsNotAdded, len(subs))
}
}
return nil
}
Expand Down Expand Up @@ -455,6 +461,11 @@ func (w *Websocket) connect() error {
break
}

if len(subs) != 0 && w.connectionManager[i].Subscriptions.Len() != len(subs) {
multiConnectFatalError = fmt.Errorf("%v %w expecting %d subscribed %v", w.exchangeName, errSubscriptionsNotAdded, len(subs), subs)
break
}

if w.verbose {
log.Debugf(log.WebsocketMgr, "%s websocket: [conn:%d] [URL:%s] connected. [Subscribed: %d]",
w.exchangeName,
Expand Down Expand Up @@ -625,14 +636,7 @@ func (w *Websocket) FlushChannels() error {
if err != nil {
return err
}
subs, unsubs := w.GetChannelDifference(nil, newSubs)
if err := w.UnsubscribeChannels(nil, unsubs); err != nil {
return err
}
if len(subs) == 0 {
return nil
}
return w.SubscribeToChannels(nil, subs)
return w.updateChannelSubscriptions(nil, w.subscriptions, newSubs)
}

for x := range w.connectionManager {
Expand All @@ -658,17 +662,9 @@ func (w *Websocket) FlushChannels() error {
w.connectionManager[x].Connection = conn
}

subs, unsubs := w.GetChannelDifference(w.connectionManager[x].Connection, newSubs)

if len(unsubs) != 0 {
if err := w.UnsubscribeChannels(w.connectionManager[x].Connection, unsubs); err != nil {
return err
}
}
if len(subs) != 0 {
if err := w.SubscribeToChannels(w.connectionManager[x].Connection, subs); err != nil {
return err
}
err = w.updateChannelSubscriptions(w.connectionManager[x].Connection, w.connectionManager[x].Subscriptions, newSubs)
if err != nil {
return err
}

// If there are no subscriptions to subscribe to, close the connection as it is no longer needed.
Expand All @@ -683,6 +679,31 @@ func (w *Websocket) FlushChannels() error {
return nil
}

// updateChannelSubscriptions subscribes or unsubscribes from channels and checks that the correct number of channels
// have been subscribed to or unsubscribed from.
func (w *Websocket) updateChannelSubscriptions(c Connection, store *subscription.Store, incoming subscription.List) error {
subs, unsubs := w.GetChannelDifference(c, incoming)
if len(unsubs) != 0 {
prevState := store.Len()
if err := w.UnsubscribeChannels(c, unsubs); err != nil {
return err
}
if diff := prevState - store.Len(); diff != len(unsubs) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't feel right ...

  1. Shouldn't Unsubscribe be erroring if it wasn't successful ?
  2. Shouldn't the state of all subs in unsubs be changing ?
    That said, it's not too bad either.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't Unsubscribe be erroring if it wasn't successful ?

I think it should, I just added it as a catch all in the event we forgot to remove it from the store when it was successful. Then it should complain. should 😬

Shouldn't the state of all subs in unsubs be changing

Now you are making me think this is all completely wrong 😆. This specifically didn't catch any issues. Can you suggest a better way as a back up check? Cause I am drooling at my screen trying to figure it out 🤤.

return fmt.Errorf("%v %w expected %d unsubscribed", w.exchangeName, errSubscriptionsNotRemoved, len(unsubs))
}
}
if len(subs) != 0 {
prevState := store.Len()
if err := w.SubscribeToChannels(c, subs); err != nil {
return err
}
if diff := store.Len() - prevState; diff != len(subs) {
return fmt.Errorf("%v %w expected %d subscribed", w.exchangeName, errSubscriptionsNotAdded, len(subs))
}
}
return nil
}

func (w *Websocket) setState(s uint32) {
w.state.Store(s)
}
Expand Down
Loading
Loading