-
Notifications
You must be signed in to change notification settings - Fork 641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: Reuse PolymarketWebSocketClient
during non-initial subscription
#2238
Labels
Comments
Something along the lines of this. From 125e4e8ab8cee63784ecf4ba989225f4e1f36c00 Mon Sep 17 00:00:00 2001
From: Ryan Tam <[email protected]>
Date: Thu, 23 Jan 2025 20:13:42 +0000
Subject: [PATCH] feat: Gate and group polymarket subscription by configurable
value
This is to avoid overwhelming polymarket by establishing many WS at
once, since polymarket will throw 429 at us.
---
nautilus_trader/adapters/polymarket/config.py | 6 +-
nautilus_trader/adapters/polymarket/data.py | 80 +++++++++----------
2 files changed, 40 insertions(+), 46 deletions(-)
diff --git a/nautilus_trader/adapters/polymarket/config.py b/nautilus_trader/adapters/polymarket/config.py
index 249625b3a..adfd02453 100644
--- a/nautilus_trader/adapters/polymarket/config.py
+++ b/nautilus_trader/adapters/polymarket/config.py
@@ -50,8 +50,8 @@ class PolymarketDataClientConfig(LiveDataClientConfig, frozen=True):
The HTTP client custom endpoint override.
base_url_ws : str, optional
The WebSocket client custom endpoint override.
- ws_connection_delay_secs : PositiveInt, default 5
- The delay (seconds) prior to main websocket connection to allow initial subscriptions to arrive.
+ ws_connection_delay_secs : PositiveFloat, default 0.1
+ The delay (seconds) prior to websocket connection to allow subscriptions to arrive.
update_instruments_interval_mins: PositiveInt or None, default 60
The interval (minutes) between updating Polymarket instruments.
compute_effective_deltas : bool, default False
@@ -69,7 +69,7 @@ class PolymarketDataClientConfig(LiveDataClientConfig, frozen=True):
passphrase: str | None = None
base_url_http: str | None = None
base_url_ws: str | None = None
- ws_connection_delay_secs: PositiveInt = 5
+ ws_connection_delay_secs: PositiveFloat = 0.1
update_instruments_interval_mins: PositiveInt | None = 60
compute_effective_deltas: bool = False
diff --git a/nautilus_trader/adapters/polymarket/data.py b/nautilus_trader/adapters/polymarket/data.py
index 15060a3b6..48f89c76d 100644
--- a/nautilus_trader/adapters/polymarket/data.py
+++ b/nautilus_trader/adapters/polymarket/data.py
@@ -107,15 +107,15 @@ class PolymarketDataClient(LiveMarketDataClient):
# WebSocket API
self._ws_base_url = self._config.base_url_ws
- self._ws_client: PolymarketWebSocketClient = self._create_websocket_client()
- self._ws_clients: dict[InstrumentId, PolymarketWebSocketClient] = {}
+ self._ws_clients: list[PolymarketWebSocketClient] = []
+ self._ws_client_pending_connection: PolymarketWebSocketClient | None = None
+
self._decoder_market_msg = msgspec.json.Decoder(MARKET_WS_MESSAGE)
# Tasks
self._update_instruments_interval_mins: int | None = config.update_instruments_interval_mins
self._update_instruments_task: asyncio.Task | None = None
- self._main_ws_connect_task: asyncio.Task | None = None
- self._main_ws_delay = True
+ self._delayed_ws_client_connection_task: asyncio.Task | None = None
# Hot caches
self._last_quotes: dict[InstrumentId, QuoteTick] = {}
@@ -131,34 +131,27 @@ class PolymarketDataClient(LiveMarketDataClient):
self._update_instruments(self._update_instruments_interval_mins),
)
- self._main_ws_connect_task = self.create_task(self._connect_main_ws_after_delay())
-
async def _disconnect(self) -> None:
if self._update_instruments_task:
self._log.debug("Canceling task 'update_instruments'")
self._update_instruments_task.cancel()
self._update_instruments_task = None
- if self._main_ws_connect_task:
- self._log.debug("Canceling task 'connect_main_ws_after_delay'")
- self._main_ws_connect_task.cancel()
- self._main_ws_connect_task = None
+ if self._delayed_ws_client_connection_task:
+ self._log.debug("Canceling task 'delayed_ws_client_connection'")
+ self._delayed_ws_client_connection_task.cancel()
+ self._delayed_ws_client_connection_task = None
# Shutdown websockets
tasks: set[Coroutine[Any, Any, None]] = set()
- if self._ws_client.is_connected():
- tasks.add(self._ws_client.disconnect())
-
- for ws_client in self._ws_clients.values():
+ for ws_client in self._ws_clients:
if ws_client.is_connected():
tasks.add(ws_client.disconnect())
if tasks:
await asyncio.gather(*tasks)
- self._main_ws_delay = True
-
def _create_websocket_client(self) -> PolymarketWebSocketClient:
self._log.info("Creating new PolymarketWebSocketClient", LogColor.MAGENTA)
return PolymarketWebSocketClient(
@@ -170,18 +163,6 @@ class PolymarketDataClient(LiveMarketDataClient):
loop=self._loop,
)
- async def _connect_main_ws_after_delay(self) -> None:
- delay_secs = self._config.ws_connection_delay_secs
- self._log.info(
- f"Awaiting initial websocket connection delay ({delay_secs}s)...",
- LogColor.BLUE,
- )
- await asyncio.sleep(delay_secs)
- if self._ws_client.asset_subscriptions():
- await self._ws_client.connect()
-
- self._main_ws_delay = False
-
def _send_all_instruments_to_data_engine(self) -> None:
for instrument in self._instrument_provider.get_all().values():
self._handle_data(instrument)
@@ -201,23 +182,36 @@ class PolymarketDataClient(LiveMarketDataClient):
except asyncio.CancelledError:
self._log.debug("Canceled task 'update_instruments'")
+ async def _delayed_ws_client_connection(
+ self,
+ ws_client: PolymarketWebSocketClient,
+ sleep_secs: int,
+ ) -> None:
+ try:
+ await asyncio.sleep(sleep_secs)
+ await ws_client.connect()
+ finally:
+ self._ws_client_pending_connection = None
+ self._delayed_ws_client_connection_task = None
+
async def _subscribe_asset_book(self, instrument_id):
- token_id = get_polymarket_token_id(instrument_id)
+ create_connect_task = False
+ if self._ws_client_pending_connection is None:
+ self._ws_client_pending_connection = self._create_websocket_client()
+ create_connect_task = True
- if not self._ws_client.is_connected():
- ws_client = self._ws_client
- if token_id in ws_client.asset_subscriptions():
- return # Already subscribed
- ws_client.subscribe_book(asset=token_id)
- if not self._main_ws_delay:
- await ws_client.connect()
- else:
- ws_client = self._create_websocket_client()
- if token_id in ws_client.asset_subscriptions():
- return # Already subscribed
- self._ws_clients[instrument_id] = ws_client
- ws_client.subscribe_book(asset=token_id)
- await ws_client.connect()
+ token_id = get_polymarket_token_id(instrument_id)
+ self._ws_client_pending_connection.subscribe_book(token_id)
+
+ if create_connect_task:
+ self._delayed_ws_client_connection_task = self.create_task(
+ self._delayed_ws_client_connection(
+ self._ws_client_pending_connection,
+ self._config.ws_connection_delay_secs,
+ ),
+ log_msg="Delayed start PolymarketWebSocketClient connection",
+ success_msg="Finished delaying start of PolymarketWebSocketClient connection",
+ )
async def _subscribe_order_book_deltas(
self,
--
2.43.0
|
(P.S. there is no RFC template so I used the feature request template) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Background
I use nautlius to record polymarket data, I subscribe to new instruments' order book as I become aware of them - which could be during a reload of instruments (i.e. not during init).
If the new batch of instruments I want to subscribe to is massive, then the number of new
PolymarketWebSocketClient
creation becomes massive as well, and it's quite easy to hit a rate limit error.Relevant code
nautilus_trader/nautilus_trader/adapters/polymarket/data.py
Lines 204 to 220 in fafcc30
Notice how in the event where
self._ws_client
is already established and connected, we then opt for always creating a new client and subscribe over another WS.Proposal
Change the meaning of
ws_connection_delay_secs
- make client always wait for this many seconds prior to connecting to WS.The text was updated successfully, but these errors were encountered: