Skip to content

Commit

Permalink
Use optimistic concurrency lock for retries
Browse files Browse the repository at this point in the history
  • Loading branch information
ekzyis committed Jan 21, 2025
1 parent a7f8916 commit 24df678
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 53 deletions.
9 changes: 6 additions & 3 deletions api/paidAction/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ stateDiagram-v2
PENDING --> FAILED
PAID --> [*]
CANCELING --> FAILED
FAILED --> RETRYING
FAILED --> RETRY_PENDING
FAILED --> [*]
RETRY_PENDING --> RETRYING
RETRYING --> [*]
[*] --> PENDING_HELD
PENDING_HELD --> HELD
Expand Down Expand Up @@ -59,8 +60,9 @@ stateDiagram-v2
PENDING --> FAILED
PAID --> [*]
CANCELING --> FAILED
FAILED --> RETRYING
FAILED --> RETRY_PENDING
FAILED --> [*]
RETRY_PENDING --> RETRYING
RETRYING --> [*]
```
</details>
Expand Down Expand Up @@ -121,8 +123,9 @@ This works by requesting an invoice from the recipient's wallet and reusing the
stateDiagram-v2
PAID --> [*]
CANCELING --> FAILED
FAILED --> RETRYING
FAILED --> RETRY_PENDING
FAILED --> [*]
RETRY_PENDING --> RETRYING
RETRYING --> [*]
[*] --> PENDING_HELD
PENDING_HELD --> FORWARDING
Expand Down
3 changes: 1 addition & 2 deletions api/paidAction/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,10 @@ export async function retryPaidAction (actionType, args, incomingContext) {
return await models.$transaction(async tx => {
const context = { ...retryContext, tx, invoiceArgs }

// update the old invoice to RETRYING, so that it's not confused with FAILED
await tx.invoice.update({
where: {
id: failedInvoice.id,
actionState: 'FAILED'
actionState: 'RETRY_PENDING'
},
data: {
actionState: 'RETRYING'
Expand Down
44 changes: 27 additions & 17 deletions api/resolvers/paidAction.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,45 @@ export default {
}
},
Mutation: {
retryPaidAction: async (parent, { invoiceId }, { models, me, lnd }) => {
retryPaidAction: async (parent, { invoiceId, newAttempt }, { models, me, lnd }) => {
if (!me) {
throw new Error('You must be logged in')
}

const invoice = await models.invoice.findUnique({ where: { id: invoiceId, userId: me.id } })
// optimistic concurrency control:
// make sure only one client at a time can retry by immediately transitioning to an intermediate state
const [invoice] = await models.$queryRaw`
UPDATE "Invoice"
SET "actionState" = 'RETRY_PENDING'
WHERE id in (
SELECT id FROM "Invoice"
WHERE id = ${invoiceId} AND "userId" = ${me.id} AND "actionState" = 'FAILED'
FOR UPDATE
)
RETURNING *`
if (!invoice) {
throw new Error('Invoice not found')
}

if (invoice.actionState !== 'FAILED') {
if (invoice.actionState === 'PAID') {
throw new Error('Invoice is already paid')
}
throw new Error(`Invoice is not in failed state: ${invoice.actionState}`)
}

// a locked invoice means we want to retry a payment from the beginning
// with all sender and receiver wallets so we need to increment the retry count
const paymentAttempt = invoice.lockedAt ? invoice.paymentAttempt + 1 : invoice.paymentAttempt
// do we want to retry a payment from the beginning with all sender and receiver wallets?
const paymentAttempt = newAttempt ? invoice.paymentAttempt + 1 : invoice.paymentAttempt
if (paymentAttempt > WALLET_MAX_RETRIES) {
throw new Error('Payment has been retried too many times')
}

const result = await retryPaidAction(invoice.actionType, { invoice }, { paymentAttempt, models, me, lnd })

return {
...result,
type: paidActionType(invoice.actionType)
try {
const result = await retryPaidAction(invoice.actionType, { invoice }, { paymentAttempt, models, me, lnd })
return {
...result,
type: paidActionType(invoice.actionType)
}
} catch (err) {
// revert state transition to allow new retry for this invoice
await models.invoice.update({
where: { id: invoiceId, actionState: 'RETRY_PENDING' },
data: { actionState: 'FAILED' }
})
throw err
}
}
},
Expand Down
24 changes: 8 additions & 16 deletions api/resolvers/wallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
WALLET_CREATE_INVOICE_TIMEOUT_MS,
WALLET_RETRY_AFTER_MS,
WALLET_RETRY_BEFORE_MS,
WALLET_VISIBILITY_TIMEOUT_MS,
WALLET_MAX_RETRIES
} from '@/lib/constants'
import { amountSchema, validateSchema, withdrawlSchema, lnAddrSchema } from '@/lib/validate'
Expand Down Expand Up @@ -467,21 +466,14 @@ const resolvers = {
}
// make sure each invoice is only returned once via visibility timeouts and SKIP LOCKED
return await models.$queryRaw`
UPDATE "Invoice"
SET "lockedAt" = now()
WHERE id IN (
SELECT id FROM "Invoice"
WHERE "userId" = ${me.id}
AND "actionState" = 'FAILED'
AND "userCancel" = false
AND "cancelledAt" < now() - ${`${WALLET_RETRY_AFTER_MS} milliseconds`}::interval
AND "cancelledAt" > now() - ${`${WALLET_RETRY_BEFORE_MS} milliseconds`}::interval
AND ("lockedAt" IS NULL OR "lockedAt" < now() - ${`${WALLET_VISIBILITY_TIMEOUT_MS} milliseconds`}::interval)
AND "paymentAttempt" < ${WALLET_MAX_RETRIES}
ORDER BY id DESC
FOR UPDATE SKIP LOCKED
)
RETURNING *`
SELECT * FROM "Invoice"
WHERE "userId" = ${me.id}
AND "actionState" = 'FAILED'
AND "userCancel" = false
AND "cancelledAt" < now() - ${`${WALLET_RETRY_AFTER_MS} milliseconds`}::interval
AND "cancelledAt" > now() - ${`${WALLET_RETRY_BEFORE_MS} milliseconds`}::interval
AND "paymentAttempt" < ${WALLET_MAX_RETRIES}
ORDER BY id DESC`
}
},
Wallet: {
Expand Down
2 changes: 1 addition & 1 deletion api/typeDefs/paidAction.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ extend type Query {
}
extend type Mutation {
retryPaidAction(invoiceId: Int!): PaidAction!
retryPaidAction(invoiceId: Int!, newAttempt: Boolean): PaidAction!
}
enum PaymentMethod {
Expand Down
4 changes: 2 additions & 2 deletions components/use-invoice.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ export default function useInvoice () {
return data.cancelInvoice
}, [cancelInvoice])

const retry = useCallback(async ({ id, hash, hmac }, { update } = {}) => {
const retry = useCallback(async ({ id, hash, hmac, newAttempt = false }, { update } = {}) => {
console.log('retrying invoice:', hash)
const { data, error } = await retryPaidAction({ variables: { invoiceId: Number(id) }, update })
const { data, error } = await retryPaidAction({ variables: { invoiceId: Number(id), newAttempt }, update })
if (error) throw error

const newInvoice = data.retryPaidAction.invoice
Expand Down
4 changes: 2 additions & 2 deletions fragments/paidAction.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ export const RETRY_PAID_ACTION = gql`
${PAID_ACTION}
${ITEM_PAID_ACTION_FIELDS}
${ITEM_ACT_PAID_ACTION_FIELDS}
mutation retryPaidAction($invoiceId: Int!) {
retryPaidAction(invoiceId: $invoiceId) {
mutation retryPaidAction($invoiceId: Int!, $newAttempt: Boolean) {
retryPaidAction(invoiceId: $invoiceId, newAttempt: $newAttempt) {
__typename
...PaidActionFields
... on ItemPaidAction {
Expand Down
3 changes: 0 additions & 3 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,5 @@ export const WALLET_CREATE_INVOICE_TIMEOUT_MS = 45_000
// be retried by the client due to sender or receiver fallbacks are not returned to the client.
export const WALLET_RETRY_AFTER_MS = 60_000 // 1 minute
export const WALLET_RETRY_BEFORE_MS = 86_400_000 // 24 hours
// timeout after which we give up on waiting for the retry of a previously returned invoice
// and thus we allow returning an invoice to a client again
export const WALLET_VISIBILITY_TIMEOUT_MS = 60_000 // 1 minute
// we want to attempt a payment three times so we retry two times
export const WALLET_MAX_RETRIES = 2
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
-- AlterTable
ALTER TABLE "Invoice" ADD COLUMN "paymentAttempt" INTEGER NOT NULL DEFAULT 0;
ALTER TABLE "Invoice" ADD COLUMN "lockedAt" TIMESTAMP(3);
CREATE INDEX "Invoice_cancelledAt_idx" ON "Invoice"("cancelledAt");
CREATE INDEX "Invoice_cancelledAt_idx" ON "Invoice"("cancelledAt");

-- AlterEnum
ALTER TYPE "InvoiceActionState" ADD VALUE 'RETRY_PENDING';
2 changes: 1 addition & 1 deletion prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ enum InvoiceActionState {
FORWARDING
FORWARDED
FAILED_FORWARD
RETRY_PENDING
RETRYING
CANCELING
}
Expand Down Expand Up @@ -924,7 +925,6 @@ model Invoice {
cancelled Boolean @default(false)
cancelledAt DateTime?
userCancel Boolean?
lockedAt DateTime?
paymentAttempt Int @default(0)
msatsRequested BigInt
msatsReceived BigInt?
Expand Down
10 changes: 9 additions & 1 deletion wallets/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,15 @@ function RetryHandler ({ children }) {
useEffect(() => {
(async () => {
for (const invoice of failedInvoices) {
const newInvoice = await invoiceHelper.retry(invoice)
let newInvoice
try {
// retries can fail since only one client at a time is allowed to retry
// but multiple clients could attempt a retry at the same time
newInvoice = await invoiceHelper.retry({ ...invoice, newAttempt: true })
} catch (err) {
console.error('retry failed:', err.message)
continue
}
waitForWalletPayment(newInvoice).catch(console.error)
}
})()
Expand Down
6 changes: 3 additions & 3 deletions wallets/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ export async function createWrappedInvoice (userId,

export async function getInvoiceableWallets (userId, { paymentAttempt, predecessorId, models }) {
// filter out all wallets that have already been tried by recursively following the retry chain of predecessor invoices.
// the current predecessor invoice is in state 'FAILED' and not in state 'RETRYING' because we are currently retrying it
// so it has not been updated yet.
// the current predecessor invoice is in state 'RETRY_PENDING' and not in state 'RETRYING'
// because we are currently retrying it so it has not been updated yet.
// if predecessorId is not provided, the subquery will be empty and thus no wallets are filtered out.
const wallets = await models.$queryRaw`
SELECT
Expand All @@ -135,7 +135,7 @@ export async function getInvoiceableWallets (userId, { paymentAttempt, predecess
-- this failed invoice will be used to start the recursion
SELECT "Invoice"."id", "Invoice"."predecessorId"
FROM "Invoice"
WHERE "Invoice"."id" = ${predecessorId} AND "Invoice"."actionState" = 'FAILED'
WHERE "Invoice"."id" = ${predecessorId} AND "Invoice"."actionState" = 'RETRY_PENDING'
UNION ALL
Expand Down

0 comments on commit 24df678

Please sign in to comment.