Skip to content

Commit

Permalink
Merge pull request #131 from monti-apm/feature/job-monitoring
Browse files Browse the repository at this point in the history
Job monitoring
  • Loading branch information
zodern authored Dec 20, 2024
2 parents d9b56d3 + 6f416e7 commit 5fc9d5f
Show file tree
Hide file tree
Showing 18 changed files with 797 additions and 23 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## Next

* Create custom traces with `Monti.traceJob`
* Add monitoring for job queues, cron jobs, and custom functions
* Add `disableInstrumentation` option
* Use websockets to start cpu profiling
* Support for recording heap snapshots

## v2.49.4
July 11, 2024

Expand Down
63 changes: 61 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ Now you can deploy your application and it will send information to Monti APM. W

`montiapm:agent` is compatible with:

- Meteor 1.4.3.2 and newer
- Meteor 1.4.3.2 and newer, including Meteor 3. Meteor will install the correct version of `montiapm:agent` for the version of Meteor.
- Internet Explorer 9 and newer web browsers
- Can be used with [Monti APM](https://montiapm.com/) or the open sourced version of Kadira, though many new features are not supported by Kadira

Features that require a newer version of Meteor are only enabled when using a supported version. For example, monitoring incoming HTTP requests is automatically enabled when the app uses Meteor 1.7 or newer.
Some features have a higher minimum version of Meteor, and are disabled in older Meteor versions. For example, monitoring incoming HTTP requests is enabled for apps using Meteor 1.7 or newer.

### Auto Connect

Expand Down Expand Up @@ -195,6 +195,65 @@ The fields are redacted from:
By default, the `password` field is redacted.
## Job Monitoring
### Custom Traces
You can create custom traces for any block of code. The code will be traced as a job, and appear in the Jobs dashboard in Monti APM.
```js
Monti.traceJob(options, functionToTrace);
Monti.traceJob({ name: 'job name' }, () => {
// ... code to trace
});
```
Options can have these properties. `name` is the only property required.
- `name`, which is the name of the trace or job. It is used to group metrics and traces together for the same job.
- `waitTime`, which is how long the job waited to run after it was scheduled to run. Shown in Monti APM as the job delay
- `data`, which can have the job data or any other details you want to store in the trace. It is shown under the `Start` event in the trace.
The `functionToTrace` is called immediately, and it's return value is returned by `Monti.traceJob`.

When `Monti.traceJob` is called inside a trace, it does nothing and simply runs `functionToTrace`.

We recommend not using more than a few dozen names for custom traces.

### Pending Jobs

Monti APM does not automatically track pending jobs to avoid causing performance issues.
However, your app can report the metric to Monti APM.

The pending metric is intended for job queues to know how many jobs are waiting to be processed. We recommend reporting the metric within 20 seconds of the app starting, and every 10 - 50 seconds afterwards.

```js
async function reportPending() {
// How you get the pending count depends on the job queue library
// This is one way you can with BullMQ.
let counts = await queue.getJobCounts('wait');
Monti.recordPendingJobs('job name', counts.wait);
}
// Report the count when the app starts
reportPending();
// Update the count every 20 seconds
setInterval(() => reportPending(), 1000 * 20);
```

### New Jobs

When using `Monti.traceJob`, the `added` metric for the job is not recorded. This metric is intended for job queues to track how many new jobs were created, to understand how the rate of new jobs and completed jobs compares. You can manually record new jobs with:

```js
Monti.recordNewJob('job name');
```

Each time the function is called, it increments the `added` metric by 1.

### Development

#### Tests:
Expand Down
2 changes: 1 addition & 1 deletion lib/auto_connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ if (
}

Kadira._connectWithEnv(envOptions);
Kadira._connectWithSettings(settingsOptions);
Kadira._connectWithSettings(montiSettings);
53 changes: 53 additions & 0 deletions lib/hijack/agenda.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { checkModuleUsed } from './commonjs-utils';

export function wrapAgenda () {
Meteor.startup(() => {
if (checkModuleUsed('@hokify/agenda')) {
instrumentAgendaTs();
}
if (checkModuleUsed('agenda')) {
instrumentAgenda();
}
});
}

function instrumentAgendaTs () {
// eslint-disable-next-line global-require
let agenda = require('@hokify/agenda');
let Job = agenda.Job;

instrumentJob(Job.prototype);
}

function instrumentAgenda () {
// eslint-disable-next-line global-require
let Job = require('agenda/dist/job').Job;
instrumentJob(Job.prototype);
}

function instrumentJob (JobMethods) {
let oldSaveJob = JobMethods.save;
JobMethods.save = function () {
let id = this.attrs._id;

if (!id) {
let name = this.attrs.name;
Kadira.models.jobs.trackNewJob(name);
}

return oldSaveJob.apply(this, arguments);
};

let oldRun = JobMethods.run;
JobMethods.run = function (...args) {
let name = this.attrs.name;
let waitTime = Date.now() - this.attrs.nextRunAt;
let details = {
name,
waitTime,
data: this.attrs.data
};

return Kadira.traceJob(details, () => oldRun.apply(this, args));
};
}
48 changes: 48 additions & 0 deletions lib/hijack/bullmq.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { checkModuleUsed, tryResolve } from './commonjs-utils';

export function wrapBullMQ () {
Meteor.startup(() => {
if (checkModuleUsed('bullmq')) {
instrumentBullMQ(tryResolve('bullmq'));
}
});
}

function instrumentBullMQ (modulePath) {
let bullMq = Npm.require(modulePath);

let oldAdd = bullMq.Queue.prototype.addJob;
bullMq.Queue.prototype.addJob = function () {
Kadira.models.jobs.trackNewJob(this.name);
return oldAdd.apply(this, arguments);
};

let oldAddBulk = bullMq.Queue.prototype.addJobs;
bullMq.Queue.prototype.addJobs = function (jobs) {
let count = jobs && jobs.length || 0;

Kadira.models.jobs.trackNewJob(this.name, count);

return oldAddBulk.apply(this, arguments);
};

let oldProcessJob = bullMq.Worker.prototype.callProcessJob;
bullMq.Worker.prototype.callProcessJob = function (...args) {
let job = args[0];
let name = this.name;

return Kadira.traceJob({
name,
waitTime: Date.now() - (job.timestamp + (job.delay || 0)),
_attributes: {
jobId: job.id,
jobName: job.name,
jobCreated: new Date(job.timestamp),
jobDelay: job.delay || 0,
queueName: job.queueName,
attemptsMade: job.attemptsMade,
},
data: job.data
}, () => oldProcessJob.apply(this, args));
};
}
46 changes: 46 additions & 0 deletions lib/hijack/commonjs-utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
const logger = Npm.require('debug')('kadira:apm');
const path = require('path');

let meteorBootstrap = typeof __meteor_bootstrap__ === 'object' && __meteor_bootstrap__;
let serverDir = meteorBootstrap ? meteorBootstrap.serverDir : process.cwd();
let nodeRequire;

try {
// eslint-disable-next-line global-require
let nodeModule = require('node:module');

nodeRequire = nodeModule.createRequire(serverDir);
} catch (err) {
logger(`Failed to create native require: ${err}`);
}

export function tryResolve (modulePath) {
if (!meteorBootstrap || !nodeRequire) {
return false;
}

try {
return nodeRequire.resolve(modulePath, {
paths: [
serverDir,
path.resolve(serverDir, 'npm')
]
});
} catch (err) {
if (err.code === 'MODULE_NOT_FOUND') {
return false;
}

throw err;
}
}

export function checkModuleUsed (name) {
let resolved = tryResolve(name);

if (!resolved) {
return false;
}

return !!nodeRequire.cache[resolved];
}
6 changes: 6 additions & 0 deletions lib/hijack/instrument.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import { wrapStringifyDDP } from './wrap_ddp_stringify';
import { setLabels } from './set_labels';
import { hijackDBOps } from './db';
import { wrapRedisOplogObserveDriver } from './redis_oplog';
import { wrapSyncedCron } from './synced-cron.js';
import { wrapAgenda } from './agenda.js';
import { wrapBullMQ } from './bullmq.js';

let instrumented = false;
Kadira._startInstrumenting = function (callback = () => {}) {
Expand All @@ -33,6 +36,9 @@ Kadira._startInstrumenting = function (callback = () => {}) {
wrapPicker();
wrapFs();
wrapRouters();
wrapSyncedCron();
wrapAgenda();
wrapBullMQ();

MeteorX.onReady(function () {
// instrumenting session
Expand Down
19 changes: 19 additions & 0 deletions lib/hijack/synced-cron.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
export function wrapSyncedCron () {
Meteor.startup(() => {
let cronPackage = Package['littledata:synced-cron'] || Package['percolate:synced-cron'];

if (!cronPackage) {
return;
}

let cron = cronPackage.SyncedCron;

Object.values(cron._entries).forEach(entry => {
let oldJob = entry.job;

entry.job = function (...args) {
return Kadira.traceJob({ name: entry.name },() => oldJob.apply(this, args));
};
});
});
}
Loading

0 comments on commit 5fc9d5f

Please sign in to comment.