-
I'm working on a tool that needs to move a LOT of data (GB/s) between processes. Since I'm trying to operate as fast as possible, I don't want to reallocate memory for anything, I'd like to do all the processing in-place, but I'm having trouble figuring out how to correctly recycle a memory address. I can't share my actual code, but I took the complex data example and mutated it into a pseudo recreation of it. In my actual implementation, these sub/pub instances are in different processes, but this example is enough to trigger the error. use iceoryx2::prelude::*;
use iceoryx2_bb_container::vec::FixedSizeVec;
#[derive(Debug, Default, PlacementDefault)]
#[repr(C)]
pub struct Data {
buffer: FixedSizeVec<u8, { 1024 * 1024 * 8 }>,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let node = NodeBuilder::new().create::<ipc::Service>()?;
let service_out = node
.service_builder(&"DataOut".try_into()?)
.publish_subscribe::<Data>()
.max_publishers(16)
.max_subscribers(16)
.open_or_create()?;
let service_in = node
.service_builder(&"DataReturn".try_into()?)
.publish_subscribe::<Data>()
.max_publishers(16)
.max_subscribers(16)
.open_or_create()?;
let publisher_out = service_out.publisher_builder().create()?;
let publisher_return = service_in.publisher_builder().create()?;
let subscriber_out = service_out.subscriber_builder().create()?;
let subscriber_return = service_in.subscriber_builder().create()?;
let mut alldata = Vec::with_capacity(5);
for _ in 0..5 {
alldata.push(Data::default());
}
loop {
let mut sample = publisher_out.loan_uninit()?;
unsafe { Data::placement_default(sample.payload_mut().as_mut_ptr()) };
let mut sample = unsafe { sample.assume_init() };
let payload = sample.payload_mut();
payload.buffer = alldata.pop().unwrap().buffer;
sample.send()?;
while let Some(sample) = subscriber_out.receive()? {
println!("received data. {}", sample.buffer.len());
}
let mut ret_sample = publisher_return.loan_uninit()?;
unsafe { Data::placement_default(ret_sample.payload_mut().as_mut_ptr()) };
let mut ret_sample = unsafe { ret_sample.assume_init() };
ret_sample.send()?;
if let Some(returned_buf) = subscriber_return.receive()? {
alldata.push(returned_buf.payload());
}
}
} The crux of my issue comes in when I try to send the buffer back to the originator. I was storing the available buffers in a Vec, but the returned type in the message is Did I miss something really obvious? Is there a better/correct way to do this? My main goal is to avoid reallocations, I'm already pushing this system to the edge. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
What kind of reallocations? iceoryx2 allocates all the memory it requires at once when you create the publisher - it is the data segment of the publisher and is managed by a pool allocator. This pool allocator is incredibly fast, so you do not have to worry about overhead here.
I am a bit confused here. Do you mean that Just let me rephrase your use case so that I am sure that I understood it.
|
Beta Was this translation helpful? Give feedback.
@nosjojo This will not yet work with iceoryx2.
What you are describing is something we describe as a pipeline. Process A produces data and sends it to process B, it consumes and modifies the data a bit and sends it to process C which again modifies the data. In your case process A and C would be the same and it would be a circular pipeline - which is not a problem.
But a pipeline comes with a restriction - only one subscriber is allowed. Otherwise one subscriber is reading a sample while another one is modifying it and causing data races.
Another challenge you would have here is what happens when, in this pipeline, one process crashes? We must be able to at least reclaim the lost sample s…