Skip to content

Commit

Permalink
Plumb grpc.Server interceptors to assert for each call to aid in triv…
Browse files Browse the repository at this point in the history
…ial elimination
  • Loading branch information
odeke-em committed Dec 22, 2024
1 parent d8277b2 commit 571bcee
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 1 deletion.
15 changes: 15 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ export interface SpannerOptions extends GrpcClientOptions {
routeToLeaderEnabled?: boolean;
directedReadOptions?: google.spanner.v1.IDirectedReadOptions | null;
observabilityOptions?: ObservabilityOptions;
unaryInterceptors?: any[];
streamInterceptors?: any[];
}
export interface RequestConfig {
client: string;
Expand Down Expand Up @@ -309,6 +311,14 @@ class Spanner extends GrpcService {
}
}
}

let unaryInterceptors: any[] = [];
let streamInterceptors: any[] = [];
if (options) {
unaryInterceptors = options.unaryInterceptors || [];
streamInterceptors = options.streamInterceptors || [];
}

options = Object.assign(
{
libName: 'gccl',
Expand All @@ -321,6 +331,11 @@ class Spanner extends GrpcService {
'grpc.callInvocationTransformer': grpcGcp.gcpCallInvocationTransformer,
'grpc.channelFactoryOverride': grpcGcp.gcpChannelFactoryOverride,
'grpc.gcpApiConfig': grpcGcp.createGcpApiConfig(gcpApiConfig),

// TODO: Negotiate with the Google team to plumb gRPC
// settings such as interceptors to the gRPC client.
'grpc.unaryInterceptors': unaryInterceptors,
'grpc.streamInterceptors': streamInterceptors,
grpc,
},
options || {}
Expand Down
79 changes: 79 additions & 0 deletions src/request_id_header.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import {randomBytes} from 'crypto';
import * as grpc from '@grpc/grpc-js';
const randIdForProcess = randomBytes(8).readBigUint64LE(0).toString();
const X_GOOG_SPANNER_REQUEST_ID_HEADER = 'x-goog-spanner-request-id';

Expand Down Expand Up @@ -72,10 +73,88 @@ function newAtomicCounter(n?: number): AtomicCounter {
return new AtomicCounter(n);
}

const X_GOOG_REQ_ID_REGEX = /(\d+\.){5}\d+/;

class XGoogRequestHeaderInterceptor {
private nStream: number;
private nUnary: number;
private streamCalls: any[];
private unaryCalls: any[];
constructor() {
this.nStream = 0;
this.streamCalls = [];
this.nUnary = 0;
this.unaryCalls = [];
}

assertHasHeader(call): string|unknown {
const metadata = call.metadata;
const gotReqId = metadata[X_GOOG_SPANNER_REQUEST_ID_HEADER];
if (!gotReqId) {
throw new Error(`${call.method} is missing ${X_GOOG_SPANNER_REQUEST_ID_HEADER} header`);
}

if (!gotReqId.match(X_GOOG_REQ_ID_REGEX)) {
throw new Error(`${call.method} reqID header ${gotReqId} does not match ${X_GOOG_REQ_ID_REGEX}`);
}
return gotReqId;
}

interceptUnary(call, next) {
const gotReqId = this.assertHasHeader(call);
this.unaryCalls.push({method: call.method, reqId: gotReqId});
this.nUnary++;
next(call);
}

interceptStream(call, next) {
const gotReqId = this.assertHasHeader(call);
this.streamCalls.push({method: call.method, reqId: gotReqId});
this.nStream++;
next(call);
}

serverInterceptor(methodDescriptor, call) {
const method = call.handler.path;
const isUnary = call.handler.type === 'unary';
const listener = new grpc.ServerListenerBuilder().
withOnReceiveMetadata((metadata, next) => {
const gotReqId = metadata[X_GOOG_SPANNER_REQUEST_ID_HEADER];
if (!gotReqId) {
call.sendStatus({
code: grpc.status.INVALID_ARGUMENT,
details: `${method} is missing ${X_GOOG_SPANNER_REQUEST_ID_HEADER} header`,
});
return;
}

if (!gotReqId.match(X_GOOG_REQ_ID_REGEX)) {
call.sendStatus({
code: grpc.status.INVALID_ARGUMENT,
details: `${method} reqID header ${gotReqId} does not match ${X_GOOG_REQ_ID_REGEX}`,
});
}

// Otherwise it matched all good.
if (isUnary) {
this.unaryCalls.push({method: method, reqId: gotReqId});
this.nUnary++;
} else {
this.streamCalls.push({method: method, reqId: gotReqId});
this.nStream++;
}
}).build();

const responder = (new grpc.ResponderBuilder()).withStart(next => next(listener)).build();
return new grpc.ServerInterceptingCall(call, responder);
}
}

export {
AtomicCounter,
X_GOOG_SPANNER_REQUEST_ID_HEADER,
craftRequestId,
nextSpannerClientId,
newAtomicCounter,
XGoogRequestHeaderInterceptor,
};
8 changes: 7 additions & 1 deletion test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import {
CLOUD_RESOURCE_HEADER,
LEADER_AWARE_ROUTING_HEADER,
} from '../src/common';
import {XGoogRequestHeaderInterceptor} from '../src/request_id_header';
import CreateInstanceMetadata = google.spanner.admin.instance.v1.CreateInstanceMetadata;
import QueryOptions = google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import v1 = google.spanner.v1;
Expand Down Expand Up @@ -102,7 +103,10 @@ describe('Spanner with mock server', () => {
const fooNotFoundErr = Object.assign(new Error('Table FOO not found'), {
code: grpc.status.NOT_FOUND,
});
const server = new grpc.Server();
const xGoogReqIDInterceptor = new XGoogRequestHeaderInterceptor();
const server = new grpc.Server({
interceptors: [xGoogReqIDInterceptor.serverInterceptor],
});
const spannerMock = mock.createMockSpanner(server);
mockInstanceAdmin.createMockInstanceAdmin(server);
mockDatabaseAdmin.createMockDatabaseAdmin(server);
Expand Down Expand Up @@ -167,6 +171,8 @@ describe('Spanner with mock server', () => {
servicePath: 'localhost',
port,
sslCreds: grpc.credentials.createInsecure(),
streamInterceptors: [xGoogReqIDInterceptor.interceptStream],
unaryInterceptors: [xGoogReqIDInterceptor.interceptUnary],
});
// Gets a reference to a Cloud Spanner instance and database
instance = spanner.instance('instance');
Expand Down

0 comments on commit 571bcee

Please sign in to comment.