diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index 9793a2ffd..cd98f6f74 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -3,6 +3,7 @@ package tasks import ( "context" + "github.com/filecoin-project/curio/tasks/pdp" "sort" "strings" "sync" @@ -242,6 +243,11 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task } } + if cfg.Subsystems.EnablePDP { + pdpNotifTask := pdp.NewPDPNotifyTask(db) + activeTasks = append(activeTasks, pdpNotifTask) + } + indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg) ipniTask := indexing.NewIPNITask(db, sc, iStore, pp, cfg) activeTasks = append(activeTasks, indexingTask, ipniTask) @@ -361,7 +367,7 @@ func addSealingTasks( moveStorageTask := seal.NewMoveStorageTask(sp, slr, db, cfg.Subsystems.MoveStorageMaxTasks) moveStorageSnapTask := snap.NewMoveStorageTask(slr, db, cfg.Subsystems.MoveStorageMaxTasks) - storePieceTask, err := piece2.NewStorePieceTask(db, must.One(slrLazy.Val()), cfg.Subsystems.MoveStorageMaxTasks) + storePieceTask, err := piece2.NewStorePieceTask(db, must.One(slrLazy.Val()), stor, cfg.Subsystems.MoveStorageMaxTasks) if err != nil { return nil, err } diff --git a/deps/config/types.go b/deps/config/types.go index 6ead76367..6df781ac5 100644 --- a/deps/config/types.go +++ b/deps/config/types.go @@ -304,6 +304,11 @@ type CurioSubsystemsConfig struct { // EnableDealMarket enabled the deal market on the node. This would also enable libp2p on the node, if configured. EnableDealMarket bool + // Enable handling for PDP (proof-of-data possession) deals / proving on this node. + // PDP deals allow the node to directly store and prove unsealed data with "PDP Services" like Storacha. + // This feature is BETA and should only be enabled on nodes which are part of a PDP network. + EnablePDP bool + // EnableCommP enables the commP task on te node. CommP is calculated before sending PublishDealMessage for a Mk12 deal // Must have EnableDealMarket = True EnableCommP bool diff --git a/lib/dealdata/dealdata.go b/lib/dealdata/dealdata.go index 0f73c49d8..6b3e8ecc1 100644 --- a/lib/dealdata/dealdata.go +++ b/lib/dealdata/dealdata.go @@ -218,7 +218,7 @@ func getDealMetadata(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, s reader, _ := padreader.New(pr, uint64(*p.DataRawSize)) pieceReaders = append(pieceReaders, reader) } else { - reader, _ := padreader.New(NewUrlReader(dataUrl, hdrs, *p.DataRawSize), uint64(*p.DataRawSize)) + reader, _ := padreader.New(NewUrlReader(nil, dataUrl, hdrs, *p.DataRawSize), uint64(*p.DataRawSize)) pieceReaders = append(pieceReaders, reader) } diff --git a/lib/dealdata/urlpiecereader.go b/lib/dealdata/urlpiecereader.go index 4a07f0f7f..0e2dc2fa9 100644 --- a/lib/dealdata/urlpiecereader.go +++ b/lib/dealdata/urlpiecereader.go @@ -19,18 +19,20 @@ type UrlPieceReader struct { Headers http.Header RawSize int64 // the exact number of bytes read, if we read more or less that's an error - RemoteEndpointReader paths.Remote // Only used for .ReadRemote which issues http requests for internal /remote endpoints + RemoteEndpointReader *paths.Remote // Only used for .ReadRemote which issues http requests for internal /remote endpoints readSoFar int64 closed bool active io.ReadCloser // auto-closed on EOF } -func NewUrlReader(p string, h http.Header, rs int64) *UrlPieceReader { +func NewUrlReader(rmt *paths.Remote, p string, h http.Header, rs int64) *UrlPieceReader { return &UrlPieceReader{ Url: p, RawSize: rs, Headers: h, + + RemoteEndpointReader: rmt, } } @@ -51,6 +53,10 @@ func (u *UrlPieceReader) initiateRequest() error { } if goUrl.Scheme == CustoreScheme { + if u.RemoteEndpointReader == nil { + return xerrors.New("RemoteEndpoint is nil") + } + goUrl.Scheme = "http" u.active, err = u.RemoteEndpointReader.ReadRemote(context.Background(), goUrl.String(), 0, 0) if err != nil { @@ -138,6 +144,9 @@ func (u *UrlPieceReader) Read(p []byte) (n int, err error) { func (u *UrlPieceReader) Close() error { if !u.closed { u.closed = true + if u.active == nil { + return nil + } return u.active.Close() } diff --git a/lib/paths/local_stash.go b/lib/paths/local_stash.go index a2cf747d1..05164b1ef 100644 --- a/lib/paths/local_stash.go +++ b/lib/paths/local_stash.go @@ -129,12 +129,12 @@ func (st *Local) ServeAndRemove(ctx context.Context, id uuid.UUID) (io.ReadClose continue } - st.localLk.RUnlock() - stashDir := filepath.Join(p.local, StashDirName) stashFilePath := filepath.Join(stashDir, fileName) f, err := os.Open(stashFilePath) if err == nil { + st.localLk.RUnlock() + // Wrap the file in a custom ReadCloser return &stashFileReadCloser{ File: f, diff --git a/tasks/pdp/notify_task.go b/tasks/pdp/notify_task.go index 5ce7f91d1..af89d1ade 100644 --- a/tasks/pdp/notify_task.go +++ b/tasks/pdp/notify_task.go @@ -20,6 +20,10 @@ type PDPNotifyTask struct { db *harmonydb.DB } +func NewPDPNotifyTask(db *harmonydb.DB) *PDPNotifyTask { + return &PDPNotifyTask{db: db} +} + func (t *PDPNotifyTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { ctx := context.Background() diff --git a/tasks/piece/task_park_piece.go b/tasks/piece/task_park_piece.go index fd5322f8f..4958cfcd6 100644 --- a/tasks/piece/task_park_piece.go +++ b/tasks/piece/task_park_piece.go @@ -27,8 +27,9 @@ var PieceParkPollInterval = time.Second * 15 // ParkPieceTask gets a piece from some origin, and parks it in storage // Pieces are always f00, piece ID is mapped to pieceCID in the DB type ParkPieceTask struct { - db *harmonydb.DB - sc *ffi2.SealCalls + db *harmonydb.DB + sc *ffi2.SealCalls + remote *paths.Remote TF promise.Promise[harmonytask.AddTaskFunc] @@ -38,17 +39,18 @@ type ParkPieceTask struct { } func NewParkPieceTask(db *harmonydb.DB, sc *ffi2.SealCalls, max int) (*ParkPieceTask, error) { - return newPieceTask(db, sc, max, false) + return newPieceTask(db, sc, nil, max, false) } -func NewStorePieceTask(db *harmonydb.DB, sc *ffi2.SealCalls, max int) (*ParkPieceTask, error) { - return newPieceTask(db, sc, max, true) +func NewStorePieceTask(db *harmonydb.DB, sc *ffi2.SealCalls, remote *paths.Remote, max int) (*ParkPieceTask, error) { + return newPieceTask(db, sc, remote, max, true) } -func newPieceTask(db *harmonydb.DB, sc *ffi2.SealCalls, max int, longTerm bool) (*ParkPieceTask, error) { +func newPieceTask(db *harmonydb.DB, sc *ffi2.SealCalls, remote *paths.Remote, max int, longTerm bool) (*ParkPieceTask, error) { pt := &ParkPieceTask{ db: db, sc: sc, + remote: remote, max: max, longTerm: longTerm, } @@ -165,7 +167,7 @@ func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (d if err != nil { return false, xerrors.Errorf("unmarshaling reference data headers: %w", err) } - upr := dealdata.NewUrlReader(refData[i].DataURL, hdrs, pieceData.PieceRawSize) + upr := dealdata.NewUrlReader(p.remote, refData[i].DataURL, hdrs, pieceData.PieceRawSize) defer func() { _ = upr.Close()