diff --git a/changelog/next.md b/changelog/next.md index 7857103..e437e6c 100644 --- a/changelog/next.md +++ b/changelog/next.md @@ -18,11 +18,14 @@ set shell id. - Added the ability to override Quickshell.cacheDir with a custom path. - Added minimized, maximized, and fullscreen properties to FloatingWindow. - Added the ability to handle move and resize events to FloatingWindow. +- Pipewire service now reconnects if pipewire dies or a protocol error occurs. +- Added pipewire audio peak detection. ## Other Changes - FreeBSD is now partially supported. - IPC operations filter available instances to the current display connection by default. +- PwNodeLinkTracker ignores sound level monitoring programs. ## Bug Fixes diff --git a/src/core/logging.cpp b/src/core/logging.cpp index 10ea453..d24225b 100644 --- a/src/core/logging.cpp +++ b/src/core/logging.cpp @@ -27,7 +27,10 @@ #include #include #include -#include +#ifdef __linux__ +#include +#include +#endif #include "instanceinfo.hpp" #include "logcat.hpp" @@ -43,6 +46,57 @@ using namespace qt_logging_registry; QS_LOGGING_CATEGORY(logLogging, "quickshell.logging", QtWarningMsg); +namespace { +bool copyFileData(int sourceFd, int destFd, qint64 size) { + auto usize = static_cast(size); + +#ifdef __linux__ + off_t offset = 0; + auto remaining = usize; + + while (remaining > 0) { + auto r = sendfile(destFd, sourceFd, &offset, remaining); + if (r == -1) { + if (errno == EINTR) continue; + return false; + } + if (r == 0) break; + remaining -= static_cast(r); + } + + return true; +#else + std::array buffer = {}; + auto remaining = totalTarget; + + while (remaining > 0) { + auto chunk = std::min(remaining, buffer.size()); + auto r = ::read(sourceFd, buffer.data(), chunk); + if (r == -1) { + if (errno == EINTR) continue; + return false; + } + if (r == 0) break; + + auto readBytes = static_cast(r); + size_t written = 0; + while (written < readBytes) { + auto w = ::write(destFd, buffer.data() + written, readBytes - written); + if (w == -1) { + if (errno == EINTR) continue; + return false; + } + written += static_cast(w); + } + + remaining -= readBytes; + } + + return true; +#endif +} +} // namespace + bool LogMessage::operator==(const LogMessage& other) const { // note: not including time return this->type == other.type && this->category == other.category && this->body == other.body; @@ -414,7 +468,11 @@ void ThreadLogging::initFs() { auto* oldFile = this->file; if (oldFile) { oldFile->seek(0); - copy_file_range(oldFile->handle(), nullptr, file->handle(), nullptr, oldFile->size(), 0); + + if (!copyFileData(oldFile->handle(), file->handle(), oldFile->size())) { + qCritical(logLogging) << "Failed to copy log from memfd with error code " << errno + << qt_error_string(errno); + } } this->file = file; @@ -426,14 +484,10 @@ void ThreadLogging::initFs() { auto* oldFile = this->detailedFile; if (oldFile) { oldFile->seek(0); - copy_file_range( - oldFile->handle(), - nullptr, - detailedFile->handle(), - nullptr, - oldFile->size(), - 0 - ); + if (!copyFileData(oldFile->handle(), detailedFile->handle(), oldFile->size())) { + qCritical(logLogging) << "Failed to copy detailed log from memfd with error code " << errno + << qt_error_string(errno); + } } crash::CrashInfo::INSTANCE.logFd = detailedFile->handle(); diff --git a/src/services/pam/conversation.cpp b/src/services/pam/conversation.cpp index 500abd5..a9d498b 100644 --- a/src/services/pam/conversation.cpp +++ b/src/services/pam/conversation.cpp @@ -1,4 +1,5 @@ #include "conversation.hpp" +#include #include #include @@ -6,7 +7,6 @@ #include #include #include -#include #include #include diff --git a/src/services/pipewire/CMakeLists.txt b/src/services/pipewire/CMakeLists.txt index fddca6f..fe894c9 100644 --- a/src/services/pipewire/CMakeLists.txt +++ b/src/services/pipewire/CMakeLists.txt @@ -3,6 +3,7 @@ pkg_check_modules(pipewire REQUIRED IMPORTED_TARGET libpipewire-0.3) qt_add_library(quickshell-service-pipewire STATIC qml.cpp + peak.cpp core.cpp connection.cpp registry.cpp diff --git a/src/services/pipewire/connection.cpp b/src/services/pipewire/connection.cpp index ac4c5e6..c2f505f 100644 --- a/src/services/pipewire/connection.cpp +++ b/src/services/pipewire/connection.cpp @@ -1,15 +1,137 @@ #include "connection.hpp" +#include +#include +#include +#include +#include #include +#include +#include + +#include "../../core/logcat.hpp" +#include "core.hpp" namespace qs::service::pipewire { +namespace { +QS_LOGGING_CATEGORY(logConnection, "quickshell.service.pipewire.connection", QtWarningMsg); +} + PwConnection::PwConnection(QObject* parent): QObject(parent) { - if (this->core.isValid()) { - this->registry.init(this->core); + this->runtimeDir = PwConnection::resolveRuntimeDir(); + + QObject::connect(&this->core, &PwCore::fatalError, this, &PwConnection::queueFatalError); + + if (!this->tryConnect(false) + && qEnvironmentVariableIntValue("QS_PIPEWIRE_IMMEDIATE_RECONNECT") == 1) + { + this->beginReconnect(); } } +QString PwConnection::resolveRuntimeDir() { + auto runtimeDir = qEnvironmentVariable("PIPEWIRE_RUNTIME_DIR"); + if (runtimeDir.isEmpty()) { + runtimeDir = qEnvironmentVariable("XDG_RUNTIME_DIR"); + } + + if (runtimeDir.isEmpty()) { + runtimeDir = QString("/run/user/%1").arg(getuid()); + } + + return runtimeDir; +} + +void PwConnection::beginReconnect() { + if (this->core.isValid()) { + this->stopSocketWatcher(); + return; + } + + if (!qEnvironmentVariableIsEmpty("PIPEWIRE_REMOTE")) return; + + if (this->runtimeDir.isEmpty()) { + qCWarning( + logConnection + ) << "Cannot watch runtime dir for pipewire reconnects: runtime dir is empty."; + return; + } + + this->startSocketWatcher(); + this->tryConnect(true); +} + +bool PwConnection::tryConnect(bool retry) { + if (this->core.isValid()) return true; + + qCDebug(logConnection) << "Attempting reconnect..."; + if (!this->core.start(retry)) { + return false; + } + + qCInfo(logConnection) << "Connection established"; + this->stopSocketWatcher(); + + this->registry.init(this->core); + return true; +} + +void PwConnection::startSocketWatcher() { + if (this->socketWatcher != nullptr) return; + if (!qEnvironmentVariableIsEmpty("PIPEWIRE_REMOTE")) return; + + auto dir = QDir(this->runtimeDir); + if (!dir.exists()) { + qCWarning(logConnection) << "Cannot wait for a new pipewire socket, runtime dir does not exist:" + << this->runtimeDir; + return; + } + + this->socketWatcher = new QFileSystemWatcher(this); + this->socketWatcher->addPath(this->runtimeDir); + + QObject::connect( + this->socketWatcher, + &QFileSystemWatcher::directoryChanged, + this, + &PwConnection::onRuntimeDirChanged + ); +} + +void PwConnection::stopSocketWatcher() { + if (this->socketWatcher == nullptr) return; + + this->socketWatcher->deleteLater(); + this->socketWatcher = nullptr; +} + +void PwConnection::queueFatalError() { + if (this->fatalErrorQueued) return; + + this->fatalErrorQueued = true; + QMetaObject::invokeMethod(this, &PwConnection::onFatalError, Qt::QueuedConnection); +} + +void PwConnection::onFatalError() { + this->fatalErrorQueued = false; + + this->defaults.reset(); + this->registry.reset(); + this->core.shutdown(); + + this->beginReconnect(); +} + +void PwConnection::onRuntimeDirChanged(const QString& /*path*/) { + if (this->core.isValid()) { + this->stopSocketWatcher(); + return; + } + + this->tryConnect(true); +} + PwConnection* PwConnection::instance() { static PwConnection* instance = nullptr; // NOLINT diff --git a/src/services/pipewire/connection.hpp b/src/services/pipewire/connection.hpp index 2b3e860..d0374f8 100644 --- a/src/services/pipewire/connection.hpp +++ b/src/services/pipewire/connection.hpp @@ -1,9 +1,13 @@ #pragma once +#include + #include "core.hpp" #include "defaults.hpp" #include "registry.hpp" +class QFileSystemWatcher; + namespace qs::service::pipewire { class PwConnection: public QObject { @@ -18,6 +22,23 @@ public: static PwConnection* instance(); private: + static QString resolveRuntimeDir(); + + void beginReconnect(); + bool tryConnect(bool retry); + void startSocketWatcher(); + void stopSocketWatcher(); + +private slots: + void queueFatalError(); + void onFatalError(); + void onRuntimeDirChanged(const QString& path); + +private: + QString runtimeDir; + QFileSystemWatcher* socketWatcher = nullptr; + bool fatalErrorQueued = false; + // init/destroy order is important. do not rearrange. PwCore core; }; diff --git a/src/services/pipewire/core.cpp b/src/services/pipewire/core.cpp index 22445aa..e40bc54 100644 --- a/src/services/pipewire/core.cpp +++ b/src/services/pipewire/core.cpp @@ -27,7 +27,7 @@ const pw_core_events PwCore::EVENTS = { .info = nullptr, .done = &PwCore::onSync, .ping = nullptr, - .error = nullptr, + .error = &PwCore::onError, .remove_id = nullptr, .bound_id = nullptr, .add_mem = nullptr, @@ -36,26 +36,46 @@ const pw_core_events PwCore::EVENTS = { }; PwCore::PwCore(QObject* parent): QObject(parent), notifier(QSocketNotifier::Read) { - qCInfo(logLoop) << "Creating pipewire event loop."; pw_init(nullptr, nullptr); +} + +bool PwCore::start(bool retry) { + if (this->core != nullptr) return true; + + qCInfo(logLoop) << "Creating pipewire event loop."; this->loop = pw_loop_new(nullptr); if (this->loop == nullptr) { - qCCritical(logLoop) << "Failed to create pipewire event loop."; - return; + if (retry) { + qCInfo(logLoop) << "Failed to create pipewire event loop."; + } else { + qCCritical(logLoop) << "Failed to create pipewire event loop."; + } + this->shutdown(); + return false; } this->context = pw_context_new(this->loop, nullptr, 0); if (this->context == nullptr) { - qCCritical(logLoop) << "Failed to create pipewire context."; - return; + if (retry) { + qCInfo(logLoop) << "Failed to create pipewire context."; + } else { + qCCritical(logLoop) << "Failed to create pipewire context."; + } + this->shutdown(); + return false; } qCInfo(logLoop) << "Connecting to pipewire server."; this->core = pw_context_connect(this->context, nullptr, 0); if (this->core == nullptr) { - qCCritical(logLoop) << "Failed to connect pipewire context. Errno:" << errno; - return; + if (retry) { + qCInfo(logLoop) << "Failed to connect pipewire context. Errno:" << errno; + } else { + qCCritical(logLoop) << "Failed to connect pipewire context. Errno:" << errno; + } + this->shutdown(); + return false; } pw_core_add_listener(this->core, &this->listener.hook, &PwCore::EVENTS, this); @@ -66,22 +86,34 @@ PwCore::PwCore(QObject* parent): QObject(parent), notifier(QSocketNotifier::Read this->notifier.setSocket(fd); QObject::connect(&this->notifier, &QSocketNotifier::activated, this, &PwCore::poll); this->notifier.setEnabled(true); + + return true; +} + +void PwCore::shutdown() { + if (this->core != nullptr) { + this->listener.remove(); + pw_core_disconnect(this->core); + this->core = nullptr; + } + + if (this->context != nullptr) { + pw_context_destroy(this->context); + this->context = nullptr; + } + + if (this->loop != nullptr) { + pw_loop_destroy(this->loop); + this->loop = nullptr; + } + + this->notifier.setEnabled(false); + QObject::disconnect(&this->notifier, nullptr, this, nullptr); } PwCore::~PwCore() { qCInfo(logLoop) << "Destroying PwCore."; - - if (this->loop != nullptr) { - if (this->context != nullptr) { - if (this->core != nullptr) { - pw_core_disconnect(this->core); - } - - pw_context_destroy(this->context); - } - - pw_loop_destroy(this->loop); - } + this->shutdown(); } bool PwCore::isValid() const { @@ -90,6 +122,7 @@ bool PwCore::isValid() const { } void PwCore::poll() { + if (this->loop == nullptr) return; qCDebug(logLoop) << "Pipewire event loop received new events, iterating."; // Spin pw event loop. pw_loop_iterate(this->loop, 0); @@ -107,6 +140,18 @@ void PwCore::onSync(void* data, quint32 id, qint32 seq) { emit self->synced(id, seq); } +void PwCore::onError(void* data, quint32 id, qint32 /*seq*/, qint32 res, const char* message) { + auto* self = static_cast(data); + + if (message != nullptr) { + qCWarning(logLoop) << "Fatal pipewire error on object" << id << "with code" << res << message; + } else { + qCWarning(logLoop) << "Fatal pipewire error on object" << id << "with code" << res; + } + + emit self->fatalError(); +} + SpaHook::SpaHook() { // NOLINT spa_zero(this->hook); } diff --git a/src/services/pipewire/core.hpp b/src/services/pipewire/core.hpp index 262e2d3..967efaf 100644 --- a/src/services/pipewire/core.hpp +++ b/src/services/pipewire/core.hpp @@ -30,6 +30,9 @@ public: ~PwCore() override; Q_DISABLE_COPY_MOVE(PwCore); + bool start(bool retry); + void shutdown(); + [[nodiscard]] bool isValid() const; [[nodiscard]] qint32 sync(quint32 id) const; @@ -40,6 +43,7 @@ public: signals: void polled(); void synced(quint32 id, qint32 seq); + void fatalError(); private slots: void poll(); @@ -48,6 +52,7 @@ private: static const pw_core_events EVENTS; static void onSync(void* data, quint32 id, qint32 seq); + static void onError(void* data, quint32 id, qint32 seq, qint32 res, const char* message); QSocketNotifier notifier; SpaHook listener; diff --git a/src/services/pipewire/defaults.cpp b/src/services/pipewire/defaults.cpp index 88a1dc1..02463f4 100644 --- a/src/services/pipewire/defaults.cpp +++ b/src/services/pipewire/defaults.cpp @@ -31,6 +31,22 @@ PwDefaultTracker::PwDefaultTracker(PwRegistry* registry): registry(registry) { QObject::connect(registry, &PwRegistry::nodeAdded, this, &PwDefaultTracker::onNodeAdded); } +void PwDefaultTracker::reset() { + if (auto* meta = this->defaultsMetadata.object()) { + QObject::disconnect(meta, nullptr, this, nullptr); + } + + this->defaultsMetadata.setObject(nullptr); + this->setDefaultSink(nullptr); + this->setDefaultSinkName(QString()); + this->setDefaultSource(nullptr); + this->setDefaultSourceName(QString()); + this->setDefaultConfiguredSink(nullptr); + this->setDefaultConfiguredSinkName(QString()); + this->setDefaultConfiguredSource(nullptr); + this->setDefaultConfiguredSourceName(QString()); +} + void PwDefaultTracker::onMetadataAdded(PwMetadata* metadata) { if (metadata->name() == "default") { qCDebug(logDefaults) << "Got new defaults metadata object" << metadata; diff --git a/src/services/pipewire/defaults.hpp b/src/services/pipewire/defaults.hpp index f3a8e3f..591c4fd 100644 --- a/src/services/pipewire/defaults.hpp +++ b/src/services/pipewire/defaults.hpp @@ -12,6 +12,7 @@ class PwDefaultTracker: public QObject { public: explicit PwDefaultTracker(PwRegistry* registry); + void reset(); [[nodiscard]] PwNode* defaultSink() const; [[nodiscard]] PwNode* defaultSource() const; diff --git a/src/services/pipewire/module.md b/src/services/pipewire/module.md index d109f05..e34f77d 100644 --- a/src/services/pipewire/module.md +++ b/src/services/pipewire/module.md @@ -2,6 +2,7 @@ name = "Quickshell.Services.Pipewire" description = "Pipewire API" headers = [ "qml.hpp", + "peak.hpp", "link.hpp", "node.hpp", ] diff --git a/src/services/pipewire/node.cpp b/src/services/pipewire/node.cpp index c34fa17..1b396af 100644 --- a/src/services/pipewire/node.cpp +++ b/src/services/pipewire/node.cpp @@ -11,7 +11,7 @@ #include #include #include -#include +#include #include #include #include @@ -90,6 +90,8 @@ QString PwAudioChannel::toString(Enum value) { QString PwNodeType::toString(PwNodeType::Flags type) { switch (type) { + // qstringliteral apparently not imported... + // NOLINTBEGIN case PwNodeType::VideoSource: return QStringLiteral("VideoSource"); case PwNodeType::VideoSink: return QStringLiteral("VideoSink"); case PwNodeType::AudioSource: return QStringLiteral("AudioSource"); @@ -99,6 +101,7 @@ QString PwNodeType::toString(PwNodeType::Flags type) { case PwNodeType::AudioInStream: return QStringLiteral("AudioInStream"); case PwNodeType::Untracked: return QStringLiteral("Untracked"); default: return QStringLiteral("Invalid"); + // NOLINTEND } } @@ -161,6 +164,24 @@ void PwNode::initProps(const spa_dict* props) { this->nick = nodeNick; } + if (const auto* nodeCategory = spa_dict_lookup(props, PW_KEY_MEDIA_CATEGORY)) { + if (strcmp(nodeCategory, "Monitor") == 0 || strcmp(nodeCategory, "Manager") == 0) { + this->isMonitor = true; + } + } + + if (const auto* serial = spa_dict_lookup(props, PW_KEY_OBJECT_SERIAL)) { + auto ok = false; + auto value = QString::fromUtf8(serial).toULongLong(&ok); + if (!ok) { + qCWarning(logNode) << this + << "has an object.serial property but the value is not valid. Value:" + << serial; + } else { + this->objectSerial = value; + } + } + if (const auto* deviceId = spa_dict_lookup(props, PW_KEY_DEVICE_ID)) { auto ok = false; auto id = QString::fromUtf8(deviceId).toInt(&ok); diff --git a/src/services/pipewire/node.hpp b/src/services/pipewire/node.hpp index 45e1551..fdec72d 100644 --- a/src/services/pipewire/node.hpp +++ b/src/services/pipewire/node.hpp @@ -199,6 +199,8 @@ public: [[nodiscard]] QVector volumes() const; void setVolumes(const QVector& volumes); + [[nodiscard]] QVector server() const; + signals: void volumesChanged(); void channelsChanged(); @@ -233,6 +235,8 @@ public: QString description; QString nick; QMap properties; + quint64 objectSerial = 0; + bool isMonitor = false; PwNodeType::Flags type = PwNodeType::Untracked; diff --git a/src/services/pipewire/peak.cpp b/src/services/pipewire/peak.cpp new file mode 100644 index 0000000..64b5c42 --- /dev/null +++ b/src/services/pipewire/peak.cpp @@ -0,0 +1,404 @@ +#include "peak.hpp" +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../../core/logcat.hpp" +#include "connection.hpp" +#include "core.hpp" +#include "node.hpp" +#include "qml.hpp" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wmissing-designated-field-initializers" + +namespace qs::service::pipewire { + +namespace { +QS_LOGGING_CATEGORY(logPeak, "quickshell.service.pipewire.peak", QtWarningMsg); +} + +class PwPeakStream { +public: + PwPeakStream(PwNodePeakMonitor* monitor, PwNode* node): monitor(monitor), node(node) {} + ~PwPeakStream() { this->destroy(); } + Q_DISABLE_COPY_MOVE(PwPeakStream); + + bool start(); + void destroy(); + +private: + static const pw_stream_events EVENTS; + static void onProcess(void* data); + static void onParamChanged(void* data, uint32_t id, const spa_pod* param); + static void + onStateChanged(void* data, pw_stream_state oldState, pw_stream_state state, const char* error); + static void onDestroy(void* data); + + void handleProcess(); + void handleParamChanged(uint32_t id, const spa_pod* param); + void handleStateChanged(pw_stream_state oldState, pw_stream_state state, const char* error); + void resetFormat(); + + PwNodePeakMonitor* monitor = nullptr; + PwNode* node = nullptr; + pw_stream* stream = nullptr; + SpaHook listener; + spa_audio_info_raw format = SPA_AUDIO_INFO_RAW_INIT(.format = SPA_AUDIO_FORMAT_UNKNOWN); + bool formatReady = false; + QVector channelPeaks; +}; + +const pw_stream_events PwPeakStream::EVENTS = { + .version = PW_VERSION_STREAM_EVENTS, + .destroy = &PwPeakStream::onDestroy, + .state_changed = &PwPeakStream::onStateChanged, + .param_changed = &PwPeakStream::onParamChanged, + .process = &PwPeakStream::onProcess, +}; + +bool PwPeakStream::start() { + auto* core = PwConnection::instance()->registry.core; + if (core == nullptr || !core->isValid()) { + qCWarning(logPeak) << "Cannot start peak monitor stream: pipewire core is not ready."; + return false; + } + + auto target = + QByteArray::number(this->node->objectSerial ? this->node->objectSerial : this->node->id); + + // clang-format off + auto* props = pw_properties_new( + PW_KEY_MEDIA_TYPE, "Audio", + PW_KEY_MEDIA_CATEGORY, "Monitor", + PW_KEY_MEDIA_NAME, "Peak detect", + PW_KEY_APP_NAME, "Quickshell Peak Detect", + PW_KEY_STREAM_MONITOR, "true", + PW_KEY_STREAM_CAPTURE_SINK, this->node->type.testFlags(PwNodeType::Sink) ? "true" : "false", + PW_KEY_TARGET_OBJECT, target.constData(), + nullptr + ); + // clang-format on + + if (props == nullptr) { + qCWarning(logPeak) << "Failed to create properties for peak monitor stream."; + return false; + } + + this->stream = pw_stream_new(core->core, "quickshell-peak-monitor", props); + if (this->stream == nullptr) { + qCWarning(logPeak) << "Failed to create peak monitor stream."; + return false; + } + + pw_stream_add_listener(this->stream, &this->listener.hook, &PwPeakStream::EVENTS, this); + + auto buffer = std::array {}; + auto builder = SPA_POD_BUILDER_INIT(buffer.data(), buffer.size()); // NOLINT + + auto params = std::array {}; + auto raw = SPA_AUDIO_INFO_RAW_INIT(.format = SPA_AUDIO_FORMAT_F32); + params[0] = spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, &raw); + + auto flags = + static_cast(PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS); + auto res = + pw_stream_connect(this->stream, PW_DIRECTION_INPUT, PW_ID_ANY, flags, params.data(), 1); + + if (res < 0) { + qCWarning(logPeak) << "Failed to connect peak monitor stream:" << res; + this->destroy(); + return false; + } + + return true; +} + +void PwPeakStream::destroy() { + if (this->stream == nullptr) return; + this->listener.remove(); + pw_stream_destroy(this->stream); + this->stream = nullptr; + this->resetFormat(); +} + +void PwPeakStream::onProcess(void* data) { + static_cast(data)->handleProcess(); // NOLINT +} + +void PwPeakStream::onParamChanged(void* data, uint32_t id, const spa_pod* param) { + static_cast(data)->handleParamChanged(id, param); // NOLINT +} + +void PwPeakStream::onStateChanged( + void* data, + pw_stream_state oldState, + pw_stream_state state, + const char* error +) { + static_cast(data)->handleStateChanged(oldState, state, error); // NOLINT +} + +void PwPeakStream::onDestroy(void* data) { + auto* self = static_cast(data); // NOLINT + self->stream = nullptr; + self->listener.remove(); + self->resetFormat(); +} + +void PwPeakStream::handleStateChanged( + pw_stream_state oldState, + pw_stream_state state, + const char* error +) { + if (state == PW_STREAM_STATE_ERROR) { + if (error != nullptr) { + qCWarning(logPeak) << "Peak monitor stream error:" << error; + } else { + qCWarning(logPeak) << "Peak monitor stream error."; + } + } + + if (state == PW_STREAM_STATE_PAUSED && oldState != PW_STREAM_STATE_PAUSED) { + auto peakCount = this->monitor->mChannels.length(); + if (peakCount == 0) { + peakCount = this->monitor->mPeaks.length(); + } + if (peakCount == 0 && this->formatReady) { + peakCount = static_cast(this->format.channels); + } + + if (peakCount > 0) { + auto zeros = QVector(peakCount, 0.0f); + this->monitor->updatePeaks(zeros, 0.0f); + } + } +} + +void PwPeakStream::handleParamChanged(uint32_t id, const spa_pod* param) { + if (param == nullptr || id != SPA_PARAM_Format) return; + + auto info = spa_audio_info {}; + if (spa_format_parse(param, &info.media_type, &info.media_subtype) < 0) return; + + if (info.media_type != SPA_MEDIA_TYPE_audio || info.media_subtype != SPA_MEDIA_SUBTYPE_raw) + return; + + auto raw = SPA_AUDIO_INFO_RAW_INIT(.format = SPA_AUDIO_FORMAT_UNKNOWN); // NOLINT + if (spa_format_audio_raw_parse(param, &raw) < 0) return; + + if (raw.format != SPA_AUDIO_FORMAT_F32) { + qCWarning(logPeak) << "Unsupported peak monitor format for" << this->node << ":" << raw.format; + this->resetFormat(); + return; + } + + this->format = raw; + this->formatReady = raw.channels > 0; + + auto channels = QVector(); + channels.reserve(static_cast(raw.channels)); + + for (quint32 i = 0; i < raw.channels; i++) { + if ((raw.flags & SPA_AUDIO_FLAG_UNPOSITIONED) != 0) { + channels.push_back(PwAudioChannel::Unknown); + } else { + channels.push_back(static_cast(raw.position[i])); + } + } + + this->channelPeaks.fill(0.0f, channels.size()); + this->monitor->updateChannels(channels); + this->monitor->updatePeaks(this->channelPeaks, 0.0f); +} + +void PwPeakStream::resetFormat() { + this->format = SPA_AUDIO_INFO_RAW_INIT(.format = SPA_AUDIO_FORMAT_UNKNOWN); + this->formatReady = false; + this->channelPeaks.clear(); + this->monitor->clearPeaks(); +} + +void PwPeakStream::handleProcess() { + if (!this->formatReady || this->stream == nullptr) return; + + auto* buffer = pw_stream_dequeue_buffer(this->stream); + auto requeue = qScopeGuard([&, this] { pw_stream_queue_buffer(this->stream, buffer); }); + + if (buffer == nullptr) { + qCWarning(logPeak) << "Peak monitor ran out of buffers."; + return; + } + + auto* spaBuffer = buffer->buffer; + if (spaBuffer == nullptr || spaBuffer->n_datas < 1) { + return; + } + + auto* data = &spaBuffer->datas[0]; // NOLINT + if (data->data == nullptr || data->chunk == nullptr) { + return; + } + + auto channelCount = static_cast(this->format.channels); + if (channelCount <= 0) { + return; + } + + const auto* base = static_cast(data->data) + data->chunk->offset; // NOLINT + const auto* samples = reinterpret_cast(base); + auto sampleCount = static_cast(data->chunk->size / sizeof(float)); + + if (sampleCount < channelCount) { + return; + } + + QVector volumes; + if (auto* audioData = dynamic_cast(this->node->boundData)) { + if (!this->node->shouldUseDevice()) volumes = audioData->volumes(); + } + + this->channelPeaks.fill(0.0f, channelCount); + + auto maxPeak = 0.0f; + for (auto channel = 0; channel < channelCount; channel++) { + auto peak = 0.0f; + for (auto sample = channel; sample < sampleCount; sample += channelCount) { + peak = std::max(peak, std::abs(samples[sample])); // NOLINT + } + + auto visualPeak = std::cbrt(peak); + if (!volumes.isEmpty() && volumes[channel] != 0.0f) visualPeak *= 1.0f / volumes[channel]; + + this->channelPeaks[channel] = visualPeak; + maxPeak = std::max(maxPeak, visualPeak); + } + + this->monitor->updatePeaks(this->channelPeaks, maxPeak); +} + +PwNodePeakMonitor::PwNodePeakMonitor(QObject* parent): QObject(parent) {} + +PwNodePeakMonitor::~PwNodePeakMonitor() { + delete this->mStream; + this->mStream = nullptr; +} + +PwNodeIface* PwNodePeakMonitor::node() const { return this->mNode; } + +void PwNodePeakMonitor::setNode(PwNodeIface* node) { + if (node == this->mNode) return; + + if (this->mNode != nullptr) { + QObject::disconnect(this->mNode, nullptr, this, nullptr); + } + + if (node != nullptr) { + QObject::connect(node, &QObject::destroyed, this, &PwNodePeakMonitor::onNodeDestroyed); + } + + this->mNode = node; + this->mNodeRef.setObject(node != nullptr ? node->node() : nullptr); + this->rebuildStream(); + emit this->nodeChanged(); +} + +bool PwNodePeakMonitor::isEnabled() const { return this->mEnabled; } + +void PwNodePeakMonitor::setEnabled(bool enabled) { + if (enabled == this->mEnabled) return; + this->mEnabled = enabled; + this->rebuildStream(); + emit this->enabledChanged(); +} + +void PwNodePeakMonitor::onNodeDestroyed() { + this->mNode = nullptr; + this->mNodeRef.setObject(nullptr); + this->rebuildStream(); + emit this->nodeChanged(); +} + +void PwNodePeakMonitor::updatePeaks(const QVector& peaks, float peak) { + if (this->mPeaks != peaks) { + this->mPeaks = peaks; + emit this->peaksChanged(); + } + + if (this->mPeak != peak) { + this->mPeak = peak; + emit this->peakChanged(); + } +} + +void PwNodePeakMonitor::updateChannels(const QVector& channels) { + if (this->mChannels == channels) return; + this->mChannels = channels; + emit this->channelsChanged(); +} + +void PwNodePeakMonitor::clearPeaks() { + if (!this->mPeaks.isEmpty()) { + this->mPeaks.clear(); + emit this->peaksChanged(); + } + + if (!this->mChannels.isEmpty()) { + this->mChannels.clear(); + emit this->channelsChanged(); + } + + if (this->mPeak != 0.0f) { + this->mPeak = 0.0f; + emit this->peakChanged(); + } +} + +void PwNodePeakMonitor::rebuildStream() { + delete this->mStream; + this->mStream = nullptr; + + auto* node = this->mNodeRef.object(); + if (!this->mEnabled || node == nullptr) { + this->clearPeaks(); + return; + } + + if (node == nullptr || !node->type.testFlags(PwNodeType::Audio)) { + this->clearPeaks(); + return; + } + + this->mStream = new PwPeakStream(this, node); + if (!this->mStream->start()) { + delete this->mStream; + this->mStream = nullptr; + this->clearPeaks(); + } +} + +} // namespace qs::service::pipewire + +#pragma GCC diagnostic pop diff --git a/src/services/pipewire/peak.hpp b/src/services/pipewire/peak.hpp new file mode 100644 index 0000000..c4af3c2 --- /dev/null +++ b/src/services/pipewire/peak.hpp @@ -0,0 +1,87 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "node.hpp" + +namespace qs::service::pipewire { + +class PwNodeIface; +class PwPeakStream; + +} // namespace qs::service::pipewire + +Q_DECLARE_OPAQUE_POINTER(qs::service::pipewire::PwNodeIface*); + +namespace qs::service::pipewire { + +///! Monitors peak levels of an audio node. +/// Tracks volume peaks for a node across all its channels. +/// +/// The peak monitor binds nodes similarly to @@PwObjectTracker when enabled. +class PwNodePeakMonitor: public QObject { + Q_OBJECT; + // clang-format off + /// The node to monitor. Must be an audio node. + Q_PROPERTY(qs::service::pipewire::PwNodeIface* node READ node WRITE setNode NOTIFY nodeChanged); + /// If true, the monitor is actively capturing and computing peaks. Defaults to true. + Q_PROPERTY(bool enabled READ isEnabled WRITE setEnabled NOTIFY enabledChanged); + /// Per-channel peak noise levels (0.0-1.0). Length matches @@channels. + /// + /// The channel's volume does not affect this property. + Q_PROPERTY(QVector peaks READ peaks NOTIFY peaksChanged); + /// Maximum value of @@peaks. + Q_PROPERTY(float peak READ peak NOTIFY peakChanged); + /// Channel positions for the captured format. Length matches @@peaks. + Q_PROPERTY(QVector channels READ channels NOTIFY channelsChanged); + // clang-format on + QML_ELEMENT; + +public: + explicit PwNodePeakMonitor(QObject* parent = nullptr); + ~PwNodePeakMonitor() override; + Q_DISABLE_COPY_MOVE(PwNodePeakMonitor); + + [[nodiscard]] PwNodeIface* node() const; + void setNode(PwNodeIface* node); + + [[nodiscard]] bool isEnabled() const; + void setEnabled(bool enabled); + + [[nodiscard]] QVector peaks() const { return this->mPeaks; } + [[nodiscard]] float peak() const { return this->mPeak; } + [[nodiscard]] QVector channels() const { return this->mChannels; } + +signals: + void nodeChanged(); + void enabledChanged(); + void peaksChanged(); + void peakChanged(); + void channelsChanged(); + +private slots: + void onNodeDestroyed(); + +private: + friend class PwPeakStream; + + void updatePeaks(const QVector& peaks, float peak); + void updateChannels(const QVector& channels); + void clearPeaks(); + void rebuildStream(); + + PwNodeIface* mNode = nullptr; + PwBindableRef mNodeRef; + bool mEnabled = true; + QVector mPeaks; + float mPeak = 0.0f; + QVector mChannels; + PwPeakStream* mStream = nullptr; +}; + +} // namespace qs::service::pipewire diff --git a/src/services/pipewire/qml.cpp b/src/services/pipewire/qml.cpp index 9efb17e..e4424c1 100644 --- a/src/services/pipewire/qml.cpp +++ b/src/services/pipewire/qml.cpp @@ -2,7 +2,6 @@ #include #include -#include #include #include #include @@ -99,15 +98,8 @@ Pipewire::Pipewire(QObject* parent): QObject(parent) { &Pipewire::defaultConfiguredAudioSourceChanged ); - if (!connection->registry.isInitialized()) { - QObject::connect( - &connection->registry, - &PwRegistry::initialized, - this, - &Pipewire::readyChanged, - Qt::SingleShotConnection - ); - } + QObject::connect(&connection->registry, &PwRegistry::initialized, this, &Pipewire::readyChanged); + QObject::connect(&connection->registry, &PwRegistry::cleared, this, &Pipewire::readyChanged); } ObjectModel* Pipewire::nodes() { return &this->mNodes; } @@ -221,6 +213,7 @@ void PwNodeLinkTracker::updateLinks() { || (this->mNode->isSink() && link->inputNode() == this->mNode->id())) { auto* iface = PwLinkGroupIface::instance(link); + if (iface->target()->node()->isMonitor) return; // do not connect twice if (!this->mLinkGroups.contains(iface)) { @@ -239,7 +232,7 @@ void PwNodeLinkTracker::updateLinks() { for (auto* iface: this->mLinkGroups) { // only disconnect no longer used nodes - if (!newLinks.contains(iface)) { + if (!newLinks.contains(iface) || iface->target()->node()->isMonitor) { QObject::disconnect(iface, nullptr, this, nullptr); } } @@ -279,6 +272,8 @@ void PwNodeLinkTracker::onLinkGroupCreated(PwLinkGroup* linkGroup) { || (this->mNode->isSink() && linkGroup->inputNode() == this->mNode->id())) { auto* iface = PwLinkGroupIface::instance(linkGroup); + if (iface->target()->node()->isMonitor) return; + QObject::connect(iface, &QObject::destroyed, this, &PwNodeLinkTracker::onLinkGroupDestroyed); this->mLinkGroups.push_back(iface); emit this->linkGroupsChanged(); diff --git a/src/services/pipewire/qml.hpp b/src/services/pipewire/qml.hpp index e3489a1..a43ce19 100644 --- a/src/services/pipewire/qml.hpp +++ b/src/services/pipewire/qml.hpp @@ -171,13 +171,13 @@ private: ObjectModel mLinkGroups {this}; }; -///! Tracks all link connections to a given node. +///! Tracks non-monitor link connections to a given node. class PwNodeLinkTracker: public QObject { Q_OBJECT; // clang-format off /// The node to track connections to. Q_PROPERTY(qs::service::pipewire::PwNodeIface* node READ node WRITE setNode NOTIFY nodeChanged); - /// Link groups connected to the given node. + /// Link groups connected to the given node, excluding monitors. /// /// If the node is a sink, links which target the node will be tracked. /// If the node is a source, links which source the node will be tracked. diff --git a/src/services/pipewire/registry.cpp b/src/services/pipewire/registry.cpp index c08fc1d..4b670b1 100644 --- a/src/services/pipewire/registry.cpp +++ b/src/services/pipewire/registry.cpp @@ -134,6 +134,46 @@ void PwRegistry::init(PwCore& core) { this->coreSyncSeq = this->core->sync(PW_ID_CORE); } +void PwRegistry::reset() { + if (this->core != nullptr) { + QObject::disconnect(this->core, nullptr, this, nullptr); + } + + this->listener.remove(); + + if (this->object != nullptr) { + pw_proxy_destroy(reinterpret_cast(this->object)); + this->object = nullptr; + } + + for (auto* meta: this->metadata.values()) { + meta->safeDestroy(); + } + this->metadata.clear(); + + for (auto* link: this->links.values()) { + link->safeDestroy(); + } + this->links.clear(); + + for (auto* node: this->nodes.values()) { + node->safeDestroy(); + } + this->nodes.clear(); + + for (auto* device: this->devices.values()) { + device->safeDestroy(); + } + this->devices.clear(); + + this->linkGroups.clear(); + this->initState = InitState::SendingObjects; + this->coreSyncSeq = 0; + this->core = nullptr; + + emit this->cleared(); +} + void PwRegistry::onCoreSync(quint32 id, qint32 seq) { if (id != PW_ID_CORE || seq != this->coreSyncSeq) return; diff --git a/src/services/pipewire/registry.hpp b/src/services/pipewire/registry.hpp index 8473f04..bb2db8c 100644 --- a/src/services/pipewire/registry.hpp +++ b/src/services/pipewire/registry.hpp @@ -116,6 +116,7 @@ class PwRegistry public: void init(PwCore& core); + void reset(); [[nodiscard]] bool isInitialized() const { return this->initState == InitState::Done; } @@ -136,6 +137,7 @@ signals: void linkGroupAdded(PwLinkGroup* group); void metadataAdded(PwMetadata* metadata); void initialized(); + void cleared(); private slots: void onLinkGroupDestroyed(QObject* object);