diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index bbdb0fccedc..1fce18dd2aa 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -63,6 +64,7 @@ import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackerFactory; import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; import tech.pegasys.teku.statetransition.block.BlockImportChannel; +import tech.pegasys.teku.statetransition.validation.BlobSidecarGossipValidator; import tech.pegasys.teku.storage.client.RecentChainData; public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHistoricalSlot @@ -98,6 +100,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis private final AsyncRunner asyncRunner; private final RecentChainData recentChainData; private final ExecutionLayerChannel executionLayer; + private final Supplier gossipValidatorSupplier; private final Consumer blobSidecarGossipPublisher; private final int maxTrackers; @@ -129,6 +132,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis final AsyncRunner asyncRunner, final RecentChainData recentChainData, final ExecutionLayerChannel executionLayer, + final Supplier gossipValidatorSupplier, final Consumer blobSidecarGossipPublisher, final UInt64 historicalSlotTolerance, final UInt64 futureSlotTolerance, @@ -140,6 +144,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis this.asyncRunner = asyncRunner; this.recentChainData = recentChainData; this.executionLayer = executionLayer; + this.gossipValidatorSupplier = gossipValidatorSupplier; this.blobSidecarGossipPublisher = blobSidecarGossipPublisher; this.maxTrackers = maxTrackers; this.sizeGauge = sizeGauge; @@ -159,6 +164,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis final AsyncRunner asyncRunner, final RecentChainData recentChainData, final ExecutionLayerChannel executionLayer, + final Supplier gossipValidatorSupplier, final Consumer blobSidecarGossipPublisher, final UInt64 historicalSlotTolerance, final UInt64 futureSlotTolerance, @@ -171,6 +177,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis this.asyncRunner = asyncRunner; this.recentChainData = recentChainData; this.executionLayer = executionLayer; + this.gossipValidatorSupplier = gossipValidatorSupplier; this.blobSidecarGossipPublisher = blobSidecarGossipPublisher; this.maxTrackers = maxTrackers; this.sizeGauge = sizeGauge; @@ -224,8 +231,8 @@ public synchronized void onNewBlobSidecar( sizeGauge.set(++totalBlobSidecars, GAUGE_BLOB_SIDECARS_LABEL); countBlobSidecar(remoteOrigin); newBlobSidecarSubscribers.deliver(NewBlobSidecarSubscriber::onNewBlobSidecar, blobSidecar); - if (remoteOrigin.equals(LOCAL_EL)) { - blobSidecarGossipPublisher.accept(blobSidecar); + if (remoteOrigin.equals(LOCAL_EL) && slotAndBlockRoot.getSlot().equals(getCurrentSlot())) { + publishRecoveredBlobSidecar(blobSidecar); } } else { countDuplicateBlobSidecar(remoteOrigin); @@ -236,6 +243,12 @@ public synchronized void onNewBlobSidecar( } } + private void publishRecoveredBlobSidecar(final BlobSidecar blobSidecar) { + LOG.debug("Publishing recovered blob sidecar {}", blobSidecar::toLogString); + gossipValidatorSupplier.get().markForEquivocation(blobSidecar); + blobSidecarGossipPublisher.accept(blobSidecar); + } + private void countBlobSidecar(final RemoteOrigin origin) { switch (origin) { case RPC -> poolStatsCounters.labels(COUNTER_SIDECAR_TYPE, COUNTER_RPC_SUBTYPE).inc(); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/PoolFactory.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/PoolFactory.java index 236bfba5155..b97236c4c39 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/PoolFactory.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/PoolFactory.java @@ -16,6 +16,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Collections; import java.util.function.Consumer; +import java.util.function.Supplier; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.metrics.Counter; import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; @@ -31,6 +32,7 @@ import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel; import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackerFactory; import tech.pegasys.teku.statetransition.block.BlockImportChannel; +import tech.pegasys.teku.statetransition.validation.BlobSidecarGossipValidator; import tech.pegasys.teku.storage.client.RecentChainData; public class PoolFactory { @@ -117,6 +119,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers( final AsyncRunner asyncRunner, final RecentChainData recentChainData, final ExecutionLayerChannel executionLayer, + final Supplier gossipValidatorSupplier, final Consumer blobSidecarGossipPublisher) { return createPoolForBlockBlobSidecarsTrackers( blockImportChannel, @@ -125,6 +128,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers( asyncRunner, recentChainData, executionLayer, + gossipValidatorSupplier, blobSidecarGossipPublisher, DEFAULT_HISTORICAL_SLOT_TOLERANCE, FutureItems.DEFAULT_FUTURE_SLOT_TOLERANCE, @@ -138,6 +142,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers( final AsyncRunner asyncRunner, final RecentChainData recentChainData, final ExecutionLayerChannel executionLayer, + final Supplier gossipValidatorSupplier, final Consumer blobSidecarGossipPublisher, final UInt64 historicalBlockTolerance, final UInt64 futureBlockTolerance, @@ -151,6 +156,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers( asyncRunner, recentChainData, executionLayer, + gossipValidatorSupplier, blobSidecarGossipPublisher, historicalBlockTolerance, futureBlockTolerance, @@ -165,6 +171,7 @@ BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers( final AsyncRunner asyncRunner, final RecentChainData recentChainData, final ExecutionLayerChannel executionLayer, + final Supplier gossipValidatorSupplier, final Consumer blobSidecarGossipPublisher, final UInt64 historicalBlockTolerance, final UInt64 futureBlockTolerance, @@ -179,6 +186,7 @@ BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers( asyncRunner, recentChainData, executionLayer, + gossipValidatorSupplier, blobSidecarGossipPublisher, historicalBlockTolerance, futureBlockTolerance, diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlobSidecarGossipValidator.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlobSidecarGossipValidator.java index e64df5a44f9..685dc1bfc9a 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlobSidecarGossipValidator.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlobSidecarGossipValidator.java @@ -262,11 +262,7 @@ public SafeFuture validate(final BlobSidecar blobSidec * [IGNORE] The sidecar is the first sidecar for the tuple (block_header.slot, block_header.proposer_index, blob_sidecar.index) * with valid header signature, sidecar inclusion proof, and kzg proof. */ - if (!receivedValidBlobSidecarInfoSet.add( - new SlotProposerIndexAndBlobIndex( - blockHeader.getSlot(), - blockHeader.getProposerIndex(), - blobSidecar.getIndex()))) { + if (!markForEquivocation(blockHeader, blobSidecar.getIndex())) { return ignore( "BlobSidecar is not the first valid for its slot and index. It will be dropped."); } @@ -277,6 +273,17 @@ public SafeFuture validate(final BlobSidecar blobSidec }); } + private boolean markForEquivocation(final BeaconBlockHeader blockHeader, final UInt64 index) { + return receivedValidBlobSidecarInfoSet.add( + new SlotProposerIndexAndBlobIndex( + blockHeader.getSlot(), blockHeader.getProposerIndex(), index)); + } + + public boolean markForEquivocation(final BlobSidecar blobSidecar) { + return markForEquivocation( + blobSidecar.getSignedBeaconBlockHeader().getMessage(), blobSidecar.getIndex()); + } + private SafeFuture validateBlobSidecarWithKnownValidHeader( final BlobSidecar blobSidecar, final BeaconBlockHeader blockHeader) { @@ -310,9 +317,7 @@ private SafeFuture validateBlobSidecarWithKnownValidHe * [IGNORE] The sidecar is the first sidecar for the tuple (block_header.slot, block_header.proposer_index, blob_sidecar.index) * with valid header signature, sidecar inclusion proof, and kzg proof. */ - if (!receivedValidBlobSidecarInfoSet.add( - new SlotProposerIndexAndBlobIndex( - blockHeader.getSlot(), blockHeader.getProposerIndex(), blobSidecar.getIndex()))) { + if (!markForEquivocation(blockHeader, blobSidecar.getIndex())) { return SafeFuture.completedFuture( ignore("BlobSidecar is not the first valid for its slot and index. It will be dropped.")); } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java index 908bcc962c6..c5dadf551c2 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java @@ -65,6 +65,7 @@ import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin; import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTracker; import tech.pegasys.teku.statetransition.block.BlockImportChannel; +import tech.pegasys.teku.statetransition.validation.BlobSidecarGossipValidator; import tech.pegasys.teku.storage.client.RecentChainData; public class BlockBlobSidecarsTrackersPoolImplTest { @@ -82,6 +83,9 @@ public class BlockBlobSidecarsTrackersPoolImplTest { @SuppressWarnings("unchecked") private final Consumer blobSidecarPublisher = mock(Consumer.class); + private final BlobSidecarGossipValidator blobSidecarGossipValidator = + mock(BlobSidecarGossipValidator.class); + private final BlockImportChannel blockImportChannel = mock(BlockImportChannel.class); private final int maxItems = 15; private final BlockBlobSidecarsTrackersPoolImpl blockBlobSidecarsTrackersPool = @@ -93,6 +97,7 @@ public class BlockBlobSidecarsTrackersPoolImplTest { asyncRunner, recentChainData, executionLayer, + () -> blobSidecarGossipValidator, blobSidecarPublisher, historicalTolerance, futureTolerance, @@ -203,7 +208,7 @@ public void onNewBlobSidecar_shouldIgnoreDuplicates() { } @Test - public void onNewBlobSidecar_shouldPublishWhenOriginIsLocalEL() { + public void onNewBlobSidecar_shouldMarkForEquivocationAndPublishWhenOriginIsLocalEL() { final BlobSidecar blobSidecar1 = dataStructureUtil .createRandomBlobSidecarBuilder() @@ -220,6 +225,8 @@ public void onNewBlobSidecar_shouldPublishWhenOriginIsLocalEL() { .signedBeaconBlockHeader(dataStructureUtil.randomSignedBeaconBlockHeader(currentSlot)) .build(); + when(blobSidecarGossipValidator.markForEquivocation(blobSidecar1)).thenReturn(true); + blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar1, RemoteOrigin.LOCAL_EL); blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar2, RemoteOrigin.GOSSIP); blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar3, RemoteOrigin.RPC); @@ -227,9 +234,49 @@ public void onNewBlobSidecar_shouldPublishWhenOriginIsLocalEL() { assertBlobSidecarsCount(3); assertBlobSidecarsTrackersCount(3); + verify(blobSidecarGossipValidator).markForEquivocation(blobSidecar1); verify(blobSidecarPublisher, times(1)).accept(blobSidecar1); } + @Test + public void onNewBlobSidecar_shouldPublishWhenOriginIsLocalELAndEquivocating() { + final BlobSidecar blobSidecar1 = + dataStructureUtil + .createRandomBlobSidecarBuilder() + .signedBeaconBlockHeader(dataStructureUtil.randomSignedBeaconBlockHeader(currentSlot)) + .build(); + + when(blobSidecarGossipValidator.markForEquivocation(blobSidecar1)).thenReturn(false); + + blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar1, RemoteOrigin.LOCAL_EL); + + assertBlobSidecarsCount(1); + assertBlobSidecarsTrackersCount(1); + + verify(blobSidecarGossipValidator).markForEquivocation(blobSidecar1); + verify(blobSidecarPublisher, times(1)).accept(blobSidecar1); + } + + @Test + public void onNewBlobSidecar_shouldNotPublishWhenOriginIsLocalELIsNotCurrentSlot() { + final BlobSidecar blobSidecar1 = + dataStructureUtil + .createRandomBlobSidecarBuilder() + .signedBeaconBlockHeader(dataStructureUtil.randomSignedBeaconBlockHeader(currentSlot)) + .build(); + + when(blobSidecarGossipValidator.markForEquivocation(blobSidecar1)).thenReturn(false); + blockBlobSidecarsTrackersPool.onSlot(currentSlot.plus(1)); + + blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar1, RemoteOrigin.LOCAL_EL); + + assertBlobSidecarsCount(1); + assertBlobSidecarsTrackersCount(1); + + verify(blobSidecarGossipValidator, never()).markForEquivocation(blobSidecar1); + verify(blobSidecarPublisher, never()).accept(blobSidecar1); + } + @Test public void onNewBlock_shouldIgnorePreDenebBlocks() { final Spec spec = TestSpecFactory.createMainnetCapella(); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/validation/BlobSidecarGossipValidatorTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/validation/BlobSidecarGossipValidatorTest.java index 38f9ddc81aa..88986b5ec53 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/validation/BlobSidecarGossipValidatorTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/validation/BlobSidecarGossipValidatorTest.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.statetransition.validation; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.clearInvocations; @@ -277,6 +278,16 @@ void shouldTrackValidInfoSet() { .isCompletedWithValueMatching(InternalValidationResult::isIgnore); } + @TestTemplate + void shouldMarkForEquivocation() { + assertThat(blobSidecarValidator.markForEquivocation(blobSidecar)).isTrue(); + + assertThat(blobSidecarValidator.markForEquivocation(blobSidecar)).isFalse(); + + SafeFutureAssert.assertThatSafeFuture(blobSidecarValidator.validate(blobSidecar)) + .isCompletedWithValueMatching(InternalValidationResult::isIgnore); + } + @TestTemplate void shouldIgnoreImmediatelyWhenBlobFromValidInfoSet() { SafeFutureAssert.assertThatSafeFuture(blobSidecarValidator.validate(blobSidecar)) diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index b76b7cda2c1..f26cbb1fdd1 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -276,6 +276,7 @@ public class BeaconChainController extends Service implements BeaconChainControl protected volatile GossipValidationHelper gossipValidationHelper; protected volatile KZG kzg; protected volatile BlobSidecarManager blobSidecarManager; + protected volatile BlobSidecarGossipValidator blobSidecarValidator; protected volatile Optional terminalPowBlockMonitor = Optional.empty(); protected volatile ProposersDataManager proposersDataManager; protected volatile KeyValueStore keyValueStore; @@ -568,7 +569,7 @@ protected void initBlobSidecarManager() { LimitedMap.createSynchronizedLRU(500); final MiscHelpersDeneb miscHelpers = MiscHelpersDeneb.required(spec.forMilestone(SpecMilestone.DENEB).miscHelpers()); - final BlobSidecarGossipValidator blobSidecarValidator = + blobSidecarValidator = BlobSidecarGossipValidator.create( spec, invalidBlockRoots, gossipValidationHelper, miscHelpers, kzg); final BlobSidecarManagerImpl blobSidecarManagerImpl = @@ -626,6 +627,7 @@ protected void initBlockBlobSidecarsTrackersPool() { beaconAsyncRunner, recentChainData, executionLayer, + () -> blobSidecarValidator, blobSidecarGossipChannel::publishBlobSidecar); eventChannels.subscribe(FinalizedCheckpointChannel.class, pool); blockBlobSidecarsTrackersPool = pool;