Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move atomic sync #723

Open
wants to merge 12 commits into
base: seperate-atomic-pkg-base
Choose a base branch
from
71 changes: 71 additions & 0 deletions plugin/evm/atomic/atomic_sync_extender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// (c) 2021-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
// TODO: move to separate package
package atomic

import (
"context"
"fmt"

syncclient "github.com/ava-labs/coreth/sync/client"
qdm12 marked this conversation as resolved.
Show resolved Hide resolved

"github.com/ava-labs/coreth/plugin/evm/message"
"github.com/ava-labs/coreth/plugin/evm/sync"
"github.com/ethereum/go-ethereum/log"
)

var _ sync.Extender = (*AtomicSyncExtender)(nil)

type AtomicSyncExtender struct {
backend AtomicBackend
stateSyncRequestSize uint16
}

func NewAtomicSyncExtender(backend AtomicBackend, stateSyncRequestSize uint16) *AtomicSyncExtender {
return &AtomicSyncExtender{
backend: backend,
stateSyncRequestSize: stateSyncRequestSize,
}
}

func (a *AtomicSyncExtender) Sync(ctx context.Context, client syncclient.Client, syncSummary message.Syncable) error {
atomicSyncSummary, ok := syncSummary.(*AtomicBlockSyncSummary)
if !ok {
return fmt.Errorf("expected *AtomicBlockSyncSummary, got %T", syncSummary)
}
log.Info("atomic tx: sync starting", "root", atomicSyncSummary)
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
atomicSyncer, err := a.backend.Syncer(client, atomicSyncSummary.AtomicRoot, atomicSyncSummary.BlockNumber, a.stateSyncRequestSize)
if err != nil {
return fmt.Errorf("failed to create atomic syncer: %w", err)
}
if err := atomicSyncer.Start(ctx); err != nil {
return fmt.Errorf("failed to start atomic syncer: %w", err)
}
err = <-atomicSyncer.Done()
log.Info("atomic tx: sync finished", "root", atomicSyncSummary.AtomicRoot, "err", err)
return err
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}

func (a *AtomicSyncExtender) OnFinishBeforeCommit(lastAcceptedHeight uint64, syncSummary message.Syncable) error {
// Mark the previously last accepted block for the shared memory cursor, so that we will execute shared
// memory operations from the previously last accepted block when ApplyToSharedMemory
// is called.
if err := a.backend.MarkApplyToSharedMemoryCursor(lastAcceptedHeight); err != nil {
return fmt.Errorf("failed to mark apply to shared memory cursor before commit: %w", err)
}
a.backend.SetLastAccepted(syncSummary.GetBlockHash())
return nil
}

func (a *AtomicSyncExtender) OnFinishAfterCommit(summaryHeight uint64) error {
// the chain state is already restored, and, from this point on,
// the block synced to is the accepted block. The last operation
// is updating shared memory with the atomic trie.
// ApplyToSharedMemory does this, and, even if the VM is stopped
// (gracefully or ungracefully), since MarkApplyToSharedMemoryCursor
// is called, VM will resume ApplyToSharedMemory on Initialize.
if err := a.backend.ApplyToSharedMemory(summaryHeight); err != nil {
return fmt.Errorf("failed to apply atomic trie to shared memory after commit: %w", err)
}
return nil
}
51 changes: 51 additions & 0 deletions plugin/evm/atomic/atomic_sync_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// (c) 2021-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package atomic

import (
"fmt"

"github.com/ava-labs/avalanchego/snow/engine/snowman/block"

"github.com/ava-labs/coreth/core"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't import core in the atomic package this will couple it to libevm bindings again.
IMO we can have a pacakge like atomic/sync.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"github.com/ava-labs/coreth/plugin/evm/sync"
"github.com/ethereum/go-ethereum/common"
)

var _ sync.SummaryProvider = &AtomicSyncProvider{}

type AtomicSyncProvider struct {
chain *core.BlockChain
atomicTrie AtomicTrie
}

