From d0dd96b1f0e80aac47fcf2575c6ab80ab00c795c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 27 Aug 2024 21:21:09 +0200 Subject: [PATCH] lmrpc: Don't use the janky done channel to know when data transfer is done --- market/lmrpc/lmrpc.go | 23 ++++++----------------- market/lmrpc/piecerefs.go | 28 +++++++++++++++++----------- 2 files changed, 23 insertions(+), 28 deletions(-) diff --git a/market/lmrpc/lmrpc.go b/market/lmrpc/lmrpc.go index 40e3b739d..1b534704f 100644 --- a/market/lmrpc/lmrpc.go +++ b/market/lmrpc/lmrpc.go @@ -277,7 +277,6 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.Chain, maddr address.Address ), int64(pi.size)) n, err := io.Copy(w, pieceData) - close(pi.done) took := time.Since(start) mbps := float64(n) / (1024 * 1024) / took.Seconds() @@ -314,8 +313,6 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.Chain, maddr address.Address type pieceInfo struct { data storiface.Data size abi.UnpaddedPieceSize - - done chan struct{} } type PieceIngester interface { @@ -343,8 +340,6 @@ func sectorAddPieceToAnyOperation(maddr address.Address, rootUrl url.URL, conf * pi := pieceInfo{ data: pieceData, size: pieceSize, - - done: make(chan struct{}), } pieceUUID := uuid.New() @@ -363,24 +358,14 @@ func sectorAddPieceToAnyOperation(maddr address.Address, rootUrl url.URL, conf * dataUrl.RawQuery = "piece_id=" + pieceUUID.String() // add piece entry - refID, pieceWasCreated, cleanup, err := prt.addPieceEntry(ctx, db, conf, deal, pieceSize, dataUrl, ssize) + refID, cleanup, err := prt.addPieceEntry(ctx, db, conf, deal, pieceSize, dataUrl, ssize) if err != nil { return lapi.SectorOffset{}, err } defer cleanup() - // wait for piece to be parked - if pieceWasCreated { - <-pi.done - } else { - // If the piece was not created, we need to close the done channel - close(pi.done) - - closeDataReader(pieceData) // todo move down, after the piece is parked, and remove pi.done? - } - { - // piece park is either done or currently happening from another AP call + // piece park is either done or currently happening // now we need to make sure that the piece is definitely parked successfully // - in case of errors we return, and boost should be able to retry the call @@ -438,6 +423,10 @@ func sectorAddPieceToAnyOperation(maddr address.Address, rootUrl url.URL, conf * } } + // piece is parked, ensure the data reader is closed + closeDataReader(pieceData) + + // prepare pieceref url. TreeD / UpdateEncode etc. are aware of the "pieceref" scheme, and will use piecepark to get the data pieceIDUrl := url.URL{ Scheme: "pieceref", Opaque: fmt.Sprintf("%d", refID), diff --git a/market/lmrpc/piecerefs.go b/market/lmrpc/piecerefs.go index 5766a81df..9f489993c 100644 --- a/market/lmrpc/piecerefs.go +++ b/market/lmrpc/piecerefs.go @@ -3,15 +3,19 @@ package lmrpc import ( "context" "errors" - "github.com/filecoin-project/curio/deps/config" - "github.com/filecoin-project/curio/harmony/harmonydb" - "github.com/filecoin-project/go-state-types/abi" - lpiece "github.com/filecoin-project/lotus/storage/pipeline/piece" - "github.com/yugabyte/pgx/v5" - "golang.org/x/xerrors" "net/url" "strconv" "time" + + "github.com/yugabyte/pgx/v5" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/curio/deps/config" + "github.com/filecoin-project/curio/harmony/harmonydb" + + lpiece "github.com/filecoin-project/lotus/storage/pipeline/piece" ) type refTracker struct { @@ -127,7 +131,7 @@ func (rt *refTracker) init() error { return nil } -func (rt *refTracker) addPieceEntry(ctx context.Context, db *harmonydb.DB, conf *config.CurioConfig, deal lpiece.PieceDealInfo, pieceSize abi.UnpaddedPieceSize, dataUrl url.URL, ssize abi.SectorSize) (int64, bool, func(), error) { +func (rt *refTracker) addPieceEntry(ctx context.Context, db *harmonydb.DB, conf *config.CurioConfig, deal lpiece.PieceDealInfo, pieceSize abi.UnpaddedPieceSize, dataUrl url.URL, ssize abi.SectorSize) (int64, func(), error) { var refID int64 var pieceWasCreated bool @@ -179,7 +183,7 @@ func (rt *refTracker) addPieceEntry(ctx context.Context, db *harmonydb.DB, conf return true, nil // This will commit the transaction }, harmonydb.OptionRetry()) if err != nil { - return refID, false, nil, xerrors.Errorf("inserting parked piece: %w", err) + return refID, nil, xerrors.Errorf("inserting parked piece: %w", err) } if !comm { if backpressureWait { @@ -187,18 +191,20 @@ func (rt *refTracker) addPieceEntry(ctx context.Context, db *harmonydb.DB, conf select { case <-time.After(backpressureWaitTime): case <-ctx.Done(): - return refID, false, nil, xerrors.Errorf("context done while waiting for backpressure: %w", ctx.Err()) + return refID, nil, xerrors.Errorf("context done while waiting for backpressure: %w", ctx.Err()) } continue } - return refID, false, nil, xerrors.Errorf("piece tx didn't commit") + return refID, nil, xerrors.Errorf("piece tx didn't commit") } break } - return refID, pieceWasCreated, func() { + log.Infow("added piece entry", "ref_id", refID, "piece_cid", deal.PieceCID().String(), "data_url", dataUrl.String(), "piece_was_created", pieceWasCreated) + + return refID, func() { _, err := db.BeginTransaction(context.Background(), func(tx *harmonydb.Tx) (commit bool, err error) { refUrl := "pieceref:" + strconv.FormatInt(refID, 10)