Skip to content

Commit

Permalink
Improve dynamic tasking support
Browse files Browse the repository at this point in the history
  • Loading branch information
Levi-Armstrong committed Nov 17, 2023
1 parent ea4be31 commit 2f83485
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void UpsampleTrajectoryTask::upsample(CompositeInstruction& composite,
if (start_instruction.isNull())
{
start_instruction = i.as<MoveInstructionPoly>();
composite.push_back(i); // Prevents loss of very first waypoint when upsampling
composite.push_back(i); // Prevents loss of very first waypoint when upsampling
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ class TaskflowTaskComposerExecutor : public TaskComposerExecutor
std::size_t num_threads_;
std::unique_ptr<tf::Executor> executor_;

std::mutex futures_mutex_;
std::map<boost::uuids::uuid, TaskComposerFuture::UPtr> futures_;
void removeFuture(const boost::uuids::uuid& uuid);

TaskComposerFuture::UPtr run(const TaskComposerNode& node, TaskComposerContext::Ptr context) override final;

static tf::Task convertToTaskflow(const TaskComposerGraph& task_graph,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include <tesseract_common/utils.h>
#include <tesseract_common/timer.h>
#include <taskflow/taskflow.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>

namespace tesseract_planning
{
Expand Down Expand Up @@ -71,6 +73,12 @@ TaskflowTaskComposerExecutor::TaskflowTaskComposerExecutor(std::string name, con

TaskflowTaskComposerExecutor::~TaskflowTaskComposerExecutor() = default;

void TaskflowTaskComposerExecutor::removeFuture(const boost::uuids::uuid& uuid)
{
std::unique_lock<std::mutex> lock(futures_mutex_);
futures_.erase(uuid);
}

TaskComposerFuture::UPtr TaskflowTaskComposerExecutor::run(const TaskComposerNode& node,
TaskComposerContext::Ptr context)
{
Expand All @@ -84,9 +92,14 @@ TaskComposerFuture::UPtr TaskflowTaskComposerExecutor::run(const TaskComposerNod
else
throw std::runtime_error("TaskComposerExecutor, unsupported node type!");

std::shared_future<void> f = executor_->run(*taskflow);

return std::make_unique<TaskflowTaskComposerFuture>(f, std::move(taskflow), std::move(context));
// Inorder to better support dynamic tasking within pipelines we store all futures internally
// and cleanup when finished because the data cannot go out of scope.
std::unique_lock<std::mutex> lock(futures_mutex_);
boost::uuids::uuid uuid = boost::uuids::random_generator()();
std::shared_future<void> f = executor_->run(*taskflow, [this, uuid]() { removeFuture(uuid); });
auto future = std::make_unique<TaskflowTaskComposerFuture>(f, std::move(taskflow), std::move(context));
futures_[uuid] = future->copy();
return future;
}

long TaskflowTaskComposerExecutor::getWorkerCount() const { return static_cast<long>(executor_->num_workers()); }
Expand Down

0 comments on commit 2f83485

Please sign in to comment.