Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): fix mutex use and CloseSend before close #11432

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,9 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
for {
select {
case <-rr.ctx.Done():
rr.mu.Lock()
rr.done = true
rr.mu.Unlock()
return
case <-rr.receiverRetry:
return
Expand Down Expand Up @@ -1373,8 +1375,6 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
}

func getActiveRange(r *gRPCBidiReader) []rangeSpec {
r.mu.Lock()
defer r.mu.Unlock()
var activeRange []rangeSpec
Comment on lines -1376 to 1378
Copy link
Contributor

@BrennaEpp BrennaEpp Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the guarantee here? That if getActiveRange is running, no other goroutine will be running as well that may access the map? I don't think we can guarantee that since Wait() might be running, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can trigger this situation running go test -v -short -run="TestRetryConformance/grpc-8-\[return-broken-stream\]-storage.objects.download-3" -race

for k, v := range r.mp {
activeRange = append(activeRange, rangeSpec{
Expand Down Expand Up @@ -1468,6 +1468,10 @@ func (mr *gRPCBidiReader) wait() {

// Close will notify stream manager goroutine that the reader has been closed, if it's still running.
func (mr *gRPCBidiReader) close() error {
// Before release of resource we close the client->server connection.
if err := mr.stream.CloseSend(); err != nil {
return err
}
Comment on lines +1472 to +1474
Copy link
Contributor

@BrennaEpp BrennaEpp Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also drain the stream after the CloseSend (as we do in drainInboundStream(), receiving from stream until we get a non-nil error) to make sure its resources are released? See grpc/grpc-go@365770f

if mr.cancel != nil {
mr.cancel()
}
Expand Down
Loading