Skip to content

Commit

Permalink
List optimization (#1240)
Browse files Browse the repository at this point in the history
* list optimization

* list optimization

* fix list hash

* add to wg

* fix list test

* check consensus

* fix listDir test

* use once for mock calls
  • Loading branch information
Hitenjain14 authored Oct 9, 2023
1 parent 5990f59 commit b8cad24
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 20 deletions.
13 changes: 11 additions & 2 deletions zboxcore/sdk/allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,23 @@ func setupMockHttpResponse(
testCaseName string, a *Allocation, httpMethod string,
statusCode int, body []byte) {

for i := 0; i < numBlobbers; i++ {
mockClient.On("Do", mock.MatchedBy(func(req *http.Request) bool {
return req.Method == httpMethod && strings.Contains(req.URL.String(), "list=true")
})).Return(&http.Response{
StatusCode: statusCode,
Body: io.NopCloser(bytes.NewReader(body)),
}, nil).Once()
}

for i := 0; i < numBlobbers; i++ {
url := funcName + testCaseName + mockBlobberUrl + strconv.Itoa(i)
mockClient.On("Do", mock.MatchedBy(func(req *http.Request) bool {
return req.Method == httpMethod &&
strings.Contains(req.URL.String(), url)
})).Return(&http.Response{
StatusCode: statusCode,
Body: ioutil.NopCloser(bytes.NewReader(body)),
Body: io.NopCloser(bytes.NewReader(body)),
}, nil).Once()
}
}
Expand Down Expand Up @@ -122,7 +131,7 @@ func setupMockWriteLockRequest(a *Allocation, mockClient *mocks.HttpClient) {
Status: WMLockStatusOK,
}
respBuf, _ := json.Marshal(resp)
return ioutil.NopCloser(bytes.NewReader(respBuf))
return io.NopCloser(bytes.NewReader(respBuf))
}(),
}, nil)
}
Expand Down
62 changes: 49 additions & 13 deletions zboxcore/sdk/listworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"strings"
"sync"
Expand All @@ -31,6 +32,7 @@ type ListRequest struct {
ctx context.Context
wg *sync.WaitGroup
forRepair bool
listOnly bool
Consensus
}

Expand Down Expand Up @@ -91,7 +93,10 @@ func (req *ListRequest) getListInfoFromBlobber(blobber *blockchain.StorageNode,
}

//formWriter.Close()
httpreq, err := zboxutil.NewListRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.remotefilepath, req.remotefilepathhash, string(authTokenBytes))
if req.forRepair {
req.listOnly = true
}
httpreq, err := zboxutil.NewListRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.remotefilepath, req.remotefilepathhash, string(authTokenBytes), req.listOnly)
if err != nil {
l.Logger.Error("List info request error: ", err.Error())
return
Expand Down Expand Up @@ -129,7 +134,7 @@ func (req *ListRequest) getListInfoFromBlobber(blobber *blockchain.StorageNode,
})
}

