Skip to content

Commit

Permalink
No delay when fetching blobs with known block and no attempt to recov…
Browse files Browse the repository at this point in the history
…er blobs for unknown block (#8927)
  • Loading branch information
StefanBratanov authored Dec 28, 2024
1 parent 7551460 commit e0a1e8d
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 401 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@

### Additions and Improvements
- Optimized blobs validation pipeline
- Remove delay when fetching blobs from the local EL on block arrival

### Bug Fixes
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarSchema;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlockHeader;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.BeaconBlockBody;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodyDeneb;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodySchemaDeneb;
import tech.pegasys.teku.spec.datastructures.execution.BlobAndProof;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.spec.datastructures.type.SszKZGCommitment;
import tech.pegasys.teku.spec.datastructures.type.SszKZGProof;
import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers;
Expand Down Expand Up @@ -266,6 +270,32 @@ public BlobSidecar constructBlobSidecar(
index, blob, commitment, proof, signedBeaconBlock.asHeader(), kzgCommitmentInclusionProof);
}

public BlobSidecar constructBlobSidecarFromBlobAndProof(
final BlobIdentifier blobIdentifier,
final BlobAndProof blobAndProof,
final BeaconBlockBodyDeneb beaconBlockBodyDeneb,
final SignedBeaconBlockHeader signedBeaconBlockHeader) {

final SszKZGCommitment sszKZGCommitment =
beaconBlockBodyDeneb.getBlobKzgCommitments().get(blobIdentifier.getIndex().intValue());

final BlobSidecar blobSidecar =
blobSidecarSchema.create(
blobIdentifier.getIndex(),
blobAndProof.blob(),
sszKZGCommitment,
new SszKZGProof(blobAndProof.proof()),
signedBeaconBlockHeader,
computeKzgCommitmentInclusionProof(blobIdentifier.getIndex(), beaconBlockBodyDeneb));

blobSidecar.markSignatureAsValidated();
blobSidecar.markKzgCommitmentInclusionProofAsValidated();
// assume kzg validation done by local EL
blobSidecar.markKzgAsValidated();

return blobSidecar;
}

public boolean verifyBlobKzgCommitmentInclusionProof(final BlobSidecar blobSidecar) {
if (blobSidecar.isKzgCommitmentInclusionProofValidated()) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ public BlobSidecarManagerImpl(
invalidBlobSidecarRoots,
(tracker) ->
new ForkChoiceBlobSidecarsAvailabilityChecker(spec, recentChainData, tracker, kzg),
// we don't care to set maxBlobsPerBlock since it isn't used with this immediate validation
// flow
(block) -> new BlockBlobSidecarsTracker(block.getSlotAndBlockRoot(), UInt64.ZERO));
(block) -> new BlockBlobSidecarsTracker(block.getSlotAndBlockRoot()));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@

