diff --git a/ant-networking/src/cmd.rs b/ant-networking/src/cmd.rs index f64fcdf236..31987e8e72 100644 --- a/ant-networking/src/cmd.rs +++ b/ant-networking/src/cmd.rs @@ -668,6 +668,7 @@ impl SwarmDriver { } RecordKind::ChunkWithPayment | RecordKind::RegisterWithPayment + | RecordKind::TransactionWithPayment | RecordKind::ScratchpadWithPayment => { error!("Record {record_key:?} with payment shall not be stored locally."); return Err(NetworkError::InCorrectRecordHeader); diff --git a/ant-networking/src/lib.rs b/ant-networking/src/lib.rs index c7dc9928f8..cfe81e6b0b 100644 --- a/ant-networking/src/lib.rs +++ b/ant-networking/src/lib.rs @@ -638,6 +638,7 @@ impl Network { match kind { RecordKind::Chunk | RecordKind::ChunkWithPayment + | RecordKind::TransactionWithPayment | RecordKind::RegisterWithPayment | RecordKind::ScratchpadWithPayment => { error!("Encountered a split record for {pretty_key:?} with unexpected RecordKind {kind:?}, skipping."); diff --git a/ant-node/src/put_validation.rs b/ant-node/src/put_validation.rs index 29876081b9..002652faa0 100644 --- a/ant-node/src/put_validation.rs +++ b/ant-node/src/put_validation.rs @@ -186,6 +186,55 @@ impl Node { } result } + RecordKind::TransactionWithPayment => { + let (payment, transaction) = + try_deserialize_record::<(ProofOfPayment, Transaction)>(&record)?; + + // check if the deserialized value's TransactionAddress matches the record's key + let net_addr = NetworkAddress::from_transaction_address(transaction.address()); + let key = net_addr.to_record_key(); + let pretty_key = PrettyPrintRecordKey::from(&key); + if record.key != key { + warn!( + "Record's key {pretty_key:?} does not match with the value's TransactionAddress, ignoring PUT." + ); + return Err(Error::RecordKeyMismatch); + } + + let already_exists = self.validate_key_and_existence(&net_addr, &key).await?; + + // The transaction may already exist during the replication. + // The payment shall get deposit to self even the transaction already presents. + // However, if the transaction is already present, the incoming one shall be + // appended with the existing one, if content is different. + if let Err(err) = self + .payment_for_us_exists_and_is_still_valid(&net_addr, payment) + .await + { + if already_exists { + debug!("Payment of the incoming exists transaction {pretty_key:?} having error {err:?}"); + } else { + error!("Payment of the incoming non-exist transaction {pretty_key:?} having error {err:?}"); + return Err(err); + } + } + + let res = self + .validate_merge_and_store_transactions(vec![transaction], &key) + .await; + if res.is_ok() { + let content_hash = XorName::from_content(&record.value); + + // Notify replication_fetcher to mark the attempt as completed. + // Send the notification earlier to avoid it got skipped due to: + // the record becomes stored during the fetch because of other interleaved process. + self.network().notify_fetch_completed( + record.key.clone(), + RecordType::NonChunk(content_hash), + ); + } + res + } RecordKind::Register => { let register = try_deserialize_record::(&record)?; @@ -282,6 +331,7 @@ impl Node { match record_header.kind { // A separate flow handles payment for chunks and registers RecordKind::ChunkWithPayment + | RecordKind::TransactionWithPayment | RecordKind::RegisterWithPayment | RecordKind::ScratchpadWithPayment => { warn!("Prepaid record came with Payment, which should be handled in another flow"); @@ -515,7 +565,7 @@ impl Node { Ok(()) } - /// Validate and store `Vec` to the RecordStore + /// Validate and store `Vec` to the RecordStore /// If we already have a transaction at this address, the Vec is extended and stored. pub(crate) async fn validate_merge_and_store_transactions( &self, diff --git a/ant-protocol/src/storage/header.rs b/ant-protocol/src/storage/header.rs index 6ab7a1148f..7cfd2ffedf 100644 --- a/ant-protocol/src/storage/header.rs +++ b/ant-protocol/src/storage/header.rs @@ -35,6 +35,7 @@ pub enum RecordKind { Chunk, ChunkWithPayment, Transaction, + TransactionWithPayment, Register, RegisterWithPayment, Scratchpad, @@ -54,6 +55,7 @@ impl Serialize for RecordKind { Self::RegisterWithPayment => serializer.serialize_u32(4), Self::Scratchpad => serializer.serialize_u32(5), Self::ScratchpadWithPayment => serializer.serialize_u32(6), + Self::TransactionWithPayment => serializer.serialize_u32(7), } } } @@ -72,6 +74,7 @@ impl<'de> Deserialize<'de> for RecordKind { 4 => Ok(Self::RegisterWithPayment), 5 => Ok(Self::Scratchpad), 6 => Ok(Self::ScratchpadWithPayment), + 7 => Ok(Self::TransactionWithPayment), _ => Err(serde::de::Error::custom( "Unexpected integer for RecordKind variant", )),