Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark unrecoverable errors so they don't spawn #412

Merged
merged 3 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 76 additions & 55 deletions lib/_pkg/payjoin/manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ const List<String> _ohttpRelayUrls = [

const payjoinDirectoryUrl = 'https://payjo.in';

sealed class SessionError {
final String message;

const SessionError._(this.message);

factory SessionError.recoverable(String message) = RecoverableError;
factory SessionError.unrecoverable(String message) = UnrecoverableError;
}

class RecoverableError extends SessionError {
const RecoverableError(super.message) : super._();
}

class UnrecoverableError extends SessionError {
const UnrecoverableError(super.message) : super._();
}

class PayjoinManager {
PayjoinManager(this._walletTx, this._payjoinStorage);
final WalletTx _walletTx;
Expand Down Expand Up @@ -123,12 +140,15 @@ class PayjoinManager {
await _payjoinStorage.markSenderSessionComplete(pjUri);
completer.complete(null);
}
} else if (message is Err) {
} else if (message is SessionError) {
PayjoinEventBus().emit(
PayjoinSendFailureEvent(pjUri: pjUri, error: message),
PayjoinSendFailureEvent(pjUri: pjUri, error: message.message),
);
if (message is UnrecoverableError) {
await _payjoinStorage.markSenderSessionUnrecoverable(pjUri);
}
await _cleanupSession(pjUri);
completer.complete(message);
completer.complete(Err(message.message));
}
});

Expand Down Expand Up @@ -254,15 +274,21 @@ class PayjoinManager {
// TODO PROPAGATE ERROR TO UI TOAST / TRANSACTION HISTORY
debugPrint(e.toString());
await _cleanupSession(receiver.id());
await _payjoinStorage
.markReceiverSessionUnrecoverable(receiver.id());
completer.complete(
Err(
e.toString(),
),
);
}
} else if (message is Err) {
} else if (message is SessionError) {
await _cleanupSession(receiver.id());
completer.complete(message);
if (message is UnrecoverableError) {
await _payjoinStorage
.markReceiverSessionUnrecoverable(receiver.id());
}
completer.complete(Err(message.toString()));
}
});

Expand Down Expand Up @@ -301,11 +327,13 @@ class PayjoinManager {
final filteredReceivers = receiverSessions
.where((session) =>
session.walletId == wallet.id &&
session.status != PayjoinSessionStatus.success)
session.status != PayjoinSessionStatus.success &&
session.status != PayjoinSessionStatus.unrecoverable)
.toList();
final filteredSenders = senderSessions.where((session) {
return session.walletId == wallet.id &&
session.status != PayjoinSessionStatus.success;
session.status != PayjoinSessionStatus.success &&
session.status != PayjoinSessionStatus.unrecoverable;
}).toList();