public class BlockBlobSidecarsTracker {
private static final Logger LOG = LogManager.getLogger();

private static final UInt64 CREATION_TIMING_IDX = UInt64.MAX_VALUE;
private static final UInt64 BLOCK_ARRIVAL_TIMING_IDX = CREATION_TIMING_IDX.decrement();
private static final UInt64 RPC_FETCH_TIMING_IDX = BLOCK_ARRIVAL_TIMING_IDX.decrement();
private static final UInt64 LOCAL_EL_FETCH_TIMING_IDX = RPC_FETCH_TIMING_IDX.decrement();
private static final UInt64 RPC_BLOCK_FETCH_TIMING_IDX = BLOCK_ARRIVAL_TIMING_IDX.decrement();
private static final UInt64 RPC_BLOBS_FETCH_TIMING_IDX = RPC_BLOCK_FETCH_TIMING_IDX.decrement();
private static final UInt64 LOCAL_EL_BLOBS_FETCH_TIMING_IDX =
RPC_BLOBS_FETCH_TIMING_IDX.decrement();

private final SlotAndBlockRoot slotAndBlockRoot;
private final UInt64 maxBlobsPerBlock;

private final AtomicReference<Optional<SignedBeaconBlock>> block =
new AtomicReference<>(Optional.empty());
Expand All @@ -56,8 +58,9 @@ public class BlockBlobSidecarsTracker {
private final NavigableMap<UInt64, BlobSidecar> blobSidecars = new ConcurrentSkipListMap<>();
private final SafeFuture<Void> blobSidecarsComplete = new SafeFuture<>();

private volatile boolean rpcFetchTriggered = false;
private volatile boolean localElFetchTriggered = false;
private volatile boolean localElBlobsFetchTriggered = false;
private volatile boolean rpcBlockFetchTriggered = false;
private volatile boolean rpcBlobsFetchTriggered = false;

private final Optional<Map<UInt64, Long>> maybeDebugTimings;

Expand All @@ -69,12 +72,9 @@ public class BlockBlobSidecarsTracker {
* tracker instance will be used, so no synchronization is required
*
* @param slotAndBlockRoot slot and block root to create tracker for
* @param maxBlobsPerBlock max number of blobs per block for the slot
*/
public BlockBlobSidecarsTracker(
final SlotAndBlockRoot slotAndBlockRoot, final UInt64 maxBlobsPerBlock) {
public BlockBlobSidecarsTracker(final SlotAndBlockRoot slotAndBlockRoot) {
this.slotAndBlockRoot = slotAndBlockRoot;
this.maxBlobsPerBlock = maxBlobsPerBlock;
if (LOG.isDebugEnabled()) {
// don't need a concurrent hashmap since we'll interact with it from synchronized BlobSidecar
// pool methods
Expand Down Expand Up @@ -112,31 +112,13 @@ public Optional<BlobSidecar> getBlobSidecar(final UInt64 index) {

public Stream<BlobIdentifier> getMissingBlobSidecars() {
final Optional<Integer> blockCommitmentsCount = getBlockKzgCommitmentsCount();
if (blockCommitmentsCount.isPresent()) {
return UInt64.range(UInt64.ZERO, UInt64.valueOf(blockCommitmentsCount.get()))
.filter(blobIndex -> !blobSidecars.containsKey(blobIndex))
.map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex));
}
checkState(blockCommitmentsCount.isPresent(), "Block must be known to call this method");

if (blobSidecars.isEmpty()) {
return Stream.of();
}

// We may return maxBlobsPerBlock because we don't know the block
return UInt64.range(UInt64.ZERO, maxBlobsPerBlock)
return UInt64.range(UInt64.ZERO, UInt64.valueOf(blockCommitmentsCount.get()))
.filter(blobIndex -> !blobSidecars.containsKey(blobIndex))
.map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex));
}

public Stream<BlobIdentifier> getUnusedBlobSidecarsForBlock() {
final Optional<Integer> blockCommitmentsCount = getBlockKzgCommitmentsCount();
checkState(blockCommitmentsCount.isPresent(), "Block must me known to call this method");

final UInt64 firstUnusedIndex = UInt64.valueOf(blockCommitmentsCount.get());
return UInt64.range(firstUnusedIndex, maxBlobsPerBlock)
.map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex));
}

public boolean add(final BlobSidecar blobSidecar) {
checkArgument(
blobSidecar.getBlockRoot().equals(slotAndBlockRoot.getBlockRoot()),
Expand Down Expand Up @@ -247,24 +229,35 @@ public boolean isComplete() {
return blobSidecarsComplete.isDone();
}

public boolean isRpcFetchTriggered() {
return rpcFetchTriggered;
public boolean isLocalElBlobsFetchTriggered() {
return localElBlobsFetchTriggered;
}

public void setRpcFetchTriggered() {
this.rpcFetchTriggered = true;
public void setLocalElBlobsFetchTriggered() {
this.localElBlobsFetchTriggered = true;
maybeDebugTimings.ifPresent(
debugTimings -> debugTimings.put(RPC_FETCH_TIMING_IDX, System.currentTimeMillis()));
debugTimings ->
debugTimings.put(LOCAL_EL_BLOBS_FETCH_TIMING_IDX, System.currentTimeMillis()));
}

public boolean isLocalElFetchTriggered() {
return localElFetchTriggered;
public boolean isRpcBlockFetchTriggered() {
return rpcBlockFetchTriggered;
}

public void setLocalElFetchTriggered() {
this.localElFetchTriggered = true;
public void setRpcBlockFetchTriggered() {
this.rpcBlockFetchTriggered = true;
maybeDebugTimings.ifPresent(
debugTimings -> debugTimings.put(LOCAL_EL_FETCH_TIMING_IDX, System.currentTimeMillis()));
debugTimings -> debugTimings.put(RPC_BLOCK_FETCH_TIMING_IDX, System.currentTimeMillis()));
}

public boolean isRpcBlobsFetchTriggered() {
return rpcBlobsFetchTriggered;
}

public void setRpcBlobsFetchTriggered() {
this.rpcBlobsFetchTriggered = true;
maybeDebugTimings.ifPresent(
debugTimings -> debugTimings.put(RPC_BLOBS_FETCH_TIMING_IDX, System.currentTimeMillis()));
}

private boolean areBlobsComplete() {
Expand Down Expand Up @@ -315,22 +308,31 @@ private void printDebugTimings(final Map<UInt64, Long> debugTimings) {
.append(debugTimings.getOrDefault(BLOCK_ARRIVAL_TIMING_IDX, 0L) - creationTime)
.append("ms - ");

if (debugTimings.containsKey(LOCAL_EL_FETCH_TIMING_IDX)) {
if (debugTimings.containsKey(LOCAL_EL_BLOBS_FETCH_TIMING_IDX)) {
timingsReport
.append("Local EL blobs fetch delay ")
.append(debugTimings.get(LOCAL_EL_BLOBS_FETCH_TIMING_IDX) - creationTime)
.append("ms - ");
} else {
timingsReport.append("Local EL blobs fetch wasn't required - ");
}

if (debugTimings.containsKey(RPC_BLOCK_FETCH_TIMING_IDX)) {
timingsReport
.append("Local EL fetch delay ")
.append(debugTimings.get(LOCAL_EL_FETCH_TIMING_IDX) - creationTime)
.append("RPC block fetch delay ")
.append(debugTimings.get(RPC_BLOCK_FETCH_TIMING_IDX) - creationTime)
.append("ms - ");
} else {
timingsReport.append("Local EL fetch wasn't required - ");
timingsReport.append("RPC block fetch wasn't required - ");
}

if (debugTimings.containsKey(RPC_FETCH_TIMING_IDX)) {
if (debugTimings.containsKey(RPC_BLOBS_FETCH_TIMING_IDX)) {
timingsReport
.append("RPC fetch delay ")
.append(debugTimings.get(RPC_FETCH_TIMING_IDX) - creationTime)
.append("RPC blobs fetch delay ")
.append(debugTimings.get(RPC_BLOBS_FETCH_TIMING_IDX) - creationTime)
.append("ms");
} else {
timingsReport.append("RPC fetch wasn't required");
timingsReport.append("RPC blobs fetch wasn't required");
}

LOG.debug(timingsReport.toString());
Expand All @@ -342,8 +344,9 @@ public String toString() {
.add("slotAndBlockRoot", slotAndBlockRoot)
.add("isBlockPresent", block.get().isPresent())
.add("isComplete", isComplete())
.add("rpcFetchTriggered", rpcFetchTriggered)
.add("localElFetchTriggered", localElFetchTriggered)
.add("localElBlobsFetchTriggered", localElBlobsFetchTriggered)
.add("rpcBlockFetchTriggered", rpcBlockFetchTriggered)
.add("rpcBlobsFetchTriggered", rpcBlobsFetchTriggered)
.add("blockImportOnCompletionEnabled", blockImportOnCompletionEnabled.get())
.add(
"blobSidecars",
Expand Down
Loading

0 comments on commit e0a1e8d

Please sign in to comment.