Skip to content

Commit

Permalink
Refactor message BLS aggregation service for usage with tasks (#282)
Browse files Browse the repository at this point in the history
* feat: Adapt MessageBlsAggregationService for tasks

* fix: Remove now unnecessary maps in Aggregator

* test: Update tests based on task aggregation service changes

* fix: Observe received message on signature processing instead of when reaching quorum

* fix: Properly handle possibly nil messages in blsagg service response

* fix: Do not wait for full stake on message expiration

* feat: Add json tags to aggregator config

* feat: Set TASK_AGGREGATION_QUORUM_THRESHOLD to 66

* feat: Validate quorum percentages on message initialization

* feat: Type service response message with internal message type

* fix: Clean up task record when sending aggregated response

* fix: Add missing message key to blsagg responses and be explicit on zero-ish values
  • Loading branch information
Hyodar authored Aug 13, 2024
1 parent 7d744cf commit b74fb68
Show file tree
Hide file tree
Showing 14 changed files with 354 additions and 329 deletions.
191 changes: 74 additions & 117 deletions aggregator/aggregator.go

Large diffs are not rendered by default.

51 changes: 23 additions & 28 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (

"github.com/Layr-Labs/eigensdk-go/crypto/bls"
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
blsaggservmock "github.com/Layr-Labs/eigensdk-go/services/mocks/blsagg"
eigentypes "github.com/Layr-Labs/eigensdk-go/types"
gethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/NethermindEth/near-sffl/aggregator/blsagg"
dbmocks "github.com/NethermindEth/near-sffl/aggregator/database/mocks"
aggmocks "github.com/NethermindEth/near-sffl/aggregator/mocks"
"github.com/NethermindEth/near-sffl/aggregator/types"
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestSendNewTask(t *testing.T) {
assert.Nil(t, err)

var TASK_INDEX = uint32(0)
var BLOCK_NUMBER = uint32(100)
var BLOCK_NUMBER = uint64(100)
var FROM_TIMESTAMP = uint64(30_000)
var TO_TIMESTAMP = uint64(40_000)

Expand All @@ -76,15 +76,23 @@ func TestSendNewTask(t *testing.T) {
TO_TIMESTAMP-uint64(types.MESSAGE_SUBMISSION_TIMEOUT.Seconds())-uint64(types.MESSAGE_BLS_AGGREGATION_TIMEOUT.Seconds()),
types.TASK_QUORUM_THRESHOLD,
coretypes.QUORUM_NUMBERS,
).Return(aggmocks.MockSendNewCheckpointTask(BLOCK_NUMBER, TASK_INDEX, FROM_TIMESTAMP, TO_TIMESTAMP))
).Return(aggmocks.MockSendNewCheckpointTask(uint32(BLOCK_NUMBER), TASK_INDEX, FROM_TIMESTAMP, TO_TIMESTAMP))
mockAvsReaderer.EXPECT().GetLastCheckpointToTimestamp(context.Background()).Return(FROM_TIMESTAMP-1, nil)

// 100 blocks, each takes 12 seconds. We hardcode for now since aggregator also hardcodes this value
taskTimeToExpiry := 100 * 12 * time.Second
taskTimeToExpiry := (100-15)*12*time.Second - 1*time.Minute
taskAggregationTimeout := 1 * time.Minute
// make sure that initializeNewTask was called on the blsAggService
// maybe there's a better way to do this? There's a saying "don't mock 3rd party code"
// see https://hynek.me/articles/what-to-mock-in-5-mins/
mockTaskBlsAggService.EXPECT().InitializeNewTask(TASK_INDEX, BLOCK_NUMBER, coretypes.QUORUM_NUMBERS, []eigentypes.QuorumThresholdPercentage{types.TASK_AGGREGATION_QUORUM_THRESHOLD}, taskTimeToExpiry)
mockTaskBlsAggService.EXPECT().InitializeMessageIfNotExists(
messages.CheckpointTaskResponse{ReferenceTaskIndex: TASK_INDEX}.Key(),
coretypes.QUORUM_NUMBERS,
[]eigentypes.QuorumThresholdPercentage{types.TASK_AGGREGATION_QUORUM_THRESHOLD},
taskTimeToExpiry,
taskAggregationTimeout,
BLOCK_NUMBER,
)

aggregator.sendNewCheckpointTask()
}
Expand All @@ -100,23 +108,18 @@ func TestHandleStateRootUpdateAggregationReachedQuorum(t *testing.T) {
msgDigest, err := msg.Digest()
assert.Nil(t, err)

blsAggServiceResp := types.MessageBlsAggregationServiceResponse{
blsAggServiceResp := blsagg.MessageBlsAggregationServiceResponse{
MessageBlsAggregation: messages.MessageBlsAggregation{
MessageDigest: msgDigest,
},
Message: msg,
Finished: true,
}

aggregator.stateRootUpdates[msgDigest] = msg

mockMsgDb.EXPECT().StoreStateRootUpdate(msg)
mockMsgDb.EXPECT().StoreStateRootUpdateAggregation(msg, blsAggServiceResp.MessageBlsAggregation)

assert.Contains(t, aggregator.stateRootUpdates, msgDigest)

aggregator.handleStateRootUpdateReachedQuorum(blsAggServiceResp)

assert.NotContains(t, aggregator.stateRootUpdates, msgDigest)
}

func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) {
Expand All @@ -130,32 +133,27 @@ func TestHandleOperatorSetUpdateAggregationReachedQuorum(t *testing.T) {
msgDigest, err := msg.Digest()
assert.Nil(t, err)

blsAggServiceResp := types.MessageBlsAggregationServiceResponse{
blsAggServiceResp := blsagg.MessageBlsAggregationServiceResponse{
MessageBlsAggregation: messages.MessageBlsAggregation{
MessageDigest: msgDigest,
NonSignersPubkeysG1: make([]*bls.G1Point, 0),
SignersApkG2: bls.NewZeroG2Point(),
SignersAggSigG1: bls.NewZeroSignature(),
},
Message: msg,
Finished: true,
}

aggregator.operatorSetUpdates[msgDigest] = msg

mockMsgDb.EXPECT().StoreOperatorSetUpdate(msg)
mockMsgDb.EXPECT().StoreOperatorSetUpdateAggregation(msg, blsAggServiceResp.MessageBlsAggregation)

signatureInfo := blsAggServiceResp.ExtractBindingRollup()
mockRollupBroadcaster.EXPECT().BroadcastOperatorSetUpdate(context.Background(), msg, signatureInfo)

assert.Contains(t, aggregator.operatorSetUpdates, msgDigest)

aggregator.handleOperatorSetUpdateReachedQuorum(context.Background(), blsAggServiceResp)

assert.NotContains(t, aggregator.operatorSetUpdates, msgDigest)
}

func TestExpiredStateRootUpdateMessage(t *testing.T) {
func TestTimeoutStateRootUpdateMessage(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

Expand All @@ -172,10 +170,10 @@ func TestExpiredStateRootUpdateMessage(t *testing.T) {
},
})

assert.Equal(t, MessageExpiredError, err)
assert.Equal(t, MessageTimeoutError, err)
}

func TestExpiredOperatorSetUpdate(t *testing.T) {
func TestTimeoutOperatorSetUpdate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

Expand All @@ -192,16 +190,16 @@ func TestExpiredOperatorSetUpdate(t *testing.T) {
},
})

assert.Equal(t, MessageExpiredError, err)
assert.Equal(t, MessageTimeoutError, err)
}

