Skip to content

Commit

Permalink
Added span in partialresultStream
Browse files Browse the repository at this point in the history
  • Loading branch information
surbhigarg92 committed Nov 21, 2024
1 parent 430a33c commit f5d21a3
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 42 deletions.
4 changes: 2 additions & 2 deletions observability-test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ describe('Database', () => {
'Expected that secondRetrySpan is the child to parentSpan'
);

const expectedEventNames = ['No session available'];
const expectedEventNames = ['No session available', 'Using Session'];
assert.deepStrictEqual(
actualEventNames,
expectedEventNames,
Expand Down Expand Up @@ -1558,7 +1558,7 @@ describe('Database', () => {
);

// We don't expect events.
const expectedEventNames = [];
const expectedEventNames = ['Using Session'];
assert.deepStrictEqual(
actualEventNames,
expectedEventNames,
Expand Down
16 changes: 14 additions & 2 deletions observability-test/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const cacheSessionEvents = [
'Acquiring session',
'Cache hit: has usable session',
'Acquired session',
'Using Session',
];

/**
Expand Down Expand Up @@ -82,14 +83,25 @@ export async function verifySpansAndEvents(
actualEventNames.push(event.name);
});
});

assert.strictEqual(
actualSpanNames.length,
expectedSpans.length,
`Span count mismatch: Expected ${expectedSpans.length} spans, but received ${actualSpanNames.length} spans`
);
assert.deepStrictEqual(
actualSpanNames,
expectedSpans,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpans}`
);
assert.strictEqual(
actualEventNames.length,
expectedEvents.length,
`Event count mismatch: Expected ${expectedEvents.length} events, but received ${actualEventNames.length} events`
);
assert.deepStrictEqual(
actualEventNames,
expectedEvents,
actualEventNames.sort(),
expectedEvents.sort(),
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEvents}`
);
}
34 changes: 9 additions & 25 deletions observability-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ describe('EndToEnd', async () => {
transaction!.commit();

const expectedSpanNames = ['CloudSpanner.Database.getTransaction'];
const expectedEventNames = [...cacheSessionEvents, 'Using Session'];
const expectedEventNames = [...cacheSessionEvents];
await verifySpansAndEvents(
traceExporter,
expectedSpanNames,
Expand All @@ -245,11 +245,7 @@ describe('EndToEnd', async () => {
'CloudSpanner.Snapshot.runStream',
'CloudSpanner.Database.runStream',
];
const expectedEventNames = [
'Starting stream',
...cacheSessionEvents,
'Using Session',
];
const expectedEventNames = ['Starting stream', ...cacheSessionEvents];
await verifySpansAndEvents(
traceExporter,
expectedSpanNames,
Expand All @@ -267,11 +263,7 @@ describe('EndToEnd', async () => {
'CloudSpanner.Database.runStream',
'CloudSpanner.Database.run',
];
const expectedEventNames = [
'Starting stream',
...cacheSessionEvents,
'Using Session',
];
const expectedEventNames = ['Starting stream', ...cacheSessionEvents];
await verifySpansAndEvents(
traceExporter,
expectedSpanNames,
Expand Down Expand Up @@ -319,7 +311,6 @@ describe('EndToEnd', async () => {
const expectedEventNames = [
'Starting stream',
...cacheSessionEvents,
'Using Session',
'Transaction Creation Done',
];
await verifySpansAndEvents(
Expand Down Expand Up @@ -350,22 +341,21 @@ describe('EndToEnd', async () => {
'CloudSpanner.Database.batchCreateSessions',
'CloudSpanner.SessionPool.createSessions',
'CloudSpanner.Snapshot.runStream',
'CloudSpanner.Snapshot.begin',
'CloudSpanner.Snapshot.runStream',
'CloudSpanner.Transaction.commit',
'CloudSpanner.Snapshot.begin',
'CloudSpanner.Database.runTransaction',
];
const expectedEventNames = [
...waitingSessionsEvents,
'Retrying Transaction',
...batchCreateSessionsEvents,
'Starting stream',
'exception',
'Stream broken. Not safe to retry',
'Begin Transaction',
'Transaction Creation Done',
'Starting stream',
'Starting Commit',
'Commit Done',
...waitingSessionsEvents,
'Retrying transaction',
];
await verifySpansAndEvents(
traceExporter,
Expand All @@ -381,7 +371,7 @@ describe('EndToEnd', async () => {
});
});

it.skip('runTransactionAsync with abort', async () => {
it('runTransactionAsync with abort', async () => {
let attempts = 0;
const database = newTestDatabase();
await database.runTransactionAsync((transaction): Promise<number> => {
Expand All @@ -406,11 +396,8 @@ describe('EndToEnd', async () => {
'CloudSpanner.Database.runTransactionAsync',
];
const expectedEventNames = [
'Requesting 25 sessions',
'Creating 25 sessions',
'Requested for 25 sessions returned 25',
...batchCreateSessionsEvents,
'Starting stream',
'exception',
'Stream broken. Not safe to retry',
'Begin Transaction',
'Transaction Creation Done',
Expand Down Expand Up @@ -441,7 +428,6 @@ describe('EndToEnd', async () => {
'Starting Commit',
'Commit Done',
...cacheSessionEvents,
'Using Session',
];
await verifySpansAndEvents(
traceExporter,
Expand Down Expand Up @@ -639,7 +625,6 @@ describe('ObservabilityOptions injection and propagation', async () => {

const expectedEventNames = [
...cacheSessionEvents,
'Using Session',
'Starting stream',
'Transaction Creation Done',
];
Expand Down Expand Up @@ -747,7 +732,6 @@ describe('ObservabilityOptions injection and propagation', async () => {

const expectedEventNames = [
...cacheSessionEvents,
'Using Session',
'Starting stream',
];
assert.deepStrictEqual(
Expand Down
3 changes: 2 additions & 1 deletion src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2110,14 +2110,14 @@ class Database extends common.GrpcServiceObject {
span.end();
this.getSnapshot(options, callback!);
} else {
span.addEvent('Using Session', {'session.id': session?.id});
this.pool_.release(session!);
span.end();
callback!(err);
}
return;
}

span.addEvent('Using Session', {'session.id': session?.id});
this._releaseOnEnd(session!, snapshot, span);
span.end();
callback!(err, snapshot);
Expand Down Expand Up @@ -3244,6 +3244,7 @@ class Database extends common.GrpcServiceObject {
return;
}

span.addEvent('Using Session', {'session.id': session?.id});
transaction!._observabilityOptions = this._observabilityOptions;
if (options.optimisticLock) {
transaction!.useOptimisticLock();
Expand Down
12 changes: 12 additions & 0 deletions src/instrument.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,18 @@ export function setSpanError(span: Span, err: Error | String): boolean {
return true;
}

/**
* Sets the span status with err and end, if non-null onto the span with
* status.code=ERROR and the message of err.toString()
*
* @returns {boolean} to signify if the status was set.
*/
export function setSpanErrorAndEnd(span: Span, err: Error | String): boolean {
const status = setSpanError(span, err);
span.end();
return status;
}

/**
* Sets err, if non-null onto the span with
* status.code=ERROR and the message of err.toString()
Expand Down
17 changes: 8 additions & 9 deletions src/partial-result-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {Readable, Transform} from 'stream';
import * as streamEvents from 'stream-events';
import {grpc, CallOptions} from 'google-gax';
import {DeadlineError, isRetryableInternalError} from './transaction-runner';
import {getActiveOrNoopSpan, setSpanErrorAndException} from './instrument';
import {Span} from './instrument';
import {codec, JSONOptions, Json, Field, Value} from './codec';
import {google} from '../protos/protos';
import * as stream from 'stream';
Expand Down Expand Up @@ -97,6 +97,7 @@ export interface RowOptions {
*/
columnsMetadata?: object;
gaxOptions?: CallOptions;
span?: Span;
}

/**
Expand Down Expand Up @@ -183,16 +184,16 @@ interface ResultEvents {
export class PartialResultStream extends Transform implements ResultEvents {
private _destroyed: boolean;
private _fields!: google.spanner.v1.StructType.Field[];
private _options: RowOptions;
private _pendingValue?: p.IValue;
private _pendingValueForResume?: p.IValue;
private _values: p.IValue[];
private _numPushFailed = 0;
options: RowOptions;
constructor(options = {}) {
super({objectMode: true});

this._destroyed = false;
this._options = Object.assign({maxResumeRetries: 20}, options);
this.options = Object.assign({maxResumeRetries: 20}, options);
this._values = [];
}
/**
Expand Down Expand Up @@ -271,7 +272,7 @@ export class PartialResultStream extends Transform implements ResultEvents {
// Downstream returned false indicating that it is still not ready for
// more data.
this._numPushFailed++;
if (this._numPushFailed === this._options.maxResumeRetries) {
if (this._numPushFailed === this.options.maxResumeRetries) {
this.destroy(
new Error(
`Stream is still not ready to receive data after ${this._numPushFailed} attempts to resume.`
Expand Down Expand Up @@ -359,8 +360,8 @@ export class PartialResultStream extends Transform implements ResultEvents {

const row: Row = this._createRow(values);

if (this._options.json) {
return this.push(row.toJSON(this._options.jsonOptions));
if (this.options.json) {
return this.push(row.toJSON(this.options.jsonOptions));
}

return this.push(row);
Expand All @@ -376,7 +377,7 @@ export class PartialResultStream extends Transform implements ResultEvents {
private _createRow(values: Value[]): Row {
const fields = values.map((value, index) => {
const {name, type} = this._fields[index];
const columnMetadata = this._options.columnsMetadata?.[name];
const columnMetadata = this.options.columnsMetadata?.[name];
return {
name,
value: codec.decode(
Expand Down Expand Up @@ -494,7 +495,6 @@ export function partialResultStream(
let lastRequestStream: Readable;
const startTime = Date.now();
const timeout = options?.gaxOptions?.timeout ?? Infinity;
const span = getActiveOrNoopSpan();

// mergeStream allows multiple streams to be connected into one. This is good;
// if we need to retry a request and pipe more data to the user's stream.
Expand Down Expand Up @@ -569,7 +569,6 @@ export function partialResultStream(
// checkpoint stream has queued. After that, we will destroy the
// user's stream with the same error.
setImmediate(() => batchAndSplitOnTokenStream.destroy(err));
// setSpanErrorAndException(span, err as Error);
return;
}

Expand Down
5 changes: 4 additions & 1 deletion src/transaction-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {Session} from './session';
import {Transaction} from './transaction';
import {NormalCallback} from './common';
import {isSessionNotFoundError} from './session-pool';
import {getActiveOrNoopSpan} from './instrument';
import {getActiveOrNoopSpan, setSpanErrorAndEnd} from './instrument';
import {Database} from './database';
import {google} from '../protos/protos';
import IRequestOptions = google.spanner.v1.IRequestOptions;
Expand Down Expand Up @@ -314,9 +314,12 @@ export class TransactionRunner extends Runner<void> {
transaction.requestStream = (config: object) => {
const proxyStream = through.obj();
const stream = requestStream(config);
const resultStream = transaction.resultStream;

stream
.on('error', (err: grpc.ServiceError) => {
resultStream?.options.span &&
setSpanErrorAndEnd(resultStream?.options.span, err);
if (!this.shouldRetry(err)) {
proxyStream.destroy(err);
return;
Expand Down
5 changes: 5 additions & 0 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ export class Snapshot extends EventEmitter {
readTimestampProto?: spannerClient.protobuf.ITimestamp;
request: (config: {}, callback: Function) => void;
requestStream: (config: {}) => Readable;
resultStream?: PartialResultStream;
session: Session;
queryOptions?: IQueryOptions;
resourceHeader_: {[k: string]: string};
Expand Down Expand Up @@ -751,6 +752,7 @@ export class Snapshot extends EventEmitter {
maxResumeRetries,
columnsMetadata,
gaxOptions,
span,
}
)
?.on('response', response => {
Expand Down Expand Up @@ -789,6 +791,7 @@ export class Snapshot extends EventEmitter {
});
}

this.resultStream = resultStream;
return resultStream;
});
}
Expand Down Expand Up @@ -1332,6 +1335,7 @@ export class Snapshot extends EventEmitter {
maxResumeRetries,
columnsMetadata,
gaxOptions,
span,
}
)
.on('response', response => {
Expand Down Expand Up @@ -1371,6 +1375,7 @@ export class Snapshot extends EventEmitter {
});
}

this.resultStream = resultStream;
return resultStream;
});
}
Expand Down
4 changes: 2 additions & 2 deletions test/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ describe('Transaction', () => {
assert.strictEqual(reqOpts.jsonOptions, undefined);
assert.strictEqual(reqOpts.maxResumeRetries, undefined);

const options = PARTIAL_RESULT_STREAM.lastCall.args[1];
const {span, ...options} = PARTIAL_RESULT_STREAM.lastCall.args[1];

assert.deepStrictEqual(options, fakeOptions);
});
Expand Down Expand Up @@ -791,7 +791,7 @@ describe('Transaction', () => {
assert.strictEqual(reqOpts.jsonOptions, undefined);
assert.strictEqual(reqOpts.maxResumeRetries, undefined);

const options = PARTIAL_RESULT_STREAM.lastCall.args[1];
const {span, ...options} = PARTIAL_RESULT_STREAM.lastCall.args[1];

assert.deepStrictEqual(options, expectedOptions);
});
Expand Down

0 comments on commit f5d21a3

Please sign in to comment.