Skip to content

Commit

Permalink
feat: support for common interface to get a session
Browse files Browse the repository at this point in the history
  • Loading branch information
alkatrivedi committed Dec 16, 2024
1 parent 92248c1 commit 421a5a1
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 78 deletions.
10 changes: 3 additions & 7 deletions observability-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ describe('ObservabilityOptions injection and propagation', async () => {
db.formattedName_
);

it('run', done => {
it('run', () => {
database.getTransaction((err, tx) => {
assert.ifError(err);

Expand Down Expand Up @@ -549,8 +549,6 @@ describe('ObservabilityOptions injection and propagation', async () => {
true,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);

done();
});
});
});
Expand Down Expand Up @@ -610,14 +608,14 @@ describe('ObservabilityOptions injection and propagation', async () => {
});
});

it('runStream', done => {
it('runStream', () => {
let rowCount = 0;
database.getTransaction((err, tx) => {
assert.ifError(err);
tx!
.runStream(selectSql)
.on('data', () => rowCount++)
.on('error', assert.ifError)
.on('error', () => {assert.ifError})

Check failure on line 618 in observability-test/spanner.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `assert.ifError` with `⏎············assert.ifError;⏎··········`
.on('stats', () => {})
.on('end', async () => {
tx!.end();
Expand Down Expand Up @@ -657,8 +655,6 @@ describe('ObservabilityOptions injection and propagation', async () => {
expectedEventNames,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);

done();
});
});
});
Expand Down
23 changes: 10 additions & 13 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ import Policy = google.iam.v1.Policy;
import FieldMask = google.protobuf.FieldMask;
import IDatabase = google.spanner.admin.database.v1.IDatabase;
import snakeCase = require('lodash.snakecase');
import {SessionFactory, SessionFactoryInterface} from './session-factory';
import {
ObservabilityOptions,
Span,
Expand Down Expand Up @@ -338,7 +339,7 @@ export interface RestoreOptions {
class Database extends common.GrpcServiceObject {
private instance: Instance;
formattedName_: string;
pool_: SessionPoolInterface;
sessionFactory_: SessionFactoryInterface;
queryOptions_?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions;
resourceHeader_: {[k: string]: string};
request: DatabaseRequest;
Expand Down Expand Up @@ -450,22 +451,12 @@ class Database extends common.GrpcServiceObject {
},
} as {} as ServiceObjectConfig);

this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(this, null)
: new SessionPool(this, poolOptions);
const sessionPoolInstance = this.pool_ as SessionPool;
if (sessionPoolInstance) {
sessionPoolInstance._observabilityOptions =
instance._observabilityOptions;
}
if (typeof poolOptions === 'object') {
this.databaseRole = poolOptions.databaseRole || null;
this.labels = poolOptions.labels || null;
}
this.formattedName_ = formattedName_;
this.instance = instance;
this._observabilityOptions = instance._observabilityOptions;
this._traceConfig = {
opts: this._observabilityOptions,
dbName: this.formattedName_,
Expand All @@ -478,8 +469,14 @@ class Database extends common.GrpcServiceObject {
this._observabilityOptions = instance._observabilityOptions;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
this.requestStream = instance.requestStream as any;
this.pool_.on('error', this.emit.bind(this, 'error'));
this.pool_.open();
this.sessionFactory_ = new SessionFactory(this, name, poolOptions);
this.pool_ = this.sessionFactory_.getPool();
this.multiplexedSession_ = this.sessionFactory_.getMultiplexedSession();
const sessionPoolInstance = this.pool_ as SessionPool;
if (sessionPoolInstance) {
sessionPoolInstance._observabilityOptions =
instance._observabilityOptions;
}
this.queryOptions_ = Object.assign(
Object.assign({}, queryOptions),
Database.getEnvironmentQueryOptions()
Expand Down
4 changes: 2 additions & 2 deletions src/multiplexed-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import {EventEmitter} from 'events';
import {Database} from './database';
import {Session} from './session';
import {GetSessionCallback} from './session-pool';
import {GetSessionCallback} from './session-factory';
import {
ObservabilityOptions,
getActiveOrNoopSpan,
Expand All @@ -38,7 +38,7 @@ export const MUX_SESSION_CREATE_ERROR = 'mux-session-create-error';
* @constructs MultiplexedSessionInterface
* @param {Database} database The database to create a multiplexed session for.
*/
export interface MultiplexedSessionInterface {
export interface MultiplexedSessionInterface extends EventEmitter {
/**
* When called creates a multiplexed session.
*
Expand Down
99 changes: 99 additions & 0 deletions src/session-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*!
* Copyright 2024 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {Database, Session, Transaction} from '.';
import {
MultiplexedSession,
MultiplexedSessionInterface,
} from './multiplexed-session';
import {
SessionPool,
SessionPoolInterface,
SessionPoolOptions,
} from './session-pool';
import {SessionPoolConstructor} from './database';
import {ServiceObjectConfig} from '@google-cloud/common';
const common = require('./common-grpc/service-object');

/**
* @callback GetSessionCallback
* @param {?Error} error Request error, if any.
* @param {Session} session The read-write session.
* @param {Transaction} transaction The transaction object.
*/
export interface GetSessionCallback {
(
err: Error | null,
session?: Session | null,
transaction?: Transaction | null
): void;
}

export interface SessionFactoryInterface {
getSession(callback: GetSessionCallback): void;
getPool(): SessionPoolInterface;
getMultiplexedSession(): MultiplexedSessionInterface | undefined;
}

export class SessionFactory
extends common.GrpcServiceObject
implements SessionFactoryInterface
{
multiplexedSession_?: MultiplexedSessionInterface;
pool_: SessionPoolInterface;
constructor(
database: Database,
name: String,
poolOptions?: SessionPoolConstructor | SessionPoolOptions
) {
super({
parent: database,
id: name,
} as {} as ServiceObjectConfig);
this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(database, null)
: new SessionPool(database, poolOptions);
this.multiplexedSession_ = new MultiplexedSession(database);
this.pool_.on('error', this.emit.bind(this, 'error'));
this.pool_.open();
// multiplexed session should only get created if the env varaible is enabled
if (process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true') {
this.multiplexedSession_.on('error', this.emit.bind(this, 'error'));
this.multiplexedSession_.createSession();
}
}

getSession(callback: GetSessionCallback): void {
if (process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true') {
this.multiplexedSession_?.getSession((err, session) => {
err ? callback(err, null) : callback(null, session);
});
} else {
this.pool_?.getSession((err, session) => {
err ? callback(err, null) : callback(null, session);
});
}
}

getPool(): SessionPoolInterface {
return this.pool_;
}

getMultiplexedSession(): MultiplexedSessionInterface | undefined {
return this.multiplexedSession_;
}
}
15 changes: 1 addition & 14 deletions src/session-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {Transaction} from './transaction';
import {NormalCallback} from './common';
import {GoogleError, grpc, ServiceError} from 'google-gax';
import trace = require('stack-trace');
import {GetSessionCallback} from './session-factory';
import {
ObservabilityOptions,
getActiveOrNoopSpan,
Expand Down Expand Up @@ -59,20 +60,6 @@ export interface GetWriteSessionCallback {
): void;
}

/**
* @callback GetSessionCallback
* @param {?Error} error Request error, if any.
* @param {Session} session The read-write session.
* @param {Transaction} transaction The transaction object.
*/
export interface GetSessionCallback {
(
err: Error | null,
session?: Session | null,
transaction?: Transaction | null
): void;
}

/**
* Interface for implementing custom session pooling logic, it should extend the
* {@link https://nodejs.org/api/events.html|EventEmitter} class and emit any
Expand Down
72 changes: 30 additions & 42 deletions test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class FakeBatchTransaction {
}
}

class FakeGrpcServiceObject extends EventEmitter {
export class FakeGrpcServiceObject extends EventEmitter {
calledWith_: IArguments;
constructor() {
super();
Expand Down Expand Up @@ -109,7 +109,24 @@ class FakeSession {
}
}

class FakeSessionPool extends EventEmitter {
export class FakeSessionFactory extends EventEmitter {
calledWith_: IArguments;
constructor() {
super();
this.calledWith_ = arguments;
}
getSession(): FakeSession {
return new FakeSession();
}
getPool(): FakeSessionPool {
return new FakeSessionPool();
}
getMultiplexedSession(): FakeMultiplexedSession {
return new FakeMultiplexedSession();
}
}

export class FakeSessionPool extends EventEmitter {
calledWith_: IArguments;
constructor() {
super();
Expand All @@ -120,6 +137,16 @@ class FakeSessionPool extends EventEmitter {
release() {}
}

export class FakeMultiplexedSession extends EventEmitter {
calledWith_: IArguments;
constructor() {
super();
this.calledWith_ = arguments;
}
createSession() {}
getSession() {}
}

class FakeTable {
calledWith_: IArguments;
constructor() {
Expand Down Expand Up @@ -242,8 +269,8 @@ describe('Database', () => {
'./batch-transaction': {BatchTransaction: FakeBatchTransaction},
'./codec': {codec: fakeCodec},
'./partial-result-stream': {partialResultStream: fakePartialResultStream},
'./session-pool': {SessionPool: FakeSessionPool},
'./session': {Session: FakeSession},
'./session-factory': {SessionFactory: FakeSessionFactory},
'./table': {Table: FakeTable},
'./transaction-runner': {
TransactionRunner: FakeTransactionRunner,
Expand Down Expand Up @@ -295,45 +322,6 @@ describe('Database', () => {
assert(database.formattedName_, formattedName);
});

it('should create a SessionPool object', () => {
assert(database.pool_ instanceof FakeSessionPool);
assert.strictEqual(database.pool_.calledWith_[0], database);
assert.strictEqual(database.pool_.calledWith_[1], POOL_OPTIONS);
});

it('should accept a custom Pool class', () => {
function FakePool() {}
FakePool.prototype.on = util.noop;
FakePool.prototype.open = util.noop;

const database = new Database(
INSTANCE,
NAME,
FakePool as {} as db.SessionPoolConstructor
);
assert(database.pool_ instanceof FakePool);
});

it('should re-emit SessionPool errors', done => {
const error = new Error('err');

database.on('error', err => {
assert.strictEqual(err, error);
done();
});

database.pool_.emit('error', error);
});

it('should open the pool', done => {
FakeSessionPool.prototype.open = () => {
FakeSessionPool.prototype.open = util.noop;
done();
};

new Database(INSTANCE, NAME);
});

it('should inherit from ServiceObject', done => {
const options = {};

Expand Down
Loading

0 comments on commit 421a5a1

Please sign in to comment.