Skip to content

Commit

Permalink
Use new ReconciliationAnnounceEntries/-TerminatePayload format
Browse files Browse the repository at this point in the history
  • Loading branch information
xash committed Nov 13, 2024
1 parent 2c6df3e commit 94eed96
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/wgps/reconciliation/announcer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type AnnouncementPack<
// Then send a ReconciliationAnnounceEntries
announcement: {
range: Range3d<SubspaceId>;
count: number;
isEmpty: boolean;
wantResponse: boolean;
senderHandle: bigint;
receiverHandle: bigint;
Expand Down Expand Up @@ -176,7 +176,7 @@ export class Announcer<
this.announcementPackQueue.push({
staticTokenBinds,
announcement: {
count: entries.length,
isEmpty: entries.length === 0,
range: announcement.range,
receiverHandle: announcement.receiverHandle,
senderHandle: announcement.senderHandle,
Expand Down
18 changes: 8 additions & 10 deletions src/wgps/reconciliation/reconcile_msg_tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {
MsgReconciliationAnnounceEntries,
MsgReconciliationSendEntry,
MsgReconciliationSendFingerprint,
MsgReconciliationTerminatePayload,
ReconciliationPrivy,
} from "../types.ts";

Expand Down Expand Up @@ -37,7 +38,7 @@ export class ReconcileMsgTracker<

private announcedRange: Range3d<SubspaceId>;
private announcedNamespace: NamespaceId;
private announcedEntriesRemaining = 0n;
private announcedEntriesExpecting = false;

private handleToNamespaceId: (aoiHandle: bigint) => NamespaceId;

Expand Down Expand Up @@ -76,7 +77,7 @@ export class ReconcileMsgTracker<

this.announcedRange = msg.range;
this.announcedNamespace = this.handleToNamespaceId(msg.receiverHandle);
this.announcedEntriesRemaining = msg.count;
this.announcedEntriesExpecting = !msg.isEmpty;
}

onSendEntry(
Expand All @@ -90,25 +91,22 @@ export class ReconcileMsgTracker<
this.prevEntry = msg.entry.entry;
this.prevToken = msg.staticTokenHandle;

this.announcedEntriesRemaining -= 1n;

this.isAwaitingTermination = true;
}

onTerminatePayload() {
onTerminatePayload(msg: MsgReconciliationTerminatePayload) {
this.isAwaitingTermination = false;
if (msg.isFinal) {
this.announcedEntriesExpecting = false;
}
}

isExpectingPayloadOrTermination() {
return this.isAwaitingTermination;
}

isExpectingReconciliationSendEntry() {
if (this.announcedEntriesRemaining > 0n) {
return true;
}

return false;
return this.announcedEntriesExpecting;
}

getPrivy(): ReconciliationPrivy<NamespaceId, SubspaceId, PayloadDigest> {
Expand Down
6 changes: 3 additions & 3 deletions src/wgps/reconciliation/reconciler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export class Reconciler<

private announceQueue = new FIFO<{
range: Range3d<SubspaceId>;
count: number;
isEmpty: boolean;
wantResponse: boolean;
covers: bigint | typeof COVERS_NONE;
}>();
Expand Down Expand Up @@ -166,14 +166,14 @@ export class Reconciler<
if (this.fingerprintScheme.isEqual(fingerprint, fingerprintOursFinal)) {
this.announceQueue.push({
range,
count: 0,
isEmpty: true,
wantResponse: false,
covers: BigInt(yourRangeCounter),
});
} else if (size <= SEND_ENTRIES_THRESHOLD) {
this.announceQueue.push({
range,
count: size,
isEmpty: size === 0,
wantResponse: true,
covers: BigInt(yourRangeCounter),
});
Expand Down
19 changes: 12 additions & 7 deletions src/wgps/wgps_messenger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ export class WgpsMessenger<
private currentlyReceivingEntries: {
namespace: NamespaceId;
range: Range3d<SubspaceId>;
remaining: bigint;
expecting: boolean;
} | undefined;

private reconciliationPayloadIngester: PayloadIngester<
Expand Down Expand Up @@ -1020,7 +1020,7 @@ export class WgpsMessenger<
// Then announce the entries.
this.encoder.encode({
kind: MsgKind.ReconciliationAnnounceEntries,
count: BigInt(pack.announcement.count),
isEmpty: pack.announcement.isEmpty,
range: pack.announcement.range,
wantResponse: pack.announcement.wantResponse,
willSort: true,
Expand All @@ -1030,6 +1030,8 @@ export class WgpsMessenger<
});

// Then send the entries.
const finalEntry = pack.entries.at(-1);

for (const entry of pack.entries) {
this.encoder.encode({
kind: MsgKind.ReconciliationSendEntry,
Expand All @@ -1042,6 +1044,7 @@ export class WgpsMessenger<

this.encoder.encode({
kind: MsgKind.ReconciliationTerminatePayload,
isFinal: entry === finalEntry,
});
}
});
Expand Down Expand Up @@ -1333,7 +1336,7 @@ export class WgpsMessenger<
case MsgKind.ReconciliationAnnounceEntries: {
if (
this.currentlyReceivingEntries &&
this.currentlyReceivingEntries.remaining > 0n
this.currentlyReceivingEntries.expecting
) {
throw new WgpsMessageValidationError(
"Never received the entries we were promised...",
Expand All @@ -1346,7 +1349,7 @@ export class WgpsMessenger<

// Set the currently receiving namespace and range and expected count.
this.currentlyReceivingEntries = {
remaining: message.count,
expecting: !message.isEmpty,
namespace: reconciler.store.namespace,
range: message.range,
};
Expand Down Expand Up @@ -1387,7 +1390,7 @@ export class WgpsMessenger<
);
}

if (this.currentlyReceivingEntries.remaining <= 0n) {
if (!this.currentlyReceivingEntries.expecting) {
throw new WgpsMessageValidationError(
"Received entry for an announcement when we are not expecting any more!",
);
Expand Down Expand Up @@ -1418,8 +1421,6 @@ export class WgpsMessenger<
);
}

this.currentlyReceivingEntries.remaining -= 1n;

this.reconciliationPayloadIngester.target(
message.entry.entry,
message.entry.available === message.entry.entry.payloadLength,
Expand All @@ -1434,6 +1435,10 @@ export class WgpsMessenger<
break;
}
case MsgKind.ReconciliationTerminatePayload: {
if (message.isFinal && this.currentlyReceivingEntries) {
this.currentlyReceivingEntries.expecting = false;
}

const entryToRequestPayloadFor = this.reconciliationPayloadIngester
.terminate();

Expand Down

0 comments on commit 94eed96

Please sign in to comment.