func NewAtomicProvider(chain *core.BlockChain, atomicTrie AtomicTrie) *AtomicSyncProvider {
return &AtomicSyncProvider{chain: chain, atomicTrie: atomicTrie}
}

// StateSummaryAtHeight returns the block state summary at [height] if valid and available.
func (a *AtomicSyncProvider) StateSummaryAtHeight(height uint64) (block.StateSummary, error) {
atomicRoot, err := a.atomicTrie.Root(height)
if err != nil {
return nil, fmt.Errorf("failed to retrieve atomic trie root for height (%d): %w", height, err)
}

if atomicRoot == (common.Hash{}) {
return nil, fmt.Errorf("atomic trie root not found for height (%d)", height)
}

blk := a.chain.GetBlockByNumber(height)
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
if blk == nil {
return nil, fmt.Errorf("block not found for height (%d)", height)
}

if !a.chain.HasState(blk.Root()) {
return nil, fmt.Errorf("block root does not exist for height (%d), root (%s)", height, blk.Root())
}

summary, err := NewAtomicSyncSummary(blk.Hash(), height, blk.Root(), atomicRoot)
if err != nil {
return nil, fmt.Errorf("failed to construct syncable block at height %d: %w", height, err)
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}
return summary, nil
}
119 changes: 119 additions & 0 deletions plugin/evm/atomic/syncable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// (c) 2021-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package atomic

import (
"context"
"fmt"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/coreth/plugin/evm/message"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"

"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
)

var (
_ message.Syncable = (*AtomicBlockSyncSummary)(nil)
_ message.SyncableParser = (*AtomicSyncSummaryParser)(nil)
)

// AtomicBlockSyncSummary provides the information necessary to sync a node starting
// at the given block.
type AtomicBlockSyncSummary struct {
BlockNumber uint64 `serialize:"true"`
BlockHash common.Hash `serialize:"true"`
BlockRoot common.Hash `serialize:"true"`
AtomicRoot common.Hash `serialize:"true"`

summaryID ids.ID
bytes []byte
acceptImpl message.AcceptImplFn
}

func init() {
message.SyncSummaryType = &AtomicBlockSyncSummary{}
}

type AtomicSyncSummaryParser struct{}

func NewAtomicSyncSummaryParser() *AtomicSyncSummaryParser {
return &AtomicSyncSummaryParser{}
}

func (a *AtomicSyncSummaryParser) ParseFromBytes(summaryBytes []byte, acceptImpl message.AcceptImplFn) (message.Syncable, error) {
summary := AtomicBlockSyncSummary{}
if codecVersion, err := Codec.Unmarshal(summaryBytes, &summary); err != nil {
return nil, fmt.Errorf("failed to parse syncable summary: %w", err)
} else if codecVersion != message.Version {
return nil, fmt.Errorf("failed to parse syncable summary due to unexpected codec version (got %d, expected %d)", codecVersion, message.Version)
}

summary.bytes = summaryBytes
summaryID, err := ids.ToID(crypto.Keccak256(summaryBytes))
if err != nil {
return nil, fmt.Errorf("failed to compute summary ID: %w", err)
}
summary.summaryID = summaryID
summary.acceptImpl = acceptImpl
return &summary, nil
}

func NewAtomicSyncSummary(blockHash common.Hash, blockNumber uint64, blockRoot common.Hash, atomicRoot common.Hash) (*AtomicBlockSyncSummary, error) {
summary := AtomicBlockSyncSummary{
BlockNumber: blockNumber,
BlockHash: blockHash,
BlockRoot: blockRoot,
AtomicRoot: atomicRoot,
}
bytes, err := Codec.Marshal(message.Version, &summary)
if err != nil {
return nil, fmt.Errorf("failed to marshal syncable summary: %w", err)
}

summary.bytes = bytes
summaryID, err := ids.ToID(crypto.Keccak256(bytes))
if err != nil {
return nil, fmt.Errorf("failed to compute summary ID: %w", err)
}
summary.summaryID = summaryID

return &summary, nil
}

