Skip to content

Commit

Permalink
few changes
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Jan 10, 2025
1 parent 526cb25 commit 233e7f2
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,8 @@ public SyncSource getOrCreateSyncSource(final Eth2Peer peer, final Spec spec) {
final Optional<Integer> maybeMaxBlobSidecarsPerMinute =
spec.getMaxBlobsPerBlockForHighestMilestone()
.map(
maxBlobsPerBlock -> {
final int maxBlobSidecarsPerMinuteUpperBound =
((this.maxBlocksPerMinute - batchSize) * maxBlobsPerBlock) - 1;
// The default configured value for requesting is less than what we'd accept to
// avoid requesting a very large number of blobs in a short amount of time, so
// choose the minimum of the two
return Math.min(maxBlobSidecarsPerMinute, maxBlobSidecarsPerMinuteUpperBound);
});
maxBlobsPerBlock ->
this.maxBlobSidecarsPerMinute - (batchSize * maxBlobsPerBlock) - 1);
return syncSourcesByPeer.computeIfAbsent(
peer,
source ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ public Eth2P2PNetwork build() {
eth2RpcOutstandingPingThreshold,
eth2StatusUpdateInterval,
timeProvider,
config.getPeerRateLimit(),
config.getPeerBlocksRateLimit(),
config.getPeerBlobSidecarsRateLimit(),
config.getPeerRequestLimit(),
spec,
kzg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@

public class P2PConfig {

public static final int DEFAULT_PEER_RATE_LIMIT = 500;
public static final int DEFAULT_PEER_BLOCKS_RATE_LIMIT = 500;
public static final int DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT = 1500;

public static final int DEFAULT_PEER_REQUEST_LIMIT = 100;

public static final boolean DEFAULT_PEER_ALL_TOPIC_FILTER_ENABLED = true;
Expand All @@ -54,7 +56,8 @@ public class P2PConfig {
private final GossipEncoding gossipEncoding;
private final int targetSubnetSubscriberCount;
private final boolean subscribeAllSubnetsEnabled;
private final int peerRateLimit;
private final int peerBlocksRateLimit;
private final int peerBlobSidecarsRateLimit;
private final int peerRequestLimit;
private final int batchVerifyMaxThreads;
private final int batchVerifyQueueCapacity;
Expand All @@ -71,7 +74,8 @@ private P2PConfig(
final GossipEncoding gossipEncoding,
final int targetSubnetSubscriberCount,
final boolean subscribeAllSubnetsEnabled,
final int peerRateLimit,
final int peerBlocksRateLimit,
final int peerBlobSidecarsRateLimit,
final int peerRequestLimit,
final int batchVerifyMaxThreads,
final int batchVerifyQueueCapacity,
Expand All @@ -86,7 +90,8 @@ private P2PConfig(
this.gossipEncoding = gossipEncoding;
this.targetSubnetSubscriberCount = targetSubnetSubscriberCount;
this.subscribeAllSubnetsEnabled = subscribeAllSubnetsEnabled;
this.peerRateLimit = peerRateLimit;
this.peerBlocksRateLimit = peerBlocksRateLimit;
this.peerBlobSidecarsRateLimit = peerBlobSidecarsRateLimit;
this.peerRequestLimit = peerRequestLimit;
this.batchVerifyMaxThreads = batchVerifyMaxThreads;
this.batchVerifyQueueCapacity = batchVerifyQueueCapacity;
Expand Down Expand Up @@ -129,8 +134,12 @@ public boolean isSubscribeAllSubnetsEnabled() {
return subscribeAllSubnetsEnabled;
}

public int getPeerRateLimit() {
return peerRateLimit;
public int getPeerBlocksRateLimit() {
return peerBlocksRateLimit;
}

public int getPeerBlobSidecarsRateLimit() {
return peerBlobSidecarsRateLimit;
}

public int getPeerRequestLimit() {
Expand Down Expand Up @@ -174,7 +183,8 @@ public static class Builder {
private final GossipEncoding gossipEncoding = GossipEncoding.SSZ_SNAPPY;
private Integer targetSubnetSubscriberCount = DEFAULT_P2P_TARGET_SUBNET_SUBSCRIBER_COUNT;
private Boolean subscribeAllSubnetsEnabled = DEFAULT_SUBSCRIBE_ALL_SUBNETS_ENABLED;
private Integer peerRateLimit = DEFAULT_PEER_RATE_LIMIT;
private Integer peerBlocksRateLimit = DEFAULT_PEER_BLOCKS_RATE_LIMIT;
private Integer peerBlobSidecarsRateLimit = DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT;
private Integer peerRequestLimit = DEFAULT_PEER_REQUEST_LIMIT;
private int batchVerifyMaxThreads = DEFAULT_BATCH_VERIFY_MAX_THREADS;
private OptionalInt batchVerifyQueueCapacity = OptionalInt.empty();
Expand Down Expand Up @@ -225,7 +235,8 @@ public P2PConfig build() {
gossipEncoding,
targetSubnetSubscriberCount,
subscribeAllSubnetsEnabled,
peerRateLimit,
peerBlocksRateLimit,
peerBlobSidecarsRateLimit,
peerRequestLimit,
batchVerifyMaxThreads,
batchVerifyQueueCapacity.orElse(DEFAULT_BATCH_VERIFY_QUEUE_CAPACITY),
Expand Down Expand Up @@ -277,13 +288,23 @@ public Builder subscribeAllSubnetsEnabled(final Boolean subscribeAllSubnetsEnabl
return this;
}

public Builder peerRateLimit(final Integer peerRateLimit) {
checkNotNull(peerRateLimit);
if (peerRateLimit < 0) {
public Builder peerBlocksRateLimit(final Integer peerBlocksRateLimit) {
checkNotNull(peerBlocksRateLimit);
if (peerBlocksRateLimit < 0) {
throw new InvalidConfigurationException(
String.format("Invalid peerBlocksRateLimit: %d", peerBlocksRateLimit));
}
this.peerBlocksRateLimit = peerBlocksRateLimit;
return this;
}

public Builder peerBlobSidecarsRateLimit(final Integer peerBlobSidecarsRateLimit) {
checkNotNull(peerBlobSidecarsRateLimit);
if (peerBlobSidecarsRateLimit < 0) {
throw new InvalidConfigurationException(
String.format("Invalid peerRateLimit: %d", peerRateLimit));
String.format("Invalid peerBlobSidecarsRateLimit: %d", peerBlobSidecarsRateLimit));
}
this.peerRateLimit = peerRateLimit;
this.peerBlobSidecarsRateLimit = peerBlobSidecarsRateLimit;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public class Eth2PeerFactory {
private final CombinedChainDataClient chainDataClient;
private final TimeProvider timeProvider;
private final Optional<Checkpoint> requiredCheckpoint;
private final int peerRateLimit;
private final int peerBlocksRateLimit;
private final int peerBlobSidecarsRateLimit;
private final int peerRequestLimit;
private final KZG kzg;
private final DiscoveryNodeIdExtractor discoveryNodeIdExtractor;
Expand All @@ -48,7 +49,8 @@ public Eth2PeerFactory(
final MetadataMessagesFactory metadataMessagesFactory,
final TimeProvider timeProvider,
final Optional<Checkpoint> requiredCheckpoint,
final int peerRateLimit,
final int peerBlocksRateLimit,
final int peerBlobSidecarsRateLimit,
final int peerRequestLimit,
final KZG kzg,
final DiscoveryNodeIdExtractor discoveryNodeIdExtractor) {
Expand All @@ -59,7 +61,8 @@ public Eth2PeerFactory(
this.statusMessageFactory = statusMessageFactory;
this.metadataMessagesFactory = metadataMessagesFactory;
this.requiredCheckpoint = requiredCheckpoint;
this.peerRateLimit = peerRateLimit;
this.peerBlocksRateLimit = peerBlocksRateLimit;
this.peerBlobSidecarsRateLimit = peerBlobSidecarsRateLimit;
this.peerRequestLimit = peerRequestLimit;
this.kzg = kzg;
this.discoveryNodeIdExtractor = discoveryNodeIdExtractor;
Expand All @@ -74,11 +77,8 @@ public Eth2Peer create(final Peer peer, final BeaconChainMethods rpcMethods) {
statusMessageFactory,
metadataMessagesFactory,
PeerChainValidator.create(spec, metricsSystem, chainDataClient, requiredCheckpoint),
RateTracker.create(peerRateLimit, TIME_OUT, timeProvider),
RateTracker.create(
peerRateLimit * spec.getMaxBlobsPerBlockForHighestMilestone().orElse(1),
TIME_OUT,
timeProvider),
RateTracker.create(peerBlocksRateLimit, TIME_OUT, timeProvider),
RateTracker.create(peerBlobSidecarsRateLimit, TIME_OUT, timeProvider),
RateTracker.create(peerRequestLimit, TIME_OUT, timeProvider),
kzg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public static Eth2PeerManager create(
final int eth2RpcOutstandingPingThreshold,
final Duration eth2StatusUpdateInterval,
final TimeProvider timeProvider,
final int peerRateLimit,
final int peerBlocksRateLimit,
final int peerBlobSidecarsRateLimit,
final int peerRequestLimit,
final Spec spec,
final KZG kzg,
Expand All @@ -140,7 +141,8 @@ public static Eth2PeerManager create(
metadataMessagesFactory,
timeProvider,
requiredCheckpoint,
peerRateLimit,
peerBlocksRateLimit,
peerBlobSidecarsRateLimit,
peerRequestLimit,
kzg,
discoveryNodeIdExtractor),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) {
eth2RpcOutstandingPingThreshold,
eth2StatusUpdateInterval,
timeProvider,
500,
50,
P2PConfig.DEFAULT_PEER_BLOCKS_RATE_LIMIT,
P2PConfig.DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT,
P2PConfig.DEFAULT_PEER_REQUEST_LIMIT,
spec,
KZG.NOOP,
(__) -> Optional.empty());
Expand Down
19 changes: 14 additions & 5 deletions teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -294,14 +294,22 @@ The network interface(s) on which the node listens for P2P communication.
private boolean gossipScoringEnabled = P2PConfig.DEFAULT_GOSSIP_SCORING_ENABLED;

@Option(
names = {"--Xpeer-rate-limit"},
names = {"--Xpeer-blocks-rate-limit"},
paramLabel = "<NUMBER>",
description =
"The number of requested blocks/blobs per peer to allow per minute before disconnecting the peer.\n"
+ "NOTE: the actual size for the allowed blobs per peer per minute will be `maxBlobsPerBlock` times the value of this parameter.",
"The number of requested blocks per peer to allow per minute before disconnecting the peer.",
arity = "1",
hidden = true)
private Integer peerRateLimit = P2PConfig.DEFAULT_PEER_RATE_LIMIT;
private Integer peerBlocksRateLimit = P2PConfig.DEFAULT_PEER_BLOCKS_RATE_LIMIT;

@Option(
names = {"--Xpeer-blob-sidecars-rate-limit"},
paramLabel = "<NUMBER>",
description =
"The number of requested blobs per peer to allow per minute before disconnecting the peer.",
arity = "1",
hidden = true)
private Integer peerBlobSidecarsRateLimit = P2PConfig.DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT;

@Option(
names = {"--Xp2p-gossip-blobs-after-block-enabled"},
Expand Down Expand Up @@ -430,7 +438,8 @@ public void configure(final TekuConfiguration.Builder builder) {
.batchVerifyStrictThreadLimitEnabled(batchVerifyStrictThreadLimitEnabled)
.targetSubnetSubscriberCount(p2pTargetSubnetSubscriberCount)
.isGossipScoringEnabled(gossipScoringEnabled)
.peerRateLimit(peerRateLimit)
.peerBlocksRateLimit(peerBlocksRateLimit)
.peerBlobSidecarsRateLimit(peerBlobSidecarsRateLimit)
.allTopicsFilterEnabled(allTopicsFilterEnabled)
.peerRequestLimit(peerRequestLimit)
.floodPublishMaxMessageSizeThreshold(floodPublishMaxMessageSizeThreshold)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,10 @@ private String[] createCliArgs() {
"127.0.0.1",
"--Xrest-api-max-url-length",
"65535",
"--Xpeer-rate-limit",
"--Xpeer-blocks-rate-limit",
"500",
"--Xpeer-blob-sidecars-rate-limit",
"1500",
"--Xpeer-request-limit",
"50"
};
Expand All @@ -582,7 +584,7 @@ private TekuConfiguration.Builder expectedDefaultConfigurationBuilder() {
.dataStorageMode(MINIMAL))
.metrics(b -> b.metricsCategories(DEFAULT_METRICS_CATEGORIES))
.restApi(b -> b.eth1DepositContractAddress(networkConfig.getEth1DepositContractAddress()))
.p2p(p -> p.peerRateLimit(500).peerRequestLimit(50))
.p2p(p -> p.peerBlocksRateLimit(500).peerBlobSidecarsRateLimit(1500).peerRequestLimit(100))
.discovery(
d ->
d.isDiscoveryEnabled(true)
Expand Down Expand Up @@ -623,7 +625,12 @@ private TekuConfiguration.Builder expectedConfigurationBuilder() {
.dataStorageCreateDbVersion(DatabaseVersion.DEFAULT_VERSION)
.maxKnownNodeCacheSize(100_000))
.data(b -> b.dataBasePath(dataPath))
.p2p(b -> b.targetSubnetSubscriberCount(2).peerRateLimit(500).peerRequestLimit(50))
.p2p(
b ->
b.targetSubnetSubscriberCount(2)
.peerBlocksRateLimit(500)
.peerBlobSidecarsRateLimit(1500)
.peerRequestLimit(100))
.discovery(
d ->
d.isDiscoveryEnabled(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,19 @@ public void usingNetworkFromUrl() {
}

@Test
public void setPeerRateLimit() {
public void setPeerBlocksRateLimit() {
TekuConfiguration tekuConfiguration =
getTekuConfigurationFromArguments("--Xpeer-rate-limit", "10");
getTekuConfigurationFromArguments("--Xpeer-blocks-rate-limit", "10");
final P2PConfig config = tekuConfiguration.beaconChain().p2pConfig();
assertThat(config.getPeerRateLimit()).isEqualTo(10);
assertThat(config.getPeerBlocksRateLimit()).isEqualTo(10);
}

@Test
public void setPeerBlobSidecarsRateLimit() {
TekuConfiguration tekuConfiguration =
getTekuConfigurationFromArguments("--Xpeer-blob-sidecars-rate-limit", "10");
final P2PConfig config = tekuConfiguration.beaconChain().p2pConfig();
assertThat(config.getPeerBlobSidecarsRateLimit()).isEqualTo(10);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public void shouldReadFromConfigurationFile() {

final P2PConfig p2pConfig = tekuConfig.p2p();
assertThat(p2pConfig.getTargetSubnetSubscriberCount()).isEqualTo(5);
assertThat(p2pConfig.getPeerRateLimit()).isEqualTo(100);
assertThat(p2pConfig.getPeerBlocksRateLimit()).isEqualTo(100);
assertThat(p2pConfig.getPeerBlobSidecarsRateLimit()).isEqualTo(300);
assertThat(p2pConfig.getPeerRequestLimit()).isEqualTo(101);

final DiscoveryConfig discoConfig = tekuConfig.discovery();
Expand Down
3 changes: 2 additions & 1 deletion teku/src/test/resources/P2POptions_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ p2p-peer-lower-bound: 70
p2p-peer-upper-bound: 85
Xp2p-target-subnet-subscriber-count: 5
Xp2p-minimum-randomly-selected-peer-count: 1
Xpeer-rate-limit: 100
Xpeer-blocks-rate-limit: 100
Xpeer-blob-sidecars-rate-limit: 300
Xpeer-request-limit: 101
Xp2p-historical-sync-batch-size: 102
Xp2p-sync-batch-size: 103
Expand Down

0 comments on commit 233e7f2

Please sign in to comment.