From 5d8354a88be2ce2c16add7457c94e29f6e7c3684 Mon Sep 17 00:00:00 2001 From: outfoxxed Date: Thu, 8 Jan 2026 03:58:17 -0800 Subject: [PATCH] services/pipewire: add reconnect support --- changelog/next.md | 1 + src/services/pipewire/connection.cpp | 126 ++++++++++++++++++++++++++- src/services/pipewire/connection.hpp | 21 +++++ src/services/pipewire/core.cpp | 85 +++++++++++++----- src/services/pipewire/core.hpp | 5 ++ src/services/pipewire/defaults.cpp | 16 ++++ src/services/pipewire/defaults.hpp | 1 + src/services/pipewire/qml.cpp | 12 +-- src/services/pipewire/registry.cpp | 40 +++++++++ src/services/pipewire/registry.hpp | 2 + 10 files changed, 277 insertions(+), 32 deletions(-) diff --git a/changelog/next.md b/changelog/next.md index 7857103..f79900f 100644 --- a/changelog/next.md +++ b/changelog/next.md @@ -18,6 +18,7 @@ set shell id. - Added the ability to override Quickshell.cacheDir with a custom path. - Added minimized, maximized, and fullscreen properties to FloatingWindow. - Added the ability to handle move and resize events to FloatingWindow. +- Pipewire service now reconnects if pipewire dies or a protocol error occurs. ## Other Changes diff --git a/src/services/pipewire/connection.cpp b/src/services/pipewire/connection.cpp index ac4c5e6..c2f505f 100644 --- a/src/services/pipewire/connection.cpp +++ b/src/services/pipewire/connection.cpp @@ -1,15 +1,137 @@ #include "connection.hpp" +#include +#include +#include +#include +#include #include +#include +#include + +#include "../../core/logcat.hpp" +#include "core.hpp" namespace qs::service::pipewire { +namespace { +QS_LOGGING_CATEGORY(logConnection, "quickshell.service.pipewire.connection", QtWarningMsg); +} + PwConnection::PwConnection(QObject* parent): QObject(parent) { - if (this->core.isValid()) { - this->registry.init(this->core); + this->runtimeDir = PwConnection::resolveRuntimeDir(); + + QObject::connect(&this->core, &PwCore::fatalError, this, &PwConnection::queueFatalError); + + if (!this->tryConnect(false) + && qEnvironmentVariableIntValue("QS_PIPEWIRE_IMMEDIATE_RECONNECT") == 1) + { + this->beginReconnect(); } } +QString PwConnection::resolveRuntimeDir() { + auto runtimeDir = qEnvironmentVariable("PIPEWIRE_RUNTIME_DIR"); + if (runtimeDir.isEmpty()) { + runtimeDir = qEnvironmentVariable("XDG_RUNTIME_DIR"); + } + + if (runtimeDir.isEmpty()) { + runtimeDir = QString("/run/user/%1").arg(getuid()); + } + + return runtimeDir; +} + +void PwConnection::beginReconnect() { + if (this->core.isValid()) { + this->stopSocketWatcher(); + return; + } + + if (!qEnvironmentVariableIsEmpty("PIPEWIRE_REMOTE")) return; + + if (this->runtimeDir.isEmpty()) { + qCWarning( + logConnection + ) << "Cannot watch runtime dir for pipewire reconnects: runtime dir is empty."; + return; + } + + this->startSocketWatcher(); + this->tryConnect(true); +} + +bool PwConnection::tryConnect(bool retry) { + if (this->core.isValid()) return true; + + qCDebug(logConnection) << "Attempting reconnect..."; + if (!this->core.start(retry)) { + return false; + } + + qCInfo(logConnection) << "Connection established"; + this->stopSocketWatcher(); + + this->registry.init(this->core); + return true; +} + +void PwConnection::startSocketWatcher() { + if (this->socketWatcher != nullptr) return; + if (!qEnvironmentVariableIsEmpty("PIPEWIRE_REMOTE")) return; + + auto dir = QDir(this->runtimeDir); + if (!dir.exists()) { + qCWarning(logConnection) << "Cannot wait for a new pipewire socket, runtime dir does not exist:" + << this->runtimeDir; + return; + } + + this->socketWatcher = new QFileSystemWatcher(this); + this->socketWatcher->addPath(this->runtimeDir); + + QObject::connect( + this->socketWatcher, + &QFileSystemWatcher::directoryChanged, + this, + &PwConnection::onRuntimeDirChanged + ); +} + +void PwConnection::stopSocketWatcher() { + if (this->socketWatcher == nullptr) return; + + this->socketWatcher->deleteLater(); + this->socketWatcher = nullptr; +} + +void PwConnection::queueFatalError() { + if (this->fatalErrorQueued) return; + + this->fatalErrorQueued = true; + QMetaObject::invokeMethod(this, &PwConnection::onFatalError, Qt::QueuedConnection); +} + +void PwConnection::onFatalError() { + this->fatalErrorQueued = false; + + this->defaults.reset(); + this->registry.reset(); + this->core.shutdown(); + + this->beginReconnect(); +} + +void PwConnection::onRuntimeDirChanged(const QString& /*path*/) { + if (this->core.isValid()) { + this->stopSocketWatcher(); + return; + } + + this->tryConnect(true); +} + PwConnection* PwConnection::instance() { static PwConnection* instance = nullptr; // NOLINT diff --git a/src/services/pipewire/connection.hpp b/src/services/pipewire/connection.hpp index 2b3e860..d0374f8 100644 --- a/src/services/pipewire/connection.hpp +++ b/src/services/pipewire/connection.hpp @@ -1,9 +1,13 @@ #pragma once +#include + #include "core.hpp" #include "defaults.hpp" #include "registry.hpp" +class QFileSystemWatcher; + namespace qs::service::pipewire { class PwConnection: public QObject { @@ -18,6 +22,23 @@ public: static PwConnection* instance(); private: + static QString resolveRuntimeDir(); + + void beginReconnect(); + bool tryConnect(bool retry); + void startSocketWatcher(); + void stopSocketWatcher(); + +private slots: + void queueFatalError(); + void onFatalError(); + void onRuntimeDirChanged(const QString& path); + +private: + QString runtimeDir; + QFileSystemWatcher* socketWatcher = nullptr; + bool fatalErrorQueued = false; + // init/destroy order is important. do not rearrange. PwCore core; }; diff --git a/src/services/pipewire/core.cpp b/src/services/pipewire/core.cpp index 22445aa..e40bc54 100644 --- a/src/services/pipewire/core.cpp +++ b/src/services/pipewire/core.cpp @@ -27,7 +27,7 @@ const pw_core_events PwCore::EVENTS = { .info = nullptr, .done = &PwCore::onSync, .ping = nullptr, - .error = nullptr, + .error = &PwCore::onError, .remove_id = nullptr, .bound_id = nullptr, .add_mem = nullptr, @@ -36,26 +36,46 @@ const pw_core_events PwCore::EVENTS = { }; PwCore::PwCore(QObject* parent): QObject(parent), notifier(QSocketNotifier::Read) { - qCInfo(logLoop) << "Creating pipewire event loop."; pw_init(nullptr, nullptr); +} + +bool PwCore::start(bool retry) { + if (this->core != nullptr) return true; + + qCInfo(logLoop) << "Creating pipewire event loop."; this->loop = pw_loop_new(nullptr); if (this->loop == nullptr) { - qCCritical(logLoop) << "Failed to create pipewire event loop."; - return; + if (retry) { + qCInfo(logLoop) << "Failed to create pipewire event loop."; + } else { + qCCritical(logLoop) << "Failed to create pipewire event loop."; + } + this->shutdown(); + return false; } this->context = pw_context_new(this->loop, nullptr, 0); if (this->context == nullptr) { - qCCritical(logLoop) << "Failed to create pipewire context."; - return; + if (retry) { + qCInfo(logLoop) << "Failed to create pipewire context."; + } else { + qCCritical(logLoop) << "Failed to create pipewire context."; + } + this->shutdown(); + return false; } qCInfo(logLoop) << "Connecting to pipewire server."; this->core = pw_context_connect(this->context, nullptr, 0); if (this->core == nullptr) { - qCCritical(logLoop) << "Failed to connect pipewire context. Errno:" << errno; - return; + if (retry) { + qCInfo(logLoop) << "Failed to connect pipewire context. Errno:" << errno; + } else { + qCCritical(logLoop) << "Failed to connect pipewire context. Errno:" << errno; + } + this->shutdown(); + return false; } pw_core_add_listener(this->core, &this->listener.hook, &PwCore::EVENTS, this); @@ -66,22 +86,34 @@ PwCore::PwCore(QObject* parent): QObject(parent), notifier(QSocketNotifier::Read this->notifier.setSocket(fd); QObject::connect(&this->notifier, &QSocketNotifier::activated, this, &PwCore::poll); this->notifier.setEnabled(true); + + return true; +} + +void PwCore::shutdown() { + if (this->core != nullptr) { + this->listener.remove(); + pw_core_disconnect(this->core); + this->core = nullptr; + } + + if (this->context != nullptr) { + pw_context_destroy(this->context); + this->context = nullptr; + } + + if (this->loop != nullptr) { + pw_loop_destroy(this->loop); + this->loop = nullptr; + } + + this->notifier.setEnabled(false); + QObject::disconnect(&this->notifier, nullptr, this, nullptr); } PwCore::~PwCore() { qCInfo(logLoop) << "Destroying PwCore."; - - if (this->loop != nullptr) { - if (this->context != nullptr) { - if (this->core != nullptr) { - pw_core_disconnect(this->core); - } - - pw_context_destroy(this->context); - } - - pw_loop_destroy(this->loop); - } + this->shutdown(); } bool PwCore::isValid() const { @@ -90,6 +122,7 @@ bool PwCore::isValid() const { } void PwCore::poll() { + if (this->loop == nullptr) return; qCDebug(logLoop) << "Pipewire event loop received new events, iterating."; // Spin pw event loop. pw_loop_iterate(this->loop, 0); @@ -107,6 +140,18 @@ void PwCore::onSync(void* data, quint32 id, qint32 seq) { emit self->synced(id, seq); } +void PwCore::onError(void* data, quint32 id, qint32 /*seq*/, qint32 res, const char* message) { + auto* self = static_cast(data); + + if (message != nullptr) { + qCWarning(logLoop) << "Fatal pipewire error on object" << id << "with code" << res << message; + } else { + qCWarning(logLoop) << "Fatal pipewire error on object" << id << "with code" << res; + } + + emit self->fatalError(); +} + SpaHook::SpaHook() { // NOLINT spa_zero(this->hook); } diff --git a/src/services/pipewire/core.hpp b/src/services/pipewire/core.hpp index 262e2d3..967efaf 100644 --- a/src/services/pipewire/core.hpp +++ b/src/services/pipewire/core.hpp @@ -30,6 +30,9 @@ public: ~PwCore() override; Q_DISABLE_COPY_MOVE(PwCore); + bool start(bool retry); + void shutdown(); + [[nodiscard]] bool isValid() const; [[nodiscard]] qint32 sync(quint32 id) const; @@ -40,6 +43,7 @@ public: signals: void polled(); void synced(quint32 id, qint32 seq); + void fatalError(); private slots: void poll(); @@ -48,6 +52,7 @@ private: static const pw_core_events EVENTS; static void onSync(void* data, quint32 id, qint32 seq); + static void onError(void* data, quint32 id, qint32 seq, qint32 res, const char* message); QSocketNotifier notifier; SpaHook listener; diff --git a/src/services/pipewire/defaults.cpp b/src/services/pipewire/defaults.cpp index 88a1dc1..02463f4 100644 --- a/src/services/pipewire/defaults.cpp +++ b/src/services/pipewire/defaults.cpp @@ -31,6 +31,22 @@ PwDefaultTracker::PwDefaultTracker(PwRegistry* registry): registry(registry) { QObject::connect(registry, &PwRegistry::nodeAdded, this, &PwDefaultTracker::onNodeAdded); } +void PwDefaultTracker::reset() { + if (auto* meta = this->defaultsMetadata.object()) { + QObject::disconnect(meta, nullptr, this, nullptr); + } + + this->defaultsMetadata.setObject(nullptr); + this->setDefaultSink(nullptr); + this->setDefaultSinkName(QString()); + this->setDefaultSource(nullptr); + this->setDefaultSourceName(QString()); + this->setDefaultConfiguredSink(nullptr); + this->setDefaultConfiguredSinkName(QString()); + this->setDefaultConfiguredSource(nullptr); + this->setDefaultConfiguredSourceName(QString()); +} + void PwDefaultTracker::onMetadataAdded(PwMetadata* metadata) { if (metadata->name() == "default") { qCDebug(logDefaults) << "Got new defaults metadata object" << metadata; diff --git a/src/services/pipewire/defaults.hpp b/src/services/pipewire/defaults.hpp index f3a8e3f..591c4fd 100644 --- a/src/services/pipewire/defaults.hpp +++ b/src/services/pipewire/defaults.hpp @@ -12,6 +12,7 @@ class PwDefaultTracker: public QObject { public: explicit PwDefaultTracker(PwRegistry* registry); + void reset(); [[nodiscard]] PwNode* defaultSink() const; [[nodiscard]] PwNode* defaultSource() const; diff --git a/src/services/pipewire/qml.cpp b/src/services/pipewire/qml.cpp index 9efb17e..7a0d952 100644 --- a/src/services/pipewire/qml.cpp +++ b/src/services/pipewire/qml.cpp @@ -2,7 +2,6 @@ #include #include -#include #include #include #include @@ -99,15 +98,8 @@ Pipewire::Pipewire(QObject* parent): QObject(parent) { &Pipewire::defaultConfiguredAudioSourceChanged ); - if (!connection->registry.isInitialized()) { - QObject::connect( - &connection->registry, - &PwRegistry::initialized, - this, - &Pipewire::readyChanged, - Qt::SingleShotConnection - ); - } + QObject::connect(&connection->registry, &PwRegistry::initialized, this, &Pipewire::readyChanged); + QObject::connect(&connection->registry, &PwRegistry::cleared, this, &Pipewire::readyChanged); } ObjectModel* Pipewire::nodes() { return &this->mNodes; } diff --git a/src/services/pipewire/registry.cpp b/src/services/pipewire/registry.cpp index c08fc1d..4b670b1 100644 --- a/src/services/pipewire/registry.cpp +++ b/src/services/pipewire/registry.cpp @@ -134,6 +134,46 @@ void PwRegistry::init(PwCore& core) { this->coreSyncSeq = this->core->sync(PW_ID_CORE); } +void PwRegistry::reset() { + if (this->core != nullptr) { + QObject::disconnect(this->core, nullptr, this, nullptr); + } + + this->listener.remove(); + + if (this->object != nullptr) { + pw_proxy_destroy(reinterpret_cast(this->object)); + this->object = nullptr; + } + + for (auto* meta: this->metadata.values()) { + meta->safeDestroy(); + } + this->metadata.clear(); + + for (auto* link: this->links.values()) { + link->safeDestroy(); + } + this->links.clear(); + + for (auto* node: this->nodes.values()) { + node->safeDestroy(); + } + this->nodes.clear(); + + for (auto* device: this->devices.values()) { + device->safeDestroy(); + } + this->devices.clear(); + + this->linkGroups.clear(); + this->initState = InitState::SendingObjects; + this->coreSyncSeq = 0; + this->core = nullptr; + + emit this->cleared(); +} + void PwRegistry::onCoreSync(quint32 id, qint32 seq) { if (id != PW_ID_CORE || seq != this->coreSyncSeq) return; diff --git a/src/services/pipewire/registry.hpp b/src/services/pipewire/registry.hpp index 8473f04..bb2db8c 100644 --- a/src/services/pipewire/registry.hpp +++ b/src/services/pipewire/registry.hpp @@ -116,6 +116,7 @@ class PwRegistry public: void init(PwCore& core); + void reset(); [[nodiscard]] bool isInitialized() const { return this->initState == InitState::Done; } @@ -136,6 +137,7 @@ signals: void linkGroupAdded(PwLinkGroup* group); void metadataAdded(PwMetadata* metadata); void initialized(); + void cleared(); private slots: void onLinkGroupDestroyed(QObject* object);