Skip to content

Commit

Permalink
test: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
alkatrivedi committed Dec 21, 2024
1 parent bfb68aa commit e24245d
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 114 deletions.
2 changes: 2 additions & 0 deletions observability-test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import {Instance, MutationGroup, Spanner} from '../src';
import * as pfy from '@google-cloud/promisify';
import {grpc} from 'google-gax';
import {MockError} from '../test/mockserver/mockspanner';
import {FakeSessionFactory} from '../test/database';
const {generateWithAllSpansHaveDBName} = require('./helper');

const fakePfy = extend({}, pfy, {
Expand Down Expand Up @@ -234,6 +235,7 @@ describe('Database', () => {
'./codec': {codec: fakeCodec},
'./partial-result-stream': {partialResultStream: fakePartialResultStream},
'./session-pool': {SessionPool: FakeSessionPool},
'./session-factory': {SessionFactory: FakeSessionFactory},
'./session': {Session: FakeSession},
'./table': {Table: FakeTable},
'./transaction-runner': {
Expand Down
12 changes: 7 additions & 5 deletions observability-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ describe('ObservabilityOptions injection and propagation', async () => {
db.formattedName_
);

it('run', () => {
it('run', done => {
database.getTransaction((err, tx) => {
assert.ifError(err);

Expand Down Expand Up @@ -549,6 +549,8 @@ describe('ObservabilityOptions injection and propagation', async () => {
true,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);

done();
});
});
});
Expand Down Expand Up @@ -608,16 +610,14 @@ describe('ObservabilityOptions injection and propagation', async () => {
});
});

it('runStream', () => {
it('runStream', done => {
let rowCount = 0;
database.getTransaction((err, tx) => {
assert.ifError(err);
tx!
.runStream(selectSql)
.on('data', () => rowCount++)
.on('error', () => {
assert.ifError;
})
.on('error', assert.ifError)
.on('stats', () => {})
.on('end', async () => {
tx!.end();
Expand Down Expand Up @@ -657,6 +657,8 @@ describe('ObservabilityOptions injection and propagation', async () => {
expectedEventNames,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);

done();
});
});
});
Expand Down
7 changes: 4 additions & 3 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
} from 'google-gax';
import {Backup} from './backup';
import {BatchTransaction, TransactionIdentifier} from './batch-transaction';
import {SessionFactory, SessionFactoryInterface} from './session-factory';
import {
google as databaseAdmin,
google,
Expand Down Expand Up @@ -102,7 +103,6 @@ import Policy = google.iam.v1.Policy;
import FieldMask = google.protobuf.FieldMask;
import IDatabase = google.spanner.admin.database.v1.IDatabase;
import snakeCase = require('lodash.snakecase');
import {SessionFactory, SessionFactoryInterface} from './session-factory';
import {
ObservabilityOptions,
Span,
Expand All @@ -112,7 +112,7 @@ import {
setSpanErrorAndException,
traceConfig,
} from './instrument';

import {MultiplexedSessionInterface} from './multiplexed-session';
export type GetDatabaseRolesCallback = RequestCallback<
IDatabaseRole,
databaseAdmin.spanner.admin.database.v1.IListDatabaseRolesResponse
Expand Down Expand Up @@ -341,6 +341,7 @@ class Database extends common.GrpcServiceObject {
formattedName_: string;
pool_: SessionPoolInterface;
sessionFactory_: SessionFactoryInterface;
multiplexedSession_: MultiplexedSessionInterface | undefined;
queryOptions_?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions;
resourceHeader_: {[k: string]: string};
request: DatabaseRequest;
Expand Down Expand Up @@ -458,6 +459,7 @@ class Database extends common.GrpcServiceObject {
}
this.formattedName_ = formattedName_;
this.instance = instance;
this._observabilityOptions = instance._observabilityOptions;
this._traceConfig = {
opts: this._observabilityOptions,
dbName: this.formattedName_,
Expand All @@ -472,7 +474,6 @@ class Database extends common.GrpcServiceObject {
this.requestStream = instance.requestStream as any;
this.sessionFactory_ = new SessionFactory(this, name, poolOptions);
this.pool_ = this.sessionFactory_.getPool();
this.multiplexedSession_ = this.sessionFactory_.getMultiplexedSession();
const sessionPoolInstance = this.pool_ as SessionPool;
if (sessionPoolInstance) {
sessionPoolInstance._observabilityOptions =
Expand Down
33 changes: 30 additions & 3 deletions src/session-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import {
import {SessionPoolConstructor} from './database';
import {ServiceObjectConfig} from '@google-cloud/common';
const common = require('./common-grpc/service-object');

/**
* @callback GetSessionCallback
* @param {?Error} error Request error, if any.
Expand Down Expand Up @@ -68,15 +67,28 @@ export class SessionFactory
? new (poolOptions as SessionPoolConstructor)(database, null)
: new SessionPool(database, poolOptions);
this.multiplexedSession_ = new MultiplexedSession(database);
this.pool_.on('error', this.emit.bind(this, 'error'));
this.pool_.on('error', this.emit.bind(database, 'error'));

// create session pool
this.pool_.open();
// multiplexed session should only get created if the env varaible is enabled
if (process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true') {
this.multiplexedSession_.on('error', this.emit.bind(this, 'error'));
this.multiplexedSession_.on('error', this.emit.bind(database, 'error'));

// create multiplexed session
this.multiplexedSession_.createSession();
}
}

/**
* Retrieves the session, either the regular session or the multiplexed session based upon the environment varibale
* If the environment variable GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS is set to `true` the method will attempt to
* retrieve the multiplexed session. Otherwise it will retrieve the session from the pool.
*
* The session is returned asynchronously via the provided callback, which will receive either an error or the session object.
* @param callback
*/

getSession(callback: GetSessionCallback): void {
if (process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true') {
this.multiplexedSession_?.getSession((err, session) => {
Expand All @@ -89,10 +101,25 @@ export class SessionFactory
}
}

/**
* Returns the SessionPoolInterface used by the current instance, which provide access to the session pool
* for obtaining database sessions.
*
* @returns {SessionPoolInterface} The session pool used by current instance.
* This object allows interaction with the pool for acquiring and managing sessions.
*/

getPool(): SessionPoolInterface {
return this.pool_;
}

/**
* Returns the MultiplexedSession used bt the current instance.
*
* @returns {MultiplexedSessionInterface | undefined} The multiplexed session used by current instance.
* The object allows interaction with the multiplexed session.
*/

getMultiplexedSession(): MultiplexedSessionInterface | undefined {
return this.multiplexedSession_;
}
Expand Down
3 changes: 1 addition & 2 deletions src/session-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import {Transaction} from './transaction';
import {NormalCallback} from './common';
import {GoogleError, grpc, ServiceError} from 'google-gax';
import trace = require('stack-trace');
import {GetSessionCallback} from './session-factory';
import {
ObservabilityOptions,
getActiveOrNoopSpan,
setSpanErrorAndException,
startTrace,
} from './instrument';

import {GetSessionCallback} from './session-factory';
import {
isDatabaseNotFoundError,
isInstanceNotFoundError,
Expand Down
70 changes: 51 additions & 19 deletions test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import {
CommitOptions,
MutationSet,
} from '../src/transaction';

import {SessionFactory} from '../src/session-factory';
let promisified = false;
const fakePfy = extend({}, pfy, {
promisifyAll(klass, options) {
Expand Down Expand Up @@ -91,7 +91,7 @@ function fakePartialResultStream(this: Function & {calledWith_: IArguments}) {
return this;
}

class FakeSession {
export class FakeSession {
calledWith_: IArguments;
formattedName_: any;
constructor() {
Expand All @@ -109,42 +109,46 @@ class FakeSession {
}
}

export class FakeSessionFactory extends EventEmitter {
export class FakeSessionPool extends EventEmitter {
calledWith_: IArguments;
constructor() {
super();
this.calledWith_ = arguments;
}
getSession(): FakeSession {
return new FakeSession();
}
getPool(): FakeSessionPool {
return new FakeSessionPool();
}
getMultiplexedSession(): FakeMultiplexedSession {
return new FakeMultiplexedSession();
}
open() {}
getSession() {}
release() {}
}

export class FakeSessionPool extends EventEmitter {
export class FakeMultiplexedSession extends EventEmitter {
calledWith_: IArguments;
constructor() {
super();
this.calledWith_ = arguments;
}
open() {}
createSession() {}
getSession() {}
release() {}
}

export class FakeMultiplexedSession extends EventEmitter {
export class FakeSessionFactory extends EventEmitter {
calledWith_: IArguments;
constructor() {
super();
this.calledWith_ = arguments;
}
createSession() {}
getSession() {}
getSession(): FakeSession | FakeMultiplexedSession {
if (process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS) {
return new FakeSession();
} else {
return new FakeMultiplexedSession();
}
}
getPool(): FakeSessionPool {
return new FakeSessionPool();
}
getMultiplexedSession(): FakeMultiplexedSession {
return new FakeMultiplexedSession();
}
}

class FakeTable {
Expand Down Expand Up @@ -269,8 +273,9 @@ describe('Database', () => {
'./batch-transaction': {BatchTransaction: FakeBatchTransaction},
'./codec': {codec: fakeCodec},
'./partial-result-stream': {partialResultStream: fakePartialResultStream},
'./session': {Session: FakeSession},
'./session-pool': {SessionPool: FakeSessionPool},
'./session-factory': {SessionFactory: FakeSessionFactory},
'./session': {Session: FakeSession},
'./table': {Table: FakeTable},
'./transaction-runner': {
TransactionRunner: FakeTransactionRunner,
Expand Down Expand Up @@ -322,6 +327,33 @@ describe('Database', () => {
assert(database.formattedName_, formattedName);
});

it('should re-emit SessionPool errors', done => {
const error = new Error('err');

const sessionFactory = new SessionFactory(database, NAME);

database.on('error', err => {
assert.strictEqual(err, error);
done();
});

sessionFactory.pool_.emit('error', error);
});

it('should re-emit Multiplexed Session errors', done => {
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true';
const error = new Error('err');

const sessionFactory = new SessionFactory(database, NAME);

database.on('error', err => {
assert.strictEqual(err, error);
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'false';
done();
});
sessionFactory.multiplexedSession_?.emit('error', error);
});

it('should inherit from ServiceObject', done => {
const options = {};

Expand Down
Loading

0 comments on commit e24245d

Please sign in to comment.