Skip to content

Commit

Permalink
fix(bonsai-core): work around indexer race condition by managing webs…
Browse files Browse the repository at this point in the history
…ocket value lifecycles more clearly (#1420)
  • Loading branch information
tyleroooo authored Jan 9, 2025
1 parent dfd2686 commit 31ad327
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 76 deletions.
20 changes: 16 additions & 4 deletions src/abacus-ts/lib/resourceCacheManager.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { logAbacusTsError } from '../logs';

type CacheEntry<T> = {
resource: T;
count: number;
Expand All @@ -14,7 +16,7 @@ export class ResourceCacheManager<T, U> {
constructor(
private options: {
constructor: (key: U) => T;
destroyer: (resource: NoInfer<T>) => void;
destroyer: (resource: NoInfer<T>, key: NoInfer<U>) => void;
keySerializer: (key: NoInfer<U>) => string;
destroyDelayMs?: number;
}
Expand Down Expand Up @@ -42,7 +44,14 @@ export class ResourceCacheManager<T, U> {
markDone(key: U): void {
const serializedKey = this.options.keySerializer(key);
const entry = this.cache[serializedKey];
if (!entry) return;
if (!entry) {
logAbacusTsError('ResourceCacheManager', 'tried to mark done unknown key', key);
return;
}
if (entry.count <= 0) {
logAbacusTsError('ResourceCacheManager', 'tried to mark done key with no subscribers', key);
entry.count = 1;
}

entry.count -= 1;

Expand All @@ -55,9 +64,12 @@ export class ResourceCacheManager<T, U> {
const delay = this.options.destroyDelayMs ?? 1000;
entry.destroyTimeout = setTimeout(() => {
const latestVal = this.cache[serializedKey];
if (!latestVal) return;
if (!latestVal) {
logAbacusTsError('ResourceCacheManager', 'could not find resource to destroy', key);
return;
}

this.options.destroyer(latestVal.resource);
this.options.destroyer(latestVal.resource, key);
delete this.cache[serializedKey];
}, delay);
}
Expand Down
48 changes: 48 additions & 0 deletions src/abacus-ts/websocket/lib/indexerValueManagerHelpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { ResourceCacheManager } from '@/abacus-ts/lib/resourceCacheManager';
import stableStringify from 'fast-json-stable-stringify';

import { IndexerWebsocket } from './indexerWebsocket';
import { IndexerWebsocketManager } from './indexerWebsocketManager';
import { WebsocketDerivedValue } from './websocketDerivedValue';

// this is set to just above the websocket subscribe timeout because of race conditions in the indexer backend
const DESTROY_DELAY_MS = 21000;

type WebsocketValueCreator<Args, ReturnType> = (
websocket: IndexerWebsocket,
args: Args
) => WebsocketDerivedValue<ReturnType>;

export function makeWsValueManager<Args, ReturnType>(
creator: WebsocketValueCreator<Args, ReturnType>
): ResourceCacheManager<WebsocketDerivedValue<ReturnType>, Args & { wsUrl: string }> {
return new ResourceCacheManager({
constructor: (allArgs: Args & { wsUrl: string }) =>
creator(IndexerWebsocketManager.use(allArgs.wsUrl), allArgs),

destroyer: (instance, { wsUrl }) => {
instance.teardown();
IndexerWebsocketManager.markDone(wsUrl);
},

// take care - extra properties on the key will cause divergent behavior
// (cache misses, unexpected new object creation, marking incorrect objects as done, etc)
// only ever pass the exact key type for correct behavior
keySerializer: (allArgs) => stableStringify(allArgs),

destroyDelayMs: DESTROY_DELAY_MS,
});
}

export function subscribeToWsValue<Args, ReturnType>(
manager: ResourceCacheManager<WebsocketDerivedValue<ReturnType>, Args & { wsUrl: string }>,
args: NoInfer<Args> & { wsUrl: string },
handleChange: (val: NoInfer<ReturnType>) => void
): () => void {
const value = manager.use(args);
const unsub = value.subscribe(handleChange);
return () => {
unsub();
manager.markDone(args);
};
}
2 changes: 2 additions & 0 deletions src/abacus-ts/websocket/lib/reconnectingWebsocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ class WebSocketConnection {
1001,
// normal but no code
1005,
// supposedly abnormal tcp failure but super super common
1006,
]);
if (!allowedCodes.has(close.code)) {
logAbacusTsError('WebSocketConnection', `socket ${this.id} closed abnormally`, {
Expand Down
7 changes: 1 addition & 6 deletions src/abacus-ts/websocket/lib/websocketDerivedValue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@ export class WebsocketDerivedValue<T> {
handleBaseData: (data: any, value: T, fullMessage: any) => T;
handleUpdates: (updates: any[], value: T, fullMessage: any) => T;
},
value: T,
changeHandler: ((val: T) => void) | undefined
value: T
) {
this.value = value;

if (changeHandler) {
this.subscribe(changeHandler);
}

this.unsubFromWs = websocket.addChannelSubscription({
channel: sub.channel,
id: sub.id,
Expand Down
24 changes: 11 additions & 13 deletions src/abacus-ts/websocket/markets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ import { setAllMarketsRaw } from '@/state/raw';

import { createStoreEffect } from '../lib/createStoreEffect';
import { Loadable, loadableLoaded, loadablePending } from '../lib/loadable';
import { logAbacusTsError } from '../logs';
import { MarketsData } from '../rawTypes';
import { selectWebsocketUrl } from '../socketSelectors';
import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers';
import { IndexerWebsocket } from './lib/indexerWebsocket';
import { IndexerWebsocketManager } from './lib/indexerWebsocketManager';
import { WebsocketDerivedValue } from './lib/websocketDerivedValue';

function marketsWebsocketValue(
websocket: IndexerWebsocket,
onChange: (val: Loadable<MarketsData>) => void
) {
function marketsWebsocketValueCreator(websocket: IndexerWebsocket) {
return new WebsocketDerivedValue<Loadable<MarketsData>>(
websocket,
{
Expand All @@ -34,8 +32,7 @@ function marketsWebsocketValue(
const updates = isWsMarketUpdateResponses(baseUpdates);
let startingValue = value.data;
if (startingValue == null) {
// eslint-disable-next-line no-console
console.log('MarketsTracker found unexpectedly null base data in update');
logAbacusTsError('MarketsTracker', 'found unexpectedly null base data in update');
return value;
}
startingValue = { ...startingValue };
Expand Down Expand Up @@ -65,24 +62,25 @@ function marketsWebsocketValue(
return loadableLoaded(startingValue);
},
},
loadablePending(),
onChange
loadablePending()
);
}

const MarketsValueManager = makeWsValueManager(marketsWebsocketValueCreator);

export function setUpMarkets(store: RootStore) {
const throttledSetMarkets = throttle((val: Loadable<MarketsData>) => {
store.dispatch(setAllMarketsRaw(val));
}, 2 * timeUnits.second);

return createStoreEffect(store, selectWebsocketUrl, (wsUrl) => {
const thisTracker = marketsWebsocketValue(IndexerWebsocketManager.use(wsUrl), (val) =>
const unsub = subscribeToWsValue(MarketsValueManager, { wsUrl }, (val) =>
throttledSetMarkets(val)
);

return () => {
thisTracker.teardown();
IndexerWebsocketManager.markDone(wsUrl);
unsub();
throttledSetMarkets.cancel();
store.dispatch(setAllMarketsRaw(loadablePending()));
};
});
}
25 changes: 12 additions & 13 deletions src/abacus-ts/websocket/orderbook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ import { isTruthy } from '@/lib/isTruthy';

import { createStoreEffect } from '../lib/createStoreEffect';
import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable';
import { logAbacusTsError } from '../logs';
import { OrderbookData } from '../rawTypes';
import { selectWebsocketUrl } from '../socketSelectors';
import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers';
import { IndexerWebsocket } from './lib/indexerWebsocket';
import { IndexerWebsocketManager } from './lib/indexerWebsocketManager';
import { WebsocketDerivedValue } from './lib/websocketDerivedValue';

function orderbookWebsocketValue(
function orderbookWebsocketValueCreator(
websocket: IndexerWebsocket,
marketId: string,
onChange: (val: Loadable<OrderbookData>) => void
{ marketId }: { marketId: string }
) {
return new WebsocketDerivedValue<Loadable<OrderbookData>>(
websocket,
Expand All @@ -45,8 +45,7 @@ function orderbookWebsocketValue(
const updates = isWsOrderbookUpdateResponses(baseUpdates);
let startingValue = value.data;
if (startingValue == null) {
// eslint-disable-next-line no-console
console.log('MarketsTracker found unexpectedly null base data in update');
logAbacusTsError('OrderbookTracker', 'found unexpectedly null base data in update');
return value;
}
startingValue = { asks: { ...startingValue.asks }, bids: { ...startingValue.bids } };
Expand All @@ -61,11 +60,12 @@ function orderbookWebsocketValue(
return loadableLoaded(startingValue);
},
},
loadablePending(),
onChange
loadablePending()
);
}

const OrderbookValueManager = makeWsValueManager(orderbookWebsocketValueCreator);

const selectMarketAndWsInfo = createAppSelector(
[selectWebsocketUrl, getCurrentMarketIdIfTradeable],
(wsUrl, currentMarketId) => ({ wsUrl, currentMarketId })
Expand All @@ -80,15 +80,14 @@ export function setUpOrderbook(store: RootStore) {
store.dispatch(setOrderbookRaw({ marketId: currentMarketId, data }));
}, timeUnits.second / 2);

const thisTracker = orderbookWebsocketValue(
IndexerWebsocketManager.use(wsUrl),
currentMarketId,
const unsub = subscribeToWsValue(
OrderbookValueManager,
{ wsUrl, marketId: currentMarketId },
(data) => throttledSetOrderbook(data)
);

return () => {
thisTracker.teardown();
IndexerWebsocketManager.markDone(wsUrl);
unsub();
throttledSetOrderbook.cancel();
store.dispatch(setOrderbookRaw({ marketId: currentMarketId, data: loadableIdle() }));
};
Expand Down
29 changes: 16 additions & 13 deletions src/abacus-ts/websocket/parentSubaccount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import { createStoreEffect } from '../lib/createStoreEffect';
import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable';
import { ChildSubaccountData, ParentSubaccountData } from '../rawTypes';
import { selectParentSubaccountInfo, selectWebsocketUrl } from '../socketSelectors';
import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers';
import { IndexerWebsocket } from './lib/indexerWebsocket';
import { IndexerWebsocketManager } from './lib/indexerWebsocketManager';
import { WebsocketDerivedValue } from './lib/websocketDerivedValue';

function isValidSubaccount(childSubaccount: IndexerSubaccountResponseObject) {
Expand Down Expand Up @@ -64,11 +64,14 @@ function freshChildSubaccount({
};
}

function accountWebsocketValue(
interface AccountValueArgsBase {
address: string;
parentSubaccountNumber: string;
}

function accountWebsocketValueCreator(
websocket: IndexerWebsocket,
address: string,
parentSubaccountNumber: string,
onChange: (val: Loadable<ParentSubaccountData>) => void
{ address, parentSubaccountNumber }: AccountValueArgsBase
) {
return new WebsocketDerivedValue<Loadable<ParentSubaccountData>>(
websocket,
Expand Down Expand Up @@ -214,11 +217,12 @@ function accountWebsocketValue(
return { ...value, data: resultData };
},
},
loadablePending(),
onChange
loadablePending()
);
}

const AccountValueManager = makeWsValueManager(accountWebsocketValueCreator);

const selectParentSubaccount = createAppSelector(
[selectWebsocketUrl, selectParentSubaccountInfo],
(wsUrl, { wallet, subaccount }) => ({ wsUrl, wallet, subaccount })
Expand All @@ -229,16 +233,15 @@ export function setUpParentSubaccount(store: RootStore) {
if (!isTruthy(wallet) || subaccount == null) {
return undefined;
}
const thisTracker = accountWebsocketValue(
IndexerWebsocketManager.use(wsUrl),
wallet,
subaccount.toString(),

const unsub = subscribeToWsValue(
AccountValueManager,
{ wsUrl, address: wallet, parentSubaccountNumber: subaccount.toString() },
(val) => store.dispatch(setParentSubaccountRaw(val))
);

return () => {
thisTracker.teardown();
IndexerWebsocketManager.markDone(wsUrl);
unsub();
store.dispatch(setParentSubaccountRaw(loadableIdle()));
};
});
Expand Down
Loading

0 comments on commit 31ad327

Please sign in to comment.