From 3f75f916087382ec8b102cc960f8e56c0876f200 Mon Sep 17 00:00:00 2001 From: Dmitriy Matrenichev Date: Fri, 7 Jun 2024 09:04:05 +0300 Subject: [PATCH] fix: change Transport.Address field to Transport.Address method With new gRPC (both gateway and modules) it uses `grpc.NewClient` call to create clients. It no longer support custom addresses without a `passthrough:` prefix. Previous fix didn't account for that in some places, so this one changes the structure of `Transport` to always return address in proper form for external users. Signed-off-by: Dmitriy Matrenichev --- .../api/omni/management/management.pb.gw.go | 6 +++--- client/api/omni/oidc/oidc.pb.gw.go | 6 +++--- client/api/omni/resources/resources.pb.gw.go | 6 +++--- client/api/talos/machine/machine.pb.gw.go | 16 +++++++------- go.mod | 3 +-- go.sum | 2 -- internal/backend/grpc/grpc.go | 4 ++-- internal/backend/grpc/router/router.go | 7 +------ internal/backend/server.go | 4 ++-- internal/memconn/memconn.go | 21 ++++++++++++++----- 10 files changed, 39 insertions(+), 36 deletions(-) diff --git a/client/api/omni/management/management.pb.gw.go b/client/api/omni/management/management.pb.gw.go index 043934ea..30d708c3 100644 --- a/client/api/omni/management/management.pb.gw.go +++ b/client/api/omni/management/management.pb.gw.go @@ -638,21 +638,21 @@ func RegisterManagementServiceHandlerServer(ctx context.Context, mux *runtime.Se // RegisterManagementServiceHandlerFromEndpoint is same as RegisterManagementServiceHandler but // automatically dials to "endpoint" and closes the connection when "ctx" gets done. func RegisterManagementServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { - conn, err := grpc.DialContext(ctx, endpoint, opts...) + conn, err := grpc.NewClient(endpoint, opts...) if err != nil { return err } defer func() { if err != nil { if cerr := conn.Close(); cerr != nil { - grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr) } return } go func() { <-ctx.Done() if cerr := conn.Close(); cerr != nil { - grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr) } }() }() diff --git a/client/api/omni/oidc/oidc.pb.gw.go b/client/api/omni/oidc/oidc.pb.gw.go index 399145aa..2e1e6b57 100644 --- a/client/api/omni/oidc/oidc.pb.gw.go +++ b/client/api/omni/oidc/oidc.pb.gw.go @@ -94,21 +94,21 @@ func RegisterOIDCServiceHandlerServer(ctx context.Context, mux *runtime.ServeMux // RegisterOIDCServiceHandlerFromEndpoint is same as RegisterOIDCServiceHandler but // automatically dials to "endpoint" and closes the connection when "ctx" gets done. func RegisterOIDCServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { - conn, err := grpc.DialContext(ctx, endpoint, opts...) + conn, err := grpc.NewClient(endpoint, opts...) if err != nil { return err } defer func() { if err != nil { if cerr := conn.Close(); cerr != nil { - grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr) } return } go func() { <-ctx.Done() if cerr := conn.Close(); cerr != nil { - grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr) } }() }() diff --git a/client/api/omni/resources/resources.pb.gw.go b/client/api/omni/resources/resources.pb.gw.go index cfe02726..08845330 100644 --- a/client/api/omni/resources/resources.pb.gw.go +++ b/client/api/omni/resources/resources.pb.gw.go @@ -377,21 +377,21 @@ func RegisterResourceServiceHandlerServer(ctx context.Context, mux *runtime.Serv // RegisterResourceServiceHandlerFromEndpoint is same as RegisterResourceServiceHandler but // automatically dials to "endpoint" and closes the connection when "ctx" gets done. func RegisterResourceServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { - conn, err := grpc.DialContext(ctx, endpoint, opts...) + conn, err := grpc.NewClient(endpoint, opts...) if err != nil { return err } defer func() { if err != nil { if cerr := conn.Close(); cerr != nil { - grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr) } return } go func() { <-ctx.Done() if cerr := conn.Close(); cerr != nil { - grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr) } }() }() diff --git a/client/api/talos/machine/machine.pb.gw.go b/client/api/talos/machine/machine.pb.gw.go index 63133f6d..5d9acea3 100644 --- a/client/api/talos/machine/machine.pb.gw.go +++ b/client/api/talos/machine/machine.pb.gw.go @@ -335,7 +335,7 @@ func request_MachineService_EtcdRecover_0(ctx context.Context, marshaler runtime var metadata runtime.ServerMetadata stream, err := client.EtcdRecover(ctx) if err != nil { - grpclog.Infof("Failed to start streaming: %v", err) + grpclog.Errorf("Failed to start streaming: %v", err) return nil, metadata, err } dec := marshaler.NewDecoder(req.Body) @@ -346,25 +346,25 @@ func request_MachineService_EtcdRecover_0(ctx context.Context, marshaler runtime break } if err != nil { - grpclog.Infof("Failed to decode request: %v", err) + grpclog.Errorf("Failed to decode request: %v", err) return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) } if err = stream.Send(&protoReq); err != nil { if err == io.EOF { break } - grpclog.Infof("Failed to send request: %v", err) + grpclog.Errorf("Failed to send request: %v", err) return nil, metadata, err } } if err := stream.CloseSend(); err != nil { - grpclog.Infof("Failed to terminate client stream: %v", err) + grpclog.Errorf("Failed to terminate client stream: %v", err) return nil, metadata, err } header, err := stream.Header() if err != nil { - grpclog.Infof("Failed to get header from client: %v", err) + grpclog.Errorf("Failed to get header from client: %v", err) return nil, metadata, err } metadata.HeaderMD = header @@ -2343,21 +2343,21 @@ func RegisterMachineServiceHandlerServer(ctx context.Context, mux *runtime.Serve // RegisterMachineServiceHandlerFromEndpoint is same as RegisterMachineServiceHandler but // automatically dials to "endpoint" and closes the connection when "ctx" gets done. func RegisterMachineServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { - conn, err := grpc.DialContext(ctx, endpoint, opts...) + conn, err := grpc.NewClient(endpoint, opts...) if err != nil { return err } defer func() { if err != nil { if cerr := conn.Close(); cerr != nil { - grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr) } return } go func() { <-ctx.Done() if cerr := conn.Close(); cerr != nil { - grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr) } }() }() diff --git a/go.mod b/go.mod index 09f11881..39ff6d53 100644 --- a/go.mod +++ b/go.mod @@ -54,9 +54,9 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/mattn/go-shellwords v1.0.12 github.com/prometheus/client_golang v1.19.1 - github.com/rs/xid v1.5.0 github.com/siderolabs/crypto v0.4.4 github.com/siderolabs/discovery-api v0.1.4 + github.com/siderolabs/discovery-client v0.1.9 github.com/siderolabs/discovery-service v1.0.3-0.20240530092511-74bca2da5cc8 github.com/siderolabs/gen v0.5.0 github.com/siderolabs/go-api-signature v0.3.2 @@ -209,7 +209,6 @@ require ( github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 // indirect github.com/sergi/go-diff v1.3.1 // indirect github.com/shabbyrobe/gocovmerge v0.0.0-20190829150210-3e036491d500 // indirect - github.com/siderolabs/discovery-client v0.1.9 // indirect github.com/siderolabs/go-blockdevice v0.4.7 // indirect github.com/siderolabs/go-kubeconfig v0.1.0 // indirect github.com/siderolabs/net v0.4.0 // indirect diff --git a/go.sum b/go.sum index 7e328acd..fed1abae 100644 --- a/go.sum +++ b/go.sum @@ -379,8 +379,6 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo= github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= -github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= -github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/russellhaering/goxmldsig v1.4.0 h1:8UcDh/xGyQiyrW+Fq5t8f+l2DLB1+zlhYzkPUJ7Qhys= github.com/russellhaering/goxmldsig v1.4.0/go.mod h1:gM4MDENBQf7M+V824SGfyIUVFWydB7n0KkEubVJl+Tw= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= diff --git a/internal/backend/grpc/grpc.go b/internal/backend/grpc/grpc.go index 75e8a718..4dda3b27 100644 --- a/internal/backend/grpc/grpc.go +++ b/internal/backend/grpc/grpc.go @@ -106,13 +106,13 @@ func New(ctx context.Context, mux *http.ServeMux, servers []ServiceServer, trans } for _, srv := range servers { - err := srv.gateway(ctx, runtimeMux, transport.Address, opts) + err := srv.gateway(ctx, runtimeMux, transport.Address(), opts) if err != nil { return nil, fmt.Errorf("error registering gateway: %w", err) } } - if err := machine.RegisterMachineServiceHandlerFromEndpoint(ctx, runtimeMux, transport.Address, opts); err != nil { + if err := machine.RegisterMachineServiceHandlerFromEndpoint(ctx, runtimeMux, transport.Address(), opts); err != nil { return nil, fmt.Errorf("error registering gateway: %w", err) } diff --git a/internal/backend/grpc/router/router.go b/internal/backend/grpc/router/router.go index ee6fde25..afcc0926 100644 --- a/internal/backend/grpc/router/router.go +++ b/internal/backend/grpc/router/router.go @@ -75,12 +75,7 @@ func NewRouter( authEnabled bool, verifier grpc.UnaryServerInterceptor, ) (*Router, error) { - address := transport.Address - if address == "grpc-conn" { - address = "passthrough:whatever" - } - - omniConn, err := grpc.NewClient(address, + omniConn, err := grpc.NewClient(transport.Address(), grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return transport.Dial() }), diff --git a/internal/backend/server.go b/internal/backend/server.go index 6cbd894f..d176f9df 100644 --- a/internal/backend/server.go +++ b/internal/backend/server.go @@ -218,14 +218,14 @@ func (s *Server) Run(ctx context.Context) error { return err } - gatewayTransport := &memconn.Transport{Address: "gateway-conn"} + gatewayTransport := memconn.NewTransport("gateway-conn") grpcServer, err := grpcomni.New(ctx, mux, serviceServers, gatewayTransport, s.logger, serverOptions...) if err != nil { return err } - grpcTransport := &memconn.Transport{Address: "grpc-conn"} + grpcTransport := memconn.NewTransport("grpc-conn") rtr, err := router.NewRouter( grpcTransport, diff --git a/internal/memconn/memconn.go b/internal/memconn/memconn.go index 9c7084cb..49aa4eb5 100644 --- a/internal/memconn/memconn.go +++ b/internal/memconn/memconn.go @@ -13,25 +13,36 @@ import ( "github.com/akutz/memconn" ) +// NewTransport creates a new transport. +func NewTransport(address string) *Transport { + return &Transport{address: address} +} + // Transport is transport for in-memory connection. type Transport struct { - Address string + address string } // Listener creates new listener. func (l *Transport) Listener() (net.Listener, error) { - if l.Address == "" { + if l.address == "" { return nil, errors.New("address is not set") } - return memconn.Listen("memu", l.Address) + return memconn.Listen("memu", l.address) } // Dial creates a new connection. func (l *Transport) Dial() (net.Conn, error) { - if l.Address == "" { + if l.address == "" { return nil, errors.New("address is not set") } - return memconn.Dial("memu", l.Address) + return memconn.Dial("memu", l.address) +} + +// Address returns the address. Since this is a memory-based connection, the address is always "passthrough:" + address, +// because the address is not a real network address and gRPC tries to resolve it otherwise. +func (l *Transport) Address() string { + return "passthrough:" + l.address }