func (a *AtomicBlockSyncSummary) GetBlockNumber() uint64 {
return a.BlockNumber
}

func (a *AtomicBlockSyncSummary) GetBlockHash() common.Hash {
return a.BlockHash
}

func (a *AtomicBlockSyncSummary) GetBlockRoot() common.Hash {
return a.BlockRoot
}

func (a *AtomicBlockSyncSummary) Bytes() []byte {
return a.bytes
}

func (a *AtomicBlockSyncSummary) Height() uint64 {
return a.BlockNumber
}

func (a *AtomicBlockSyncSummary) ID() ids.ID {
return a.summaryID
}

func (a *AtomicBlockSyncSummary) String() string {
return fmt.Sprintf("AtomicBlockSyncSummary(BlockHash=%s, BlockNumber=%d, BlockRoot=%s, AtomicRoot=%s)", a.BlockHash, a.BlockNumber, a.BlockRoot, a.AtomicRoot)
}

func (a *AtomicBlockSyncSummary) Accept(context.Context) (block.StateSyncMode, error) {
if a.acceptImpl == nil {
return block.StateSyncSkipped, fmt.Errorf("accept implementation not specified for summary: %s", a)
}
return a.acceptImpl(a)
}
8 changes: 6 additions & 2 deletions plugin/evm/message/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ const (
maxMessageSize = 2*units.MiB - 64*units.KiB // Subtract 64 KiB from p2p network cap to leave room for encoding overhead from AvalancheGo
)

var Codec codec.Manager
var (
Codec codec.Manager
// TODO: Remove this once we have a better way to register types (i.e use a different codec version or use build flags)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can move init to a function like (Initialize) and call in vm.Initialize like Codec.Initialize(SyncSummaryType), but then tests should be doing the same thing everytime. But it might be the only way once we merge subnet-evm since two VMs should be using two different codecs since subnet-evm do not have atomic types (and thus codec has different order of type IDs).

SyncSummaryType interface{} = BlockSyncSummary{}
)

