diff --git a/api/paidAction/buyCredits.js b/api/paidAction/buyCredits.js new file mode 100644 index 000000000..731f37d75 --- /dev/null +++ b/api/paidAction/buyCredits.js @@ -0,0 +1,35 @@ +// XXX we don't use this yet ... +// it's just showing that even buying credits +// can eventually be a paid action + +import { ANON_USER_ID } from '@/lib/constants' + +export const anonable = true +export const supportsPessimism = true +export const supportsOptimism = true + +export async function getCost ({ amount }) { + return BigInt(amount) * BigInt(1000) +} + +export async function doStatements () { + return [] +} + +export async function onPaidStatements ({ invoice }, { models }) { + return [ + models.users.update({ + where: { id: invoice.userId }, + data: { balance: { increment: invoice.msatsReceived } } + }) + ] +} + +export async function resultsToResponse (results, args, context) { + return null +} + +export async function describe ({ amount }, { models, me }) { + const user = await models.user.findUnique({ where: { id: me?.id ?? ANON_USER_ID } }) + return `SN: buying credits for @${user.name}` +} diff --git a/api/paidAction/createItem.js b/api/paidAction/createItem.js index 4bda47a31..3e1b80bb7 100644 --- a/api/paidAction/createItem.js +++ b/api/paidAction/createItem.js @@ -1,13 +1,26 @@ +import { ANON_USER_ID } from '@/lib/constants' +import { getDeleteAt, getRemindAt } from '@/lib/item' +import { notifyItemParents, notifyTerritorySubscribers, notifyUserSubscribers } from '@/lib/webPush' + export const anonable = true export const supportsPessimism = true export const supportsOptimism = true -export async function getCost () { - // TODO - return null +export async function getCost ({ subName, parentId, uploadIds, boost }, { models, user }) { + const sub = await models.sub.findUnique({ where: { name: subName } }) + const baseCost = sub.baseCost * BigInt(1000) + // baseCost * 10^num_items_in_10m * 100 (anon) or 1 (user) + image fees + const [cost] = await models.$queryRaw` + SELECT ${baseCost}::INTEGER + * POWER(10, item_spam(${parentId}::INTEGER, ${user?.id || ANON_USER_ID}::INTEGER, '10m'::INTERVAL)) + * ${user ? 1 : 100}::INTEGER + + (SELECT "nUnpaid" * "imageFeeMsats" FROM image_fees_info(${user?.id || ANON_USER_ID}::INTEGER, ${uploadIds}))` + // freebies must be allowed, cost must be less than baseCost, no boost, user must exist, and cost must be less or equal to user's balance + const freebie = sub.allowFreebies && cost <= baseCost && !boost && !!user && cost <= user?.privates?.msats + return freebie ? 0 : cost } -export async function performStatements ( +export async function doStatements ( { invoiceId, uploadIds = [], itemForwards = [], pollOptions = [], boost = 0, ...data }, { me, models, cost }) { const boostMsats = BigInt(boost) * BigInt(1000) @@ -22,60 +35,136 @@ export async function performStatements ( itemActs.push({ msats: cost - boostMsats, act: 'FEE', invoiceId, invoiceActionState: 'PENDING', userId: me.id }) + } else { + data.freebie = true + } + + const mentions = [] + const text = data.text + if (text) { + const mentionPattern = /\B@[\w_]+/gi + const names = text.match(mentionPattern)?.map(m => m.slice(1)) + if (names?.length > 0) { + const users = await models.user.findMany({ where: { name: { in: names } } }) + mentions.push(...users.map(({ id }) => ({ userId: id })) + .filter(({ userId }) => userId !== me.id)) + } + data.deleteAt = getDeleteAt(text) + data.remindAt = getRemindAt(text) + } + + if (me) { + const [medianVote] = await models.$executeRaw` + SELECT LEAST( + percentile_cont(0.5) WITHIN GROUP(ORDER BY "weightedVotes" - "weightedDownVotes"), 0) + FROM "Item" WHERE "userId" = ${me.id}` + + data.weightedDownVotes = Math.abs(medianVote) + } + + const itemData = { + ...data, + text, + boost, + invoiceId, + userId: me.id || ANON_USER_ID, + actionInvoiceState: 'PENDING', + threadSubscription: { + createMany: [ + { userId: me.id }, + ...itemForwards.map(({ userId }) => ({ userId })) + ] + }, + itemForwards: { + createMany: itemForwards + }, + pollOptions: { + createMany: pollOptions + }, + itemUploads: { + connect: uploadIds.map(id => ({ uploadId: id })) + }, + itemAct: { + createMany: itemActs + }, + mention: { + createMany: mentions + } + } + + const stmts = [] + if (data.bio && me) { + stmts.push(models.user.update({ + where: { id: me.id }, + data: { + bio: { + create: itemData + } + } + })) + } else { + stmts.push(models.item.create({ + data: itemData + })) } return [ + ...stmts, models.upload.updateMany({ where: { id: { in: uploadIds } }, data: { invoiceId, actionInvoiceState: 'PENDING' } - }), - models.item.create({ - data: { - ...data, - boost, - invoiceId, - actionInvoiceState: 'PENDING' - }, - threadSubscription: { - createMany: [ - { userId: me.id }, - ...itemForwards.map(({ userId }) => ({ userId })) - ] - }, - itemForwards: { - createMany: itemForwards - }, - pollOptions: { - createMany: pollOptions - }, - itemUploads: { - connect: uploadIds.map(id => ({ uploadId: id })) - }, - itemAct: { - createMany: itemActs - } }) ] - // TODO: run_auction for job or just remove jobs? } export async function resultsToResponse (results, args, context) { - // TODO - return null + return args.bio ? results[0].bio : results[0] } export async function onPaidStatements ({ invoice }, { models }) { const item = await models.item.findFirst({ where: { invoiceId: invoice.id } }) + const stmts = [] + if (item.deleteAt) { + stmts.push(models.$queryRaw` + INSERT INTO pgboss.job (name, data, startafter, expirein) + VALUES ( + 'deleteItem', + jsonb_build_object('id', ${item.id}), + ${item.deleteAt}, + ${item.deleteAt} - now() + interval '1 minute')`) + } + if (item.remindAt) { + stmts.push(models.$queryRaw` + INSERT INTO pgboss.job (name, data, startafter, expirein) + VALUES ( + 'remindItem', + jsonb_build_object('id', ${item.id}), + ${item.remindAt}, + ${item.remindAt} - now() + interval '1 minute')`) + } + if (item.maxBid) { + stmts.push(models.$executeRaw`SELECT run_auction(${item.id}::INTEGER)`) + } + return [ models.itemAct.updateMany({ where: { invoiceId: invoice.id }, data: { invoiceActionState: 'PAID' } }), models.item.updateMany({ where: { invoiceId: invoice.id }, data: { invoiceActionState: 'PAID' } }), models.upload.updateMany({ where: { invoiceId: invoice.id }, data: { actionInvoiceState: 'PAID' } }), - // TODO: this doesn't work because it's a trigger - models.$executeRaw`SELECT ncomments_after_comment(${item.id}::INTEGER)` - // TODO: create mentions - // TODO: bot stuff - // TODO: notifications + // TODO: this don't work because it's a trigger + models.$executeRaw`SELECT ncomments_after_comment(${item.id}::INTEGER)`, + // jobs ... TODO: remove the triggers for these + models.$executeRaw`INSERT INTO pgboss.job (name, data, startafter, priority) + VALUES ('timestampItem', jsonb_build_object('id', ${item.id}), now() + interval '10 minutes', -2)`, + models.$executeRaw`INSERT INTO pgboss.job (name, data, priority) VALUES ('indexItem', jsonb_build_object('id', NEW.id), -100)`, + models.$executeRaw` + INSERT INTO pgboss.job (name, data, retrylimit, retrybackoff, startafter) + VALUES ('imgproxy', jsonb_build_object('id', item.id), 21, true, now() + interval '5 seconds')`, + ...stmts, + // TODO: this doesn't work because we expect prisma queries + notifyItemParents({ item, me: item.userId, models }), + notifyUserSubscribers({ models, item }), + notifyTerritorySubscribers({ models, item }) ] } diff --git a/api/paidAction/index.js b/api/paidAction/index.js index 36d3d588c..8b032660e 100644 --- a/api/paidAction/index.js +++ b/api/paidAction/index.js @@ -4,6 +4,7 @@ import { verifyPayment } from '../resolvers/serial' import { ANON_USER_ID } from '@/lib/constants' export const paidActions = { + BUY_CREDITS: require('./buyCredits'), CREATE_ITEM: require('./createItem'), UPDATE_ITEM: require('./updateItem'), ZAP: require('./zap'), @@ -31,8 +32,8 @@ export default async function doPaidAction (actionType, args, context) { return await doPessimiticAction(actionType, args, context) } - context.cost = await paidAction.getCost(args, context) context.user = await models.user.findUnique({ where: { id: me.id } }) + context.cost = await paidAction.getCost(args, context) const isRich = context.cost <= context.user.privates.msats if (!isRich && !paidAction.supportsOptimism) { diff --git a/lib/item.js b/lib/item.js index a7d111069..6a58695ae 100644 --- a/lib/item.js +++ b/lib/item.js @@ -30,6 +30,24 @@ export const getDeleteCommand = (text) => { return commands.length ? commands[commands.length - 1] : undefined } +export const getDeleteAt = (text) => { + const command = getDeleteCommand(text) + if (command) { + const { number, unit } = command + return datePivot(new Date(), { [`${unit}s`]: number }) + } + return null +} + +export const getRemindAt = (text) => { + const command = getReminderCommand(text) + if (command) { + const { number, unit } = command + return datePivot(new Date(), { [`${unit}s`]: number }) + } + return null +} + export const hasDeleteCommand = (text) => !!getDeleteCommand(text) export const hasReminderMention = (text) => reminderMentionPattern.test(text ?? '') diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 175ad2ae4..557cbf75b 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -284,18 +284,20 @@ model ItemUpload { } model Upload { - id Int @id @default(autoincrement()) - createdAt DateTime @default(now()) @map("created_at") - updatedAt DateTime @default(now()) @updatedAt @map("updated_at") - type String - size Int - width Int? - height Int? - userId Int - paid Boolean? - user User @relation("Uploads", fields: [userId], references: [id], onDelete: Cascade) - User User[] - ItemUpload ItemUpload[] + id Int @id @default(autoincrement()) + createdAt DateTime @default(now()) @map("created_at") + updatedAt DateTime @default(now()) @updatedAt @map("updated_at") + type String + size Int + width Int? + height Int? + userId Int + invoiceId Int? + invoiceActionState InvoiceActionState? + invoice Invoice? @relation(fields: [invoiceId], references: [id], onDelete: SetNull) + user User @relation("Uploads", fields: [userId], references: [id], onDelete: Cascade) + User User[] + ItemUpload ItemUpload[] @@index([createdAt], map: "Upload.created_at_index") @@index([userId], map: "Upload.userId_index") @@ -398,6 +400,8 @@ model Item { bounty Int? noteId String? @unique(map: "Item.noteId_unique") rootId Int? + deleteAt DateTime? + remindAt DateTime? bountyPaidTo Int[] upvotes Int @default(0) weightedComments Float @default(0) @@ -668,6 +672,7 @@ model Mention { } enum InvoiceActionType { + BUY_CREDITS ITEM_CREATE ITEM_UPDATE ZAP @@ -709,6 +714,7 @@ model Invoice { actionType InvoiceActionType? ItemAct ItemAct[] Item Item[] + Upload Upload[] @@index([createdAt], map: "Invoice.created_at_index") @@index([userId], map: "Invoice.userId_index") diff --git a/worker/index.js b/worker/index.js index 2d5b8955a..81756952c 100644 --- a/worker/index.js +++ b/worker/index.js @@ -25,6 +25,7 @@ import { ofac } from './ofac.js' import { autoWithdraw } from './autowithdraw.js' import { saltAndHashEmails } from './saltAndHashEmails.js' import { remindUser } from './reminder.js' +import { settleAction, settleActionError } from './paidAction.js' const { loadEnvConfig } = nextEnv const { ApolloClient, HttpLink, InMemoryCache } = apolloClient @@ -104,6 +105,8 @@ async function work () { await boss.work('ofac', jobWrapper(ofac)) await boss.work('saltAndHashEmails', jobWrapper(saltAndHashEmails)) await boss.work('reminder', jobWrapper(remindUser)) + await boss.work('settleActionError', jobWrapper(settleActionError)) + await boss.work('settleAction', jobWrapper(settleAction)) console.log('working jobs') } diff --git a/worker/paidAction.js b/worker/paidAction.js index 9e62ac0bb..2a053c0e1 100644 --- a/worker/paidAction.js +++ b/worker/paidAction.js @@ -1,31 +1,82 @@ import { paidActions } from '@/api/paidAction' +import { datePivot } from '@/lib/time' +import { Prisma } from '@prisma/client' +import { getInvoice } from 'ln-service' -export async function settleAction ({ data: { invoiceId }, models }) { - const invoice = await models.invoice.findUnique({ +export async function settleAction ({ data: { invoiceId }, models, lnd, boss }) { + const dbInv = await models.invoice.findUnique({ where: { id: invoiceId } }) + const invoice = await getInvoice({ id: dbInv.hash, lnd }) - await models.$transaction([ - // optimistic concurrency control (aborts if invoice is not in PENDING state) - models.invoice.update({ - where: { id: invoice.id, actionState: 'PENDING' }, - data: { actionState: 'PAID' } - }), - ...await paidActions[invoice.actionType].onPaidStatements({ invoice }, { models }) - ]) + if (!invoice.is_confirmed) { + throw new Error('invoice is not confirmed') + } + + try { + await models.$transaction([ + // optimistic concurrency control (aborts if invoice is not in PENDING state) + models.invoice.update({ + where: { id: invoice.id, actionState: 'PENDING' }, + data: { + actionState: 'PAID', + confirmedAt: new Date(invoice.confirmed_at), + confirmedIndex: invoice.confirmed_index, + msatsReceived: BigInt(invoice.received_mtokens) + } + }), + ...await paidActions[dbInv.actionType].onPaidStatements({ invoice: dbInv }, { models }) + ]) + } catch (e) { + if (e instanceof Prisma.PrismaClientKnownRequestError) { + // this error is thrown when we try to update a record that has been updated by another worker + // so we just ignore it and let the other worker take the transition "lock" and perform the transition + if (e.code === 'P2025') { + return + } + } + console.error(`unexpected error transitioning action ${dbInv.actionType} to PAID`, e) + boss.send( + 'settleAction', + { invoiceId }, + { startAfter: datePivot(new Date(), { minutes: 1 }), priority: 1000 }) + } } -export async function settleActionError ({ data: { invoiceId }, models }) { - const invoice = await models.invoice.findUnique({ +export async function settleActionError ({ data: { invoiceId }, models, lnd, boss }) { + const dbInv = await models.invoice.findUnique({ where: { id: invoiceId } }) + const invoice = await getInvoice({ id: dbInv.hash, lnd }) + + if (!invoice.is_cancelled) { + throw new Error('invoice is not cancelled') + } - await models.$transaction([ - // optimistic concurrency control (aborts if invoice is not in PENDING state) - models.invoice.update({ - where: { id: invoice.id, actionState: 'PENDING' }, - data: { actionState: 'FAILED' } - }), - ...await paidActions[invoice.actionType].onFailedStatements({ invoice }, { models }) - ]) + try { + await models.$transaction([ + // optimistic concurrency control (aborts if invoice is not in PENDING state) + models.invoice.update({ + where: { id: dbInv.id, actionState: 'PENDING' }, + data: { + actionState: 'FAILED', + cancelled: true + } + }), + ...await paidActions[invoice.actionType].onFailedStatements({ invoice: dbInv }, { models }) + ]) + } catch (e) { + if (e instanceof Prisma.PrismaClientKnownRequestError) { + // this error is thrown when we try to update a record that has been updated by another worker + // so we just ignore it and let the other worker take the transition "lock" and perform the transition + if (e.code === 'P2025') { + return + } + } + console.error(`unexpected error transitioning action ${dbInv.actionType} to FAILED`, e) + boss.send( + 'settleActionError', + { invoiceId }, + { startAfter: datePivot(new Date(), { minutes: 1 }), priority: 1000 }) + } } diff --git a/worker/wallet.js b/worker/wallet.js index 80507d45c..df630675d 100644 --- a/worker/wallet.js +++ b/worker/wallet.js @@ -9,6 +9,7 @@ import { datePivot, sleep } from '@/lib/time.js' import retry from 'async-retry' import { addWalletLog } from '@/api/resolvers/wallet' import { msatsToSats, numWithUnits } from '@/lib/format' +import { settleAction, settleActionError } from './paidAction' export async function subscribeToWallet (args) { await subscribeToDeposits(args) @@ -119,6 +120,10 @@ async function checkInvoice ({ data: { hash }, boss, models, lnd }) { } if (inv.is_confirmed) { + if (dbInv.actionType) { + return await settleAction({ data: { invoiceId: dbInv.id }, ...arguments }) + } + // NOTE: confirm invoice prevents double confirmations (idempotent) // ALSO: is_confirmed and is_held are mutually exclusive // that is, a hold invoice will first be is_held but not is_confirmed @@ -161,6 +166,10 @@ async function checkInvoice ({ data: { hash }, boss, models, lnd }) { } if (inv.is_canceled) { + if (dbInv.actionType) { + return await settleActionError({ data: { invoiceId: dbInv.id }, ...arguments }) + } + return await serialize( models.invoice.update({ where: {