diff --git a/.travis.yml b/.travis.yml index aa059ca2..94d3cb3e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,10 +3,12 @@ language: go go_import_path: github.com/colinmarc/hdfs go: 1.x env: +- PLATFORM=hdp2 - PLATFORM=cdh5 - PLATFORM=cdh6 -- PLATFORM=cdh6 KERBEROS=true -- PLATFORM=hdp2 +- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=authentication +- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=integrity +- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy before_install: - export GO111MODULE=on # Travis installs into $GOPATH/src, which disables module support by default. install: diff --git a/internal/rpc/challenge.go b/internal/rpc/challenge.go new file mode 100644 index 00000000..5cf3d483 --- /dev/null +++ b/internal/rpc/challenge.go @@ -0,0 +1,66 @@ +package rpc + +import ( + "fmt" + "regexp" + "strings" + + hadoop "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common" +) + +const ( + // qopAuthenication is how the namenode refers to authentication mode, which + // only establishes mutual authentication without encryption (the default). + qopAuthentication = "auth" + // qopIntegrity is how the namenode refers to integrity mode, which, in + // in addition to authentication, verifies the signature of RPC messages. + qopIntegrity = "auth-int" + // qopPrivacy is how the namenode refers to privacy mode, which, in addition + // to authentication and integrity, provides full end-to-end encryption for + // RPC messages. + qopPrivacy = "auth-conf" +) + +var challengeRegexp = regexp.MustCompile(",?([a-zA-Z0-9]+)=(\"([^\"]+)\"|([^,]+)),?") + +type tokenChallenge struct { + realm string + nonce string + qop string + charset string + cipher []string + algorithm string +} + +// parseChallenge returns a tokenChallenge parsed from a challenge response from +// the namenode. +func parseChallenge(auth *hadoop.RpcSaslProto_SaslAuth) (*tokenChallenge, error) { + tokenChallenge := tokenChallenge{} + + matched := challengeRegexp.FindAllSubmatch(auth.Challenge, -1) + if matched == nil { + return nil, fmt.Errorf("invalid token challenge: %s", auth.Challenge) + } + + for _, m := range matched { + key := string(m[1]) + val := string(m[3]) + switch key { + case "realm": + tokenChallenge.realm = val + case "nonce": + tokenChallenge.nonce = val + case "qop": + tokenChallenge.qop = val + case "charset": + tokenChallenge.charset = val + case "cipher": + tokenChallenge.cipher = strings.Split(val, ",") + case "algorithm": + tokenChallenge.algorithm = val + default: + } + } + + return &tokenChallenge, nil +} diff --git a/internal/rpc/kerberos.go b/internal/rpc/kerberos.go index 6147067c..2244fa15 100644 --- a/internal/rpc/kerberos.go +++ b/internal/rpc/kerberos.go @@ -21,24 +21,31 @@ var ( ) func (c *NamenodeConnection) doKerberosHandshake() error { - // All SASL requests/responses use this sequence number. - c.currentRequestID = saslRpcCallId - // Start negotiation, and get the list of supported mechanisms in reply. - c.writeSaslRequest(&hadoop.RpcSaslProto{State: hadoop.RpcSaslProto_NEGOTIATE.Enum()}) + err := c.writeSaslRequest(&hadoop.RpcSaslProto{ + State: hadoop.RpcSaslProto_NEGOTIATE.Enum(), + }) + if err != nil { + return err + } + resp, err := c.readSaslResponse(hadoop.RpcSaslProto_NEGOTIATE) if err != nil { return err } - var mechanism *hadoop.RpcSaslProto_SaslAuth + var krbAuth, tokenAuth *hadoop.RpcSaslProto_SaslAuth for _, m := range resp.GetAuths() { - if *m.Method == "KERBEROS" { - mechanism = m + switch *m.Method { + case "KERBEROS": + krbAuth = m + case "TOKEN": + tokenAuth = m + default: } } - if mechanism == nil { + if krbAuth == nil { return errKerberosNotSupported } @@ -48,12 +55,34 @@ func (c *NamenodeConnection) doKerberosHandshake() error { return err } + if tokenAuth != nil { + challenge, err := parseChallenge(tokenAuth) + if err != nil { + return err + } + + switch challenge.qop { + case qopPrivacy, qopIntegrity: + // Switch to SASL RPC handler + c.transport = &saslTransport{ + basicTransport: basicTransport{ + clientID: c.ClientID, + }, + sessionKey: sessionKey, + privacy: challenge.qop == qopPrivacy, + } + case qopAuthentication: + // No special transport is required. + default: + return errors.New("unexpected QOP in challenge") + } + } + err = c.writeSaslRequest(&hadoop.RpcSaslProto{ State: hadoop.RpcSaslProto_INITIATE.Enum(), Token: token.MechTokenBytes, - Auths: []*hadoop.RpcSaslProto_SaslAuth{mechanism}, + Auths: []*hadoop.RpcSaslProto_SaslAuth{krbAuth}, }) - if err != nil { return err } @@ -92,7 +121,6 @@ func (c *NamenodeConnection) doKerberosHandshake() error { State: hadoop.RpcSaslProto_RESPONSE.Enum(), Token: signedBytes, }) - if err != nil { return err } @@ -103,7 +131,8 @@ func (c *NamenodeConnection) doKerberosHandshake() error { } func (c *NamenodeConnection) writeSaslRequest(req *hadoop.RpcSaslProto) error { - packet, err := makeRPCPacket(newRPCRequestHeader(saslRpcCallId, c.ClientID), req) + rrh := newRPCRequestHeader(saslRpcCallId, c.ClientID) + packet, err := makeRPCPacket(rrh, req) if err != nil { return err } @@ -113,10 +142,20 @@ func (c *NamenodeConnection) writeSaslRequest(req *hadoop.RpcSaslProto) error { } func (c *NamenodeConnection) readSaslResponse(expectedState hadoop.RpcSaslProto_SaslState) (*hadoop.RpcSaslProto, error) { + rrh := &hadoop.RpcResponseHeaderProto{} resp := &hadoop.RpcSaslProto{} - err := c.readResponse("sasl", resp) + err := readRPCPacket(c.conn, rrh, resp) if err != nil { return nil, err + } else if int32(rrh.GetCallId()) != saslRpcCallId { + return nil, errors.New("unexpected sequence number") + } else if rrh.GetStatus() != hadoop.RpcResponseHeaderProto_SUCCESS { + return nil, &NamenodeError{ + method: "sasl", + message: rrh.GetErrorMsg(), + code: int(rrh.GetErrorDetail()), + exception: rrh.GetExceptionClassName(), + } } else if resp.GetState() != expectedState { return nil, fmt.Errorf("unexpected SASL state: %s", resp.GetState().String()) } diff --git a/internal/rpc/namenode.go b/internal/rpc/namenode.go index 35af0893..c36e1cde 100644 --- a/internal/rpc/namenode.go +++ b/internal/rpc/namenode.go @@ -42,10 +42,11 @@ type NamenodeConnection struct { kerberosServicePrincipleName string kerberosRealm string - dialFunc func(ctx context.Context, network, addr string) (net.Conn, error) - conn net.Conn - host *namenodeHost - hostList []*namenodeHost + dialFunc func(ctx context.Context, network, addr string) (net.Conn, error) + conn net.Conn + host *namenodeHost + hostList []*namenodeHost + transport transport reqLock sync.Mutex done chan struct{} @@ -115,8 +116,9 @@ func NewNamenodeConnection(options NamenodeConnectionOptions) (*NamenodeConnecti kerberosServicePrincipleName: options.KerberosServicePrincipleName, kerberosRealm: realm, - dialFunc: options.DialFunc, - hostList: hostList, + dialFunc: options.DialFunc, + hostList: hostList, + transport: &basicTransport{clientID: clientId}, done: make(chan struct{}), } @@ -190,6 +192,7 @@ func (c *NamenodeConnection) Execute(method string, req proto.Message, resp prot defer c.reqLock.Unlock() c.currentRequestID++ + requestID := c.currentRequestID for { err := c.resolveConnection() @@ -197,13 +200,13 @@ func (c *NamenodeConnection) Execute(method string, req proto.Message, resp prot return err } - err = c.writeRequest(method, req) + err = c.transport.writeRequest(c.conn, method, requestID, req) if err != nil { c.markFailure(err) continue } - err = c.readResponse(method, resp) + err = c.transport.readResponse(c.conn, method, requestID, resp) if err != nil { // Only retry on a standby exception. if nerr, ok := err.(*NamenodeError); ok && nerr.exception == standbyExceptionClass { @@ -220,62 +223,6 @@ func (c *NamenodeConnection) Execute(method string, req proto.Message, resp prot return nil } -// addLease increases the lease counter on the namenode. As long as the lease -// counter is greater than zero, all leases will automatically be renewed every -// - -// RPC definitions - -// A request packet: -// +-----------------------------------------------------------+ -// | uint32 length of the next three parts | -// +-----------------------------------------------------------+ -// | varint length + RpcRequestHeaderProto | -// +-----------------------------------------------------------+ -// | varint length + RequestHeaderProto | -// +-----------------------------------------------------------+ -// | varint length + Request | -// +-----------------------------------------------------------+ -func (c *NamenodeConnection) writeRequest(method string, req proto.Message) error { - rrh := newRPCRequestHeader(c.currentRequestID, c.ClientID) - rh := newRequestHeader(method) - - reqBytes, err := makeRPCPacket(rrh, rh, req) - if err != nil { - return err - } - - _, err = c.conn.Write(reqBytes) - return err -} - -// A response from the namenode: -// +-----------------------------------------------------------+ -// | uint32 length of the next two parts | -// +-----------------------------------------------------------+ -// | varint length + RpcResponseHeaderProto | -// +-----------------------------------------------------------+ -// | varint length + Response | -// +-----------------------------------------------------------+ -func (c *NamenodeConnection) readResponse(method string, resp proto.Message) error { - rrh := &hadoop.RpcResponseHeaderProto{} - err := readRPCPacket(c.conn, rrh, resp) - if err != nil { - return err - } else if int32(rrh.GetCallId()) != c.currentRequestID { - return errors.New("unexpected sequence number") - } else if rrh.GetStatus() != hadoop.RpcResponseHeaderProto_SUCCESS { - return &NamenodeError{ - method: method, - message: rrh.GetErrorMsg(), - code: int(rrh.GetErrorDetail()), - exception: rrh.GetExceptionClassName(), - } - } - - return nil -} - // A handshake packet: // +-----------------------------------------------------------+ // | Header, 4 bytes ("hrpc") | @@ -320,9 +267,6 @@ func (c *NamenodeConnection) doNamenodeHandshake() error { if err != nil { return fmt.Errorf("SASL handshake: %s", err) } - - // Reset the sequence number here, since we set it to -33 for the SASL bits. - c.currentRequestID = 0 } rrh := newRPCRequestHeader(handshakeCallID, c.ClientID) diff --git a/internal/rpc/sasl_transport.go b/internal/rpc/sasl_transport.go new file mode 100644 index 00000000..d1c7c90b --- /dev/null +++ b/internal/rpc/sasl_transport.go @@ -0,0 +1,82 @@ +package rpc + +import ( + "bytes" + "fmt" + "io" + + hadoop "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common" + "github.com/golang/protobuf/proto" + "gopkg.in/jcmturner/gokrb5.v7/crypto" + "gopkg.in/jcmturner/gokrb5.v7/gssapi" + "gopkg.in/jcmturner/gokrb5.v7/iana/keyusage" + krbtypes "gopkg.in/jcmturner/gokrb5.v7/types" +) + +// saslTransport implements encrypted or signed RPC. +type saslTransport struct { + basicTransport + + // sessionKey is the encryption key used to decrypt and encrypt the payload. + sessionKey krbtypes.EncryptionKey + // privacy indicates full message encryption + privacy bool +} + +// readResponse reads a SASL-wrapped RPC response. +func (t *saslTransport) readResponse(r io.Reader, method string, requestID int32, resp proto.Message) error { + // First, read the sasl payload as a standard rpc response. + sasl := hadoop.RpcSaslProto{} + err := t.basicTransport.readResponse(r, method, saslRpcCallId, &sasl) + if err != nil { + return err + } else if sasl.GetState() != hadoop.RpcSaslProto_WRAP { + return fmt.Errorf("unexpected SASL state: %s", sasl.GetState().String()) + } + + // The SaslProto contains the actual payload. + var wrapToken gssapi.WrapToken + err = wrapToken.Unmarshal(sasl.GetToken(), true) + if err != nil { + return err + } + + rrh := &hadoop.RpcResponseHeaderProto{} + + if t.privacy { + // Decrypt the blob, which then looks like a normal RPC response. + decrypted, err := crypto.DecryptMessage(wrapToken.Payload, t.sessionKey, keyusage.GSSAPI_ACCEPTOR_SEAL) + if err != nil { + return err + } + + err = readRPCPacket(bytes.NewReader(decrypted), rrh, resp) + if err != nil { + return err + } + } else { + // Verify the checksum; the blob is just a normal RPC response. + _, err = wrapToken.Verify(t.sessionKey, keyusage.GSSAPI_ACCEPTOR_SEAL) + if err != nil { + return fmt.Errorf("unverifiable message from namenode: %s", err) + } + + err = readRPCPacket(bytes.NewReader(wrapToken.Payload), rrh, resp) + if err != nil { + return err + } + } + + if int32(rrh.GetCallId()) != requestID { + return errUnexpectedSequenceNumber + } else if rrh.GetStatus() != hadoop.RpcResponseHeaderProto_SUCCESS { + return &NamenodeError{ + method: method, + message: rrh.GetErrorMsg(), + code: int(rrh.GetErrorDetail()), + exception: rrh.GetExceptionClassName(), + } + } + + return nil +} diff --git a/internal/rpc/transport.go b/internal/rpc/transport.go new file mode 100644 index 00000000..1600a1c6 --- /dev/null +++ b/internal/rpc/transport.go @@ -0,0 +1,76 @@ +package rpc + +import ( + "errors" + "io" + + hadoop "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common" + "github.com/golang/protobuf/proto" +) + +var errUnexpectedSequenceNumber = errors.New("unexpected sequence number") + +type transport interface { + writeRequest(w io.Writer, method string, requestID int32, req proto.Message) error + readResponse(r io.Reader, method string, requestID int32, resp proto.Message) error +} + +// basicTransport implements plain RPC. +type basicTransport struct { + // clientID is the client ID of this writer. + clientID []byte +} + +// writeRequest writes an RPC message. +// +// A request packet: +// +-----------------------------------------------------------+ +// | uint32 length of the next three parts | +// +-----------------------------------------------------------+ +// | varint length + RpcRequestHeaderProto | +// +-----------------------------------------------------------+ +// | varint length + RequestHeaderProto | +// +-----------------------------------------------------------+ +// | varint length + Request | +// +-----------------------------------------------------------+ +func (t *basicTransport) writeRequest(w io.Writer, method string, requestID int32, req proto.Message) error { + rrh := newRPCRequestHeader(requestID, t.clientID) + rh := newRequestHeader(method) + + reqBytes, err := makeRPCPacket(rrh, rh, req) + if err != nil { + return err + } + + _, err = w.Write(reqBytes) + return err +} + +// ReadResponse reads a response message. +// +// A response from the namenode: +// +-----------------------------------------------------------+ +// | uint32 length of the next two parts | +// +-----------------------------------------------------------+ +// | varint length + RpcResponseHeaderProto | +// +-----------------------------------------------------------+ +// | varint length + Response | +// +-----------------------------------------------------------+ +func (t *basicTransport) readResponse(r io.Reader, method string, requestID int32, resp proto.Message) error { + rrh := &hadoop.RpcResponseHeaderProto{} + err := readRPCPacket(r, rrh, resp) + if err != nil { + return err + } else if int32(rrh.GetCallId()) != requestID { + return errUnexpectedSequenceNumber + } else if rrh.GetStatus() != hadoop.RpcResponseHeaderProto_SUCCESS { + return &NamenodeError{ + method: method, + message: rrh.GetErrorMsg(), + code: int(rrh.GetErrorDetail()), + exception: rrh.GetExceptionClassName(), + } + } + + return nil +} diff --git a/travis-setup-cdh5.sh b/travis-setup-cdh5.sh index 1739f571..defde3d3 100755 --- a/travis-setup-cdh5.sh +++ b/travis-setup-cdh5.sh @@ -107,6 +107,10 @@ sudo tee /etc/hadoop/conf.gohdfs/core-site.xml <dfs.datanode.kerberos.principal dn/localhost@$KERBEROS_REALM + + hadoop.rpc.protection + $RPC_PROTECTION + EOF diff --git a/travis-setup-cdh6.sh b/travis-setup-cdh6.sh index 83e41e13..2ad9b2db 100755 --- a/travis-setup-cdh6.sh +++ b/travis-setup-cdh6.sh @@ -109,6 +109,10 @@ sudo tee /etc/hadoop/conf.gohdfs/core-site.xml <dfs.datanode.kerberos.principal dn/localhost@$KERBEROS_REALM + + hadoop.rpc.protection + $RPC_PROTECTION + EOF