Skip to content

Commit

Permalink
fix server change event (#643)
Browse files Browse the repository at this point in the history
  • Loading branch information
binbin0325 authored Aug 7, 2023
1 parent bb9e2ce commit d4949e1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
12 changes: 6 additions & 6 deletions common/remote/rpc/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func NewGrpcClient(ctx context.Context, clientName string, nacosServer *nacos_se
name: clientName,
labels: make(map[string]string, 8),
rpcClientStatus: INITIALIZED,
eventChan: make(chan ConnectionEvent),
reconnectionChan: make(chan ReconnectContext),
eventChan: make(chan ConnectionEvent, 1),
reconnectionChan: make(chan ReconnectContext, 1),
nacosServer: nacosServer,
mux: new(sync.Mutex),
},
Expand Down Expand Up @@ -161,7 +161,7 @@ func (c *GrpcClient) sendConnectionSetupRequest(grpcConn *GrpcConnection) error
csr.ClientAbilities = c.clientAbilities
err := grpcConn.biStreamSend(convertRequest(csr))
if err != nil {
logger.Warnf("Send ConnectionSetupRequest error:%+v", err)
logger.Warnf("send connectionSetupRequest error:%v", err)
}
time.Sleep(100 * time.Millisecond)
return err
Expand All @@ -188,16 +188,16 @@ func (c *GrpcClient) bindBiRequestStream(streamClient nacos_grpc_service.BiReque
abandon := grpcConn.getAbandon()
if c.IsRunning() && !abandon {
if err == io.EOF {
logger.Infof("%s Request stream onCompleted, switch server", grpcConn.getConnectionId())
logger.Infof("connectionId %s request stream onCompleted, switch server", grpcConn.getConnectionId())
} else {
logger.Errorf("%s Request stream error, switch server, error=%+v", grpcConn.getConnectionId(), err)
logger.Errorf("connectionId %s request stream error, switch server, error=%v", grpcConn.getConnectionId(), err)
}
if atomic.CompareAndSwapInt32((*int32)(&c.rpcClientStatus), int32(RUNNING), int32(UNHEALTHY)) {
c.switchServerAsync(ServerInfo{}, false)
return
}
} else {
logger.Infof("%s received error event, isRunning:%v, isAbandon=%v, error=%+v", grpcConn.getConnectionId(), running, abandon, err)
logger.Infof("connectionId %s received error event, isRunning:%v, isAbandon=%v, error=%v", grpcConn.getConnectionId(), running, abandon, err)
return
}
} else {
Expand Down
16 changes: 7 additions & 9 deletions common/remote/rpc/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (r *RpcClient) Start() {
startUpRetryTimes--
serverInfo, err := r.nextRpcServer()
if err != nil {
logger.Errorf("[RpcClient.nextRpcServer],err:%+v", err)
logger.Errorf("[RpcClient.nextRpcServer],err:%v", err)
break
}
logger.Infof("[RpcClient.Start] %s try to connect to server on start up, server: %+v", r.name, serverInfo)
Expand All @@ -235,16 +235,14 @@ func (r *RpcClient) Start() {
currentConnection.getServerInfo(), currentConnection.getConnectionId())
r.currentConnection = currentConnection
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
r.asyncNotifyConnectionChange(CONNECTED)
r.notifyConnectionChange(CONNECTED)
} else {
r.switchServerAsync(ServerInfo{}, false)
}
}

func (r *RpcClient) asyncNotifyConnectionChange(eventType ConnectionStatus) {
go func() {
r.eventChan <- ConnectionEvent{eventType: eventType}
}()
func (r *RpcClient) notifyConnectionChange(eventType ConnectionStatus) {
r.eventChan <- ConnectionEvent{eventType: eventType}
}

func (r *RpcClient) notifyServerSrvChange() {
Expand Down Expand Up @@ -345,7 +343,7 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
}
r.currentConnection = connectionNew
atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING))
r.asyncNotifyConnectionChange(CONNECTED)
r.notifyConnectionChange(CONNECTED)
return
}
if r.isShutdown() {
Expand All @@ -371,7 +369,7 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) {
func (r *RpcClient) closeConnection() {
if r.currentConnection != nil {
r.currentConnection.close()
r.asyncNotifyConnectionChange(DISCONNECTED)
r.notifyConnectionChange(DISCONNECTED)
}
}

Expand All @@ -381,7 +379,7 @@ func (r *RpcClient) notifyConnectionEvent(event ConnectionEvent) {
if len(listeners) == 0 {
return
}
logger.Infof("%s notify %s event to listeners.", r.name, event.toString())
logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), r.currentConnection.getConnectionId())
for _, v := range listeners {
if event.isConnected() {
v.OnConnected()
Expand Down

0 comments on commit d4949e1

Please sign in to comment.