Skip to content

Commit

Permalink
PipeWire: Implement node addition/removal callback(s)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidebeatrici committed Oct 3, 2024
1 parent 08431c4 commit 506fa11
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 55 deletions.
47 changes: 28 additions & 19 deletions src/backends/PipeWire/EventManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,53 +20,62 @@ static constexpr pw_registry_events eventsRegistry = { PW_VERSION_REGISTRY_EVENT
const uint32_t /*version*/, const spa_dict * /*props*/) {
if (spa_streq(type, NODE_TYPE_ID)) {
auto &manager = *static_cast< EventManager * >(userData);

new NodeInfoData(manager, id);
manager.addNode(id);
}
},
[](void *userData, const uint32_t id) {
auto &manager = *static_cast< EventManager* >(userData);

manager.feedback().nodeRemoved(id);
auto &manager = *static_cast< EventManager * >(userData);
manager.removeNode(id);
} };

static constexpr pw_node_events eventsNode = { PW_VERSION_NODE_EVENTS,
[](void *userData, const pw_node_info *info) {
auto data = static_cast< NodeInfoData* >(userData);

data->manager().feedback().nodeAdded(info);

delete data;
auto &manager = *static_cast< EventManager * >(userData);
manager.updateNode(info);
},
nullptr };

EventManager::EventManager(pw_core *core, const Feedback &feedback)
: m_feedback(feedback), m_registry(pw_core_get_registry(core, PW_VERSION_REGISTRY, 0)) {
if (m_registry) {
pw_registry_add_listener(m_registry, &m_registryListener, &eventsRegistry, this);
pw_registry_add_listener(m_registry, &m_listener, &eventsRegistry, this);
}
}

EventManager::~EventManager() {
if (m_registry) {
spa_hook_remove(&m_registryListener);
spa_hook_remove(&m_listener);
lib().proxy_destroy(reinterpret_cast< pw_proxy * >(m_registry));
}
}

NodeInfoData::NodeInfoData(EventManager &manager, const uint32_t id)
: m_manager(manager),
m_proxy(static_cast< pw_proxy * >(pw_registry_bind(manager.registry(), id, NODE_TYPE_ID, PW_VERSION_NODE, 0))),
void EventManager::addNode(const uint32_t id) {
if (m_nodes.try_emplace(id, id, *this).second) {
m_feedback.nodeAdded(id);
}
}

void EventManager::removeNode(const uint32_t id) {
if (m_nodes.erase(id)) {
m_feedback.nodeRemoved(id);
}
}

void EventManager::updateNode(const pw_node_info *info) {
m_feedback.nodeUpdated(info);
}

EventManager::Node::Node(const uint32_t id, EventManager &manager)
: m_proxy(static_cast< pw_proxy * >(pw_registry_bind(manager.registry(), id, NODE_TYPE_ID, PW_VERSION_NODE, 0))),
m_listener() {
if (m_proxy) {
lib().proxy_add_object_listener(m_proxy, &m_listener, &eventsNode, this);
lib().proxy_add_object_listener(m_proxy, &m_listener, &eventsNode, &manager);
}
}

