Skip to content

Commit

Permalink
renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
stneng committed Jan 3, 2023
1 parent cad6fad commit bf6a7a7
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 27 deletions.
10 changes: 6 additions & 4 deletions src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct CoLink {
pub(crate) ca_certificate: Option<Certificate>,
pub(crate) identity: Option<Identity>,
#[cfg(feature = "variable_transfer")]
pub(crate) vt_p2p: Arc<crate::extensions::variable_transfer::p2p_inbox::VTP2PCTX>,
pub(crate) vt_p2p_ctx: Arc<crate::extensions::variable_transfer::p2p_inbox::VtP2pCtx>,
}

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand All @@ -46,7 +46,9 @@ impl CoLink {
ca_certificate: None,
identity: None,
#[cfg(feature = "variable_transfer")]
vt_p2p: Arc::new(crate::extensions::variable_transfer::p2p_inbox::VTP2PCTX::default()),
vt_p2p_ctx: Arc::new(
crate::extensions::variable_transfer::p2p_inbox::VtP2pCtx::default(),
),
}
}

Expand Down Expand Up @@ -89,8 +91,8 @@ impl CoLink {
self.task_id = task_id.to_string();
#[cfg(feature = "variable_transfer")]
{
self.vt_p2p =
Arc::new(crate::extensions::variable_transfer::p2p_inbox::VTP2PCTX::default());
self.vt_p2p_ctx =
Arc::new(crate::extensions::variable_transfer::p2p_inbox::VtP2pCtx::default());
}
}

Expand Down
42 changes: 25 additions & 17 deletions src/extensions/variable_transfer/p2p_inbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub(crate) struct VTInbox {
}

#[derive(Default)]
pub(crate) struct VTP2PCTX {
pub(crate) struct VtP2pCtx {
pub(crate) public_addr: Option<String>,
pub(crate) has_created_inbox: Mutex<bool>,
pub(crate) inbox_server: RwLock<Option<VTInboxServer>>,
Expand All @@ -158,7 +158,7 @@ impl crate::application::CoLink {
receiver: &Participant,
) -> Result<(), Error> {
if !self
.vt_p2p
.vt_p2p_ctx
.remote_inboxes
.read()
.await
Expand All @@ -173,14 +173,14 @@ impl crate::application::CoLink {
} else {
Some(inbox)
};
self.vt_p2p
self.vt_p2p_ctx
.remote_inboxes
.write()
.await
.insert(receiver.user_id.clone(), inbox);
}
match self
.vt_p2p
.vt_p2p_ctx
.remote_inboxes
.read()
.await
Expand Down Expand Up @@ -225,29 +225,31 @@ impl crate::application::CoLink {
) -> Result<Vec<u8>, Error> {
// send inbox information to the sender by remote_storage
if !self
.vt_p2p
.vt_p2p_ctx
.has_configured_inbox
.read()
.await
.contains(&sender.user_id)
{
// create inbox if it does not exist
if self.vt_p2p.public_addr.is_some() && !(*self.vt_p2p.has_created_inbox.lock().await) {
let mut has_created_inbox = self.vt_p2p.has_created_inbox.lock().await;
if self.vt_p2p_ctx.public_addr.is_some()
&& !(*self.vt_p2p_ctx.has_created_inbox.lock().await)
{
let mut has_created_inbox = self.vt_p2p_ctx.has_created_inbox.lock().await;
let inbox_server = VTInboxServer::new();
*self.vt_p2p.inbox_server.write().await = Some(inbox_server);
*self.vt_p2p_ctx.inbox_server.write().await = Some(inbox_server);
*has_created_inbox = true;
}
// generate vt_inbox information for the sender
let vt_inbox = if self.vt_p2p.public_addr.is_none() {
let vt_inbox = if self.vt_p2p_ctx.public_addr.is_none() {
VTInbox {
addr: "".to_string(),
vt_jwt: "".to_string(),
tls_cert: Vec::new(),
}
} else {
let jwt_secret = self
.vt_p2p
.vt_p2p_ctx
.inbox_server
.read()
.await
Expand All @@ -264,12 +266,18 @@ impl crate::application::CoLink {
VTInbox {
addr: format!(
"https://{}:{}",
self.vt_p2p.public_addr.as_ref().unwrap(),
self.vt_p2p.inbox_server.read().await.as_ref().unwrap().port
self.vt_p2p_ctx.public_addr.as_ref().unwrap(),
self.vt_p2p_ctx
.inbox_server
.read()
.await
.as_ref()
.unwrap()
.port
),
vt_jwt,
tls_cert: self
.vt_p2p
.vt_p2p_ctx
.inbox_server
.read()
.await
Expand All @@ -285,18 +293,18 @@ impl crate::application::CoLink {
&[sender.clone()],
)
.await?;
self.vt_p2p
self.vt_p2p_ctx
.has_configured_inbox
.write()
.await
.insert(sender.user_id.clone());
}

if self.vt_p2p.public_addr.is_none() {
if self.vt_p2p_ctx.public_addr.is_none() {
Err("Remote inbox: not available")?;
}
loop {
let inbox_server = self.vt_p2p.inbox_server.read().await;
let inbox_server = self.vt_p2p_ctx.inbox_server.read().await;
let data_map = inbox_server.as_ref().unwrap().data_map.read().await;
let data = data_map.get(&(sender.user_id.clone(), key.to_string()));
if data.is_some() {
Expand All @@ -320,7 +328,7 @@ impl crate::application::CoLink {
drop(data_map);
drop(inbox_server);
rx.recv().await;
let inbox_server = self.vt_p2p.inbox_server.read().await;
let inbox_server = self.vt_p2p_ctx.inbox_server.read().await;
let data_map = inbox_server.as_ref().unwrap().data_map.read().await;
let data = data_map.get(&(sender.user_id.clone(), key.to_string()));
if data.is_some() {
Expand Down
13 changes: 7 additions & 6 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ impl CoLinkProtocol {
cl.set_task_id(&task.task_id);
#[cfg(feature = "variable_transfer")]
{
cl.vt_p2p = Arc::new(crate::extensions::variable_transfer::p2p_inbox::VTP2PCTX {
public_addr: self.vt_public_addr.clone(),
..Default::default()
});
cl.vt_p2p_ctx =
Arc::new(crate::extensions::variable_transfer::p2p_inbox::VtP2pCtx {
public_addr: self.vt_public_addr.clone(),
..Default::default()
});
}
let cl_clone = cl.clone();
match self
Expand All @@ -100,8 +101,8 @@ impl CoLinkProtocol {
Ok(_) => {}
Err(e) => error!("Task {}: {}.", task.task_id, e),
}
if cl_clone.vt_p2p.inbox_server.write().await.is_some() {
let inbox_server = cl_clone.vt_p2p.inbox_server.write().await;
if cl_clone.vt_p2p_ctx.inbox_server.write().await.is_some() {
let inbox_server = cl_clone.vt_p2p_ctx.inbox_server.write().await;
inbox_server
.as_ref()
.unwrap()
Expand Down

0 comments on commit bf6a7a7

Please sign in to comment.