// DragonX Wallet - ImGui Edition // Copyright 2024-2026 The Hush Developers // Released under the GPLv3 #include "rpc_worker.h" #include #include "../util/logger.h" namespace dragonx { namespace rpc { RPCWorker::RPCWorker() = default; RPCWorker::~RPCWorker() { stop(); } void RPCWorker::start() { if (running_.load(std::memory_order_relaxed)) return; running_.store(true, std::memory_order_release); thread_ = std::thread(&RPCWorker::run, this); } void RPCWorker::stop() { if (!running_.load(std::memory_order_relaxed) && !thread_.joinable()) return; // Signal stop if not already signaled requestStop(); if (thread_.joinable()) { thread_.join(); } // Discard pending tasks { std::lock_guard lk(taskMtx_); tasks_.clear(); } } void RPCWorker::requestStop() { if (!running_.load(std::memory_order_relaxed)) return; { std::lock_guard lk(taskMtx_); running_.store(false, std::memory_order_release); } taskCv_.notify_one(); } void RPCWorker::post(WorkFn work) { { std::lock_guard lk(taskMtx_); tasks_.push_back(std::move(work)); } taskCv_.notify_one(); } int RPCWorker::drainResults() { // Swap the result queue under the lock, then execute outside the lock // to minimise contention with the worker thread. std::deque batch; { std::lock_guard lk(resultMtx_); batch.swap(results_); } int count = 0; for (auto& cb : batch) { if (cb) { try { cb(); } catch (const std::exception& e) { DEBUG_LOGF("[RPCWorker] Main-thread callback threw: %s\n", e.what()); } catch (...) { DEBUG_LOGF("[RPCWorker] Main-thread callback threw unknown exception\n"); } ++count; } } return count; } bool RPCWorker::hasPendingResults() const { std::lock_guard lk(resultMtx_); return !results_.empty(); } void RPCWorker::run() { while (true) { WorkFn task; // Wait for a task or stop signal { std::unique_lock lk(taskMtx_); taskCv_.wait(lk, [this] { return !tasks_.empty() || !running_.load(std::memory_order_acquire); }); if (!running_.load(std::memory_order_acquire) && tasks_.empty()) { break; } if (!tasks_.empty()) { task = std::move(tasks_.front()); tasks_.pop_front(); } } if (!task) continue; // Execute the work function (blocking I/O happens here) try { MainCb result = task(); if (result) { std::lock_guard lk(resultMtx_); results_.push_back(std::move(result)); } } catch (const std::exception& e) { DEBUG_LOGF("[RPCWorker] Task threw: %s\n", e.what()); } } } } // namespace rpc } // namespace dragonx