Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Events With Await Detector #104

Merged
merged 18 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ jobs:
fail-fast: false
matrix:
meteorRelease:
- 3.0-alpha.16
# Meteor 3 Alpha
- '--release 3.0-alpha.16'
steps:
- name: Checkout code
uses: actions/checkout@v2
Expand All @@ -25,12 +26,11 @@ jobs:
- name: Install Dependencies
run: |
curl https://install.meteor.com | /bin/sh
npm i -g @zodern/mtest

- name: Run Tests
run: |
# Fix using old versions of Meteor
export NODE_TLS_REJECT_UNAUTHORIZED=0
export NODE_OPTIONS="--trace-warnings --trace-uncaught"
export FORCE_COLOR=true
export NODE_TLS_REJECT_UNAUTHORIZED=0

meteor test-packages ./ --raw-logs --exclude-archs="web.browser.legacy,web.cordova" --driver-package=test-in-console --release=${{ matrix.meteorRelease }}
mtest --package ./ --once ${{ matrix.meteorRelease }}
16 changes: 10 additions & 6 deletions lib/async/als.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ import crypto from 'crypto';
export const getId = () => crypto.randomUUID();

export const MontiAsyncStorage = new AsyncLocalStorage();
export const MontiAsyncIgnoreStorage = new AsyncLocalStorage();

const oldBindEnv = Meteor.bindEnvironment;

Meteor.bindEnvironment = function (...args) {
const func = oldBindEnv.apply(this, args);
return function () {
return MontiAsyncIgnoreStorage.run(true, () => func.apply(this, arguments));
leonardoventurini marked this conversation as resolved.
Show resolved Hide resolved
};
};