NodeInfoData::~NodeInfoData() {
spa_hook_remove(&m_listener);

EventManager::Node::~Node() {
if (m_proxy) {
spa_hook_remove(&m_listener);
lib().proxy_destroy(m_proxy);
}
}
36 changes: 21 additions & 15 deletions src/backends/PipeWire/EventManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <cstdint>
#include <functional>
#include <unordered_map>

#include <spa/utils/hook.h>

Expand All @@ -20,36 +21,41 @@ namespace pipewire {
class EventManager {
public:
struct Feedback {
std::function< void(const pw_node_info *info) > nodeAdded;
std::function< void(uint32_t id) > nodeAdded;
std::function< void(uint32_t id) > nodeRemoved;
std::function< void(const pw_node_info *info) > nodeUpdated;
};

class Node {
public:
Node(uint32_t id, EventManager &manager);
~Node();

private:
Node(const Node &) = delete;
Node &operator=(const Node &) = delete;

pw_proxy *m_proxy;
spa_hook m_listener;
};

EventManager(pw_core *core, const Feedback &feedback);
~EventManager();

constexpr auto &feedback() { return m_feedback; }
constexpr auto registry() { return m_registry; }

void addNode(uint32_t id);
void removeNode(uint32_t id);
void updateNode(const pw_node_info *info);

private:
EventManager(const EventManager &) = delete;
EventManager &operator=(const EventManager &) = delete;

Feedback m_feedback;
pw_registry *m_registry;
spa_hook m_registryListener;
};

class NodeInfoData {
public:
NodeInfoData(EventManager &manager, const uint32_t id);
~NodeInfoData();

constexpr auto &manager() { return m_manager; }

private:
EventManager &m_manager;
pw_proxy *m_proxy;
spa_hook m_listener;
std::unordered_map< uint32_t, Node > m_nodes;
};
} // namespace pipewire

Expand Down
91 changes: 72 additions & 19 deletions src/backends/PipeWire/PipeWire.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ static ErrorCode engineFree(BE_Engine *engine) {
return CROSSAUDIO_EC_OK;
}

static ErrorCode engineStart(BE_Engine *engine, const EngineFeedback *) {
return toImpl(engine)->start();
static ErrorCode engineStart(BE_Engine *engine, const EngineFeedback *feedback) {
return toImpl(engine)->start(feedback ? *feedback : EngineFeedback());
}

static ErrorCode engineStop(BE_Engine *engine) {
Expand Down Expand Up @@ -211,9 +211,11 @@ void Engine::unlock() {
}
}

ErrorCode Engine::start() {
const EventManager::Feedback eventManagerFeedback{ .nodeAdded = [this](const pw_node_info *info) { addNode(info); },
.nodeRemoved = [this](const uint32_t id) { removeNode(id); } };
ErrorCode Engine::start(const EngineFeedback &feedback) {
const EventManager::Feedback eventManagerFeedback{ .nodeAdded = [this](const uint32_t id) { addNode(id); },
.nodeRemoved = [this](const uint32_t id) { removeNode(id); },
.nodeUpdated =
[this](const pw_node_info *info) { updateNode(info); } };

if (m_core) {
return CROSSAUDIO_EC_INIT;
Expand All @@ -223,6 +225,7 @@ ErrorCode Engine::start() {
return CROSSAUDIO_EC_CONNECT;
}

m_feedback = feedback;
m_eventManager = std::make_unique< EventManager >(m_core, eventManagerFeedback);

if (lib().thread_loop_start(m_threadLoop) < 0) {
Expand Down Expand Up @@ -285,7 +288,11 @@ Nodes *Engine::engineNodesGet() {

for (const auto &iter : m_nodes) {
const auto &nodeIn = iter.second;
auto &nodeOut = nodes->items[i++];
if (!nodeIn.advertised) {
continue;
}

auto &nodeOut = nodes->items[i++];

nodeOut.id = strdup(nodeIn.id.data());
nodeOut.name = strdup(nodeIn.name.data());
Expand All @@ -295,17 +302,59 @@ Nodes *Engine::engineNodesGet() {
return nodes;
}

void Engine::addNode(const pw_node_info *info) {
const spa_dict *props = info->props;
void Engine::addNode(const uint32_t id) {
lock();
m_nodes.try_emplace(id, Node());
unlock();
}

void Engine::removeNode(const uint32_t id) {
lock();
const auto iter = m_nodes.extract(id);
unlock();

if (!iter.empty() && m_feedback.nodeRemoved) {
const Node &node = iter.mapped();
::Node *nodeNotif = nodeNew();

nodeNotif->id = strdup(node.id.data());
nodeNotif->name = strdup(node.name.data());
nodeNotif->direction = node.direction;

m_feedback.nodeRemoved(m_feedback.userData, nodeNotif);
}
}

void Engine::updateNode(const pw_node_info *info) {
const auto lock = locker();

const auto iter = m_nodes.find(info->id);
if (iter == m_nodes.cend()) {
return;
}

const char *id = spa_dict_lookup(props, PW_KEY_NODE_NAME);
if (!id) {
auto &node = iter->second;
if (node.advertised) {
return;
}

const char *name = spa_dict_lookup(props, PW_KEY_NODE_DESCRIPTION);
if (!name) {
name = id;
if (node.id.empty()) {
const char *id = spa_dict_lookup(info->props, PW_KEY_NODE_NAME);
if (id) {
node.id = id;
}
}

if (node.name.empty()) {
const char *name = spa_dict_lookup(info->props, PW_KEY_NODE_DESCRIPTION);
if (name) {
node.name = name;
}
}

if (!(info->n_input_ports || info->n_output_ports) || node.id.empty()) {
// Don't advertise the node if it has no ports or ID.
return;
}

uint8_t direction = CROSSAUDIO_DIR_NONE;
Expand All @@ -316,15 +365,19 @@ void Engine::addNode(const pw_node_info *info) {
direction |= CROSSAUDIO_DIR_IN;
}

const auto lock = locker();
node.direction = static_cast< Direction >(direction);

m_nodes.emplace(info->id, Node(id, name, static_cast< Direction >(direction)));
}
if (m_feedback.nodeAdded) {
::Node *nodeNotif = nodeNew();

void Engine::removeNode(const uint32_t id) {
const auto lock = locker();
nodeNotif->id = strdup(node.id.data());
nodeNotif->name = strdup(node.name.data());
nodeNotif->direction = node.direction;

m_nodes.erase(id);
m_feedback.nodeAdded(m_feedback.userData, nodeNotif);

node.advertised = true;
}
}

static constexpr pw_stream_events eventsInput = { PW_VERSION_STREAM_EVENTS,
Expand Down
8 changes: 6 additions & 2 deletions src/backends/PipeWire/PipeWire.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "Backend.h"

#include "crossaudio/Direction.h"
#include "crossaudio/Engine.h"
#include "crossaudio/ErrorCode.h"
#include "crossaudio/Flux.h"
#include "crossaudio/Node.h"
Expand Down Expand Up @@ -50,6 +51,7 @@ class Engine {
std::string id;
std::string name;
Direction direction;
bool advertised;
};

Engine();
Expand All @@ -67,7 +69,7 @@ class Engine {

Nodes *engineNodesGet();

ErrorCode start();
ErrorCode start(const EngineFeedback &feedback);
ErrorCode stop();

pw_thread_loop *m_threadLoop;
Expand All @@ -80,9 +82,11 @@ class Engine {
Engine(const Engine &) = delete;
Engine &operator=(const Engine &) = delete;

void addNode(const pw_node_info *info);
void addNode(uint32_t id);
void removeNode(uint32_t id);
void updateNode(const pw_node_info *info);

EngineFeedback m_feedback;
std::map< uint32_t, Node > m_nodes;
};

Expand Down

0 comments on commit 506fa11

Please sign in to comment.