diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f4abfa6..0ec03f60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## Next + +* Create custom traces with `Monti.traceJob` +* Add monitoring for job queues, cron jobs, and custom functions +* Add `disableInstrumentation` option +* Use websockets to start cpu profiling +* Support for recording heap snapshots + ## v2.49.4 July 11, 2024 diff --git a/README.md b/README.md index c4361644..01a3d7b8 100644 --- a/README.md +++ b/README.md @@ -26,11 +26,11 @@ Now you can deploy your application and it will send information to Monti APM. W `montiapm:agent` is compatible with: -- Meteor 1.4.3.2 and newer +- Meteor 1.4.3.2 and newer, including Meteor 3. Meteor will install the correct version of `montiapm:agent` for the version of Meteor. - Internet Explorer 9 and newer web browsers - Can be used with [Monti APM](https://montiapm.com/) or the open sourced version of Kadira, though many new features are not supported by Kadira -Features that require a newer version of Meteor are only enabled when using a supported version. For example, monitoring incoming HTTP requests is automatically enabled when the app uses Meteor 1.7 or newer. +Some features have a higher minimum version of Meteor, and are disabled in older Meteor versions. For example, monitoring incoming HTTP requests is enabled for apps using Meteor 1.7 or newer. ### Auto Connect @@ -195,6 +195,65 @@ The fields are redacted from: By default, the `password` field is redacted. +## Job Monitoring + +### Custom Traces + +You can create custom traces for any block of code. The code will be traced as a job, and appear in the Jobs dashboard in Monti APM. + +```js + +Monti.traceJob(options, functionToTrace); + +Monti.traceJob({ name: 'job name' }, () => { + // ... code to trace +}); +``` + +Options can have these properties. `name` is the only property required. +- `name`, which is the name of the trace or job. It is used to group metrics and traces together for the same job. +- `waitTime`, which is how long the job waited to run after it was scheduled to run. Shown in Monti APM as the job delay +- `data`, which can have the job data or any other details you want to store in the trace. It is shown under the `Start` event in the trace. + +The `functionToTrace` is called immediately, and it's return value is returned by `Monti.traceJob`. + +When `Monti.traceJob` is called inside a trace, it does nothing and simply runs `functionToTrace`. + +We recommend not using more than a few dozen names for custom traces. + +### Pending Jobs + +Monti APM does not automatically track pending jobs to avoid causing performance issues. +However, your app can report the metric to Monti APM. + +The pending metric is intended for job queues to know how many jobs are waiting to be processed. We recommend reporting the metric within 20 seconds of the app starting, and every 10 - 50 seconds afterwards. + +```js +async function reportPending() { + // How you get the pending count depends on the job queue library + // This is one way you can with BullMQ. + let counts = await queue.getJobCounts('wait'); + + Monti.recordPendingJobs('job name', counts.wait); +} + +// Report the count when the app starts +reportPending(); + +// Update the count every 20 seconds +setInterval(() => reportPending(), 1000 * 20); +``` + +### New Jobs + +When using `Monti.traceJob`, the `added` metric for the job is not recorded. This metric is intended for job queues to track how many new jobs were created, to understand how the rate of new jobs and completed jobs compares. You can manually record new jobs with: + +```js +Monti.recordNewJob('job name'); +``` + +Each time the function is called, it increments the `added` metric by 1. + ### Development #### Tests: diff --git a/lib/auto_connect.js b/lib/auto_connect.js index 5c94d130..be7f20ec 100644 --- a/lib/auto_connect.js +++ b/lib/auto_connect.js @@ -52,4 +52,4 @@ if ( } Kadira._connectWithEnv(envOptions); -Kadira._connectWithSettings(settingsOptions); +Kadira._connectWithSettings(montiSettings); diff --git a/lib/hijack/agenda.js b/lib/hijack/agenda.js new file mode 100644 index 00000000..dbdc9420 --- /dev/null +++ b/lib/hijack/agenda.js @@ -0,0 +1,53 @@ +import { checkModuleUsed } from './commonjs-utils'; + +export function wrapAgenda () { + Meteor.startup(() => { + if (checkModuleUsed('@hokify/agenda')) { + instrumentAgendaTs(); + } + if (checkModuleUsed('agenda')) { + instrumentAgenda(); + } + }); +} + +function instrumentAgendaTs () { + // eslint-disable-next-line global-require + let agenda = require('@hokify/agenda'); + let Job = agenda.Job; + + instrumentJob(Job.prototype); +} + +function instrumentAgenda () { + // eslint-disable-next-line global-require + let Job = require('agenda/dist/job').Job; + instrumentJob(Job.prototype); +} + +function instrumentJob (JobMethods) { + let oldSaveJob = JobMethods.save; + JobMethods.save = function () { + let id = this.attrs._id; + + if (!id) { + let name = this.attrs.name; + Kadira.models.jobs.trackNewJob(name); + } + + return oldSaveJob.apply(this, arguments); + }; + + let oldRun = JobMethods.run; + JobMethods.run = function (...args) { + let name = this.attrs.name; + let waitTime = Date.now() - this.attrs.nextRunAt; + let details = { + name, + waitTime, + data: this.attrs.data + }; + + return Kadira.traceJob(details, () => oldRun.apply(this, args)); + }; +} diff --git a/lib/hijack/bullmq.js b/lib/hijack/bullmq.js new file mode 100644 index 00000000..b3faaa74 --- /dev/null +++ b/lib/hijack/bullmq.js @@ -0,0 +1,48 @@ +import { checkModuleUsed, tryResolve } from './commonjs-utils'; + +export function wrapBullMQ () { + Meteor.startup(() => { + if (checkModuleUsed('bullmq')) { + instrumentBullMQ(tryResolve('bullmq')); + } + }); +} + +function instrumentBullMQ (modulePath) { + let bullMq = Npm.require(modulePath); + + let oldAdd = bullMq.Queue.prototype.addJob; + bullMq.Queue.prototype.addJob = function () { + Kadira.models.jobs.trackNewJob(this.name); + return oldAdd.apply(this, arguments); + }; + + let oldAddBulk = bullMq.Queue.prototype.addJobs; + bullMq.Queue.prototype.addJobs = function (jobs) { + let count = jobs && jobs.length || 0; + + Kadira.models.jobs.trackNewJob(this.name, count); + + return oldAddBulk.apply(this, arguments); + }; + + let oldProcessJob = bullMq.Worker.prototype.callProcessJob; + bullMq.Worker.prototype.callProcessJob = function (...args) { + let job = args[0]; + let name = this.name; + + return Kadira.traceJob({ + name, + waitTime: Date.now() - (job.timestamp + (job.delay || 0)), + _attributes: { + jobId: job.id, + jobName: job.name, + jobCreated: new Date(job.timestamp), + jobDelay: job.delay || 0, + queueName: job.queueName, + attemptsMade: job.attemptsMade, + }, + data: job.data + }, () => oldProcessJob.apply(this, args)); + }; +} diff --git a/lib/hijack/commonjs-utils.js b/lib/hijack/commonjs-utils.js new file mode 100644 index 00000000..26302647 --- /dev/null +++ b/lib/hijack/commonjs-utils.js @@ -0,0 +1,46 @@ +const logger = Npm.require('debug')('kadira:apm'); +const path = require('path'); + +let meteorBootstrap = typeof __meteor_bootstrap__ === 'object' && __meteor_bootstrap__; +let serverDir = meteorBootstrap ? meteorBootstrap.serverDir : process.cwd(); +let nodeRequire; + +try { + // eslint-disable-next-line global-require + let nodeModule = require('node:module'); + + nodeRequire = nodeModule.createRequire(serverDir); +} catch (err) { + logger(`Failed to create native require: ${err}`); +} + +export function tryResolve (modulePath) { + if (!meteorBootstrap || !nodeRequire) { + return false; + } + + try { + return nodeRequire.resolve(modulePath, { + paths: [ + serverDir, + path.resolve(serverDir, 'npm') + ] + }); + } catch (err) { + if (err.code === 'MODULE_NOT_FOUND') { + return false; + } + + throw err; + } +} + +export function checkModuleUsed (name) { + let resolved = tryResolve(name); + + if (!resolved) { + return false; + } + + return !!nodeRequire.cache[resolved]; +} diff --git a/lib/hijack/instrument.js b/lib/hijack/instrument.js index e1a546ef..ffe60cf7 100644 --- a/lib/hijack/instrument.js +++ b/lib/hijack/instrument.js @@ -17,6 +17,9 @@ import { wrapStringifyDDP } from './wrap_ddp_stringify'; import { setLabels } from './set_labels'; import { hijackDBOps } from './db'; import { wrapRedisOplogObserveDriver } from './redis_oplog'; +import { wrapSyncedCron } from './synced-cron.js'; +import { wrapAgenda } from './agenda.js'; +import { wrapBullMQ } from './bullmq.js'; let instrumented = false; Kadira._startInstrumenting = function (callback = () => {}) { @@ -33,6 +36,9 @@ Kadira._startInstrumenting = function (callback = () => {}) { wrapPicker(); wrapFs(); wrapRouters(); + wrapSyncedCron(); + wrapAgenda(); + wrapBullMQ(); MeteorX.onReady(function () { // instrumenting session diff --git a/lib/hijack/synced-cron.js b/lib/hijack/synced-cron.js new file mode 100644 index 00000000..8cbb2e03 --- /dev/null +++ b/lib/hijack/synced-cron.js @@ -0,0 +1,19 @@ +export function wrapSyncedCron () { + Meteor.startup(() => { + let cronPackage = Package['littledata:synced-cron'] || Package['percolate:synced-cron']; + + if (!cronPackage) { + return; + } + + let cron = cronPackage.SyncedCron; + + Object.values(cron._entries).forEach(entry => { + let oldJob = entry.job; + + entry.job = function (...args) { + return Kadira.traceJob({ name: entry.name },() => oldJob.apply(this, args)); + }; + }); + }); +} diff --git a/lib/kadira.js b/lib/kadira.js index 8dc93830..9c29af6a 100644 --- a/lib/kadira.js +++ b/lib/kadira.js @@ -18,6 +18,7 @@ import { getClientVersions } from './utils'; import { handleApiResponse } from './sourcemaps'; import { TrackMeteorDebug, TrackUncaughtExceptions, TrackUnhandledRejections } from './hijack/error'; import { queueJob } from './jobs'; +import { JobsModel } from './models/jobs'; const hostname = Npm.require('os').hostname(); const logger = Npm.require('debug')('kadira:apm'); @@ -43,6 +44,7 @@ Kadira.models.methods = new MethodsModel(); Kadira.models.pubsub = new PubsubModel(); Kadira.models.system = new SystemModel(); Kadira.models.http = new HttpModel(); +Kadira.models.jobs = new JobsModel(); Kadira.docSzCache = new DocSzCache(100000, 10); Kadira.syncedDate = new Ntp(); @@ -190,11 +192,12 @@ Kadira.connect = function (appId, appSecret, options) { // track how many times we've sent the data (once per minute) Kadira._buildPayload = function () { - let payload = {host: Kadira.options.hostname, clientVersions: getClientVersions()}; + let payload = { host: Kadira.options.hostname, clientVersions: getClientVersions() }; let buildDetailedInfo = Kadira._isDetailedInfo(); _.extend(payload, Kadira.models.methods.buildPayload(buildDetailedInfo)); _.extend(payload, Kadira.models.pubsub.buildPayload(buildDetailedInfo)); _.extend(payload, Kadira.models.system.buildPayload()); + _.extend(payload, Kadira.models.jobs.buildPayload()); _.extend(payload, Kadira.models.http.buildPayload()); if (Kadira.options.enableErrorTracking) { @@ -363,7 +366,7 @@ Kadira.startContinuousProfiling = function () { return; } - Kadira.coreApi.sendData({ profiles: [{profile, startTime, endTime }]}) + Kadira.coreApi.sendData({ profiles: [{ profile, startTime, endTime }] }) .catch(e => console.log('Monti: err sending cpu profile', e)); }); }; @@ -482,3 +485,97 @@ Kadira.endEvent = function (event, data) { Kadira.tracer.eventEnd(kadiraInfo.trace, event, data); } }; + +Kadira.traceJob = function (details, processor) { + let info = Kadira._getInfo(); + if (info || !details) { + return processor.apply(this, arguments); + } + + if (!Fibers.current) { + return Fibers(function () { + return Kadira.traceJob(details, processor); + }).run(); + } + + let wait = details.waitTime || 0; + + let data = details.data; + if (typeof data === 'object' && data !== null) { + data = Kadira.tracer._applyObjectFilters(data); + } + + let stringifiedParams = JSON.stringify(data); + + let trace = Kadira.tracer.start(details.name, 'job'); + + let startData = { + waitTime: wait, + data: stringifiedParams + }; + + if (details._attributes) { + Object.assign(startData, details._attributes); + } + + Kadira.tracer.event(trace, 'start', startData); + + Kadira._setInfo({ trace }); + Kadira.models.jobs.trackActiveJobs(details.name, 1); + + let ended = false; + function end (err) { + if (ended) { + throw new Error('Monti APM: Job ended more than once'); + } + + ended = true; + + Kadira.tracer.endLastEvent(trace); + + let cleanError; + + if (err) { + cleanError = _.pick(err, ['message', 'stack', 'details']); + Kadira.tracer.event(trace, 'error', { error: cleanError }); + } else { + Kadira.tracer.event(trace, 'complete'); + } + + let built = Kadira.tracer.buildTrace(trace); + Kadira.models.jobs.processJob(built, wait); + Kadira.models.jobs.trackActiveJobs(details.name, -1); + + if (cleanError && Kadira.options.enableErrorTracking) { + Kadira.models.error.trackError(cleanError, trace); + } + } + + let result; + try { + result = processor.apply(this, arguments); + + if (result && typeof result.then === 'function') { + result.then(() => end(), err => end(err)); + } else { + end(); + } + } catch (err) { + end(err); + throw err; + } finally { + // Synchronously reset info so outside code doesn't see it. + // It'll still be set in any child fibers if the processor uses promises + Kadira._setInfo(null); + } + + return result; +}; + +Kadira.recordNewJob = function (jobName) { + Kadira.models.jobs.trackNewJob(jobName); +}; + +Kadira.recordPendingJobs = function (jobName, count) { + Kadira.models.jobs.trackPendingJobs(jobName, count); +}; diff --git a/lib/models/jobs.js b/lib/models/jobs.js new file mode 100644 index 00000000..30a2fa27 --- /dev/null +++ b/lib/models/jobs.js @@ -0,0 +1,206 @@ +import { _ } from 'meteor/underscore'; +import { KadiraModel } from './0model'; +import { TracerStore } from '../tracer/tracer_store'; +import { Ntp } from '../ntp'; + +const { DDSketch } = require('monti-apm-sketches-js'); + +const JOB_METRICS_FIELDS = ['wait', 'db', 'http', 'email', 'async', 'compute', 'total']; + +export function JobsModel (metricsThreshold) { + this.jobMetricsByMinute = Object.create(null); + this.errorMap = Object.create(null); + + this._metricsThreshold = _.extend({ + db: 100, + http: 1000, + email: 100, + async: 100, + compute: 100, + total: 200 + }, metricsThreshold || Object.create(null)); + + // store max time elapsed methods for each job, event(metrics-field) + this.maxEventTimesForJobs = Object.create(null); + + this.tracerStore = new TracerStore({ + // process traces every minute + interval: 1000 * 60, + // for 30 minutes + maxTotalPoints: 30, + // always trace for every 5 minutes + archiveEvery: 5 + }); + + // Not part of the jobMetricsByMinute since we don't want to + // reset these metrics each minute + this.activeJobCounts = new Map(); + this.pendingJobCounts = new Map(); + + this.tracerStore.start(); +} + +_.extend(JobsModel.prototype, KadiraModel.prototype); + +JobsModel.prototype._getMetrics = function (timestamp, jobName) { + const dateId = this._getDateId(timestamp); + + if (!this.jobMetricsByMinute[dateId]) { + this.jobMetricsByMinute[dateId] = { + jobs: Object.create(null), + }; + } + + let jobs = this.jobMetricsByMinute[dateId].jobs; + + // initialize method + if (!jobs[jobName]) { + jobs[jobName] = { + count: 0, + errors: 0, + fetchedDocSize: 0, + added: 0, + histogram: new DDSketch({ + alpha: 0.02 + }) + }; + + JOB_METRICS_FIELDS.forEach(function (field) { + jobs[jobName][field] = 0; + }); + } + + return this.jobMetricsByMinute[dateId].jobs[jobName]; +}; + +JobsModel.prototype.trackNewJob = function (jobName, count = 1) { + const timestamp = Ntp._now(); + const dateId = this._getDateId(timestamp); + + let jobMetrics = this._getMetrics(dateId, jobName); + jobMetrics.added += count; +}; + +JobsModel.prototype.trackActiveJobs = function (jobName, diff) { + let updatedValue = (this.activeJobCounts.get(jobName) || 0) + diff; + + if (updatedValue === 0) { + this.activeJobCounts.delete(jobName); + } else { + this.activeJobCounts.set(jobName, updatedValue); + } + + // Ensure there's an entry for this date id so it is included in the payload + const timestamp = Ntp._now(); + const dateId = this._getDateId(timestamp); + this._getMetrics(dateId, jobName); +}; + +JobsModel.prototype.trackPendingJobs = function (jobName, pendingCount) { + if (pendingCount === 0) { + this.pendingJobCounts.delete(jobName); + } else { + // Ensure there's an entry for this date id so it is included in the payload + const timestamp = Ntp._now(); + const dateId = this._getDateId(timestamp); + this._getMetrics(dateId, jobName); + this.pendingJobCounts.set(jobName, pendingCount); + } +}; + +JobsModel.prototype.processJob = function (jobTrace, waitTime) { + const dateId = this._getDateId(jobTrace.at); + + // append metrics to previous values + this._appendMetrics(dateId, jobTrace, waitTime); + if (jobTrace.errored) { + this.jobMetricsByMinute[dateId].jobs[jobTrace.name].errors++; + } + + this.tracerStore.addTrace(jobTrace); +}; + +JobsModel.prototype._appendMetrics = function (id, jobTrace, waitTime = 0) { + const jobMetrics = this._getMetrics(id, jobTrace.name); + + // startTime needs to be converted into serverTime before sending + if (!this.jobMetricsByMinute[id].startTime) { + this.jobMetricsByMinute[id].startTime = jobTrace.at; + } + + // merge + JOB_METRICS_FIELDS.forEach(function (field) { + let value = jobTrace.metrics[field]; + if (value > 0) { + jobMetrics[field] += value; + } + }); + + jobMetrics.wait += waitTime; + jobMetrics.count++; + jobMetrics.histogram.add(jobTrace.metrics.total); + this.jobMetricsByMinute[id].endTime = jobTrace.metrics.at; +}; + +/* + There are two types of data + + 1. jobMetrics - metrics about the jobs (for every 20 secs) + 2. jobRequests - raw job request. normally max, min for every 1 min and errors always +*/ +JobsModel.prototype.buildPayload = function () { + const payload = { + jobMetrics: [], + jobRequests: [] + }; + + // handling metrics + let jobMetricsByMinute = this.jobMetricsByMinute; + this.jobMetricsByMinute = Object.create(null); + + // create final payload for jobMetrics + for (let key in jobMetricsByMinute) { + const jobMetrics = jobMetricsByMinute[key]; + // converting startTime into the actual serverTime + let startTime = jobMetrics.startTime; + jobMetrics.startTime = Kadira.syncedDate.syncTime(startTime); + + for (let jobName in jobMetrics.jobs) { + JOB_METRICS_FIELDS.forEach(function (field) { + jobMetrics.jobs[jobName][field] /= + jobMetrics.jobs[jobName].count; + }); + } + + this.activeJobCounts.forEach((value, jobName) => { + if (value === 0) { + return; + } + + if (!jobMetrics.jobs[jobName]) { + jobMetrics.jobs[jobName] = {}; + } + + jobMetrics.jobs[jobName].active = value; + }); + + this.pendingJobCounts.forEach((value, jobName) => { + if (value === 0) { + return; + } + + if (!jobMetrics.jobs[jobName]) { + jobMetrics.jobs[jobName] = {}; + } + + jobMetrics.jobs[jobName].pending = value; + }); + + payload.jobMetrics.push(jobMetricsByMinute[key]); + } + + // collect traces and send them with the payload + payload.jobRequests = this.tracerStore.collectTraces(); + + return payload; +}; diff --git a/lib/tracer/tracer.js b/lib/tracer/tracer.js index 58e2ed07..64c72f95 100644 --- a/lib/tracer/tracer.js +++ b/lib/tracer/tracer.js @@ -7,7 +7,7 @@ import { Ntp } from '../ntp'; let eventLogger = Npm.require('debug')('kadira:tracer'); let REPETITIVE_EVENTS = {db: true, http: true, email: true, wait: true, async: true, custom: true, fs: true}; -let TRACE_TYPES = ['sub', 'method', 'http']; +let TRACE_TYPES = ['sub', 'method', 'http', 'job']; let MAX_TRACE_EVENTS = 1500; diff --git a/package.js b/package.js index af0a37a6..421346f6 100644 --- a/package.js +++ b/package.js @@ -2,7 +2,7 @@ Package.describe({ summary: 'Performance Monitoring for Meteor', - version: '2.50.0-beta.2', + version: '2.50.0-beta.4', git: 'https://github.com/monti-apm/monti-apm-agent.git', name: 'montiapm:agent' }); @@ -72,6 +72,7 @@ Package.onTest(function (api) { 'tests/models/pubsub.js', 'tests/models/system.js', 'tests/models/errors.js', + 'tests/models/jobs.js', 'tests/tracer/tracer_store.js', 'tests/tracer/tracer.js', 'tests/tracer/default_filters.js', diff --git a/tests/_helpers/helpers.js b/tests/_helpers/helpers.js index 1437766e..3b858bc9 100644 --- a/tests/_helpers/helpers.js +++ b/tests/_helpers/helpers.js @@ -125,6 +125,8 @@ export const CleanTestData = function () { TestData.remove({}); Kadira.models.pubsub.metricsByMinute = {}; Kadira.models.pubsub.subscriptions = {}; + Kadira.models.jobs.jobMetricsByMinute = {}; + Kadira.models.jobs.activeJobCounts.clear(); }; export const SubscribeAndWait = function (client, name, args) { @@ -150,6 +152,31 @@ export const SubscribeAndWait = function (client, name, args) { } }; +export const callPromise = function (client, ...args) { + return new Promise((resolve, reject) => { + client.call(...args, (err, result) => { + if (err) { + return reject(err); + } + + resolve(result); + }); + }); +}; + +export const subscribePromise = function (client, ...args) { + return new Promise((resolve, reject) => { + client.subscribe(...args, { + onError (err) { + reject(err); + }, + onReady () { + resolve(); + } + }); + }); +}; + export function compareNear (v1, v2, maxDifference) { maxDifference = maxDifference || 30; let diff = Math.abs(v1 - v2); diff --git a/tests/hijack/subscriptions.js b/tests/hijack/subscriptions.js index 99520fb3..9a11f320 100644 --- a/tests/hijack/subscriptions.js +++ b/tests/hijack/subscriptions.js @@ -111,6 +111,7 @@ Tinytest.add( CleanTestData(); EnableTrackingMethods(); let client = GetMeteorClient(); + waitForConnection(client); let h1 = SubscribeAndWait(client, 'tinytest-data'); Wait(50); h1.stop(); @@ -165,7 +166,7 @@ Tinytest.add( TestHelpers.wait(100); - Kadira.EventBus.once('pubsub', 'observerDeleted', (ownerInfo) => console.log('on sub stop:', JSON.stringify(ownerInfo))); + Kadira.EventBus.once('pubsub', 'observerDeleted', (ownerInfo) => console.log('on sub stop:', Date.now(), JSON.stringify(ownerInfo))); st = Date.now(); h1.stop(); @@ -176,9 +177,9 @@ Tinytest.add( TestHelpers.wait(100); let metrics = TestHelpers.findMetricsForPub('tinytest-data'); - + console.dir(metrics); console.log({elapsedTime}); - test.isTrue(TestHelpers.compareNear(metrics.observerLifetime, 100 + elapsedTime, 60)); + test.isTrue(TestHelpers.compareNear(metrics.observerLifetime, 100, 60)); TestHelpers.closeClient(client); } ); diff --git a/tests/models/jobs.js b/tests/models/jobs.js new file mode 100644 index 00000000..0eb1f27b --- /dev/null +++ b/tests/models/jobs.js @@ -0,0 +1,189 @@ +import { EJSON } from 'meteor/ejson'; +import { JobsModel } from '../../lib/models/jobs'; +import { CleanTestData } from '../_helpers/helpers'; +import { TestData } from '../_helpers/globals'; + +const model = new JobsModel(); + +Tinytest.add( + 'Models - Jobs - buildPayload simple', + function (test) { + createCompletedJob('hello', 100, 5); + createCompletedJob('hello', 800, 10); + + let payload = model.buildPayload(); + payload.jobRequests = []; + + let expected = { + jobMetrics: [ + { + startTime: 100, + jobs: { + hello: { + count: 2, + added: 0, + errors: 0, + wait: 0, + db: 0, + http: 0, + email: 0, + async: 0, + compute: 7.5, + total: 7.5, + fetchedDocSize: 0, + histogram: { + alpha: 0.02, + bins: { + 41: 1, + 58: 1 + }, + maxNumBins: 2048, + n: 2, + gamma: 1.0408163265306123, + numBins: 2 + } + } + } + } + ], + jobRequests: [] + }; + + let startTime = expected.jobMetrics[0].startTime; + expected.jobMetrics[0].startTime = Kadira.syncedDate.syncTime(startTime); + // TODO comparing without parsing and stringifing fails + test.equal(EJSON.parse(EJSON.stringify(payload)), EJSON.parse(EJSON.stringify(expected))); + CleanTestData(); + } +); + +Tinytest.add( + 'Models - Jobs - track new jobs', + function (test) { + model.trackNewJob('analyze'); + model.trackNewJob('analyze'); + model.trackNewJob('analyze'); + + let payload = model.buildPayload(); + test.equal(payload.jobMetrics[0].jobs.analyze.added, 3); + CleanTestData(); + } +); + +Tinytest.add( + 'Models - Jobs - track active jobs', + function (test) { + model.activeJobCounts.clear(); + model.trackActiveJobs('analyze', 1); + model.trackActiveJobs('analyze', 1); + + let payload = model.buildPayload(); + console.dir(payload, { depth: 10 }); + test.equal(payload.jobMetrics[0].jobs.analyze.active, 2); + + model.trackActiveJobs('analyze', -1); + + payload = model.buildPayload(); + test.equal(payload.jobMetrics[0].jobs.analyze.active, 1); + + CleanTestData(); + } +); + +Tinytest.add( + 'Models - Jobs - Monti.recordNewJob', + async function (test) { + Kadira.recordNewJob('hello'); + let payload = Kadira.models.jobs.buildPayload(); + test.equal(payload.jobMetrics[0].jobs.hello.added, 1); + CleanTestData(); + } +); + +Tinytest.add( + 'Models - Jobs - Monti.recordPendingJobs', + async function (test) { + Kadira.recordPendingJobs('hello', 5); + let payload = Kadira.models.jobs.buildPayload(); + test.equal(payload.jobMetrics[0].jobs.hello.pending, 5); + + Kadira.recordPendingJobs('hello', 0); + payload = Kadira.models.jobs.buildPayload(); + test.equal(payload.jobMetrics.length, 0); + CleanTestData(); + } +); + +Tinytest.add( + 'Models - Jobs - traceJob - return sync value', + function (test) { + let result = Kadira.traceJob({ name: 'hello' }, () => 5); + + test.equal(result, 5); + CleanTestData(); + } +); + +Tinytest.addAsync( + 'Models - Jobs - traceJob - return async value', + async function (test, done) { + let result = await Kadira.traceJob({ name: 'hello' }, () => Promise.resolve(5)); + + test.equal(result, 5); + CleanTestData(); + done(); + } +); + +Tinytest.add( + 'Models - Jobs - traceJob - track sync processor', + function (test) { + Kadira.traceJob({ name: 'hello' }, () => { + TestData.find().fetch(); + }); + + let payload = Kadira.models.jobs.buildPayload(); + test.equal(payload.jobMetrics[0].jobs.hello.count, 1); + test.ok(payload.jobMetrics[0].jobs.hello.total > 0); + test.ok(payload.jobMetrics[0].jobs.hello.db > 0); + CleanTestData(); + } +); + +Tinytest.addAsync( + 'Models - Jobs - traceJob - track active status', + async function (test, done) { + model.activeJobCounts.clear(); + + let resolver; + let promise = new Promise(resolve => { + resolver = resolve; + }); + + let jobPromise = Kadira.traceJob({ name: 'hello' }, async () => { + await promise; + }); + + let payload = Kadira.models.jobs.buildPayload(); + test.equal(payload.jobMetrics[0].jobs.hello.active, 1); + test.equal(payload.jobMetrics[0].jobs.hello.count, 0); + + resolver(); + await jobPromise; + + payload = Kadira.models.jobs.buildPayload(); + test.equal(payload.jobMetrics[0].jobs.hello.active, undefined); + test.equal(payload.jobMetrics[0].jobs.hello.count, 1); + + CleanTestData(); + done(); + } +); + +function createCompletedJob (jobName, startTime, totalTime = 5) { + let method = { type: 'job', name: jobName, events: [] }; + method.events.push({ type: 'start', at: startTime }); + method.events.push({ type: 'complete', at: startTime + totalTime }); + method = Kadira.tracer.buildTrace(method); + model.processJob(method); +} diff --git a/tests/models/methods.js b/tests/models/methods.js index e86b2dd5..e6bce0f6 100644 --- a/tests/models/methods.js +++ b/tests/models/methods.js @@ -1,7 +1,7 @@ import { EJSON } from 'meteor/ejson'; import { MethodsModel } from '../../lib/models/methods'; import { TestData } from '../_helpers/globals'; -import { CleanTestData, CloseClient, GetMeteorClient, RegisterMethod, Wait, WithDocCacheGetSize, findMetricsForMethod, waitForConnection } from '../_helpers/helpers'; +import { CleanTestData, CloseClient, GetMeteorClient, RegisterMethod, Wait, WithDocCacheGetSize, callPromise, findMetricsForMethod, waitForConnection } from '../_helpers/helpers'; import { Meteor } from 'meteor/meteor'; Tinytest.add( @@ -189,6 +189,7 @@ Tinytest.add( } ); + Tinytest.addAsync('Models - Method - Waited On - track wait time of queued messages', async (test, done) => { let methodId = RegisterMethod( function (id) { Meteor._sleepForMs(25); @@ -197,11 +198,12 @@ Tinytest.addAsync('Models - Method - Waited On - track wait time of queued messa let client = GetMeteorClient(); + let promises = []; for (let i = 0; i < 10; i++) { - client.call(methodId, i, () => {}); + promises.push(callPromise(client, methodId, i)); } - Meteor._sleepForMs(1000); + await Promise.all(promises); const metrics = findMetricsForMethod(methodId); @@ -255,11 +257,12 @@ Tinytest.addAsync('Models - Method - Waited On - check unblock time', async (tes let client = GetMeteorClient(); + let promises = []; for (let i = 0; i < 10; i++) { - client.call(methodId, i, () => {}); + promises.push(callPromise(client, methodId, i)); } - Meteor._sleepForMs(1000); + await Promise.all(promises); const metrics = findMetricsForMethod(methodId); @@ -276,10 +279,10 @@ Tinytest.addAsync('Models - Method - Waited On - track wait time of next message let client = GetMeteorClient(); - client.call(slowMethod, () => {}); - client.call(fastMethod, () => {}); - - Meteor._sleepForMs(200); + await Promise.all([ + callPromise(client, slowMethod), + callPromise(client, fastMethod), + ]); const metrics = findMetricsForMethod(slowMethod); test.isTrue(metrics.waitedOn >= 20, `${metrics.waitedOn} should be greater than 20`); diff --git a/tests/models/pubsub.js b/tests/models/pubsub.js index 54a0e6f7..c33efef8 100644 --- a/tests/models/pubsub.js +++ b/tests/models/pubsub.js @@ -9,6 +9,7 @@ import { RegisterMethod, releaseParts, SubscribeAndWait, + subscribePromise, Wait, waitForConnection, WithDocCacheGetSize @@ -765,15 +766,16 @@ Tinytest.addAsync('Models - PubSub - Waited On - track wait time of queued messa const pubName = 'tinytest-waited-on'; + let promises = []; for (let i = 0; i < 10; i++) { - client.subscribe(pubName); + promises.push(subscribePromise(client, pubName)); } - Meteor._sleepForMs(1000); + await Promise.all(promises); const metrics = FindMetricsForPub(pubName); - test.isTrue(metrics.waitedOn > 1000, `${metrics.waitedOn} should be greater than 1000`); + test.isTrue(metrics.waitedOn > 500, `${metrics.waitedOn} should be greater than 500`); CloseClient(client); done(); diff --git a/types.d.ts b/types.d.ts index d4bfcf1d..16edbd84 100644 --- a/types.d.ts +++ b/types.d.ts @@ -29,6 +29,12 @@ export namespace Tracer { function addFilter(filterFunction: (eventType: EventType, data: Record, info: TraceInfo) => any): void } +export interface TraceJobOptions { + name: string; + data?: object + waitTime?: number +} + export namespace Monti { var tracer: typeof Tracer @@ -44,6 +50,10 @@ export namespace Monti { function startEvent(name: string, data?: Record): MontiEvent | false; function endEvent(event: MontiEvent | false, data?: Record): void; + + function traceJob any>(options: TraceJobOptions, fn: T): ReturnType + function recordNewJob (jobName: string): void; + function recordPendingJobs (jobName: string, count: number): void; } declare var MontiNamespace: typeof Monti @@ -52,4 +62,3 @@ declare global { var Monti: typeof MontiNamespace var Kadira: typeof MontiNamespace } -