export const runWithALS = (
fn,
Expand All @@ -29,12 +39,6 @@ export const getStore = () => MontiAsyncStorage.getStore() ?? {};

export const getInfo = () => MontiAsyncStorage.getStore()?.info;

export const setActiveEvent = (event, store = MontiAsyncStorage.getStore()) => {
if (!store) return;

MontiAsyncStorage.enterWith({ ...store, activeEvent: event });
};

export const getActiveEvent = (store = MontiAsyncStorage.getStore()) => store?.activeEvent;

export const debug = label => (...args) => {
Expand Down
67 changes: 28 additions & 39 deletions lib/async/async-hook.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncHooks from 'async_hooks';
import { getActiveEvent, getInfo } from './als';
import { MaxAsyncLevel } from '../constants';
import { getActiveEvent, getInfo, MontiAsyncIgnoreStorage } from './als';
import { EventType, MaxAsyncLevel } from '../constants';
import { Ntp } from '../ntp';
import { stackTrace } from '../utils';
import { pick, stackTrace } from '../utils';
import { AwaitDetector } from '@monti-apm/core/dist/await-detector';


export const AsyncMetrics = {
totalAsyncCount: 0,
Expand All @@ -29,33 +31,13 @@ export const getResource = (asyncId) => {
return resources?.get(asyncId);
};

const captureEnd = (type) => (asyncId) => {
AsyncMetrics.activeAsyncCount--;

const resource = getResource(asyncId);

if (!resource) return;

resource.end = Ntp._now();
resource.duration = resource.end - resource.start;
resource.hooks = { ...resource.hooks, [type]: true };
};

export const AsyncResourceType = {
PROMISE: 'PROMISE',
};
export const awaitDetector = new AwaitDetector({
onAwaitStart (asyncId, triggerAsyncId) {
const info = getInfo();

// @todo Keep nested async events for all events, except `db`.
// @todo Generate metrics for nested events as if they are root when nested in `custom` events.
const hook = asyncHooks.createHook({
init (asyncId, type, triggerAsyncId) {
// We don't want to capture anything other than promise resources.
if (type !== AsyncResourceType.PROMISE) return;
if (!info) return;

AsyncMetrics.totalAsyncCount++;
AsyncMetrics.activeAsyncCount++;

const info = getInfo();
if (MontiAsyncIgnoreStorage.getStore()) return;

const activeEvent = getActiveEvent();

Expand All @@ -71,37 +53,44 @@ const hook = asyncHooks.createHook({

const executionAsyncId = asyncHooks.executionAsyncId();

const executionChanged = trigger?.executionAsyncId !== executionAsyncId;

const res = {
asyncId,
triggerAsyncId,
executionAsyncId,
type,
start: Ntp._now(),
end: null,
startTime: Ntp._now(),
endTime: null,
activeEvent,
executionChanged,
};

if (Kadira.options.enableAsyncStackTraces || process.env.ENABLE_ASYNC_STACK_TRACES) {
res.stack = stackTrace();
}

res.event = Kadira.tracer.event(info.trace, EventType.Async, pick(res, ['asyncId', 'stack', 'asyncId', 'triggerAsyncId', 'executionAsyncId']));
info.resources.set(asyncId, res);
leonardoventurini marked this conversation as resolved.
Show resolved Hide resolved

if (trigger) {
trigger.children = trigger.children || [];
trigger.children.push(asyncId);
}
},
// @todo Perhaps we need to handle rejected promises.
// All promises call `promiseResolve` at the end, except when they are rejected.
promiseResolve: captureEnd('promiseResolve'),
});
onAwaitEnd (asyncId) {
const resource = getResource(asyncId);

if (!resource) return;

resource.end = Ntp._now();
resource.duration = resource.end - resource.start;

const info = getInfo();

if (!info) return;

hook.enable();
// Some events finish after the trace is already processed.
if (info.trace.isEventsProcessed) return;

process.once('beforeExit', function () {
hook.disable();
Kadira.tracer.eventEnd(info.trace, resource.event);
}
});
44 changes: 37 additions & 7 deletions lib/hijack/wrap_session.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { MeteorDebugIgnore } from './error';
import { TimeoutManager } from './timeout_manager';
import { runWithALS } from '../async/als';
import { EventType } from '../constants';
import { awaitDetector } from '../async/async-hook';

const MAX_PARAMS_LENGTH = 4000;

Expand Down Expand Up @@ -71,10 +72,7 @@ export function wrapSession (sessionProto) {

unblock = Kadira.waitTimeBuilder.trackWaitTime(this, msg, unblock);


const promise = originalMethodHandler.call(self, msg, unblock);

response = await promise;
response = await originalMethodHandler.call(self, msg, unblock);

unblock();
} else {
Expand All @@ -86,7 +84,7 @@ export function wrapSession (sessionProto) {

// to capture the currently processing message
let orginalSubHandler = sessionProto.protocol_handlers.sub;
sessionProto.protocol_handlers.sub = runWithALS(function (msg, unblock) {
sessionProto.protocol_handlers.sub = runWithALS(async function (msg, unblock) {
let self = this;
// add context
let kadiraInfo = msg.__kadiraInfo;
Expand All @@ -101,14 +99,16 @@ export function wrapSession (sessionProto) {

// end wait event
let waitList = Kadira.waitTimeBuilder.build(this, msg.id);

Kadira.tracer.eventEnd(kadiraInfo.trace, msg._waitEventId, {waitOn: waitList});

unblock = Kadira.waitTimeBuilder.trackWaitTime(this, msg, unblock);

response = orginalSubHandler.call(self, msg, unblock);
response = await orginalSubHandler.call(self, msg, unblock);

unblock();
} else {
response = orginalSubHandler.call(self, msg, unblock);
response = await orginalSubHandler.call(self, msg, unblock);
}

return response;
Expand Down Expand Up @@ -170,6 +170,36 @@ export function wrapSession (sessionProto) {
};
}

// eslint-disable-next-line camelcase
Meteor.server.method_handlers = new Proxy(Meteor.server.method_handlers, {
get (target, propKey) {
const origMethod = target[propKey];

if (typeof origMethod !== 'function') {
return origMethod;
}

return function (...args) {
return awaitDetector.detect(() => origMethod.apply(this, args));
};
}
});

// eslint-disable-next-line camelcase
Meteor.server.publish_handlers = new Proxy(Meteor.server.publish_handlers, {
get (target, propKey) {
const origMethod = target[propKey];

if (typeof origMethod !== 'function') {
return origMethod;
}

return function (...args) {
return awaitDetector.detect(() => origMethod.apply(this, args));
};
}
});

// wrap existing method handlers for capturing errors
_.each(Meteor.server.method_handlers, function (handler, name) {
wrapMethodHandlerForErrors(name, handler, Meteor.server.method_handlers);
Expand Down
14 changes: 9 additions & 5 deletions lib/tracer/tracer.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ Tracer.prototype.event = function (trace, type, data, meta) {
eventLogger('%s %s', type, trace._id);

if (activeEvent) {
event.parentEvent = activeEvent;
activeEvent.nested.push(event);
return event;
}
Expand Down Expand Up @@ -157,6 +156,10 @@ Tracer.prototype.asyncEvent = function (type, data, meta, fn) {

const event = this.event(info.trace, type, data, meta);

if (event) {
event.parentEvent = getActiveEvent();
}

let callback = () => {
try {
let result = Reflect.apply(fn, this, [event]);
Expand Down Expand Up @@ -351,13 +354,14 @@ Tracer.prototype.optimizeEvent = function (objectEvent, metrics) {
if (![EventType.Complete, EventType.Start].includes(type)) {
setEventDuration(objectEvent);

const {duration} = objectEvent;

if (!endAt) {
endAt = at;
endAt = Ntp._now();
objectEvent.duration = endAt - at;
extraInfo.forcedEnd = true;
}

const {duration} = objectEvent;

// We need this info as events are not always in order or in series.
extraInfo.at = at;
extraInfo.endAt = endAt;
Expand All @@ -369,7 +373,7 @@ Tracer.prototype.optimizeEvent = function (objectEvent, metrics) {
extraInfo.stack = objectEvent.stack;
}

if (duration > 0) {
if (metrics && duration > 0) {
const isAsyncAndRootOrOther = type === EventType.Async && level === 0 || type !== EventType.Async;

if (isAsyncAndRootOrOther) {
Expand Down
58 changes: 55 additions & 3 deletions tests/tracer/tracer.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import { _ } from 'meteor/underscore';
import { Tracer } from '../../lib/tracer/tracer';
import { addAsyncTest, callAsync, cleanTrace, getLastMethodEvents, registerMethod } from '../_helpers/helpers';
import {
addAsyncTest,
callAsync,
cleanTrace,
getLastMethodEvents,
registerMethod,
subscribeAndWait
} from '../_helpers/helpers';
import { sleep } from '../../lib/utils';
import { TestData } from '../_helpers/globals';
import { getInfo } from '../../lib/async/als';
import { mergeIntervals, subtractIntervals } from '../../lib/utils/time';
import { diffObjects } from '../_helpers/pretty-log';
import { diffObjects, prettyLog } from '../_helpers/pretty-log';
import { EventType } from '../../lib/constants';
import { Meteor } from 'meteor/meteor';
import { Random } from 'meteor/random';

let eventDefaults = {
endAt: 0,
Expand Down Expand Up @@ -359,7 +368,7 @@ addAsyncTest(

const expected = [
['start'],
['wait', 0, null, { at: 0, endAt: 0, forcedEnd: true }],
['wait', traceInfo.events[1][1], null, { at: 0, endAt: traceInfo.events[1][3].endAt, forcedEnd: true }],
['db', 500, null, { at: 2000, endAt: 2500}],
['complete']
];
Expand Down Expand Up @@ -522,6 +531,7 @@ addAsyncTest('Tracer - Build Trace - custom with nested parallel events', async
let methodId = registerMethod(async function () {
let backgroundPromise;

// Compute
await sleep(30);

await Kadira.event('test', async (event) => {
Expand Down Expand Up @@ -564,6 +574,48 @@ addAsyncTest('Tracer - Build Trace - custom with nested parallel events', async
test.stableEqual(events, expected);
});

addAsyncTest('Tracer - Build Trace - the correct number of async events are captured for methods', async (test) => {
let info;

const methodId = registerMethod(async function () {
await sleep(100);
await sleep(200);

info = getInfo();

return await sleep(300);
});

await callAsync(methodId);

const asyncEvents = info.trace.events.filter(([type, duration]) => type === EventType.Async && duration >= 100);

prettyLog(info.trace.events);

test.equal(asyncEvents.length, 3);
});

addAsyncTest('Tracer - Build Trace - the correct number of async events are captured for pubsub', async (test, client) => {
const subName = `sub_${Random.id()}`;

let info;

Meteor.publish(subName, async function () {
await sleep(100);

info = getInfo();

return [];
});

await subscribeAndWait(client, subName);

prettyLog(info.trace.events);

const asyncEvents = info.trace.events.filter(([type, duration]) => type === EventType.Async && duration >= 100);

test.equal(asyncEvents.length,1);
});

addAsyncTest('Tracer - Time - Subtract Intervals', async function (test) {
function testSubtractIntervals (arr1, arr2, expected) {
Expand Down
Loading