func init() {
Codec = codec.NewManager(maxMessageSize)
Expand All @@ -26,7 +30,7 @@ func init() {
c.SkipRegistrations(2)
errs.Add(
// Types for state sync frontier consensus
c.RegisterType(SyncSummary{}),
c.RegisterType(SyncSummaryType),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not directly BlockSyncSummary{} though?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockSyncSummary has changed (removed the AtomicRoot). This is a (probably janky) workaround in order to be compatible but let another pkg register it's own type.


// state sync types
c.RegisterType(BlockRequest{}),
Expand Down
75 changes: 52 additions & 23 deletions plugin/evm/message/syncable.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,78 +14,107 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
)

var _ block.StateSummary = &SyncSummary{}
var _ Syncable = (*BlockSyncSummary)(nil)

// SyncSummary provides the information necessary to sync a node starting
type Syncable interface {
block.StateSummary
GetBlockNumber() uint64
GetBlockHash() common.Hash
GetBlockRoot() common.Hash
}

type SyncableParser interface {
ParseFromBytes(summaryBytes []byte, acceptImpl AcceptImplFn) (Syncable, error)
}

type AcceptImplFn func(Syncable) (block.StateSyncMode, error)

// BlockSyncSummary provides the information necessary to sync a node starting
// at the given block.
type SyncSummary struct {
type BlockSyncSummary struct {
Comment on lines +32 to +34
Copy link
Collaborator

@qdm12 qdm12 Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did we need to rename SyncSummary to BlockSyncSummary? 🤔
It seems like unneeded changes, unless there is a reason?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to distinguish between AtomicBlockSyncSummary

BlockNumber uint64 `serialize:"true"`
BlockHash common.Hash `serialize:"true"`
BlockRoot common.Hash `serialize:"true"`
AtomicRoot common.Hash `serialize:"true"`

summaryID ids.ID
bytes []byte
acceptImpl func(SyncSummary) (block.StateSyncMode, error)
acceptImpl AcceptImplFn
}

type BlockSyncSummaryParser struct{}
qdm12 marked this conversation as resolved.
Show resolved Hide resolved

func NewBlockSyncSummaryParser() *BlockSyncSummaryParser {
return &BlockSyncSummaryParser{}
}

func NewSyncSummaryFromBytes(summaryBytes []byte, acceptImpl func(SyncSummary) (block.StateSyncMode, error)) (SyncSummary, error) {
summary := SyncSummary{}
func (b *BlockSyncSummaryParser) ParseFromBytes(summaryBytes []byte, acceptImpl AcceptImplFn) (Syncable, error) {
summary := BlockSyncSummary{}
if codecVersion, err := Codec.Unmarshal(summaryBytes, &summary); err != nil {
return SyncSummary{}, err
return nil, fmt.Errorf("failed to parse syncable summary: %w", err)
} else if codecVersion != Version {
return SyncSummary{}, fmt.Errorf("failed to parse syncable summary due to unexpected codec version (%d != %d)", codecVersion, Version)
return nil, fmt.Errorf("failed to parse syncable summary due to unexpected codec version (%d != %d)", codecVersion, Version)
}

summary.bytes = summaryBytes
summaryID, err := ids.ToID(crypto.Keccak256(summaryBytes))
if err != nil {
return SyncSummary{}, err
return nil, fmt.Errorf("failed to compute summary ID: %w", err)
}
summary.summaryID = summaryID
summary.acceptImpl = acceptImpl
return summary, nil
return &summary, nil
}

func NewSyncSummary(blockHash common.Hash, blockNumber uint64, blockRoot common.Hash, atomicRoot common.Hash) (SyncSummary, error) {
summary := SyncSummary{
func NewBlockSyncSummary(blockHash common.Hash, blockNumber uint64, blockRoot common.Hash) (*BlockSyncSummary, error) {
summary := BlockSyncSummary{
BlockNumber: blockNumber,
BlockHash: blockHash,
BlockRoot: blockRoot,
AtomicRoot: atomicRoot,
}
bytes, err := Codec.Marshal(Version, &summary)
if err != nil {
return SyncSummary{}, err
return nil, fmt.Errorf("failed to marshal syncable summary: %w", err)
}

summary.bytes = bytes
summaryID, err := ids.ToID(crypto.Keccak256(bytes))
if err != nil {
return SyncSummary{}, err
return nil, fmt.Errorf("failed to compute summary ID: %w", err)
}
summary.summaryID = summaryID

return summary, nil
return &summary, nil
}

func (s *BlockSyncSummary) GetBlockNumber() uint64 {
return s.BlockNumber
}

func (s *BlockSyncSummary) GetBlockHash() common.Hash {
return s.BlockHash
}

func (s *BlockSyncSummary) GetBlockRoot() common.Hash {
return s.BlockRoot
}

func (s SyncSummary) Bytes() []byte {
func (s *BlockSyncSummary) Bytes() []byte {
return s.bytes
}

func (s SyncSummary) Height() uint64 {
func (s *BlockSyncSummary) Height() uint64 {
return s.BlockNumber
}

func (s SyncSummary) ID() ids.ID {
func (s *BlockSyncSummary) ID() ids.ID {
return s.summaryID
}

func (s SyncSummary) String() string {
return fmt.Sprintf("SyncSummary(BlockHash=%s, BlockNumber=%d, BlockRoot=%s, AtomicRoot=%s)", s.BlockHash, s.BlockNumber, s.BlockRoot, s.AtomicRoot)
func (s *BlockSyncSummary) String() string {
return fmt.Sprintf("BlockSyncSummary(BlockHash=%s, BlockNumber=%d, BlockRoot=%s)", s.BlockHash, s.BlockNumber, s.BlockRoot)
}

func (s SyncSummary) Accept(context.Context) (block.StateSyncMode, error) {
func (s *BlockSyncSummary) Accept(context.Context) (block.StateSyncMode, error) {
if s.acceptImpl == nil {
return block.StateSyncSkipped, fmt.Errorf("accept implementation not specified for summary: %s", s)
}
Expand Down
Loading
Loading