diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index c60549776..9130e2989 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -505,7 +505,7 @@ describe('ObservabilityOptions injection and propagation', async () => { db.formattedName_ ); - it('run', done => { + it('run', () => { database.getTransaction((err, tx) => { assert.ifError(err); @@ -549,8 +549,6 @@ describe('ObservabilityOptions injection and propagation', async () => { true, `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` ); - - done(); }); }); }); @@ -610,14 +608,14 @@ describe('ObservabilityOptions injection and propagation', async () => { }); }); - it('runStream', done => { + it('runStream', () => { let rowCount = 0; database.getTransaction((err, tx) => { assert.ifError(err); tx! .runStream(selectSql) .on('data', () => rowCount++) - .on('error', assert.ifError) + .on('error', () => {assert.ifError}) .on('stats', () => {}) .on('end', async () => { tx!.end(); @@ -657,8 +655,6 @@ describe('ObservabilityOptions injection and propagation', async () => { expectedEventNames, `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` ); - - done(); }); }); }); diff --git a/src/database.ts b/src/database.ts index e8b91480f..5fdf288a8 100644 --- a/src/database.ts +++ b/src/database.ts @@ -102,6 +102,7 @@ import Policy = google.iam.v1.Policy; import FieldMask = google.protobuf.FieldMask; import IDatabase = google.spanner.admin.database.v1.IDatabase; import snakeCase = require('lodash.snakecase'); +import {SessionFactory, SessionFactoryInterface} from './session-factory'; import { ObservabilityOptions, Span, @@ -339,6 +340,7 @@ class Database extends common.GrpcServiceObject { private instance: Instance; formattedName_: string; pool_: SessionPoolInterface; + sessionFactory_: SessionFactoryInterface; queryOptions_?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions; resourceHeader_: {[k: string]: string}; request: DatabaseRequest; @@ -450,22 +452,12 @@ class Database extends common.GrpcServiceObject { }, } as {} as ServiceObjectConfig); - this.pool_ = - typeof poolOptions === 'function' - ? new (poolOptions as SessionPoolConstructor)(this, null) - : new SessionPool(this, poolOptions); - const sessionPoolInstance = this.pool_ as SessionPool; - if (sessionPoolInstance) { - sessionPoolInstance._observabilityOptions = - instance._observabilityOptions; - } if (typeof poolOptions === 'object') { this.databaseRole = poolOptions.databaseRole || null; this.labels = poolOptions.labels || null; } this.formattedName_ = formattedName_; this.instance = instance; - this._observabilityOptions = instance._observabilityOptions; this._traceConfig = { opts: this._observabilityOptions, dbName: this.formattedName_, @@ -478,8 +470,14 @@ class Database extends common.GrpcServiceObject { this._observabilityOptions = instance._observabilityOptions; // eslint-disable-next-line @typescript-eslint/no-explicit-any this.requestStream = instance.requestStream as any; - this.pool_.on('error', this.emit.bind(this, 'error')); - this.pool_.open(); + this.sessionFactory_ = new SessionFactory(this, name, poolOptions); + this.pool_ = this.sessionFactory_.getPool(); + this.multiplexedSession_ = this.sessionFactory_.getMultiplexedSession(); + const sessionPoolInstance = this.pool_ as SessionPool; + if (sessionPoolInstance) { + sessionPoolInstance._observabilityOptions = + instance._observabilityOptions; + } this.queryOptions_ = Object.assign( Object.assign({}, queryOptions), Database.getEnvironmentQueryOptions() diff --git a/src/multiplexed-session.ts b/src/multiplexed-session.ts index 6b0deb71b..4255fcba0 100644 --- a/src/multiplexed-session.ts +++ b/src/multiplexed-session.ts @@ -17,7 +17,7 @@ import {EventEmitter} from 'events'; import {Database} from './database'; import {Session} from './session'; -import {GetSessionCallback} from './session-pool'; +import {GetSessionCallback} from './session-factory'; import { ObservabilityOptions, getActiveOrNoopSpan, @@ -38,7 +38,7 @@ export const MUX_SESSION_CREATE_ERROR = 'mux-session-create-error'; * @constructs MultiplexedSessionInterface * @param {Database} database The database to create a multiplexed session for. */ -export interface MultiplexedSessionInterface { +export interface MultiplexedSessionInterface extends EventEmitter { /** * When called creates a multiplexed session. * diff --git a/src/session-factory.ts b/src/session-factory.ts new file mode 100644 index 000000000..899aa8625 --- /dev/null +++ b/src/session-factory.ts @@ -0,0 +1,99 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {Database, Session, Transaction} from '.'; +import { + MultiplexedSession, + MultiplexedSessionInterface, +} from './multiplexed-session'; +import { + SessionPool, + SessionPoolInterface, + SessionPoolOptions, +} from './session-pool'; +import {SessionPoolConstructor} from './database'; +import {ServiceObjectConfig} from '@google-cloud/common'; +const common = require('./common-grpc/service-object'); + +/** + * @callback GetSessionCallback + * @param {?Error} error Request error, if any. + * @param {Session} session The read-write session. + * @param {Transaction} transaction The transaction object. + */ +export interface GetSessionCallback { + ( + err: Error | null, + session?: Session | null, + transaction?: Transaction | null + ): void; +} + +export interface SessionFactoryInterface { + getSession(callback: GetSessionCallback): void; + getPool(): SessionPoolInterface; + getMultiplexedSession(): MultiplexedSessionInterface | undefined; +} + +export class SessionFactory + extends common.GrpcServiceObject + implements SessionFactoryInterface +{ + multiplexedSession_?: MultiplexedSessionInterface; + pool_: SessionPoolInterface; + constructor( + database: Database, + name: String, + poolOptions?: SessionPoolConstructor | SessionPoolOptions + ) { + super({ + parent: database, + id: name, + } as {} as ServiceObjectConfig); + this.pool_ = + typeof poolOptions === 'function' + ? new (poolOptions as SessionPoolConstructor)(database, null) + : new SessionPool(database, poolOptions); + this.multiplexedSession_ = new MultiplexedSession(database); + this.pool_.on('error', this.emit.bind(this, 'error')); + this.pool_.open(); + // multiplexed session should only get created if the env varaible is enabled + if (process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true') { + this.multiplexedSession_.on('error', this.emit.bind(this, 'error')); + this.multiplexedSession_.createSession(); + } + } + + getSession(callback: GetSessionCallback): void { + if (process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true') { + this.multiplexedSession_?.getSession((err, session) => { + err ? callback(err, null) : callback(null, session); + }); + } else { + this.pool_?.getSession((err, session) => { + err ? callback(err, null) : callback(null, session); + }); + } + } + + getPool(): SessionPoolInterface { + return this.pool_; + } + + getMultiplexedSession(): MultiplexedSessionInterface | undefined { + return this.multiplexedSession_; + } +} diff --git a/src/session-pool.ts b/src/session-pool.ts index 9b75cdb9e..daa6e50f1 100644 --- a/src/session-pool.ts +++ b/src/session-pool.ts @@ -24,6 +24,7 @@ import {Transaction} from './transaction'; import {NormalCallback} from './common'; import {GoogleError, grpc, ServiceError} from 'google-gax'; import trace = require('stack-trace'); +import {GetSessionCallback} from './session-factory'; import { ObservabilityOptions, getActiveOrNoopSpan, @@ -59,20 +60,6 @@ export interface GetWriteSessionCallback { ): void; } -/** - * @callback GetSessionCallback - * @param {?Error} error Request error, if any. - * @param {Session} session The read-write session. - * @param {Transaction} transaction The transaction object. - */ -export interface GetSessionCallback { - ( - err: Error | null, - session?: Session | null, - transaction?: Transaction | null - ): void; -} - /** * Interface for implementing custom session pooling logic, it should extend the * {@link https://nodejs.org/api/events.html|EventEmitter} class and emit any diff --git a/test/database.ts b/test/database.ts index 684b0c1d3..63aabf74b 100644 --- a/test/database.ts +++ b/test/database.ts @@ -78,7 +78,7 @@ class FakeBatchTransaction { } } -class FakeGrpcServiceObject extends EventEmitter { +export class FakeGrpcServiceObject extends EventEmitter { calledWith_: IArguments; constructor() { super(); @@ -109,7 +109,24 @@ class FakeSession { } } -class FakeSessionPool extends EventEmitter { +export class FakeSessionFactory extends EventEmitter { + calledWith_: IArguments; + constructor() { + super(); + this.calledWith_ = arguments; + } + getSession(): FakeSession { + return new FakeSession(); + } + getPool(): FakeSessionPool { + return new FakeSessionPool(); + } + getMultiplexedSession(): FakeMultiplexedSession { + return new FakeMultiplexedSession(); + } +} + +export class FakeSessionPool extends EventEmitter { calledWith_: IArguments; constructor() { super(); @@ -120,6 +137,16 @@ class FakeSessionPool extends EventEmitter { release() {} } +export class FakeMultiplexedSession extends EventEmitter { + calledWith_: IArguments; + constructor() { + super(); + this.calledWith_ = arguments; + } + createSession() {} + getSession() {} +} + class FakeTable { calledWith_: IArguments; constructor() { @@ -242,8 +269,8 @@ describe('Database', () => { './batch-transaction': {BatchTransaction: FakeBatchTransaction}, './codec': {codec: fakeCodec}, './partial-result-stream': {partialResultStream: fakePartialResultStream}, - './session-pool': {SessionPool: FakeSessionPool}, './session': {Session: FakeSession}, + './session-factory': {SessionFactory: FakeSessionFactory}, './table': {Table: FakeTable}, './transaction-runner': { TransactionRunner: FakeTransactionRunner, @@ -295,45 +322,6 @@ describe('Database', () => { assert(database.formattedName_, formattedName); }); - it('should create a SessionPool object', () => { - assert(database.pool_ instanceof FakeSessionPool); - assert.strictEqual(database.pool_.calledWith_[0], database); - assert.strictEqual(database.pool_.calledWith_[1], POOL_OPTIONS); - }); - - it('should accept a custom Pool class', () => { - function FakePool() {} - FakePool.prototype.on = util.noop; - FakePool.prototype.open = util.noop; - - const database = new Database( - INSTANCE, - NAME, - FakePool as {} as db.SessionPoolConstructor - ); - assert(database.pool_ instanceof FakePool); - }); - - it('should re-emit SessionPool errors', done => { - const error = new Error('err'); - - database.on('error', err => { - assert.strictEqual(err, error); - done(); - }); - - database.pool_.emit('error', error); - }); - - it('should open the pool', done => { - FakeSessionPool.prototype.open = () => { - FakeSessionPool.prototype.open = util.noop; - done(); - }; - - new Database(INSTANCE, NAME); - }); - it('should inherit from ServiceObject', done => { const options = {}; diff --git a/test/session-factory.ts b/test/session-factory.ts new file mode 100644 index 000000000..96d974da8 --- /dev/null +++ b/test/session-factory.ts @@ -0,0 +1,209 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {Database, Session, SessionPool} from '../src'; +import {SessionFactory} from '../src/session-factory'; +import * as sinon from 'sinon'; +import * as assert from 'assert'; +import {MultiplexedSession} from '../src/multiplexed-session'; +import {util} from '@google-cloud/common'; +import * as db from '../src/database'; +import {FakeTransaction} from './session-pool'; + +describe('SessionFactory', () => { + let sessionFactory; + const sandbox = sinon.createSandbox(); + const NAME = 'table-name'; + const POOL_OPTIONS = {}; + function noop() {} + const DATABASE = { + createSession: noop, + databaseRole: 'parent_role', + } as unknown as Database; + + const createMuxSession = (name = 'id', props?): Session => { + props = props || {multiplexed: true}; + + return Object.assign(new Session(DATABASE, name), props, { + create: sandbox.stub().resolves(), + transaction: sandbox.stub().returns(new FakeTransaction()), + }); + }; + + const createSession = (name = 'id', props?): Session => { + props = props || {}; + + return Object.assign(new Session(DATABASE, name), props, { + create: sandbox.stub().resolves(), + transaction: sandbox.stub().returns(new FakeTransaction()), + }); + }; + + beforeEach(() => { + sessionFactory = new SessionFactory(DATABASE, NAME, POOL_OPTIONS); + sessionFactory.parent = DATABASE; + }); + + afterEach(() => { + sandbox.restore(); + process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'false'; + }); + + describe('instantiation', () => { + it('should create a SessionPool object', () => { + assert(sessionFactory.pool_ instanceof SessionPool); + }); + + it('should create a MultiplexedSession object', () => { + assert(sessionFactory.multiplexedSession_ instanceof MultiplexedSession); + }); + + it('should accept a custom Pool class', () => { + function FakePool() {} + FakePool.prototype.on = util.noop; + FakePool.prototype.open = util.noop; + + const getSession = new SessionFactory( + DATABASE, + NAME, + FakePool as {} as db.SessionPoolConstructor + ); + assert(getSession.pool_ instanceof FakePool); + }); + + it('should re-emit SessionPool errors', done => { + const error = new Error('err'); + + sessionFactory.on('error', err => { + assert.strictEqual(err, error); + done(); + }); + + sessionFactory.pool_.emit('error', error); + }); + + it('should open the pool', () => { + const openStub = sandbox + .stub(SessionPool.prototype, 'open') + .callsFake(() => {}); + + new SessionFactory(DATABASE, NAME, POOL_OPTIONS); + + assert.strictEqual(openStub.callCount, 1); + }); + + it('should re-emit MultiplexedSession errors', done => { + process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true'; + const error = new Error('err'); + + sessionFactory.on('error', err => { + assert.strictEqual(err, error); + done(); + }); + + sessionFactory.pool_.emit('error', error); + }); + + it('should initiate the multiplexed session creation if the env is enabled', () => { + process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true'; + const createSessionStub = sandbox + .stub(MultiplexedSession.prototype, 'createSession') + .callsFake(() => {}); + + new SessionFactory(DATABASE, NAME, POOL_OPTIONS); + + assert.strictEqual(createSessionStub.callCount, 1); + }); + }); + + describe('getSession', () => { + let multiplexedSession; + let fakeMuxSession; + let sessionPool; + let fakeSession; + + beforeEach(() => { + multiplexedSession = new MultiplexedSession(DATABASE); + fakeMuxSession = createMuxSession(); + sessionPool = new SessionPool(DATABASE, POOL_OPTIONS); + fakeSession = createSession(); + }); + + afterEach(() => { + process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'false'; + }); + + it('should return the multiplexed session if GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS env is enabled', () => { + process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true'; + ( + sandbox.stub(multiplexedSession, 'getSession') as sinon.SinonStub + ).callsFake(callback => callback(null, fakeMuxSession)); + sessionFactory.getSession((err, resp) => { + assert.strictEqual(err, null); + assert.strictEqual(resp, fakeMuxSession); + assert.strictEqual(resp.multiplexed, true); + assert.strictEqual(fakeMuxSession.multiplexed, true); + }); + }); + + it('should return the err for getSession if GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS env is enabled', () => { + process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true'; + const fakeError = new Error(); + ( + sandbox.stub(multiplexedSession, 'getSession') as sinon.SinonStub + ).callsFake(callback => callback(fakeError, null)); + sessionFactory.getSession((err, resp) => { + assert.strictEqual(err, fakeError); + assert.strictEqual(resp, null); + }); + }); + + it('should return the multiplexed session if GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS env is disabled', () => { + (sandbox.stub(sessionPool, 'getSession') as sinon.SinonStub).callsFake( + callback => callback(null, fakeSession) + ); + sessionFactory.getSession((err, resp) => { + assert.strictEqual(err, null); + assert.strictEqual(resp, fakeSession); + }); + }); + + it('should return the err for getSession if GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS env is disabled', () => { + const fakeError = new Error(); + (sandbox.stub(sessionPool, 'getSession') as sinon.SinonStub).callsFake( + callback => callback(fakeError, null) + ); + sessionFactory.getSession((err, resp) => { + assert.strictEqual(err, fakeError); + assert.strictEqual(resp, null); + }); + }); + }); + + describe('getPool', () => { + it('should return an instance of SessionPool', () => { + const sessionFactory = new SessionFactory(DATABASE, NAME, POOL_OPTIONS); + assert(sessionFactory.getPool() instanceof SessionPool); + }); + }); + + describe('getMultiplexedSession', () => { + it('should return an instance of MultiplexedSession', () => { + const sessionFactory = new SessionFactory(DATABASE, NAME, POOL_OPTIONS); + assert(sessionFactory.getMultiplexedSession() instanceof MultiplexedSession); + }); + }); +});