Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mongo): refactor _fields #116

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 52 additions & 35 deletions packages/mongo/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { BSONType, ClientSession, Collection, Db, IndexDescription, Long, MongoClient, MongoClientOptions, MongoError, ObjectId } from 'mongodb'
import { Binary, Dict, isNullable, makeArray, mapValues, noop, omit, pick } from 'cosmokit'
import { Binary, Dict, isNullable, makeArray, mapValues, noop, omit, pick, remove } from 'cosmokit'
import { Driver, Eval, executeUpdate, Field, hasSubquery, Query, RuntimeError, Selection, z } from 'minato'
import { URLSearchParams } from 'url'
import { Builder } from './builder'
Expand All @@ -8,6 +8,14 @@

const tempKey = '__temp_minato_mongo__'

interface TableMeta {
_id: string
virtual?: boolean
migrate?: boolean
autoInc?: number
fields?: string[]
}

export class MongoDriver extends Driver<MongoDriver.Config> {
static name = 'mongo'

Expand Down Expand Up @@ -128,52 +136,62 @@
const coll = this.db.collection(table)
const bulk = coll.initializeOrderedBulkOp()
const virtualKey = this.getVirtualKey(table)
const metaTable = this.db.collection<TableMeta>('_fields')
const meta = { _id: table }, found = await metaTable.findOne(meta)
if (!found?.fields) {
this.logger.info('initializing fields for table %s', table)
await metaTable.updateOne(meta, { $set: { fields: Object.keys(fields) } }, { upsert: true })
return
}
for (const key in fields) {
if (virtualKey === key) continue
const { initial, legacy = [] } = fields[key]!
if (!Field.available(fields[key])) continue
const filter = { [key]: { $exists: false } }
for (const oldKey of legacy) {
bulk
.find({ ...filter, [oldKey]: { $exists: true } })
.update({ $rename: { [oldKey]: key } })
filter[oldKey] = { $exists: false }
}
bulk.find(filter).update({ $set: { [key]: initial ?? null } })
if (legacy.length) {
const $unset = Object.fromEntries(legacy.map(key => [key, '']))
bulk.find({}).update({ $unset })
if (found.fields.includes(key)) continue
this.logger.info('auto migrating field %s for table %s', key, table)

const oldKey = found.fields.find(field => legacy.includes(field))
if (oldKey) {
remove(found.fields, oldKey)
found.fields.push(key)
bulk.find({ [oldKey]: { $exists: true } }).update({ $rename: { [oldKey]: key } })
} else {
found.fields.push(key)
bulk.find({}).update({ $set: { [key]: initial ?? null } })
}
}
if (bulk.batches.length) await bulk.execute()
if (bulk.batches.length) {
await bulk.execute()
await metaTable.updateOne(meta, { $set: { fields: found.fields } })
}
}

private async _migrateVirtual(table: string) {
const { primary, fields: modelFields } = this.model(table)
const { primary, fields } = this.model(table)
if (Array.isArray(primary)) return
const fields = this.db.collection('_fields')
const meta: Dict = { table, field: primary }
const found = await fields.findOne(meta)
const metaTable = this.db.collection<TableMeta>('_fields')
const meta = { _id: table }, found = await metaTable.findOne(meta)
let virtual = !!found?.virtual
const useVirtualKey = !!this.getVirtualKey(table)
// If _fields table was missing for any reason
// Test the type of _id to get its possible preference
if (!found) {
const doc = await this.db.collection(table).findOne()
if (doc) {
virtual = typeof doc._id !== 'object' || (typeof primary === 'string' && modelFields[primary]?.deftype === 'primary')
} else {
// Empty collection, just set meta and return
fields.updateOne(meta, { $set: { virtual: useVirtualKey } }, { upsert: true })
this.logger.info('Successfully reconfigured table %s', table)
virtual = typeof doc._id !== 'object' || (typeof primary === 'string' && fields[primary]?.deftype === 'primary')
}
if (!doc || virtual === useVirtualKey) {
// Empty table or already configured
await metaTable.updateOne(meta, { $set: { virtual: useVirtualKey } }, { upsert: true })
this.logger.info('successfully reconfigured table %s', table)
return
}
}
if (virtual === useVirtualKey) return
this.logger.info('Start migrating table %s', table)
this.logger.info('start migrating table %s', table)

if (found?.migrate && await this.db.listCollections({ name: '_migrate_' + table }).hasNext()) {
this.logger.info('Last time crashed, recover')
this.logger.info('last time crashed, recover')

Check warning on line 194 in packages/mongo/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/mongo/src/index.ts#L194

Added line #L194 was not covered by tests
} else {
await this.db.dropCollection('_migrate_' + table).catch(noop)
await this.db.collection(table).aggregate([
Expand All @@ -183,23 +201,22 @@
{ $unset: ['_temp_id', ...useVirtualKey ? [primary] : []] },
{ $out: '_migrate_' + table },
]).toArray()
await fields.updateOne(meta, { $set: { migrate: true } }, { upsert: true })
await metaTable.updateOne(meta, { $set: { migrate: true } }, { upsert: true })
}
await this.db.dropCollection(table).catch(noop)
await this.db.renameCollection('_migrate_' + table, table)
await fields.updateOne(meta,
await metaTable.updateOne(meta,
{ $set: { virtual: useVirtualKey, migrate: false } },
{ upsert: true },
)
this.logger.info('Successfully migrated table %s', table)
this.logger.info('successfully migrated table %s', table)
}

private async _migratePrimary(table: string) {
const { primary, autoInc } = this.model(table)
if (Array.isArray(primary) || !autoInc) return
const fields = this.db.collection('_fields')
const meta: Dict = { table, field: primary }
const found = await fields.findOne(meta)
const metaTable = this.db.collection<TableMeta>('_fields')
const meta = { _id: table }, found = await metaTable.findOne(meta)
if (!isNullable(found?.autoInc)) return

const coll = this.db.collection(table)
Expand All @@ -215,9 +232,8 @@
}

const [latest] = await coll.find().sort(this.getVirtualKey(table) ? '_id' : primary, -1).limit(1).toArray()
await fields.updateOne(meta, {
$set: { autoInc: latest ? +latest[this.getVirtualKey(table) ? '_id' : primary] : 0 },
$setOnInsert: { virtual: !!this.getVirtualKey(table) },
await metaTable.updateOne(meta, {
$set: { autoInc: latest ? +latest[this.getVirtualKey(table) ? '_id' : primary] : 0, virtual: !!this.getVirtualKey(table) },
}, { upsert: true })
}

Expand Down Expand Up @@ -255,6 +271,7 @@
}

async drop(table: string) {
await this.db.collection<TableMeta>('_fields').deleteOne({ _id: table }, { session: this.session })
await this.db.dropCollection(table, { session: this.session })
}

Expand Down Expand Up @@ -397,8 +414,8 @@
if (typeof primary === 'string' && autoInc && model.fields[primary]?.deftype !== 'primary') {
const missing = data.filter(item => !(primary in item))
if (!missing.length) return
const doc = await this.db.collection('_fields').findOneAndUpdate(
{ table, field: primary },
const doc = await this.db.collection<TableMeta>('_fields').findOneAndUpdate(
{ _id: table },
{ $inc: { autoInc: missing.length } },
{ session: this.session, upsert: true },
)
Expand Down