From 3dc0cc3089a43e5ee0832298f98fe31b3aa0bdd4 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Fri, 30 Jun 2023 14:02:11 +0800 Subject: [PATCH] *: put gRPC unknown error into the header. (#5310) (#6724) close tikv/pd#5161, ref tikv/pd#5309, ref tikv/pd#5310, ref pingcap/tiflash#7408 put the unknown error into the header instead of directly returning a gRPC error. Signed-off-by: husharp Co-authored-by: Hu# Co-authored-by: Ti Chi Robot --- client/base_client.go | 4 + client/client.go | 69 +++++------ server/api/label_test.go | 9 +- server/api/member.go | 3 + server/grpc_service.go | 122 +++++++++++++++----- server/tso/allocator_manager.go | 4 + server/tso/global_allocator.go | 6 + tests/client/client_test.go | 3 +- tests/cluster.go | 14 ++- tests/compatibility/version_upgrade_test.go | 5 +- tests/server/api/api_test.go | 6 +- tests/server/cluster/cluster_test.go | 64 ++++++---- tests/server/id/id_test.go | 1 + tools/pd-heartbeat-bench/main.go | 13 ++- tools/pd-simulator/simulator/client.go | 11 +- 15 files changed, 227 insertions(+), 107 deletions(-) mode change 100644 => 100755 server/grpc_service.go mode change 100644 => 100755 server/tso/global_allocator.go diff --git a/client/base_client.go b/client/base_client.go index a6b4064af6c..6700daddfb2 100644 --- a/client/base_client.go +++ b/client/base_client.go @@ -309,6 +309,10 @@ func (c *baseClient) getMembers(ctx context.Context, url string, timeout time.Du attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String()) return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause() } + if members.GetHeader().GetError() != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", members.GetHeader().GetError().String(), cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause() + } return members, nil } diff --git a/client/client.go b/client/client.go index 55c289ef313..a14c6bdc293 100644 --- a/client/client.go +++ b/client/client.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" "go.uber.org/zap" @@ -583,10 +584,8 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) resp, err := c.getClient().GetMembers(ctx, req) cancel() - if err != nil { - cmdFailDurationGetAllMembers.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailDurationGetAllMembers, start, err, resp.GetHeader()); err != nil { + return nil, err } return resp.GetMembers(), nil } @@ -1368,10 +1367,8 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt resp, err := c.getClient().GetRegion(ctx, req) cancel() - if err != nil { - cmdFailDurationGetRegion.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil { + return nil, err } return handleRegionResponse(resp), nil } @@ -1400,7 +1397,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs Header: c.requestHeader(), RegionKey: key, }) - if err != nil { + if err != nil || resp.GetHeader().GetError() != nil { log.Error("[pd] can't get region info", zap.String("member-URL", url), errs.ZapError(err)) continue } @@ -1440,10 +1437,8 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio resp, err := c.getClient().GetPrevRegion(ctx, req) cancel() - if err != nil { - cmdFailDurationGetPrevRegion.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil { + return nil, err } return handleRegionResponse(resp), nil } @@ -1470,10 +1465,8 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get resp, err := c.getClient().GetRegionByID(ctx, req) cancel() - if err != nil { - cmdFailedDurationGetRegionByID.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil { + return nil, err } return handleRegionResponse(resp), nil } @@ -1501,10 +1494,8 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) scanCtx = grpcutil.BuildForwardContext(scanCtx, c.GetLeaderAddr()) resp, err := c.getClient().ScanRegions(scanCtx, req) - if err != nil { - cmdFailedDurationScanRegions.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil { + return nil, err } return handleRegionsResponse(resp), nil @@ -1555,10 +1546,8 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e resp, err := c.getClient().GetStore(ctx, req) cancel() - if err != nil { - cmdFailedDurationGetStore.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil { + return nil, err } return handleStoreResponse(resp) } @@ -1597,10 +1586,8 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m resp, err := c.getClient().GetAllStores(ctx, req) cancel() - if err != nil { - cmdFailedDurationGetAllStores.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil { + return nil, err } return resp.GetStores(), nil } @@ -1622,10 +1609,8 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 resp, err := c.getClient().UpdateGCSafePoint(ctx, req) cancel() - if err != nil { - cmdFailedDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return 0, errors.WithStack(err) + if err = c.respForErr(cmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil { + return 0, err } return resp.GetNewSafePoint(), nil } @@ -1654,10 +1639,8 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, resp, err := c.getClient().UpdateServiceGCSafePoint(ctx, req) cancel() - if err != nil { - cmdFailedDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return 0, errors.WithStack(err) + if err = c.respForErr(cmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil { + return 0, err } return resp.GetMinSafePoint(), nil } @@ -1896,3 +1879,15 @@ func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem }() return globalConfigWatcherCh, err } + +func (c *client) respForErr(observer prometheus.Observer, start time.Time, err error, header *pdpb.ResponseHeader) error { + if err != nil || header.GetError() != nil { + observer.Observe(time.Since(start).Seconds()) + if err != nil { + c.ScheduleCheckLeader() + return errors.WithStack(err) + } + return errors.WithStack(errors.New(header.GetError().String())) + } + return nil +} diff --git a/server/api/label_test.go b/server/api/label_test.go index 12933d17e64..9acddae8436 100644 --- a/server/api/label_test.go +++ b/server/api/label_test.go @@ -17,7 +17,6 @@ package api import ( "context" "fmt" - "strings" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -268,7 +267,7 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) { } for _, t := range cases { - _, err := s.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{ + resp, err := s.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{ Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()}, Store: &metapb.Store{ Id: t.store.Id, @@ -281,14 +280,14 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) { if t.valid { c.Assert(err, IsNil) } else { - c.Assert(strings.Contains(err.Error(), t.expectError), IsTrue) + c.Assert(resp.GetHeader().GetError(), NotNil) } } // enable placement rules. Report no error any more. c.Assert(tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/config", s.urlPrefix), []byte(`{"enable-placement-rules":"true"}`), tu.StatusOK(c)), IsNil) for _, t := range cases { - _, err := s.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{ + resp, err := s.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{ Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()}, Store: &metapb.Store{ Id: t.store.Id, @@ -301,7 +300,7 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) { if t.valid { c.Assert(err, IsNil) } else { - c.Assert(strings.Contains(err.Error(), t.expectError), IsTrue) + c.Assert(resp.GetHeader().GetError(), NotNil) } } } diff --git a/server/api/member.go b/server/api/member.go index eaf743c0493..e459983f3b9 100644 --- a/server/api/member.go +++ b/server/api/member.go @@ -67,6 +67,9 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) { if err != nil { return nil, errors.WithStack(err) } + if members.GetHeader().GetError() != nil { + return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String())) + } dclocationDistribution, err := svr.GetTSOAllocatorManager().GetClusterDCLocationsFromEtcd() if err != nil { return nil, errors.WithStack(err) diff --git a/server/grpc_service.go b/server/grpc_service.go old mode 100644 new mode 100755 index 3295cbc0707..0047c43beea --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -92,13 +92,22 @@ func (s *GrpcServer) unaryMiddleware(ctx context.Context, header *pdpb.RequestHe return nil, nil } +func (s *GrpcServer) wrapErrorToHeader(errorType pdpb.ErrorType, message string) *pdpb.ResponseHeader { + return s.errorHeader(&pdpb.Error{ + Type: errorType, + Message: message, + }) +} + // GetMembers implements gRPC PDServer. func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) { // Here we purposely do not check the cluster ID because the client does not know the correct cluster ID // at startup and needs to get the cluster ID with the first request (i.e. GetMembers). members, err := s.Server.GetMembers() if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.GetMembersResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } var etcdLeader, pdLeader *pdpb.Member @@ -113,7 +122,9 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb tsoAllocatorManager := s.GetTSOAllocatorManager() tsoAllocatorLeaders, err := tsoAllocatorManager.GetLocalAllocatorLeaders() if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.GetMembersResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } leader := s.member.GetLeader() @@ -399,7 +410,9 @@ func (s *GrpcServer) Bootstrap(ctx context.Context, request *pdpb.BootstrapReque res, err := s.bootstrapCluster(request) if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.BootstrapResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } res.Header = s.header() @@ -438,7 +451,9 @@ func (s *GrpcServer) AllocID(ctx context.Context, request *pdpb.AllocIDRequest) // We can use an allocator for all types ID allocation. id, err := s.idAllocator.Alloc() if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.AllocIDResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } return &pdpb.AllocIDResponse{ @@ -466,7 +481,10 @@ func (s *GrpcServer) GetStore(ctx context.Context, request *pdpb.GetStoreRequest storeID := request.GetStoreId() store := rc.GetStore(storeID) if store == nil { - return nil, status.Errorf(codes.Unknown, "invalid store ID %d, not found", storeID) + return &pdpb.GetStoreResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + fmt.Sprintf("invalid store ID %d, not found", storeID)), + }, nil } return &pdpb.GetStoreResponse{ Header: s.header(), @@ -515,11 +533,16 @@ func (s *GrpcServer) PutStore(ctx context.Context, request *pdpb.PutStoreRequest // NOTE: can be removed when placement rules feature is enabled by default. if !s.GetConfig().Replication.EnablePlacementRules && core.IsStoreContainLabel(store, core.EngineKey, core.EngineTiFlash) { - return nil, status.Errorf(codes.FailedPrecondition, "placement rules is disabled") + return &pdpb.PutStoreResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + "placement rules is disabled"), + }, nil } if err := rc.PutStore(store); err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.PutStoreResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } log.Info("put store ok", zap.Stringer("store", store)) @@ -592,7 +615,10 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear storeID := request.GetStats().GetStoreId() store := rc.GetStore(storeID) if store == nil { - return nil, errors.Errorf("store %v not found", storeID) + return &pdpb.StoreHeartbeatResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + fmt.Sprintf("store %v not found", storeID)), + }, nil } // Bypass stats handling if the store report for unsafe recover is not empty. @@ -603,7 +629,10 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear err := rc.HandleStoreHeartbeat(request.GetStats()) if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.StoreHeartbeatResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + err.Error()), + }, nil } s.handleDamagedStore(request.GetStats()) @@ -1074,7 +1103,10 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest return &pdpb.AskSplitResponse{Header: s.notBootstrappedHeader()}, nil } if request.GetRegion() == nil { - return nil, errors.New("missing region for split") + return &pdpb.AskSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_REGION_NOT_FOUND, + "missing region for split"), + }, nil } req := &pdpb.AskSplitRequest{ Region: request.Region, @@ -1082,10 +1114,7 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest split, err := rc.HandleAskSplit(req) if err != nil { return &pdpb.AskSplitResponse{ - Header: s.errorHeader(&pdpb.Error{ - Type: pdpb.ErrorType_UNKNOWN, - Message: err.Error(), - }), + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), }, nil } @@ -1116,7 +1145,10 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp return &pdpb.AskBatchSplitResponse{Header: s.incompatibleVersion("batch_split")}, nil } if request.GetRegion() == nil { - return nil, errors.New("missing region for split") + return &pdpb.AskBatchSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_REGION_NOT_FOUND, + "missing region for split"), + }, nil } req := &pdpb.AskBatchSplitRequest{ Region: request.Region, @@ -1125,10 +1157,7 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp split, err := rc.HandleAskBatchSplit(req) if err != nil { return &pdpb.AskBatchSplitResponse{ - Header: s.errorHeader(&pdpb.Error{ - Type: pdpb.ErrorType_UNKNOWN, - Message: err.Error(), - }), + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), }, nil } @@ -1155,7 +1184,9 @@ func (s *GrpcServer) ReportSplit(ctx context.Context, request *pdpb.ReportSplitR } _, err := rc.HandleReportSplit(request) if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.ReportSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } return &pdpb.ReportSplitResponse{ @@ -1181,7 +1212,10 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB _, err := rc.HandleBatchReportSplit(request) if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.ReportBatchSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + err.Error()), + }, nil } return &pdpb.ReportBatchSplitResponse{ @@ -1227,7 +1261,10 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus } conf := request.GetCluster() if err := rc.PutMetaCluster(conf); err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.PutClusterConfigResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + err.Error()), + }, nil } log.Info("put cluster config ok", zap.Reflect("config", conf)) @@ -1269,7 +1306,10 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg if region == nil { if request.GetRegion() == nil { //nolint - return nil, errors.Errorf("region %d not found", request.GetRegionId()) + return &pdpb.ScatterRegionResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_REGION_NOT_FOUND, + "region %d not found"), + }, nil } region = core.NewRegionInfo(request.GetRegion(), request.GetLeader()) } @@ -1531,12 +1571,17 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest tsoAllocatorManager := s.GetTSOAllocatorManager() // There is no dc-location found in this server, return err. if tsoAllocatorManager.GetClusterDCLocationsNumber() == 0 { - return nil, status.Errorf(codes.Unknown, "empty cluster dc-location found, checker may not work properly") + return &pdpb.SyncMaxTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + "empty cluster dc-location found, checker may not work properly"), + }, nil } // Get all Local TSO Allocator leaders allocatorLeaders, err := tsoAllocatorManager.GetHoldingLocalAllocatorLeaders() if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.SyncMaxTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } if !request.GetSkipCheck() { var maxLocalTS *pdpb.Timestamp @@ -1549,7 +1594,9 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest } currentLocalTSO, err := allocator.GetCurrentTSO() if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.SyncMaxTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } if tsoutil.CompareTimestamp(currentLocalTSO, maxLocalTS) > 0 { maxLocalTS = currentLocalTSO @@ -1566,10 +1613,16 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest }) if maxLocalTS == nil { - return nil, status.Errorf(codes.Unknown, "local tso allocator leaders have changed during the sync, should retry") + return &pdpb.SyncMaxTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + "local tso allocator leaders have changed during the sync, should retry"), + }, nil } if request.GetMaxTs() == nil { - return nil, status.Errorf(codes.Unknown, "empty maxTS in the request, should retry") + return &pdpb.SyncMaxTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + "empty maxTS in the request, should retry"), + }, nil } // Found a bigger or equal maxLocalTS, return it directly. cmpResult := tsoutil.CompareTimestamp(maxLocalTS, request.GetMaxTs()) @@ -1595,7 +1648,9 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest continue } if err := allocator.WriteTSO(request.GetMaxTs()); err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.SyncMaxTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } syncedDCs = append(syncedDCs, allocator.GetDCLocation()) } @@ -1687,7 +1742,10 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL info, ok := am.GetDCLocationInfo(request.GetDcLocation()) if !ok { am.ClusterDCLocationChecker() - return nil, status.Errorf(codes.Unknown, "dc-location %s is not found", request.GetDcLocation()) + return &pdpb.GetDCLocationInfoResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + fmt.Sprintf("dc-location %s is not found", request.GetDcLocation())), + }, nil } resp := &pdpb.GetDCLocationInfoResponse{ Header: s.header(), @@ -1702,7 +1760,9 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL // when it becomes the Local TSO Allocator leader. // Please take a look at https://github.com/tikv/pd/issues/3260 for more details. if resp.MaxTs, err = am.GetMaxLocalTSO(ctx); err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.GetDCLocationInfoResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } return resp, nil } diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index 9746f41b35a..728efe577d1 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -17,6 +17,7 @@ package tso import ( "context" "fmt" + "github.com/pingcap/errors" "math" "path" "strconv" @@ -1078,6 +1079,9 @@ func (am *AllocatorManager) getDCLocationInfoFromLeader(ctx context.Context, dcL if err != nil { return false, &pdpb.GetDCLocationInfoResponse{}, err } + if resp.GetHeader().GetError() != nil { + return false, &pdpb.GetDCLocationInfoResponse{}, errors.Errorf("get the dc-location info from leader failed: %s", resp.GetHeader().GetError().String()) + } return resp.GetSuffix() != 0, resp, nil } diff --git a/server/tso/global_allocator.go b/server/tso/global_allocator.go old mode 100644 new mode 100755 index 6c69c0a0cc7..6aab320abd6 --- a/server/tso/global_allocator.go +++ b/server/tso/global_allocator.go @@ -17,6 +17,7 @@ package tso import ( "context" "fmt" + "github.com/pingcap/errors" "sync" "sync/atomic" "time" @@ -349,6 +350,11 @@ func (gta *GlobalTSOAllocator) SyncMaxTS( log.Error("sync max ts rpc failed, got an error", zap.String("local-allocator-leader-url", leaderConn.Target()), errs.ZapError(err)) return } + if syncMaxTSResp.rpcRes.GetHeader().GetError() != nil { + log.Error("sync max ts rpc failed, got an error", zap.String("local-allocator-leader-url", leaderConn.Target()), + errs.ZapError(errors.Errorf("%s", syncMaxTSResp.rpcRes.GetHeader().GetError().String()))) + return + } }(ctx, leaderConn, respCh) } wg.Wait() diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 4c35869910b..9e6d05d5b0d 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -766,8 +766,9 @@ func bootstrapServer(c *C, header *pdpb.RequestHeader, client pdpb.PDClient) { Store: stores[0], Region: region, } - _, err := client.Bootstrap(context.Background(), req) + resp, err := client.Bootstrap(context.Background(), req) c.Assert(err, IsNil) + c.Assert(pdpb.ErrorType_OK, Equals, resp.GetHeader().GetError().GetType()) } func (s *testClientSuite) TestNormalTSO(c *C) { diff --git a/tests/cluster.go b/tests/cluster.go index 2061668f393..631820c9c28 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -260,9 +260,9 @@ func (s *TestServer) GetEtcdLeader() (string, error) { s.RLock() defer s.RUnlock() req := &pdpb.GetMembersRequest{Header: &pdpb.RequestHeader{ClusterId: s.server.ClusterID()}} - members, err := s.grpcServer.GetMembers(context.TODO(), req) - if err != nil { - return "", errors.WithStack(err) + members, _ := s.grpcServer.GetMembers(context.TODO(), req) + if members.Header.GetError() != nil { + return "", errors.WithStack(errors.New(members.Header.GetError().String())) } return members.GetEtcdLeader().GetName(), nil } @@ -276,6 +276,9 @@ func (s *TestServer) GetEtcdLeaderID() (uint64, error) { if err != nil { return 0, errors.WithStack(err) } + if members.GetHeader().GetError() != nil { + return 0, errors.WithStack(errors.New(members.GetHeader().GetError().String())) + } return members.GetEtcdLeader().GetMemberId(), nil } @@ -365,10 +368,13 @@ func (s *TestServer) BootstrapCluster() error { Store: &metapb.Store{Id: 1, Address: "mock://1", LastHeartbeat: time.Now().UnixNano()}, Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, } - _, err := s.grpcServer.Bootstrap(context.Background(), bootstrapReq) + resp, err := s.grpcServer.Bootstrap(context.Background(), bootstrapReq) if err != nil { return err } + if resp.GetHeader().GetError() != nil { + return errors.New(resp.GetHeader().GetError().String()) + } return nil } diff --git a/tests/compatibility/version_upgrade_test.go b/tests/compatibility/version_upgrade_test.go index 03e34697084..961a74fff19 100644 --- a/tests/compatibility/version_upgrade_test.go +++ b/tests/compatibility/version_upgrade_test.go @@ -93,8 +93,9 @@ func (s *compatibilityTestSuite) TestStoreRegister(c *C) { Version: "1.0.1", }, } - _, err = svr.PutStore(context.Background(), putStoreRequest) - c.Assert(err, NotNil) + resp, err := svr.PutStore(context.Background(), putStoreRequest) + c.Assert(err, IsNil) + c.Assert(len(resp.GetHeader().GetError().String()) > 0, IsTrue) } func (s *compatibilityTestSuite) TestRollingUpgrade(c *C) { diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 68af7235316..b947d7f0f04 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -524,8 +524,9 @@ func (s *testProgressSuite) TestRemovingProgress(c *C) { Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"}, Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, } - _, err = grpcPDClient.Bootstrap(context.Background(), req) + resp, err := grpcPDClient.Bootstrap(context.Background(), req) c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), IsNil) stores := []*metapb.Store{ { Id: 1, @@ -640,8 +641,9 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) { Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"}, Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, } - _, err = grpcPDClient.Bootstrap(context.Background(), req) + resp, err := grpcPDClient.Bootstrap(context.Background(), req) c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), IsNil) stores := []*metapb.Store{ { Id: 1, diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index bbb10c525aa..bbb6da37391 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -175,8 +175,9 @@ func (s *clusterTestSuite) TestDamagedRegion(c *C) { // To put stores. svr := &server.GrpcServer{Server: leaderServer.GetServer()} for _, store := range stores { - _, err = svr.PutStore(context.Background(), store) + resp, err := svr.PutStore(context.Background(), store) c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), IsNil) } // To validate remove peer op be added. @@ -252,50 +253,57 @@ func testPutStore(c *C, clusterID uint64, rc *cluster.RaftCluster, grpcPDClient c.Assert(updatedStore, DeepEquals, store) // Update store again. - _, err = putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), IsNil) rc.GetAllocator().Alloc() id, err := rc.GetAllocator().Alloc() c.Assert(err, IsNil) // Put new store with a duplicated address when old store is up will fail. - _, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) - c.Assert(err, NotNil) + resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) + c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), NotNil) id, err = rc.GetAllocator().Alloc() c.Assert(err, IsNil) // Put new store with a duplicated address when old store is offline will fail. resetStoreState(c, rc, store.GetId(), metapb.StoreState_Offline) - _, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) - c.Assert(err, NotNil) + resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) + c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), NotNil) id, err = rc.GetAllocator().Alloc() c.Assert(err, IsNil) // Put new store with a duplicated address when old store is tombstone is OK. resetStoreState(c, rc, store.GetId(), metapb.StoreState_Tombstone) rc.GetStore(store.GetId()) - _, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) + resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), IsNil) id, err = rc.GetAllocator().Alloc() c.Assert(err, IsNil) deployPath := getTestDeployPath(id) // Put a new store. - _, err = putStore(grpcPDClient, clusterID, newMetaStore(id, testMetaStoreAddr, "2.1.0", metapb.StoreState_Up, deployPath)) + resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, testMetaStoreAddr, "2.1.0", metapb.StoreState_Up, deployPath)) c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), IsNil) s := rc.GetStore(id).GetMeta() c.Assert(s.DeployPath, Equals, deployPath) deployPath = fmt.Sprintf("move/test/store%d", id) - _, err = putStore(grpcPDClient, clusterID, newMetaStore(id, testMetaStoreAddr, "2.1.0", metapb.StoreState_Up, deployPath)) + resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, testMetaStoreAddr, "2.1.0", metapb.StoreState_Up, deployPath)) c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), IsNil) s = rc.GetStore(id).GetMeta() c.Assert(s.DeployPath, Equals, deployPath) // Put an existed store with duplicated address with other old stores. resetStoreState(c, rc, store.GetId(), metapb.StoreState_Up) - _, err = putStore(grpcPDClient, clusterID, newMetaStore(store.GetId(), testMetaStoreAddr, "2.1.0", metapb.StoreState_Up, getTestDeployPath(store.GetId()))) - c.Assert(err, NotNil) + resp, err = putStore(grpcPDClient, clusterID, newMetaStore(store.GetId(), testMetaStoreAddr, "2.1.0", metapb.StoreState_Up, getTestDeployPath(store.GetId()))) + c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), NotNil) } func getTestDeployPath(storeID uint64) string { @@ -489,6 +497,7 @@ func (s *clusterTestSuite) TestGetPDMembers(c *C) { req := &pdpb.GetMembersRequest{Header: testutil.NewRequestHeader(clusterID)} resp, err := grpcPDClient.GetMembers(context.Background(), req) c.Assert(err, IsNil) + c.Assert(pdpb.ErrorType_OK, Equals, resp.GetHeader().GetError().GetType()) // A more strict test can be found at api/member_test.go c.Assert(resp.GetMembers(), Not(HasLen), 0) } @@ -535,8 +544,9 @@ func (s *clusterTestSuite) TestStoreVersionChange(c *C) { wg.Add(1) go func() { defer wg.Done() - _, err = putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), IsNil) }() time.Sleep(100 * time.Millisecond) svr.SetClusterVersion("1.0.0") @@ -571,8 +581,9 @@ func (s *clusterTestSuite) TestConcurrentHandleRegion(c *C) { c.Assert(err, IsNil) store := newMetaStore(storeID, addr, "2.1.0", metapb.StoreState_Up, getTestDeployPath(storeID)) stores = append(stores, store) - _, err = putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), IsNil) } ctx, cancel := context.WithCancel(context.Background()) @@ -589,8 +600,9 @@ func (s *clusterTestSuite) TestConcurrentHandleRegion(c *C) { }, } grpcServer := &server.GrpcServer{Server: leaderServer.GetServer()} - _, err := grpcServer.StoreHeartbeat(context.TODO(), req) + resp, err := grpcServer.StoreHeartbeat(context.TODO(), req) c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), IsNil) stream, err := grpcPDClient.RegionHeartbeat(ctx) c.Assert(err, IsNil) peerID, err := id.Alloc() @@ -846,15 +858,17 @@ func (s *clusterTestSuite) TestTiFlashWithPlacementRules(c *C) { } // cannot put TiFlash node without placement rules - _, err = putStore(grpcPDClient, clusterID, tiflashStore) - c.Assert(err, NotNil) + resp, err := putStore(grpcPDClient, clusterID, tiflashStore) + c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), NotNil) rep := leaderServer.GetConfig().Replication rep.EnablePlacementRules = true svr := leaderServer.GetServer() err = svr.SetReplicationConfig(rep) c.Assert(err, IsNil) - _, err = putStore(grpcPDClient, clusterID, tiflashStore) + resp, err = putStore(grpcPDClient, clusterID, tiflashStore) c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), IsNil) // test TiFlash store limit expect := map[uint64]config.StoreLimitConfig{11: {AddPeer: 30, RemovePeer: 30}} c.Assert(svr.GetScheduleConfig().StoreLimit, DeepEquals, expect) @@ -921,8 +935,9 @@ func newBootstrapRequest(clusterID uint64) *pdpb.BootstrapRequest { // helper function to check and bootstrap. func bootstrapCluster(c *C, clusterID uint64, grpcPDClient pdpb.PDClient) { req := newBootstrapRequest(clusterID) - _, err := grpcPDClient.Bootstrap(context.Background(), req) + resp, err := grpcPDClient.Bootstrap(context.Background(), req) c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), IsNil) } func putStore(grpcPDClient pdpb.PDClient, clusterID uint64, store *metapb.Store) (*pdpb.PutStoreResponse, error) { @@ -941,6 +956,7 @@ func getStore(c *C, clusterID uint64, grpcPDClient pdpb.PDClient, storeID uint64 } resp, err := grpcPDClient.GetStore(context.Background(), req) c.Assert(err, IsNil) + c.Assert(pdpb.ErrorType_OK, Equals, resp.GetHeader().GetError().GetType()) c.Assert(resp.GetStore().GetId(), Equals, storeID) return resp.GetStore() @@ -1002,8 +1018,9 @@ func (s *clusterTestSuite) TestOfflineStoreLimit(c *C) { storeID, err := id.Alloc() c.Assert(err, IsNil) store := newMetaStore(storeID, addr, "4.0.0", metapb.StoreState_Up, getTestDeployPath(storeID)) - _, err = putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) c.Assert(err, IsNil) + c.Assert(pdpb.ErrorType_OK, Equals, resp.GetHeader().GetError().GetType()) } for i := uint64(1); i <= 2; i++ { r := &metapb.Region{ @@ -1085,8 +1102,9 @@ func (s *clusterTestSuite) TestUpgradeStoreLimit(c *C) { c.Assert(rc, NotNil) rc.SetStorage(storage.NewStorageWithMemoryBackend()) store := newMetaStore(1, "127.0.1.1:0", "4.0.0", metapb.StoreState_Up, "test/store1") - _, err = putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) c.Assert(err, IsNil) + c.Assert(pdpb.ErrorType_OK, Equals, resp.GetHeader().GetError().GetType()) r := &metapb.Region{ Id: 1, RegionEpoch: &metapb.RegionEpoch{ @@ -1150,8 +1168,9 @@ func (s *clusterTestSuite) TestStaleTermHeartbeat(c *C) { peerID, err := id.Alloc() c.Assert(err, IsNil) store := newMetaStore(storeID, addr, "3.0.0", metapb.StoreState_Up, getTestDeployPath(storeID)) - _, err = putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) c.Assert(err, IsNil) + c.Assert(pdpb.ErrorType_OK, Equals, resp.GetHeader().GetError().GetType()) peers = append(peers, &metapb.Peer{ Id: peerID, StoreId: storeID, @@ -1269,8 +1288,9 @@ func (s *clusterTestSuite) TestMinResolvedTS(c *C) { if isTiflash { store.Labels = []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}} } - _, err = putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) c.Assert(err, IsNil) + c.Assert(pdpb.ErrorType_OK, Equals, resp.GetHeader().GetError().GetType()) req := &pdpb.ReportMinResolvedTsRequest{ Header: testutil.NewRequestHeader(clusterID), StoreId: storeID, diff --git a/tests/server/id/id_test.go b/tests/server/id/id_test.go index 1fb3563d039..a420921cdb8 100644 --- a/tests/server/id/id_test.go +++ b/tests/server/id/id_test.go @@ -114,6 +114,7 @@ func (s *testAllocIDSuite) TestCommand(c *C) { resp, err := grpcPDClient.AllocID(context.Background(), req) c.Assert(err, IsNil) c.Assert(resp.GetId(), Greater, last) + c.Assert(pdpb.ErrorType_OK, Equals, resp.GetHeader().GetError().GetType()) last = resp.GetId() } } diff --git a/tools/pd-heartbeat-bench/main.go b/tools/pd-heartbeat-bench/main.go index 18d6746e983..72a58fdd3da 100644 --- a/tools/pd-heartbeat-bench/main.go +++ b/tools/pd-heartbeat-bench/main.go @@ -61,6 +61,9 @@ func initClusterID(cli pdpb.PDClient) { if err != nil { log.Fatal(err) } + if res.GetHeader().GetError() != nil { + log.Fatal(res.GetHeader().GetError()) + } clusterID = res.GetHeader().GetClusterId() log.Println("ClusterID:", clusterID) } @@ -95,10 +98,13 @@ func bootstrap(cli pdpb.PDClient) { Store: store, Region: region, } - _, err = cli.Bootstrap(context.TODO(), req) + resp, err := cli.Bootstrap(context.TODO(), req) if err != nil { log.Fatal(err) } + if resp.GetHeader().GetError() != nil { + log.Fatalf("bootstrap failed: %s", resp.GetHeader().GetError().String()) + } log.Println("bootstrapped") } @@ -108,10 +114,13 @@ func putStores(cli pdpb.PDClient) { Id: i, Address: fmt.Sprintf("localhost:%d", i), } - _, err := cli.PutStore(context.TODO(), &pdpb.PutStoreRequest{Header: header(), Store: store}) + resp, err := cli.PutStore(context.TODO(), &pdpb.PutStoreRequest{Header: header(), Store: store}) if err != nil { log.Fatal(err) } + if resp.GetHeader().GetError() != nil { + log.Fatalf("put store failed: %s", resp.GetHeader().GetError().String()) + } } } diff --git a/tools/pd-simulator/simulator/client.go b/tools/pd-simulator/simulator/client.go index 3cbad1c6fee..74c35389011 100644 --- a/tools/pd-simulator/simulator/client.go +++ b/tools/pd-simulator/simulator/client.go @@ -117,6 +117,9 @@ func (c *client) getMembers(ctx context.Context) (*pdpb.GetMembersResponse, erro if err != nil { return nil, errors.WithStack(err) } + if members.GetHeader().GetError() != nil { + return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String())) + } return members, nil } @@ -243,6 +246,9 @@ func (c *client) AllocID(ctx context.Context) (uint64, error) { if err != nil { return 0, err } + if resp.GetHeader().GetError() != nil { + return 0, errors.Errorf("alloc id failed: %s", resp.GetHeader().GetError().String()) + } return resp.GetId(), nil } @@ -261,7 +267,7 @@ func (c *client) Bootstrap(ctx context.Context, store *metapb.Store, region *met if err != nil { return err } - _, err = c.pdClient().Bootstrap(ctx, &pdpb.BootstrapRequest{ + res, err := c.pdClient().Bootstrap(ctx, &pdpb.BootstrapRequest{ Header: c.requestHeader(), Store: store, Region: region, @@ -269,6 +275,9 @@ func (c *client) Bootstrap(ctx context.Context, store *metapb.Store, region *met if err != nil { return err } + if res.GetHeader().GetError() != nil { + return errors.Errorf("bootstrap failed: %s", resp.GetHeader().GetError().String()) + } return nil }