func createMockAggregator(
mockCtrl *gomock.Controller, operatorPubkeyDict map[eigentypes.OperatorId]types.OperatorInfo,
) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *blsaggservmock.MockBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockOperatorRegistrationsService, *dbmocks.MockDatabaser, *aggmocks.MockRollupBroadcasterer, *safeclientmocks.MockSafeClient, error) {
) (*Aggregator, *chainiomocks.MockAvsReaderer, *chainiomocks.MockAvsWriterer, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockMessageBlsAggregationService, *aggmocks.MockOperatorRegistrationsService, *dbmocks.MockDatabaser, *aggmocks.MockRollupBroadcasterer, *safeclientmocks.MockSafeClient, error) {
logger := sdklogging.NewNoopLogger()
mockAvsWriter := chainiomocks.NewMockAvsWriterer(mockCtrl)
mockAvsReader := chainiomocks.NewMockAvsReaderer(mockCtrl)
mockTaskBlsAggregationService := blsaggservmock.NewMockBlsAggregationService(mockCtrl)
mockTaskBlsAggregationService := aggmocks.NewMockMessageBlsAggregationService(mockCtrl)
mockStateRootUpdateBlsAggregationService := aggmocks.NewMockMessageBlsAggregationService(mockCtrl)
mockOperatorSetUpdateBlsAggregationService := aggmocks.NewMockMessageBlsAggregationService(mockCtrl)
mockMsgDb := dbmocks.NewMockDatabaser(mockCtrl)
Expand All @@ -219,9 +217,6 @@ func createMockAggregator(
operatorRegistrationsService: mockOperatorRegistrationsService,
msgDb: mockMsgDb,
tasks: make(map[coretypes.TaskIndex]taskmanager.CheckpointTask),
taskResponses: make(map[coretypes.TaskIndex]map[eigentypes.TaskResponseDigest]messages.CheckpointTaskResponse),
stateRootUpdates: make(map[coretypes.MessageDigest]messages.StateRootUpdateMessage),
operatorSetUpdates: make(map[coretypes.MessageDigest]messages.OperatorSetUpdateMessage),
rollupBroadcaster: mockRollupBroadcaster,
httpClient: mockClient,
wsClient: mockClient,
Expand Down
Loading

0 comments on commit b74fb68

Please sign in to comment.