Skip to content

Commit

Permalink
Store only most recent blocks in cache (#7531)
Browse files Browse the repository at this point in the history
* Store only most recent blocks in cache

* Make join safe in test

* Remove direct access from store values with abstract class and per-field package-private methods
  • Loading branch information
zilm13 authored Oct 9, 2023
1 parent d77470a commit ef391e9
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Consensys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.store;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.BlockAndCheckpoints;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.blocks.StateAndBlockSummary;
import tech.pegasys.teku.spec.datastructures.execution.SlotAndExecutionPayloadSummary;
import tech.pegasys.teku.spec.datastructures.forkchoice.VoteTracker;

/** Store extension dedicated to keep unsafe updates package-private */
public abstract class CacheableStore implements UpdatableStore {

abstract void cacheTimeMillis(UInt64 timeMillis);

abstract void cacheGenesisTime(UInt64 genesisTime);

abstract void cacheProposerBoostRoot(Optional<Bytes32> proposerBoostRoot);

abstract void cacheBlocks(Collection<BlockAndCheckpoints> blockAndCheckpoints);

abstract void cacheStates(Map<Bytes32, StateAndBlockSummary> stateAndBlockSummaries);

abstract void cacheFinalizedOptimisticTransitionPayload(
Optional<SlotAndExecutionPayloadSummary> finalizedOptimisticTransitionPayload);

abstract void cleanupCheckpointStates(Predicate<SlotAndBlockRoot> removalCondition);

abstract void setHighestVotedValidatorIndex(UInt64 highestVotedValidatorIndex);

abstract void setVote(int index, VoteTracker voteTracker);
}
123 changes: 87 additions & 36 deletions storage/src/main/java/tech/pegasys/teku/storage/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -31,6 +32,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -52,6 +54,7 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.BlockAndCheckpoints;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
Expand All @@ -71,7 +74,7 @@
import tech.pegasys.teku.storage.protoarray.ForkChoiceStrategy;
import tech.pegasys.teku.storage.protoarray.ProtoArray;

class Store implements UpdatableStore {
class Store extends CacheableStore {
private static final Logger LOG = LogManager.getLogger();
public static final int VOTE_TRACKER_SPARE_CAPACITY = 1000;

Expand All @@ -87,28 +90,28 @@ class Store implements UpdatableStore {

private Optional<SettableGauge> epochStatesCountGauge = Optional.empty();

final Optional<Map<Bytes32, StateAndBlockSummary>> maybeEpochStates;
private final Optional<Map<Bytes32, StateAndBlockSummary>> maybeEpochStates;

private final Spec spec;
private final StateAndBlockSummaryProvider stateProvider;
private final BlockProvider blockProvider;
private final BlobSidecarsProvider blobSidecarsProvider;
private final EarliestBlobSidecarSlotProvider earliestBlobSidecarSlotProvider;
final ForkChoiceStrategy forkChoiceStrategy;
private final ForkChoiceStrategy forkChoiceStrategy;

private final Optional<Checkpoint> initialCheckpoint;
UInt64 timeMillis;
UInt64 genesisTime;
AnchorPoint finalizedAnchor;
Checkpoint justifiedCheckpoint;
Checkpoint bestJustifiedCheckpoint;
Optional<SlotAndExecutionPayloadSummary> finalizedOptimisticTransitionPayload;
Optional<Bytes32> proposerBoostRoot = Optional.empty();
final CachingTaskQueue<Bytes32, StateAndBlockSummary> states;
final Map<Bytes32, SignedBeaconBlock> blocks;
final CachingTaskQueue<SlotAndBlockRoot, BeaconState> checkpointStates;
VoteTracker[] votes;
UInt64 highestVotedValidatorIndex;
private final CachingTaskQueue<Bytes32, StateAndBlockSummary> states;
private final Map<Bytes32, SignedBeaconBlock> blocks;
private final CachingTaskQueue<SlotAndBlockRoot, BeaconState> checkpointStates;
private UInt64 timeMillis;
private UInt64 genesisTime;
private AnchorPoint finalizedAnchor;
private Checkpoint justifiedCheckpoint;
private Checkpoint bestJustifiedCheckpoint;
private Optional<SlotAndExecutionPayloadSummary> finalizedOptimisticTransitionPayload;
private Optional<Bytes32> proposerBoostRoot = Optional.empty();
private VoteTracker[] votes;
private UInt64 highestVotedValidatorIndex;

private Store(
final MetricsSystem metricsSystem,
Expand Down Expand Up @@ -495,13 +498,7 @@ public SafeFuture<Optional<SignedBeaconBlock>> retrieveSignedBlock(final Bytes32
}

// Retrieve and cache block
return blockProvider
.getBlock(blockRoot)
.thenApply(
block -> {
block.ifPresent(this::putBlock);
return block;
});
return blockProvider.getBlock(blockRoot);
}

@Override
Expand Down Expand Up @@ -581,6 +578,74 @@ public SafeFuture<Optional<UInt64>> retrieveEarliestBlobSidecarSlot() {
return earliestBlobSidecarSlotProvider.getEarliestBlobSidecarSlot();
}

/** Non-synchronized, no lock, unsafe if Store is not locked externally */
@Override
void cacheBlocks(final Collection<BlockAndCheckpoints> blockAndCheckpoints) {
blockAndCheckpoints.stream()
.sorted(Comparator.comparing(BlockAndCheckpoints::getSlot))
.map(BlockAndCheckpoints::getBlock)
.forEach(block -> blocks.put(block.getRoot(), block));
blockCountGauge.ifPresent(gauge -> gauge.set(blocks.size()));
}

/** Non-synchronized, no lock, unsafe if Store is not locked externally */
@Override
void cacheTimeMillis(final UInt64 timeMillis) {
if (timeMillis.isGreaterThanOrEqualTo(this.timeMillis)) {
this.timeMillis = timeMillis;
}
}

/** Non-synchronized, no lock, unsafe if Store is not locked externally */
@Override
void cacheGenesisTime(final UInt64 genesisTime) {
this.genesisTime = genesisTime;
}

/** Non-synchronized, no lock, unsafe if Store is not locked externally */
@Override
void cacheProposerBoostRoot(final Optional<Bytes32> proposerBoostRoot) {
this.proposerBoostRoot = proposerBoostRoot;
}

/** Non-synchronized, no lock, unsafe if Store is not locked externally */
@Override
void cacheStates(final Map<Bytes32, StateAndBlockSummary> stateAndBlockSummaries) {
states.cacheAll(stateAndBlockSummaries);
}

/** Non-synchronized, no lock, unsafe if Store is not locked externally */
@Override
void cacheFinalizedOptimisticTransitionPayload(
final Optional<SlotAndExecutionPayloadSummary> finalizedOptimisticTransitionPayload) {
this.finalizedOptimisticTransitionPayload = finalizedOptimisticTransitionPayload;
}

/** Non-synchronized, no lock, unsafe if Store is not locked externally */
@Override
void cleanupCheckpointStates(final Predicate<SlotAndBlockRoot> removalCondition) {
checkpointStates.removeIf(removalCondition);
}

/** Non-synchronized, no lock, unsafe if Store is not locked externally */
@Override
void setHighestVotedValidatorIndex(final UInt64 highestVotedValidatorIndex) {
this.highestVotedValidatorIndex = highestVotedValidatorIndex;

// Expand votes array if needed
if (highestVotedValidatorIndex.isGreaterThanOrEqualTo(votes.length)) {
this.votes =
Arrays.copyOf(
votes, highestVotedValidatorIndex.plus(VOTE_TRACKER_SPARE_CAPACITY).intValue());
}
}

/** Non-synchronized, no lock, unsafe if Store is not locked externally */
@Override
void setVote(final int index, final VoteTracker voteTracker) {
votes[index] = voteTracker;
}

UInt64 getHighestVotedValidatorIndex() {
readVotesLock.lock();
try {
Expand Down Expand Up @@ -617,7 +682,6 @@ private SafeFuture<Optional<SignedBlockAndState>> getAndCacheBlockAndState(
signedBeaconBlock ->
SafeFuture.completedFuture(Optional.of(signedBeaconBlock)))
.orElseGet(() -> blockProvider.getBlock(blockRoot))
.thenPeek(block -> block.ifPresent(this::putBlock))
.thenApply(
block -> block.map(b -> new SignedBlockAndState(b, res.get().getState())));
});
Expand Down Expand Up @@ -815,19 +879,6 @@ private boolean isSlotAtNthEpochBoundary(
.orElse(false);
}

private void putBlock(final SignedBeaconBlock block) {
final Lock writeLock = lock.writeLock();
writeLock.lock();
try {
if (containsBlock(block.getRoot())) {
blocks.put(block.getRoot(), block);
blockCountGauge.ifPresent(gauge -> gauge.set(blocks.size()));
}
} finally {
writeLock.unlock();
}
}

@VisibleForTesting
Optional<Map<Bytes32, StateAndBlockSummary>> getEpochStates() {
return maybeEpochStates;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,9 @@ public Collection<Bytes32> getOrderedBlockRoots() {
lock.readLock().lock();
try {
final NavigableMap<UInt64, Bytes32> blockRootsBySlot = new TreeMap<>();
store.forkChoiceStrategy.processAllInOrder(
(root, slot, parent) -> blockRootsBySlot.put(slot, root));
store
.getForkChoiceStrategy()
.processAllInOrder((root, slot, parent) -> blockRootsBySlot.put(slot, root));
this.blockData
.values()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,15 @@ public StorageUpdate createStorageUpdate() {

public void applyToStore(final Store store, final UpdateResult updateResult) {
// Add new data
tx.timeMillis
.filter(t -> t.isGreaterThan(store.getTimeMillis()))
.ifPresent(value -> store.timeMillis = value);
tx.genesisTime.ifPresent(value -> store.genesisTime = value);
tx.timeMillis.ifPresent(store::cacheTimeMillis);
tx.genesisTime.ifPresent(store::cacheGenesisTime);
tx.justifiedCheckpoint.ifPresent(store::updateJustifiedCheckpoint);
tx.bestJustifiedCheckpoint.ifPresent(store::updateBestJustifiedCheckpoint);
hotBlocks.forEach((root, value) -> store.blocks.put(root, value.getBlock()));
store.states.cacheAll(Maps.transformValues(hotBlockAndStates, this::blockAndStateAsSummary));
store.cacheBlocks(hotBlocks.values());
store.cacheStates(Maps.transformValues(hotBlockAndStates, this::blockAndStateAsSummary));
if (optimisticTransitionBlockRootSet) {
store.finalizedOptimisticTransitionPayload =
updateResult.getFinalizedOptimisticTransitionPayload();
store.cacheFinalizedOptimisticTransitionPayload(
updateResult.getFinalizedOptimisticTransitionPayload());
}

// Update finalized data
Expand All @@ -123,18 +121,20 @@ public void applyToStore(final Store store, final UpdateResult updateResult) {
// Prune blocks and states
prunedHotBlockRoots.keySet().forEach(store::removeStateAndBlock);

store.checkpointStates.removeIf(
store.cleanupCheckpointStates(
slotAndBlockRoot -> prunedHotBlockRoots.containsKey(slotAndBlockRoot.getBlockRoot()));

if (tx.proposerBoostRootSet) {
store.proposerBoostRoot = tx.proposerBoostRoot;
store.cacheProposerBoostRoot(tx.proposerBoostRoot);
}

store.forkChoiceStrategy.applyUpdate(
hotBlocks.values(),
tx.pulledUpBlockCheckpoints,
prunedHotBlockRoots,
store.getFinalizedCheckpoint());
store
.getForkChoiceStrategy()
.applyUpdate(
hotBlocks.values(),
tx.pulledUpBlockCheckpoints,
prunedHotBlockRoots,
store.getFinalizedCheckpoint());
}

private StateAndBlockSummary blockAndStateAsSummary(final SignedBlockAndState blockAndState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ private StoreTransactionUpdates buildFinalizedUpdates(final Checkpoint finalized
// Transition block was finalized by this transaction
optimisticTransitionBlockRootSet = true;
optimisticTransitionBlockRoot =
baseStore.forkChoiceStrategy.getOptimisticallySyncedTransitionBlockRoot(
latestFinalized.getRoot());
baseStore
.getForkChoiceStrategy()
.getOptimisticallySyncedTransitionBlockRoot(latestFinalized.getRoot());
} else {
optimisticTransitionBlockRootSet = false;
optimisticTransitionBlockRoot = Optional.empty();
Expand Down Expand Up @@ -161,7 +162,7 @@ private Map<Bytes32, BeaconState> getHotStatesToPersist() {
private Optional<UInt64> blockSlot(final Bytes32 root) {
return Optional.ofNullable(hotBlockAndStates.get(root))
.map(SignedBlockAndState::getSlot)
.or(() -> baseStore.forkChoiceStrategy.blockSlot(root));
.or(() -> baseStore.getForkChoiceStrategy().blockSlot(root));
}

private Map<Bytes32, Bytes32> collectFinalizedRoots(final Bytes32 newlyFinalizedBlockRoot) {
Expand All @@ -176,10 +177,12 @@ private Map<Bytes32, Bytes32> collectFinalizedRoots(final Bytes32 newlyFinalized
}

// Add existing hot blocks that are now finalized
if (baseStore.forkChoiceStrategy.contains(finalizedChainHeadRoot)) {
baseStore.forkChoiceStrategy.processHashesInChain(
finalizedChainHeadRoot,
(blockRoot, slot, parentRoot) -> childToParent.put(blockRoot, parentRoot));
if (baseStore.getForkChoiceStrategy().contains(finalizedChainHeadRoot)) {
baseStore
.getForkChoiceStrategy()
.processHashesInChain(
finalizedChainHeadRoot,
(blockRoot, slot, parentRoot) -> childToParent.put(blockRoot, parentRoot));
}
return childToParent;
}
Expand Down Expand Up @@ -214,12 +217,14 @@ private boolean shouldPrune(

private void calculatePrunedHotBlockRoots() {
final BeaconBlockSummary finalizedBlock = tx.getLatestFinalized().getBlockSummary();
baseStore.forkChoiceStrategy.processAllInOrder(
(blockRoot, slot, parentRoot) -> {
if (shouldPrune(finalizedBlock, blockRoot, slot, parentRoot)) {
prunedHotBlockRoots.put(blockRoot, slot);
}
});
baseStore
.getForkChoiceStrategy()
.processAllInOrder(
(blockRoot, slot, parentRoot) -> {
if (shouldPrune(finalizedBlock, blockRoot, slot, parentRoot)) {
prunedHotBlockRoots.put(blockRoot, slot);
}
});

tx.blockData.values().stream()
// Iterate new blocks in slot order to guarantee we see parents first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

package tech.pegasys.teku.storage.store;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -96,17 +95,8 @@ public Bytes32 applyForkChoiceScoreChanges(
public void commit() {
// Votes are applied to the store immediately since the changes to the in-memory ProtoArray
// can't be rolled back.

store.highestVotedValidatorIndex = getHighestVotedValidatorIndex();

if (store.highestVotedValidatorIndex.intValue() >= store.votes.length) {
store.votes =
Arrays.copyOf(
store.votes,
store.highestVotedValidatorIndex.intValue() + Store.VOTE_TRACKER_SPARE_CAPACITY);
}

votes.forEach((key, value) -> store.votes[key.intValue()] = value);
store.setHighestVotedValidatorIndex(getHighestVotedValidatorIndex());
votes.forEach((key, value) -> store.setVote(key.intValue(), value));

voteUpdateChannel.onVotesUpdated(votes);
}
Expand Down
Loading

0 comments on commit ef391e9

Please sign in to comment.