Skip to content

Commit

Permalink
feat(eth): check events are indexed within in requested range (#12728)
Browse files Browse the repository at this point in the history
Initial work, not yet complete.
  • Loading branch information
akaladarshi authored and rvagg committed Jan 6, 2025
1 parent 9dcd3c7 commit 4c32069
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 39 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ The Lotus v1.31.0 release introduces the new `ChainIndexer` subsystem, enhancing
- Make the ordering of event output for `eth_` APIs and `GetActorEventsRaw` consistent, sorting ascending on: epoch, message index, event index and original event entry order. ([filecoin-project/lotus#12623](https://github.com/filecoin-project/lotus/pull/12623))
- Return a consistent error when encountering null rounds in ETH RPC method calls. ([filecoin-project/lotus#12655](https://github.com/filecoin-project/lotus/pull/12655))
- Correct erroneous sector QAP-calculation upon sector extension in lotus-miner cli. ([filecoin-project/lotus#12720](https://github.com/filecoin-project/lotus/pull/12720))
- Return error if logs or events within range are not indexed. ([filecoin-project/lotus#12728](https://github.com/filecoin-project/lotus/pull/12728))


## 📝 Changelog

Expand Down
1 change: 0 additions & 1 deletion chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.getMsgCidFromEthHashStmt: "SELECT message_cid FROM eth_tx_hash WHERE tx_hash = ? LIMIT 1",
&ps.insertEthTxHashStmt: "INSERT INTO eth_tx_hash (tx_hash, message_cid) VALUES (?, ?) ON CONFLICT (tx_hash) DO UPDATE SET inserted_at = CURRENT_TIMESTAMP",
&ps.insertTipsetMessageStmt: "INSERT INTO tipset_message (tipset_key_cid, height, reverted, message_cid, message_index) VALUES (?, ?, ?, ?, ?) ON CONFLICT (tipset_key_cid, message_cid) DO UPDATE SET reverted = 0",
&ps.hasTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.updateTipsetToNonRevertedStmt: "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?",
&ps.updateTipsetToRevertedStmt: "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?",
&ps.removeTipsetsBeforeHeightStmt: "DELETE FROM tipset_message WHERE height < ?",
Expand Down
128 changes: 90 additions & 38 deletions chain/index/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import (
"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi"
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"

"github.com/filecoin-project/lotus/chain/types"
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"
)

var ErrMaxResultsReached = fmt.Errorf("filter matches too many events, try a more restricted filter")
var (
ErrMaxResultsReached = fmt.Errorf("filter matches too many events, try a more restricted filter")
ErrRangeInFuture = fmt.Errorf("range end is in the future")
)

const maxLookBackForWait = 120 // one hour of tipsets

Expand Down Expand Up @@ -236,48 +238,98 @@ func loadExecutedMessages(ctx context.Context, cs ChainStore, recomputeTipSetSta
return ems, nil
}

// checkTipsetIndexedStatus verifies if a specific tipset is indexed based on the EventFilter.
// It returns nil if the tipset is indexed, ErrNotFound if it's not indexed or not specified,
func (si *SqliteIndexer) checkTipsetIndexedStatus(ctx context.Context, f *EventFilter) error {
var tipsetKeyCid []byte
var err error

// Determine the tipset to check based on the filter
switch {
case f.TipsetCid != cid.Undef:
tipsetKeyCid = f.TipsetCid.Bytes()
case f.MinHeight >= 0 && f.MinHeight == f.MaxHeight:
tipsetKeyCid, err = si.getTipsetKeyCidByHeight(ctx, f.MinHeight)
if err != nil {
if err == ErrNotFound {
// this means that this is a null round and there exist no events for this epoch
return nil
}
// checkRangeIndexedStatus verifies if a range of heights is indexed.
// It checks for the existence of non-null rounds at the range boundaries.
func (si *SqliteIndexer) checkRangeIndexedStatus(ctx context.Context, f *EventFilter) error {
minHeight := f.MinHeight
maxHeight := f.MaxHeight

return xerrors.Errorf("failed to get tipset key cid by height: %w", err)
// Find the first non-null round in the range
startCid, err := si.findFirstNonNullRound(ctx, &minHeight, maxHeight)
if err != nil {
return xerrors.Errorf("failed to find first non-null round: %w", err)
}

// If all rounds are null, consider the range valid
if startCid == nil {
return nil
}

// Find the last non-null round in the range
endCid, err := si.findLastNonNullRound(ctx, &maxHeight, minHeight)
if err != nil {
if errors.Is(err, ErrRangeInFuture) {
return xerrors.Errorf("range end is in the future: %w", err)
}
default:
// This function distinguishes between two scenarios:
// 1. Missing events: The requested tipset is not present in the Index (an error condition).
// 2. Valid case: The tipset exists but contains no events (a normal situation).
// Currently, this distinction is only made for the common use case where a user requests events for a single tipset.
// TODO: Implement this functionality for a range of tipsets. This is expensive and not a common use case so it's deferred for now.
return xerrors.Errorf("failed to find last non-null round: %w", err)
}

// If all rounds are null, consider the range valid
if endCid == nil {
return nil
}

// If we couldn't determine a specific tipset, return ErrNotFound
if tipsetKeyCid == nil {
return ErrNotFound
// Check indexing for start and end tipsets
if err := si.checkTipsetByKeyCid(ctx, startCid, minHeight); err != nil {
return err
}

// Check if the determined tipset is indexed
if exists, err := si.isTipsetIndexed(ctx, tipsetKeyCid); err != nil {
return xerrors.Errorf("failed to check if tipset is indexed: %w", err)
} else if exists {
return nil // Tipset is indexed
if err := si.checkTipsetByKeyCid(ctx, endCid, maxHeight); err != nil {
return err
}

return nil
}

// checkTipsetByKeyCid checks if a tipset identified by its key CID is indexed.
func (si *SqliteIndexer) checkTipsetByKeyCid(ctx context.Context, tipsetKeyCid []byte, height abi.ChainEpoch) error {
exists, err := si.isTipsetIndexed(ctx, tipsetKeyCid)
if err != nil {
return xerrors.Errorf("failed to check if tipset at height %d is indexed: %w", height, err)
}

if exists {
return nil // null round
}

return ErrNotFound // tipset is not indexed
}

// findFirstNonNullRound finds the first non-null round starting from minHeight up to maxHeight
func (si *SqliteIndexer) findFirstNonNullRound(ctx context.Context, minHeight *abi.ChainEpoch, maxHeight abi.ChainEpoch) ([]byte, error) {
for height := *minHeight; height <= maxHeight; height++ {
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
if err == nil {
*minHeight = height // Update the minHeight to the found height
return cid, nil
}
if !errors.Is(err, ErrNotFound) {
return nil, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
}
}

return nil, nil
}

// findLastNonNullRound finds the last non-null round starting from maxHeight down to minHeight
func (si *SqliteIndexer) findLastNonNullRound(ctx context.Context, maxHeight *abi.ChainEpoch, minHeight abi.ChainEpoch) ([]byte, error) {
head := si.cs.GetHeaviestTipSet()
if head == nil || *maxHeight > head.Height() {
return nil, ErrRangeInFuture
}

for height := *maxHeight; height >= minHeight; height-- {
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
if err == nil {
*maxHeight = height // Update the maxHeight to the found height
return cid, nil
}
if !errors.Is(err, ErrNotFound) {
return nil, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
}
}

return ErrNotFound // Tipset is not indexed
return nil, nil
}

// getTipsetKeyCidByHeight retrieves the tipset key CID for a given height.
Expand Down Expand Up @@ -460,7 +512,7 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
// if the height is old enough, we'll assume the index is caught up to it and not bother
// waiting for it to be indexed
if height <= maxLookBackHeight {
return nil, si.checkTipsetIndexedStatus(ctx, f)
return nil, si.checkRangeIndexedStatus(ctx, f)
}
}

Expand All @@ -474,7 +526,7 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
}

if len(ces) == 0 {
return nil, si.checkTipsetIndexedStatus(ctx, f)
return nil, si.checkRangeIndexedStatus(ctx, f)
}
}

Expand Down

0 comments on commit 4c32069

Please sign in to comment.