Skip to content

Commit

Permalink
fix [Task]: 支持mongodb 7 #7099
Browse files Browse the repository at this point in the history
  • Loading branch information
baozhoutao committed Nov 29, 2024
1 parent b066e99 commit f36ec25
Show file tree
Hide file tree
Showing 18 changed files with 440 additions and 182 deletions.
9 changes: 8 additions & 1 deletion creator/packages/mongo/oplog_observe_driver.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { oplogV2V1Converter } from './oplog_v2_converter';

var Future = Npm.require('fibers/future');

var PHASE = {
Expand Down Expand Up @@ -598,11 +600,16 @@ _.extend(OplogObserveDriver.prototype, {
if (self._matcher.documentMatches(op.o).result)
self._addMatching(op.o);
} else if (op.op === 'u') {
// we are mapping the new oplog format on mongo 5
// to what we know better, $set
op.o = oplogV2V1Converter(op.o)
// Is this a modifier ($set/$unset, which may require us to poll the
// database to figure out if the whole document matches the selector) or
// a replacement (in which case we can just directly re-evaluate the
// selector)?
var isReplace = !_.has(op.o, '$set') && !_.has(op.o, '$unset');
// oplog format has changed on mongodb 5, we have to support both now
// diff is the format in Mongo 5+ (oplog v2)
var isReplace = !_.has(op.o, '$set') && !_.has(op.o, 'diff') && !_.has(op.o, '$unset');
// If this modifier modifies something inside an EJSON custom type (ie,
// anything with EJSON$), then we can't try to use
// LocalCollection._modify, since that just mutates the EJSON encoding,
Expand Down
105 changes: 105 additions & 0 deletions creator/packages/mongo/oplog_v2_converter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@


function join(prefix, key) {
return prefix ? `${prefix}.${key}` : key;
}

const arrayOperatorKeyRegex = /^(a|[su]\d+)$/;

function isArrayOperatorKey(field) {
return arrayOperatorKeyRegex.test(field);
}

function isArrayOperator(operator) {
return operator.a === true && Object.keys(operator).every(isArrayOperatorKey);
}

function flattenObjectInto(target, source, prefix) {
if (Array.isArray(source) || typeof source !== 'object' || source === null ||
source instanceof Mongo.ObjectID) {
target[prefix] = source;
} else {
const entries = Object.entries(source);
if (entries.length) {
entries.forEach(([key, value]) => {
flattenObjectInto(target, value, join(prefix, key));
});
} else {
target[prefix] = source;
}
}
}

const logDebugMessages = !!process.env.OPLOG_CONVERTER_DEBUG;

function convertOplogDiff(oplogEntry, diff, prefix) {
if (logDebugMessages) {
console.log(`convertOplogDiff(${JSON.stringify(oplogEntry)}, ${JSON.stringify(diff)}, ${JSON.stringify(prefix)})`);
}

Object.entries(diff).forEach(([diffKey, value]) => {
if (diffKey === 'd') {
// Handle `$unset`s.
if (oplogEntry.$unset === null || oplogEntry.$unset === undefined) {
oplogEntry.$unset = {};
}
Object.keys(value).forEach(key => {
oplogEntry.$unset[join(prefix, key)] = true;
});
} else if (diffKey === 'i') {
// Handle (potentially) nested `$set`s.
if (oplogEntry.$set === null || oplogEntry.$set === undefined) {
oplogEntry.$set = {};
}
flattenObjectInto(oplogEntry.$set, value, prefix);
} else if (diffKey === 'u') {
// Handle flat `$set`s.
if (oplogEntry.$set === null || oplogEntry.$set === undefined) {
oplogEntry.$set = {};
}
Object.entries(value).forEach(([key, value]) => {
oplogEntry.$set[join(prefix, key)] = value;
});
} else {
// Handle s-fields.
const key = diffKey.slice(1);
if (isArrayOperator(value)) {
// Array operator.
Object.entries(value).forEach(([position, value]) => {
if (position === 'a') {
return;
}

const positionKey = join(join(prefix, key), position.slice(1));
if (position[0] === 's') {
convertOplogDiff(oplogEntry, value, positionKey);
} else if (value === null) {
if (oplogEntry.$unset === null || oplogEntry.$unset === undefined) {
oplogEntry.$unset = {};
}
oplogEntry.$unset[positionKey] = true;
} else {
if (oplogEntry.$set === null || oplogEntry.$set === undefined) {
oplogEntry.$set = {};
}
oplogEntry.$set[positionKey] = value;
}
});
} else if (key) {
// Nested object.
convertOplogDiff(oplogEntry, value, join(prefix, key));
}
}
});
}

export function oplogV2V1Converter(oplogEntry) {
// Pass-through v1 and (probably) invalid entries.
if (oplogEntry.$v !== 2 || !oplogEntry.diff) {
return oplogEntry;
}

const convertedOplogEntry = { $v: 2 };
convertOplogDiff(convertedOplogEntry, oplogEntry.diff, '');
return convertedOplogEntry;
}
2 changes: 1 addition & 1 deletion creator/packages/mongo/package.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Package.onUse(function (api) {

api.addFiles(['mongo_driver.js', 'oplog_tailing.js',
'observe_multiplex.js', 'doc_fetcher.js',
'polling_observe_driver.js','oplog_observe_driver.js'],
'polling_observe_driver.js','oplog_observe_driver.js', 'oplog_v2_converter.js'],
'server');
api.addFiles('local_collection_driver.js', ['client', 'server']);
api.addFiles('remote_collection_driver.js', 'server');
Expand Down
2 changes: 1 addition & 1 deletion server/bundle/programs/server/packages/aldeed_tabular.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var Deps = Package.tracker.Deps;
var HTML = Package.htmljs.HTML;

/* Package-scope variables */
var Tabular;
var Tabular, getPubSelector, tableInit, Util;

(function(){

Expand Down
Loading

0 comments on commit f36ec25

Please sign in to comment.