final spawnedReceivers = filteredReceivers.map((session) {
Expand Down Expand Up @@ -368,6 +396,7 @@ class PayjoinManager {

enum PayjoinSessionStatus {
pending,
unrecoverable,
success,
}

Expand Down Expand Up @@ -473,21 +502,19 @@ Future<void> _isolateSender(List<dynamic> args) async {
// Reconstruct the Sender from the JSON
final sender = Sender.fromJson(senderJson);

// Run the sender logic inside the isolate
try {
final proposalPsbt = await _runSender(sender, sendPort: sendPort);
if (proposalPsbt == null) throw Exception('proposalPsbt is null');
sendPort.send({
'type': 'psbt_to_sign',
'psbt': proposalPsbt,
});
} catch (e) {
sendPort.send(Err(e.toString()));
sendPort.send(e);
}
}

/// Top-level function that attempts to run payjoin sender (V2 protocol first, fallback to V1).
Future<String?> _runSender(Sender sender, {required SendPort sendPort}) async {
Future<String> _runSender(Sender sender, {required SendPort sendPort}) async {
final dio = Dio();

try {
Expand All @@ -506,24 +533,26 @@ Future<String?> _runSender(Sender sender, {required SendPort sendPort}) async {
sendPort.send({'type': 'request_posted'});

while (true) {
try {
final (getRequest, getReqCtx) = await getCtx.extractReq(
ohttpRelay: await _randomOhttpRelayUrl(),
);
final getRes = await _postRequest(dio, getRequest);
final proposalPsbt = await getCtx.processResponse(
response: getRes.data as List<int>,
ohttpCtx: getReqCtx,
);
if (proposalPsbt != null) return proposalPsbt;
} catch (e) {
print('Error occurred while processing payjoin: $e');
// Loop until a valid response is found
}
final (getRequest, getReqCtx) = await getCtx.extractReq(
ohttpRelay: await _randomOhttpRelayUrl(),
);
final getRes = await _postRequest(dio, getRequest);
final proposalPsbt = await getCtx.processResponse(
response: getRes.data as List<int>,
ohttpCtx: getReqCtx,
);
if (proposalPsbt != null) return proposalPsbt;
}
} catch (e) {
// If V2 fails, attempt V1
return await _runSenderV1(sender, dio, sendPort);
if (e is PayjoinException &&
// TODO condition on error type instead of message content
e.message?.contains('parse receiver public key') == true) {
return await _runSenderV1(sender, dio, sendPort);
} else if (e is DioException) {
throw Exception(SessionError.recoverable(e.toString()));
} else {
throw Exception(SessionError.unrecoverable(e.toString()));
}
}
}

Expand Down Expand Up @@ -651,7 +680,7 @@ Future<void> _isolateReceiver(List<dynamic> args) async {
return payjoinProposal;
} catch (e) {
print('Error occurred while finalizing proposal: $e');
throw Exception('Error occurred while finalizing proposal');
rethrow;
}
}

Expand All @@ -668,10 +697,10 @@ Future<void> _isolateReceiver(List<dynamic> args) async {
'type': 'proposal_sent',
});
} catch (e) {
try {
isolateTomainSendPort.send(Err(e.toString()));
} catch (e) {
print('$e');
if (e is DioException) {
isolateTomainSendPort.send(SessionError.recoverable(e.toString()));
} else {
isolateTomainSendPort.send(SessionError.unrecoverable(e.toString()));
}
}
}
Expand All @@ -680,34 +709,26 @@ Future<UncheckedProposal> _receiveUncheckedProposal(
Dio dio,
Receiver receiver,
) async {
try {
while (true) {
final (req, context) = await receiver.extractReq();
final ohttpResponse = await _postRequest(dio, req);
final proposal = await receiver.processRes(
body: ohttpResponse.data as List<int>,
ctx: context,
);
if (proposal != null) {
return proposal;
}
while (true) {
final (req, context) = await receiver.extractReq();
final ohttpResponse = await _postRequest(dio, req);
final proposal = await receiver.processRes(
body: ohttpResponse.data as List<int>,
ctx: context,
);
if (proposal != null) {
return proposal;
}
} catch (e) {
throw Exception('Error occurred while processing payjoin receiver: $e');
}
}

Future<void> _respondProposal(Dio dio, PayjoinProposal proposal) async {
try {
final (postReq, ohttpCtx) = await proposal.extractV2Req();
final postRes = await _postRequest(dio, postReq);
await proposal.processRes(
res: postRes.data as List<int>,
ohttpContext: ohttpCtx,
);
} catch (e) {
throw Exception('Error occurred while processing payjoin: $e');
}
final (postReq, ohttpCtx) = await proposal.extractV2Req();
final postRes = await _postRequest(dio, postReq);
await proposal.processRes(
res: postRes.data as List<int>,
ohttpContext: ohttpCtx,
);
}

/// Posts a request via dio and returns the response.
Expand Down
45 changes: 45 additions & 0 deletions lib/_pkg/payjoin/storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,28 @@ class PayjoinStorage {
}
}

Future<Err?> markReceiverSessionUnrecoverable(String id) async {
try {
final (session, err) = await readReceiverSession(id);
if (err != null) return err;

final updatedSession = RecvSession(
session!.isTestnet,
session.receiver,
session.walletId,
PayjoinSessionStatus.unrecoverable,
);

await _hiveStorage.saveValue(
key: receiverPrefix + id,
value: jsonEncode(updatedSession.toJson()),
);
return null;
} catch (e) {
return Err(e.toString());
}
}

Future<(List<RecvSession>, Err?)> readAllReceivers() async {
//deleteAllSessions();
try {
Expand Down Expand Up @@ -186,6 +208,29 @@ class PayjoinStorage {
}
}

Future<Err?> markSenderSessionUnrecoverable(String pjUri) async {
try {
final (session, err) = await readSenderSession(pjUri);
if (err != null) return err;

final updatedSession = SendSession(
session!.isTestnet,
session.sender,
session.walletId,
session.pjUri,
PayjoinSessionStatus.unrecoverable,
);

await _hiveStorage.saveValue(
key: senderPrefix + pjUri,
value: jsonEncode(updatedSession.toJson()),
);
return null;
} catch (e) {
return Err(e.toString());
}
}

Future<(List<SendSession>, Err?)> readAllSenders() async {
try {
final (allData, err) = await _hiveStorage.getAll();
Expand Down