services/pipewire: add reconnect support

This commit is contained in:
outfoxxed 2026-01-08 03:58:17 -08:00
parent 8d19beb69e
commit 5d8354a88b
No known key found for this signature in database
GPG key ID: 4C88A185FB89301E
10 changed files with 277 additions and 32 deletions

View file

@ -18,6 +18,7 @@ set shell id.
- Added the ability to override Quickshell.cacheDir with a custom path. - Added the ability to override Quickshell.cacheDir with a custom path.
- Added minimized, maximized, and fullscreen properties to FloatingWindow. - Added minimized, maximized, and fullscreen properties to FloatingWindow.
- Added the ability to handle move and resize events to FloatingWindow. - Added the ability to handle move and resize events to FloatingWindow.
- Pipewire service now reconnects if pipewire dies or a protocol error occurs.
## Other Changes ## Other Changes

View file

@ -1,15 +1,137 @@
#include "connection.hpp" #include "connection.hpp"
#include <qdir.h>
#include <qfilesystemwatcher.h>
#include <qlogging.h>
#include <qloggingcategory.h>
#include <qnamespace.h>
#include <qobject.h> #include <qobject.h>
#include <qtenvironmentvariables.h>
#include <unistd.h>
#include "../../core/logcat.hpp"
#include "core.hpp"
namespace qs::service::pipewire { namespace qs::service::pipewire {
namespace {
QS_LOGGING_CATEGORY(logConnection, "quickshell.service.pipewire.connection", QtWarningMsg);
}
PwConnection::PwConnection(QObject* parent): QObject(parent) { PwConnection::PwConnection(QObject* parent): QObject(parent) {
if (this->core.isValid()) { this->runtimeDir = PwConnection::resolveRuntimeDir();
this->registry.init(this->core);
QObject::connect(&this->core, &PwCore::fatalError, this, &PwConnection::queueFatalError);
if (!this->tryConnect(false)
&& qEnvironmentVariableIntValue("QS_PIPEWIRE_IMMEDIATE_RECONNECT") == 1)
{
this->beginReconnect();
} }
} }
QString PwConnection::resolveRuntimeDir() {
auto runtimeDir = qEnvironmentVariable("PIPEWIRE_RUNTIME_DIR");
if (runtimeDir.isEmpty()) {
runtimeDir = qEnvironmentVariable("XDG_RUNTIME_DIR");
}
if (runtimeDir.isEmpty()) {
runtimeDir = QString("/run/user/%1").arg(getuid());
}
return runtimeDir;
}
void PwConnection::beginReconnect() {
if (this->core.isValid()) {
this->stopSocketWatcher();
return;
}
if (!qEnvironmentVariableIsEmpty("PIPEWIRE_REMOTE")) return;
if (this->runtimeDir.isEmpty()) {
qCWarning(
logConnection
) << "Cannot watch runtime dir for pipewire reconnects: runtime dir is empty.";
return;
}
this->startSocketWatcher();
this->tryConnect(true);
}
bool PwConnection::tryConnect(bool retry) {
if (this->core.isValid()) return true;
qCDebug(logConnection) << "Attempting reconnect...";
if (!this->core.start(retry)) {
return false;
}
qCInfo(logConnection) << "Connection established";
this->stopSocketWatcher();
this->registry.init(this->core);
return true;
}
void PwConnection::startSocketWatcher() {
if (this->socketWatcher != nullptr) return;
if (!qEnvironmentVariableIsEmpty("PIPEWIRE_REMOTE")) return;
auto dir = QDir(this->runtimeDir);
if (!dir.exists()) {
qCWarning(logConnection) << "Cannot wait for a new pipewire socket, runtime dir does not exist:"
<< this->runtimeDir;
return;
}
this->socketWatcher = new QFileSystemWatcher(this);
this->socketWatcher->addPath(this->runtimeDir);
QObject::connect(
this->socketWatcher,
&QFileSystemWatcher::directoryChanged,
this,
&PwConnection::onRuntimeDirChanged
);
}
void PwConnection::stopSocketWatcher() {
if (this->socketWatcher == nullptr) return;
this->socketWatcher->deleteLater();
this->socketWatcher = nullptr;
}
void PwConnection::queueFatalError() {
if (this->fatalErrorQueued) return;
this->fatalErrorQueued = true;
QMetaObject::invokeMethod(this, &PwConnection::onFatalError, Qt::QueuedConnection);
}
void PwConnection::onFatalError() {
this->fatalErrorQueued = false;
this->defaults.reset();
this->registry.reset();
this->core.shutdown();
this->beginReconnect();
}
void PwConnection::onRuntimeDirChanged(const QString& /*path*/) {
if (this->core.isValid()) {
this->stopSocketWatcher();
return;
}
this->tryConnect(true);
}
PwConnection* PwConnection::instance() { PwConnection* PwConnection::instance() {
static PwConnection* instance = nullptr; // NOLINT static PwConnection* instance = nullptr; // NOLINT

View file

@ -1,9 +1,13 @@
#pragma once #pragma once
#include <qstring.h>
#include "core.hpp" #include "core.hpp"
#include "defaults.hpp" #include "defaults.hpp"
#include "registry.hpp" #include "registry.hpp"
class QFileSystemWatcher;
namespace qs::service::pipewire { namespace qs::service::pipewire {
class PwConnection: public QObject { class PwConnection: public QObject {
@ -18,6 +22,23 @@ public:
static PwConnection* instance(); static PwConnection* instance();
private: private:
static QString resolveRuntimeDir();
void beginReconnect();
bool tryConnect(bool retry);
void startSocketWatcher();
void stopSocketWatcher();
private slots:
void queueFatalError();
void onFatalError();
void onRuntimeDirChanged(const QString& path);
private:
QString runtimeDir;
QFileSystemWatcher* socketWatcher = nullptr;
bool fatalErrorQueued = false;
// init/destroy order is important. do not rearrange. // init/destroy order is important. do not rearrange.
PwCore core; PwCore core;
}; };

View file

@ -27,7 +27,7 @@ const pw_core_events PwCore::EVENTS = {
.info = nullptr, .info = nullptr,
.done = &PwCore::onSync, .done = &PwCore::onSync,
.ping = nullptr, .ping = nullptr,
.error = nullptr, .error = &PwCore::onError,
.remove_id = nullptr, .remove_id = nullptr,
.bound_id = nullptr, .bound_id = nullptr,
.add_mem = nullptr, .add_mem = nullptr,
@ -36,26 +36,46 @@ const pw_core_events PwCore::EVENTS = {
}; };
PwCore::PwCore(QObject* parent): QObject(parent), notifier(QSocketNotifier::Read) { PwCore::PwCore(QObject* parent): QObject(parent), notifier(QSocketNotifier::Read) {
qCInfo(logLoop) << "Creating pipewire event loop.";
pw_init(nullptr, nullptr); pw_init(nullptr, nullptr);
}
bool PwCore::start(bool retry) {
if (this->core != nullptr) return true;
qCInfo(logLoop) << "Creating pipewire event loop.";
this->loop = pw_loop_new(nullptr); this->loop = pw_loop_new(nullptr);
if (this->loop == nullptr) { if (this->loop == nullptr) {
qCCritical(logLoop) << "Failed to create pipewire event loop."; if (retry) {
return; qCInfo(logLoop) << "Failed to create pipewire event loop.";
} else {
qCCritical(logLoop) << "Failed to create pipewire event loop.";
}
this->shutdown();
return false;
} }
this->context = pw_context_new(this->loop, nullptr, 0); this->context = pw_context_new(this->loop, nullptr, 0);
if (this->context == nullptr) { if (this->context == nullptr) {
qCCritical(logLoop) << "Failed to create pipewire context."; if (retry) {
return; qCInfo(logLoop) << "Failed to create pipewire context.";
} else {
qCCritical(logLoop) << "Failed to create pipewire context.";
}
this->shutdown();
return false;
} }
qCInfo(logLoop) << "Connecting to pipewire server."; qCInfo(logLoop) << "Connecting to pipewire server.";
this->core = pw_context_connect(this->context, nullptr, 0); this->core = pw_context_connect(this->context, nullptr, 0);
if (this->core == nullptr) { if (this->core == nullptr) {
qCCritical(logLoop) << "Failed to connect pipewire context. Errno:" << errno; if (retry) {
return; qCInfo(logLoop) << "Failed to connect pipewire context. Errno:" << errno;
} else {
qCCritical(logLoop) << "Failed to connect pipewire context. Errno:" << errno;
}
this->shutdown();
return false;
} }
pw_core_add_listener(this->core, &this->listener.hook, &PwCore::EVENTS, this); pw_core_add_listener(this->core, &this->listener.hook, &PwCore::EVENTS, this);
@ -66,22 +86,34 @@ PwCore::PwCore(QObject* parent): QObject(parent), notifier(QSocketNotifier::Read
this->notifier.setSocket(fd); this->notifier.setSocket(fd);
QObject::connect(&this->notifier, &QSocketNotifier::activated, this, &PwCore::poll); QObject::connect(&this->notifier, &QSocketNotifier::activated, this, &PwCore::poll);
this->notifier.setEnabled(true); this->notifier.setEnabled(true);
return true;
}
void PwCore::shutdown() {
if (this->core != nullptr) {
this->listener.remove();
pw_core_disconnect(this->core);
this->core = nullptr;
}
if (this->context != nullptr) {
pw_context_destroy(this->context);
this->context = nullptr;
}
if (this->loop != nullptr) {
pw_loop_destroy(this->loop);
this->loop = nullptr;
}
this->notifier.setEnabled(false);
QObject::disconnect(&this->notifier, nullptr, this, nullptr);
} }
PwCore::~PwCore() { PwCore::~PwCore() {
qCInfo(logLoop) << "Destroying PwCore."; qCInfo(logLoop) << "Destroying PwCore.";
this->shutdown();
if (this->loop != nullptr) {
if (this->context != nullptr) {
if (this->core != nullptr) {
pw_core_disconnect(this->core);
}
pw_context_destroy(this->context);
}
pw_loop_destroy(this->loop);
}
} }
bool PwCore::isValid() const { bool PwCore::isValid() const {
@ -90,6 +122,7 @@ bool PwCore::isValid() const {
} }
void PwCore::poll() { void PwCore::poll() {
if (this->loop == nullptr) return;
qCDebug(logLoop) << "Pipewire event loop received new events, iterating."; qCDebug(logLoop) << "Pipewire event loop received new events, iterating.";
// Spin pw event loop. // Spin pw event loop.
pw_loop_iterate(this->loop, 0); pw_loop_iterate(this->loop, 0);
@ -107,6 +140,18 @@ void PwCore::onSync(void* data, quint32 id, qint32 seq) {
emit self->synced(id, seq); emit self->synced(id, seq);
} }
void PwCore::onError(void* data, quint32 id, qint32 /*seq*/, qint32 res, const char* message) {
auto* self = static_cast<PwCore*>(data);
if (message != nullptr) {
qCWarning(logLoop) << "Fatal pipewire error on object" << id << "with code" << res << message;
} else {
qCWarning(logLoop) << "Fatal pipewire error on object" << id << "with code" << res;
}
emit self->fatalError();
}
SpaHook::SpaHook() { // NOLINT SpaHook::SpaHook() { // NOLINT
spa_zero(this->hook); spa_zero(this->hook);
} }

View file

@ -30,6 +30,9 @@ public:
~PwCore() override; ~PwCore() override;
Q_DISABLE_COPY_MOVE(PwCore); Q_DISABLE_COPY_MOVE(PwCore);
bool start(bool retry);
void shutdown();
[[nodiscard]] bool isValid() const; [[nodiscard]] bool isValid() const;
[[nodiscard]] qint32 sync(quint32 id) const; [[nodiscard]] qint32 sync(quint32 id) const;
@ -40,6 +43,7 @@ public:
signals: signals:
void polled(); void polled();
void synced(quint32 id, qint32 seq); void synced(quint32 id, qint32 seq);
void fatalError();
private slots: private slots:
void poll(); void poll();
@ -48,6 +52,7 @@ private:
static const pw_core_events EVENTS; static const pw_core_events EVENTS;
static void onSync(void* data, quint32 id, qint32 seq); static void onSync(void* data, quint32 id, qint32 seq);
static void onError(void* data, quint32 id, qint32 seq, qint32 res, const char* message);
QSocketNotifier notifier; QSocketNotifier notifier;
SpaHook listener; SpaHook listener;

View file

@ -31,6 +31,22 @@ PwDefaultTracker::PwDefaultTracker(PwRegistry* registry): registry(registry) {
QObject::connect(registry, &PwRegistry::nodeAdded, this, &PwDefaultTracker::onNodeAdded); QObject::connect(registry, &PwRegistry::nodeAdded, this, &PwDefaultTracker::onNodeAdded);
} }
void PwDefaultTracker::reset() {
if (auto* meta = this->defaultsMetadata.object()) {
QObject::disconnect(meta, nullptr, this, nullptr);
}
this->defaultsMetadata.setObject(nullptr);
this->setDefaultSink(nullptr);
this->setDefaultSinkName(QString());
this->setDefaultSource(nullptr);
this->setDefaultSourceName(QString());
this->setDefaultConfiguredSink(nullptr);
this->setDefaultConfiguredSinkName(QString());
this->setDefaultConfiguredSource(nullptr);
this->setDefaultConfiguredSourceName(QString());
}
void PwDefaultTracker::onMetadataAdded(PwMetadata* metadata) { void PwDefaultTracker::onMetadataAdded(PwMetadata* metadata) {
if (metadata->name() == "default") { if (metadata->name() == "default") {
qCDebug(logDefaults) << "Got new defaults metadata object" << metadata; qCDebug(logDefaults) << "Got new defaults metadata object" << metadata;

View file

@ -12,6 +12,7 @@ class PwDefaultTracker: public QObject {
public: public:
explicit PwDefaultTracker(PwRegistry* registry); explicit PwDefaultTracker(PwRegistry* registry);
void reset();
[[nodiscard]] PwNode* defaultSink() const; [[nodiscard]] PwNode* defaultSink() const;
[[nodiscard]] PwNode* defaultSource() const; [[nodiscard]] PwNode* defaultSource() const;

View file

@ -2,7 +2,6 @@
#include <qcontainerfwd.h> #include <qcontainerfwd.h>
#include <qlist.h> #include <qlist.h>
#include <qnamespace.h>
#include <qobject.h> #include <qobject.h>
#include <qqmllist.h> #include <qqmllist.h>
#include <qtmetamacros.h> #include <qtmetamacros.h>
@ -99,15 +98,8 @@ Pipewire::Pipewire(QObject* parent): QObject(parent) {
&Pipewire::defaultConfiguredAudioSourceChanged &Pipewire::defaultConfiguredAudioSourceChanged
); );
if (!connection->registry.isInitialized()) { QObject::connect(&connection->registry, &PwRegistry::initialized, this, &Pipewire::readyChanged);
QObject::connect( QObject::connect(&connection->registry, &PwRegistry::cleared, this, &Pipewire::readyChanged);
&connection->registry,
&PwRegistry::initialized,
this,
&Pipewire::readyChanged,
Qt::SingleShotConnection
);
}
} }
ObjectModel<PwNodeIface>* Pipewire::nodes() { return &this->mNodes; } ObjectModel<PwNodeIface>* Pipewire::nodes() { return &this->mNodes; }

View file

@ -134,6 +134,46 @@ void PwRegistry::init(PwCore& core) {
this->coreSyncSeq = this->core->sync(PW_ID_CORE); this->coreSyncSeq = this->core->sync(PW_ID_CORE);
} }
void PwRegistry::reset() {
if (this->core != nullptr) {
QObject::disconnect(this->core, nullptr, this, nullptr);
}
this->listener.remove();
if (this->object != nullptr) {
pw_proxy_destroy(reinterpret_cast<pw_proxy*>(this->object));
this->object = nullptr;
}
for (auto* meta: this->metadata.values()) {
meta->safeDestroy();
}
this->metadata.clear();
for (auto* link: this->links.values()) {
link->safeDestroy();
}
this->links.clear();
for (auto* node: this->nodes.values()) {
node->safeDestroy();
}
this->nodes.clear();
for (auto* device: this->devices.values()) {
device->safeDestroy();
}
this->devices.clear();
this->linkGroups.clear();
this->initState = InitState::SendingObjects;
this->coreSyncSeq = 0;
this->core = nullptr;
emit this->cleared();
}
void PwRegistry::onCoreSync(quint32 id, qint32 seq) { void PwRegistry::onCoreSync(quint32 id, qint32 seq) {
if (id != PW_ID_CORE || seq != this->coreSyncSeq) return; if (id != PW_ID_CORE || seq != this->coreSyncSeq) return;

View file

@ -116,6 +116,7 @@ class PwRegistry
public: public:
void init(PwCore& core); void init(PwCore& core);
void reset();
[[nodiscard]] bool isInitialized() const { return this->initState == InitState::Done; } [[nodiscard]] bool isInitialized() const { return this->initState == InitState::Done; }
@ -136,6 +137,7 @@ signals:
void linkGroupAdded(PwLinkGroup* group); void linkGroupAdded(PwLinkGroup* group);
void metadataAdded(PwMetadata* metadata); void metadataAdded(PwMetadata* metadata);
void initialized(); void initialized();
void cleared();
private slots: private slots:
void onLinkGroupDestroyed(QObject* object); void onLinkGroupDestroyed(QObject* object);