func (req *ListRequest) getlistFromBlobbers() []*listResponse {
func (req *ListRequest) getlistFromBlobbers() ([]*listResponse, error) {
numList := len(req.blobbers)
req.wg = &sync.WaitGroup{}
req.wg.Add(numList)
Expand All @@ -142,24 +147,59 @@ func (req *ListRequest) getlistFromBlobbers() []*listResponse {
for i := 0; i < numList; i++ {
listInfos[i] = <-rspCh
}
return listInfos
if req.listOnly {
return listInfos, nil
}
consensusMap := make(map[string][]*blockchain.StorageNode)
var consensusHash string
for i := 0; i < numList; i++ {
if listInfos[i].err != nil || listInfos[i].ref == nil {
continue
}
hash := listInfos[i].ref.FileMetaHash
consensusMap[hash] = append(consensusMap[hash], req.blobbers[listInfos[i].blobberIdx])
if len(consensusMap[hash]) >= req.consensusThresh {
consensusHash = hash
}
}
var err error
req.listOnly = true
listLen := len(consensusMap[consensusHash])
if listLen < req.consensusThresh {
return listInfos, listInfos[0].err
}
for i := 0; i < listLen; i++ {
var rnd = rand.New(rand.NewSource(time.Now().UnixNano()))
num := rnd.Intn(listLen)
randomBlobber := consensusMap[consensusHash][num]
req.wg.Add(1)
go req.getListInfoFromBlobber(randomBlobber, 0, rspCh)
req.wg.Wait()
listInfos[0] = <-rspCh
if listInfos[0].err == nil {
return listInfos, nil
}
err = listInfos[0].err
}
return listInfos, err
}

func (req *ListRequest) GetListFromBlobbers() (*ListResult, error) {
lR := req.getlistFromBlobbers()
lR, err := req.getlistFromBlobbers()
if err != nil {
return nil, err
}
result := &ListResult{
deleteMask: zboxutil.NewUint128(1).Lsh(uint64(len(req.blobbers))).Sub64(1),
}
selected := make(map[string]*ListResult)
childResultMap := make(map[string]*ListResult)
var err error
var errNum int
req.consensus = 0
if !req.forRepair {
req.consensusThresh = 1
}
for i := 0; i < len(lR); i++ {
ti := lR[i]
if ti.err != nil {
err = ti.err
errNum++
result.deleteMask = result.deleteMask.And(zboxutil.NewUint128(1).Lsh(uint64(ti.blobberIdx)).Not())
continue
}
Expand Down Expand Up @@ -197,10 +237,6 @@ func (req *ListRequest) GetListFromBlobbers() (*ListResult, error) {
}
}

if errNum >= req.consensusThresh && !req.forRepair {
return nil, err
}

return result, nil
}

Expand Down
13 changes: 9 additions & 4 deletions zboxcore/sdk/listworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -233,9 +234,11 @@ func TestListRequest_GetListFromBlobbers(t *testing.T) {
jsonFR, err := json.Marshal(&fileref.ListResult{
AllocationRoot: mockAllocationRoot,
Meta: map[string]interface{}{
"type": mockType,
"type": mockType,
"file_meta_hash": "mock file meta hash",
},
})
fmt.Println("returned", string(jsonFR))
require.NoError(t, err)
return ioutil.NopCloser(bytes.NewReader([]byte(jsonFR)))
}(),
Expand Down Expand Up @@ -291,6 +294,7 @@ func TestListRequest_GetListFromBlobbers(t *testing.T) {
fullconsensus: tt.fullconsensus,
RWMutex: &sync.RWMutex{},
},
listOnly: true,
}
for i := 0; i < tt.numBlobbers; i++ {
req.blobbers = append(req.blobbers, &blockchain.StorageNode{
Expand All @@ -299,9 +303,10 @@ func TestListRequest_GetListFromBlobbers(t *testing.T) {
}
got, _ := req.GetListFromBlobbers()
expectedResult := &ListResult{
Type: mockType,
Size: 0,
deleteMask: zboxutil.NewUint128(1).Lsh(uint64(len(req.blobbers))).Sub64(1),
Type: mockType,
Size: 0,
deleteMask: zboxutil.NewUint128(1).Lsh(uint64(len(req.blobbers))).Sub64(1),
FileMetaHash: "mock file meta hash",
}
if !tt.wantErr {
require.EqualValues(expectedResult, got)
Expand Down
5 changes: 4 additions & 1 deletion zboxcore/zboxutil/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func NewFileStatsRequest(baseUrl string, allocationID string, allocationTx strin
return req, nil
}

func NewListRequest(baseUrl, allocationID string, allocationTx string, path, pathHash string, auth_token string) (*http.Request, error) {
func NewListRequest(baseUrl, allocationID string, allocationTx string, path, pathHash string, auth_token string, list bool) (*http.Request, error) {
nurl, err := joinUrl(baseUrl, LIST_ENDPOINT, allocationTx)
if err != nil {
return nil, err
Expand All @@ -457,6 +457,9 @@ func NewListRequest(baseUrl, allocationID string, allocationTx string, path, pat
params.Add("path", path)
params.Add("path_hash", pathHash)
params.Add("auth_token", auth_token)
if list {
params.Add("list", "true")
}
nurl.RawQuery = params.Encode() // Escape Query Parameters
req, err := http.NewRequest(http.MethodGet, nurl.String(), nil)
if err != nil {
Expand Down

0 comments on commit b8cad24

Please sign in to comment.