Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Events With Await Detector #104

Merged
merged 18 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions lib/async/als.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ export const getStore = () => MontiAsyncStorage.getStore() ?? {};

export const getInfo = () => MontiAsyncStorage.getStore()?.info;

export const setActiveEvent = (event, store = MontiAsyncStorage.getStore()) => {
if (!store) return;

MontiAsyncStorage.enterWith({ ...store, activeEvent: event });
};

export const getActiveEvent = (store = MontiAsyncStorage.getStore()) => store?.activeEvent;

export const debug = label => (...args) => {
Expand Down
91 changes: 38 additions & 53 deletions lib/async/async-hook.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncHooks from 'async_hooks';
import { getActiveEvent, getInfo } from './als';
import { MaxAsyncLevel } from '../constants';
import { EventType, MaxAsyncLevel } from '../constants';
import { Ntp } from '../ntp';
import { stackTrace } from '../utils';
import { pick, stackTrace } from '../utils';
import { AwaitDetector } from '@monti-apm/core/dist/await-detector';


export const AsyncMetrics = {
totalAsyncCount: 0,
Expand All @@ -15,12 +17,6 @@ export function getResources () {
return info?.resources;
}

export function getResourcesAsArray () {
const resources = getResources();

return resources ? Array.from(resources.values()) : [];
}

export const getResource = (asyncId) => {
if (!asyncId) return;

Expand All @@ -29,34 +25,12 @@ export const getResource = (asyncId) => {
return resources?.get(asyncId);
};

const captureEnd = (type) => (asyncId) => {
AsyncMetrics.activeAsyncCount--;

const resource = getResource(asyncId);

if (!resource) return;

resource.end = Ntp._now();
resource.duration = resource.end - resource.start;
resource.hooks = { ...resource.hooks, [type]: true };
};

export const AsyncResourceType = {
PROMISE: 'PROMISE',
};

// @todo Keep nested async events for all events, except `db`.
// @todo Generate metrics for nested events as if they are root when nested in `custom` events.
const hook = asyncHooks.createHook({
init (asyncId, type, triggerAsyncId) {
// We don't want to capture anything other than promise resources.
if (type !== AsyncResourceType.PROMISE) return;

AsyncMetrics.totalAsyncCount++;
AsyncMetrics.activeAsyncCount++;

export const awaitDetector = new AwaitDetector({
onAwaitStart (asyncId, triggerAsyncId) {
const info = getInfo();

if (!info) return;

const activeEvent = getActiveEvent();

if (activeEvent?.level > MaxAsyncLevel) return;
Expand All @@ -67,41 +41,52 @@ const hook = asyncHooks.createHook({

info.resources = info.resources || new Map();

const trigger = info.resources.get(triggerAsyncId);

const executionAsyncId = asyncHooks.executionAsyncId();

const executionChanged = trigger?.executionAsyncId !== executionAsyncId;

const res = {
asyncId,
triggerAsyncId,
executionAsyncId,
type,
start: Ntp._now(),
end: null,
startTime: Ntp._now(),
endTime: null,
activeEvent,
executionChanged,
};

if (Kadira.options.enableAsyncStackTraces || process.env.ENABLE_ASYNC_STACK_TRACES) {
res.stack = stackTrace();
}

info.resources.set(asyncId, res);

if (trigger) {
trigger.children = trigger.children || [];
trigger.children.push(asyncId);
}
res.event = Kadira.tracer.event(info.trace, EventType.Async, pick(res, ['asyncId', 'stack', 'asyncId', 'triggerAsyncId', 'executionAsyncId']));
},
// @todo Perhaps we need to handle rejected promises.
// All promises call `promiseResolve` at the end, except when they are rejected.
promiseResolve: captureEnd('promiseResolve'),
});
onAwaitEnd (asyncId) {
const resource = getResource(asyncId);

hook.enable();
if (!resource) return;

process.once('beforeExit', function () {
hook.disable();
const info = getInfo();

if (!info) return;

// Some events finish after the trace is already processed.
if (info.trace.isEventsProcessed) return;

Kadira.tracer.eventEnd(info.trace, resource.event);
}
});

const oldBindEnv = Meteor.bindEnvironment;

/**
* We ignore `bindEnvironment` awaits because they pollute the traces with Meteor internals.
*
* To reproduce, go to the test "Tracer - Build Trace - the correct number of async events are captured for methods"
* and remove the awaitDetector.ignore() call below to see the difference.
*/
Meteor.bindEnvironment = function (...args) {
const func = oldBindEnv.apply(this, args);
return function () {
return awaitDetector.ignore(() => func.apply(this, arguments));
};
};
44 changes: 37 additions & 7 deletions lib/hijack/wrap_session.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { MeteorDebugIgnore } from './error';
import { TimeoutManager } from './timeout_manager';
import { runWithALS } from '../async/als';
import { EventType } from '../constants';
import { awaitDetector } from '../async/async-hook';

const MAX_PARAMS_LENGTH = 4000;

Expand Down Expand Up @@ -71,10 +72,7 @@ export function wrapSession (sessionProto) {

unblock = Kadira.waitTimeBuilder.trackWaitTime(this, msg, unblock);


const promise = originalMethodHandler.call(self, msg, unblock);

response = await promise;
response = await originalMethodHandler.call(self, msg, unblock);

unblock();
} else {
Expand All @@ -86,7 +84,7 @@ export function wrapSession (sessionProto) {

// to capture the currently processing message
let orginalSubHandler = sessionProto.protocol_handlers.sub;
sessionProto.protocol_handlers.sub = runWithALS(function (msg, unblock) {
sessionProto.protocol_handlers.sub = runWithALS(async function (msg, unblock) {
let self = this;
// add context
let kadiraInfo = msg.__kadiraInfo;
Expand All @@ -101,14 +99,16 @@ export function wrapSession (sessionProto) {

// end wait event
let waitList = Kadira.waitTimeBuilder.build(this, msg.id);

Kadira.tracer.eventEnd(kadiraInfo.trace, msg._waitEventId, {waitOn: waitList});

unblock = Kadira.waitTimeBuilder.trackWaitTime(this, msg, unblock);

response = orginalSubHandler.call(self, msg, unblock);
response = await orginalSubHandler.call(self, msg, unblock);

unblock();
} else {
response = orginalSubHandler.call(self, msg, unblock);
response = await orginalSubHandler.call(self, msg, unblock);
}

return response;
Expand Down Expand Up @@ -170,6 +170,36 @@ export function wrapSession (sessionProto) {
};
}

// eslint-disable-next-line camelcase
Meteor.server.method_handlers = new Proxy(Meteor.server.method_handlers, {
get (target, propKey) {
const origMethod = target[propKey];

if (typeof origMethod !== 'function') {
return origMethod;
}

return function (...args) {
return awaitDetector.detect(() => origMethod.apply(this, args));
};
}
});

// eslint-disable-next-line camelcase
Meteor.server.publish_handlers = new Proxy(Meteor.server.publish_handlers, {
get (target, propKey) {
const origMethod = target[propKey];

if (typeof origMethod !== 'function') {
return origMethod;
}

return function (...args) {
return awaitDetector.detect(() => origMethod.apply(this, args));
};
}
});

// wrap existing method handlers for capturing errors
_.each(Meteor.server.method_handlers, function (handler, name) {
wrapMethodHandlerForErrors(name, handler, Meteor.server.method_handlers);
Expand Down
4 changes: 1 addition & 3 deletions lib/kadira.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { EventType } from './constants';

const hostname = require('os').hostname();
const logger = require('debug')('kadira:apm');
const KadiraCore = require('@monti-apm/core').Kadira;
const KadiraCore = require('@monti-apm/core').Monti;

const DEBUG_PAYLOAD_SIZE = process.env.MONTI_DEBUG_PAYLOAD_SIZE === 'true';

Expand Down Expand Up @@ -329,8 +329,6 @@ Kadira._sendPayload = function () {
// WARNNING: returned info object is the reference object.
// Changing it might cause issues when building traces. So use with care
Kadira._getInfo = function () {
// prettyLog(`Getting Info ${new Error().stack.split('\n')[2]?.trim()}`, MontiAsyncStorage.getStore());

return MontiAsyncStorage.getStore()?.info;
};

Expand Down
Loading
Loading