fix(rpc): abort in-flight curl on disconnect/shutdown to avoid UI freezes
stop()-ing a worker that is mid curl_easy_perform joined on the UI thread, so a slow/hung transfer froze the UI until the request timeout. Add RPCClient:: requestAbort() (a thread-safe atomic read by a curl progress callback that aborts the transfer), and call it before stopping the workers on disconnect (onDisconnected) and shutdown (beginShutdown + the synchronous fallback). The flag is cleared on each connect() so a fresh connection never starts aborted. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -2693,9 +2693,12 @@ void App::beginShutdown()
|
|||||||
shutdown_start_time_ = std::chrono::steady_clock::now();
|
shutdown_start_time_ = std::chrono::steady_clock::now();
|
||||||
async_tasks_.cancelAll();
|
async_tasks_.cancelAll();
|
||||||
|
|
||||||
// Signal the RPC worker to stop accepting new tasks (non-blocking).
|
// Signal the RPC worker to stop accepting new tasks (non-blocking), and abort any call
|
||||||
|
// already in flight so the later join() doesn't wait out a request timeout.
|
||||||
// The actual thread join + rpc disconnect happen in shutdown() after
|
// The actual thread join + rpc disconnect happen in shutdown() after
|
||||||
// the render loop exits, so the UI stays responsive.
|
// the render loop exits, so the UI stays responsive.
|
||||||
|
if (rpc_) rpc_->requestAbort();
|
||||||
|
if (fast_rpc_) fast_rpc_->requestAbort();
|
||||||
if (worker_) {
|
if (worker_) {
|
||||||
worker_->requestStop();
|
worker_->requestStop();
|
||||||
}
|
}
|
||||||
@@ -3308,6 +3311,8 @@ void App::shutdown()
|
|||||||
DEBUG_LOGF("Synchronous shutdown fallback...\n");
|
DEBUG_LOGF("Synchronous shutdown fallback...\n");
|
||||||
async_tasks_.cancelAll();
|
async_tasks_.cancelAll();
|
||||||
async_tasks_.joinAll();
|
async_tasks_.joinAll();
|
||||||
|
if (rpc_) rpc_->requestAbort(); // unblock any in-flight curl before joining
|
||||||
|
if (fast_rpc_) fast_rpc_->requestAbort();
|
||||||
if (worker_) {
|
if (worker_) {
|
||||||
worker_->stop();
|
worker_->stop();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -507,7 +507,9 @@ void App::onDisconnected(const std::string& reason)
|
|||||||
address_validation_cache_dirty_ = true;
|
address_validation_cache_dirty_ = true;
|
||||||
resetTransactionHistoryCacheSession();
|
resetTransactionHistoryCacheSession();
|
||||||
|
|
||||||
// Tear down the fast-lane connection
|
// Tear down the fast-lane connection. Signal abort first so a fast-lane call blocked in
|
||||||
|
// curl_easy_perform unblocks and stop()'s join() returns promptly (no UI freeze).
|
||||||
|
if (fast_rpc_) fast_rpc_->requestAbort();
|
||||||
if (fast_worker_) {
|
if (fast_worker_) {
|
||||||
fast_worker_->stop();
|
fast_worker_->stop();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -88,6 +88,14 @@ static size_t WriteCallback(void* contents, size_t size, size_t nmemb, std::stri
|
|||||||
return totalSize;
|
return totalSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// curl progress callback: a non-zero return aborts the in-flight transfer. This lets a
|
||||||
|
// requestAbort() from another thread (disconnect/shutdown) unblock curl_easy_perform so the
|
||||||
|
// UI thread's worker join() returns promptly instead of waiting out the request timeout.
|
||||||
|
static int xferInfoCallback(void* clientp, curl_off_t, curl_off_t, curl_off_t, curl_off_t) {
|
||||||
|
const auto* self = static_cast<const RPCClient*>(clientp);
|
||||||
|
return (self != nullptr && self->abortRequested()) ? 1 : 0;
|
||||||
|
}
|
||||||
|
|
||||||
// Private implementation using libcurl
|
// Private implementation using libcurl
|
||||||
class RPCClient::Impl {
|
class RPCClient::Impl {
|
||||||
public:
|
public:
|
||||||
@@ -170,6 +178,11 @@ bool RPCClient::connect(const std::string& host, const std::string& port,
|
|||||||
curl_easy_setopt(impl_->curl, CURLOPT_URL, impl_->url.c_str());
|
curl_easy_setopt(impl_->curl, CURLOPT_URL, impl_->url.c_str());
|
||||||
curl_easy_setopt(impl_->curl, CURLOPT_HTTPHEADER, impl_->headers);
|
curl_easy_setopt(impl_->curl, CURLOPT_HTTPHEADER, impl_->headers);
|
||||||
curl_easy_setopt(impl_->curl, CURLOPT_WRITEFUNCTION, WriteCallback);
|
curl_easy_setopt(impl_->curl, CURLOPT_WRITEFUNCTION, WriteCallback);
|
||||||
|
// Progress callback so requestAbort() can unblock an in-flight curl_easy_perform.
|
||||||
|
clearAbort(); // a fresh connection must not start in the aborted state
|
||||||
|
curl_easy_setopt(impl_->curl, CURLOPT_NOPROGRESS, 0L);
|
||||||
|
curl_easy_setopt(impl_->curl, CURLOPT_XFERINFOFUNCTION, xferInfoCallback);
|
||||||
|
curl_easy_setopt(impl_->curl, CURLOPT_XFERINFODATA, this);
|
||||||
curl_easy_setopt(impl_->curl, CURLOPT_TIMEOUT, 30L);
|
curl_easy_setopt(impl_->curl, CURLOPT_TIMEOUT, 30L);
|
||||||
// Localhost fails fast if nothing is listening; a remote/TLS daemon needs a larger
|
// Localhost fails fast if nothing is listening; a remote/TLS daemon needs a larger
|
||||||
// budget for the TCP + TLS handshake over real network latency (1s would spuriously fail).
|
// budget for the TCP + TLS handshake over real network latency (1s would spuriously fail).
|
||||||
@@ -230,6 +243,18 @@ json RPCClient::getLastConnectInfo() const
|
|||||||
return last_connect_info_;
|
return last_connect_info_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RPCClient::requestAbort()
|
||||||
|
{
|
||||||
|
// Deliberately NOT taking curl_mutex_ — the whole point is to interrupt a call() that is
|
||||||
|
// currently holding it inside curl_easy_perform. The atomic is read by xferInfoCallback.
|
||||||
|
abort_.store(true, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
void RPCClient::clearAbort()
|
||||||
|
{
|
||||||
|
abort_.store(false, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
void RPCClient::disconnect()
|
void RPCClient::disconnect()
|
||||||
{
|
{
|
||||||
std::lock_guard<std::recursive_mutex> lk(curl_mutex_);
|
std::lock_guard<std::recursive_mutex> lk(curl_mutex_);
|
||||||
|
|||||||
@@ -5,6 +5,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "types.h"
|
#include "types.h"
|
||||||
|
#include <atomic>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
@@ -87,6 +88,18 @@ public:
|
|||||||
*/
|
*/
|
||||||
bool isConnected() const { return connected_; }
|
bool isConnected() const { return connected_; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Ask an in-flight call() to abort as soon as possible (thread-safe).
|
||||||
|
*
|
||||||
|
* Set from another thread (e.g. before stop()-ing the worker on disconnect/shutdown):
|
||||||
|
* a curl progress callback aborts the transfer, so a blocked curl_easy_perform returns
|
||||||
|
* promptly instead of freezing the UI thread's join() until the request timeout. Cleared
|
||||||
|
* on the next connect(); abortRequested() is read by the progress callback.
|
||||||
|
*/
|
||||||
|
void requestAbort();
|
||||||
|
void clearAbort();
|
||||||
|
bool abortRequested() const noexcept { return abort_.load(std::memory_order_relaxed); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief True if the last connect() succeeded but daemon returned a warmup error.
|
* @brief True if the last connect() succeeded but daemon returned a warmup error.
|
||||||
* The curl handle is valid and auth succeeded — RPC calls will throw warmup errors
|
* The curl handle is valid and auth succeeded — RPC calls will throw warmup errors
|
||||||
@@ -236,6 +249,7 @@ private:
|
|||||||
std::string port_;
|
std::string port_;
|
||||||
std::string auth_; // Base64 encoded "user:password"
|
std::string auth_; // Base64 encoded "user:password"
|
||||||
bool connected_ = false;
|
bool connected_ = false;
|
||||||
|
std::atomic<bool> abort_{false}; // set cross-thread to abort an in-flight transfer
|
||||||
bool warming_up_ = false;
|
bool warming_up_ = false;
|
||||||
std::string warmup_status_;
|
std::string warmup_status_;
|
||||||
std::string last_connect_error_;
|
std::string last_connect_error_;
|
||||||
|
|||||||
Reference in New Issue
Block a user