From b8cad2487e2a2aa529f9abadebc024f244d45996 Mon Sep 17 00:00:00 2001 From: Hitenjain14 <57557631+Hitenjain14@users.noreply.github.com> Date: Mon, 9 Oct 2023 16:25:30 +0530 Subject: [PATCH] List optimization (#1240) * list optimization * list optimization * fix list hash * add to wg * fix list test * check consensus * fix listDir test * use once for mock calls --- zboxcore/sdk/allocation_test.go | 13 +++++-- zboxcore/sdk/listworker.go | 62 ++++++++++++++++++++++++++------- zboxcore/sdk/listworker_test.go | 13 ++++--- zboxcore/zboxutil/http.go | 5 ++- 4 files changed, 73 insertions(+), 20 deletions(-) diff --git a/zboxcore/sdk/allocation_test.go b/zboxcore/sdk/allocation_test.go index 5570a5f3a..ccf7ab5c3 100644 --- a/zboxcore/sdk/allocation_test.go +++ b/zboxcore/sdk/allocation_test.go @@ -73,6 +73,15 @@ func setupMockHttpResponse( testCaseName string, a *Allocation, httpMethod string, statusCode int, body []byte) { + for i := 0; i < numBlobbers; i++ { + mockClient.On("Do", mock.MatchedBy(func(req *http.Request) bool { + return req.Method == httpMethod && strings.Contains(req.URL.String(), "list=true") + })).Return(&http.Response{ + StatusCode: statusCode, + Body: io.NopCloser(bytes.NewReader(body)), + }, nil).Once() + } + for i := 0; i < numBlobbers; i++ { url := funcName + testCaseName + mockBlobberUrl + strconv.Itoa(i) mockClient.On("Do", mock.MatchedBy(func(req *http.Request) bool { @@ -80,7 +89,7 @@ func setupMockHttpResponse( strings.Contains(req.URL.String(), url) })).Return(&http.Response{ StatusCode: statusCode, - Body: ioutil.NopCloser(bytes.NewReader(body)), + Body: io.NopCloser(bytes.NewReader(body)), }, nil).Once() } } @@ -122,7 +131,7 @@ func setupMockWriteLockRequest(a *Allocation, mockClient *mocks.HttpClient) { Status: WMLockStatusOK, } respBuf, _ := json.Marshal(resp) - return ioutil.NopCloser(bytes.NewReader(respBuf)) + return io.NopCloser(bytes.NewReader(respBuf)) }(), }, nil) } diff --git a/zboxcore/sdk/listworker.go b/zboxcore/sdk/listworker.go index daeec0bcd..f4167044c 100644 --- a/zboxcore/sdk/listworker.go +++ b/zboxcore/sdk/listworker.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math/rand" "net/http" "strings" "sync" @@ -31,6 +32,7 @@ type ListRequest struct { ctx context.Context wg *sync.WaitGroup forRepair bool + listOnly bool Consensus } @@ -91,7 +93,10 @@ func (req *ListRequest) getListInfoFromBlobber(blobber *blockchain.StorageNode, } //formWriter.Close() - httpreq, err := zboxutil.NewListRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.remotefilepath, req.remotefilepathhash, string(authTokenBytes)) + if req.forRepair { + req.listOnly = true + } + httpreq, err := zboxutil.NewListRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.remotefilepath, req.remotefilepathhash, string(authTokenBytes), req.listOnly) if err != nil { l.Logger.Error("List info request error: ", err.Error()) return @@ -129,7 +134,7 @@ func (req *ListRequest) getListInfoFromBlobber(blobber *blockchain.StorageNode, }) } -func (req *ListRequest) getlistFromBlobbers() []*listResponse { +func (req *ListRequest) getlistFromBlobbers() ([]*listResponse, error) { numList := len(req.blobbers) req.wg = &sync.WaitGroup{} req.wg.Add(numList) @@ -142,24 +147,59 @@ func (req *ListRequest) getlistFromBlobbers() []*listResponse { for i := 0; i < numList; i++ { listInfos[i] = <-rspCh } - return listInfos + if req.listOnly { + return listInfos, nil + } + consensusMap := make(map[string][]*blockchain.StorageNode) + var consensusHash string + for i := 0; i < numList; i++ { + if listInfos[i].err != nil || listInfos[i].ref == nil { + continue + } + hash := listInfos[i].ref.FileMetaHash + consensusMap[hash] = append(consensusMap[hash], req.blobbers[listInfos[i].blobberIdx]) + if len(consensusMap[hash]) >= req.consensusThresh { + consensusHash = hash + } + } + var err error + req.listOnly = true + listLen := len(consensusMap[consensusHash]) + if listLen < req.consensusThresh { + return listInfos, listInfos[0].err + } + for i := 0; i < listLen; i++ { + var rnd = rand.New(rand.NewSource(time.Now().UnixNano())) + num := rnd.Intn(listLen) + randomBlobber := consensusMap[consensusHash][num] + req.wg.Add(1) + go req.getListInfoFromBlobber(randomBlobber, 0, rspCh) + req.wg.Wait() + listInfos[0] = <-rspCh + if listInfos[0].err == nil { + return listInfos, nil + } + err = listInfos[0].err + } + return listInfos, err } func (req *ListRequest) GetListFromBlobbers() (*ListResult, error) { - lR := req.getlistFromBlobbers() + lR, err := req.getlistFromBlobbers() + if err != nil { + return nil, err + } result := &ListResult{ deleteMask: zboxutil.NewUint128(1).Lsh(uint64(len(req.blobbers))).Sub64(1), } selected := make(map[string]*ListResult) childResultMap := make(map[string]*ListResult) - var err error - var errNum int - req.consensus = 0 + if !req.forRepair { + req.consensusThresh = 1 + } for i := 0; i < len(lR); i++ { ti := lR[i] if ti.err != nil { - err = ti.err - errNum++ result.deleteMask = result.deleteMask.And(zboxutil.NewUint128(1).Lsh(uint64(ti.blobberIdx)).Not()) continue } @@ -197,10 +237,6 @@ func (req *ListRequest) GetListFromBlobbers() (*ListResult, error) { } } - if errNum >= req.consensusThresh && !req.forRepair { - return nil, err - } - return result, nil } diff --git a/zboxcore/sdk/listworker_test.go b/zboxcore/sdk/listworker_test.go index 4efbacefd..85890520c 100644 --- a/zboxcore/sdk/listworker_test.go +++ b/zboxcore/sdk/listworker_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "io" "io/ioutil" "net/http" @@ -233,9 +234,11 @@ func TestListRequest_GetListFromBlobbers(t *testing.T) { jsonFR, err := json.Marshal(&fileref.ListResult{ AllocationRoot: mockAllocationRoot, Meta: map[string]interface{}{ - "type": mockType, + "type": mockType, + "file_meta_hash": "mock file meta hash", }, }) + fmt.Println("returned", string(jsonFR)) require.NoError(t, err) return ioutil.NopCloser(bytes.NewReader([]byte(jsonFR))) }(), @@ -291,6 +294,7 @@ func TestListRequest_GetListFromBlobbers(t *testing.T) { fullconsensus: tt.fullconsensus, RWMutex: &sync.RWMutex{}, }, + listOnly: true, } for i := 0; i < tt.numBlobbers; i++ { req.blobbers = append(req.blobbers, &blockchain.StorageNode{ @@ -299,9 +303,10 @@ func TestListRequest_GetListFromBlobbers(t *testing.T) { } got, _ := req.GetListFromBlobbers() expectedResult := &ListResult{ - Type: mockType, - Size: 0, - deleteMask: zboxutil.NewUint128(1).Lsh(uint64(len(req.blobbers))).Sub64(1), + Type: mockType, + Size: 0, + deleteMask: zboxutil.NewUint128(1).Lsh(uint64(len(req.blobbers))).Sub64(1), + FileMetaHash: "mock file meta hash", } if !tt.wantErr { require.EqualValues(expectedResult, got) diff --git a/zboxcore/zboxutil/http.go b/zboxcore/zboxutil/http.go index bdd86fd49..92a786716 100644 --- a/zboxcore/zboxutil/http.go +++ b/zboxcore/zboxutil/http.go @@ -448,7 +448,7 @@ func NewFileStatsRequest(baseUrl string, allocationID string, allocationTx strin return req, nil } -func NewListRequest(baseUrl, allocationID string, allocationTx string, path, pathHash string, auth_token string) (*http.Request, error) { +func NewListRequest(baseUrl, allocationID string, allocationTx string, path, pathHash string, auth_token string, list bool) (*http.Request, error) { nurl, err := joinUrl(baseUrl, LIST_ENDPOINT, allocationTx) if err != nil { return nil, err @@ -457,6 +457,9 @@ func NewListRequest(baseUrl, allocationID string, allocationTx string, path, pat params.Add("path", path) params.Add("path_hash", pathHash) params.Add("auth_token", auth_token) + if list { + params.Add("list", "true") + } nurl.RawQuery = params.Encode() // Escape Query Parameters req, err := http.NewRequest(http.MethodGet, nurl.String(), nil) if err != nil {