Files
ObsidianDragon/src/services/network_refresh_service.h
dan_s 229373e937 feat(wallet): persist history and surface pending sends
Add an encrypted SQLite transaction history cache with cached tip metadata and
per-address shielded scan progress so startup and full refreshes avoid
re-scanning every z-address while still invalidating on wallet/address/rescan
changes.

Improve wallet history loading by paging transparent transactions, preserving
cached shielded and sent rows, keeping recent/unconfirmed activity visible, and
classifying mining-address receives. Show z_sendmany opid sends immediately in
History and Overview, pin pending rows through refreshes, and apply optimistic
address/balance debits until opids resolve.

Add timestamped RPC console tracing by source/method without logging params or
results, reduce redundant refresh/RPC calls, and cache Explorer recent block
summaries in SQLite.

Expand focused tests for transaction cache encryption, scan-progress
persistence/invalidation, history preservation, operation-status parsing,
pending send visibility, and Explorer/RPC refresh behavior.
2026-05-05 03:22:14 -05:00

383 lines
17 KiB
C++

#pragma once
#include "data/wallet_state.h"
#include "refresh_scheduler.h"
#include "rpc/rpc_worker.h"
#include <nlohmann/json.hpp>
#include <array>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <ctime>
#include <optional>
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
namespace dragonx {
namespace services {
class NetworkRefreshService {
public:
using Timer = RefreshScheduler::Timer;
using Intervals = RefreshScheduler::Intervals;
class RefreshRpcGateway {
public:
virtual ~RefreshRpcGateway() = default;
virtual nlohmann::json call(const std::string& method,
const nlohmann::json& params) = 0;
};
enum class Job {
Core,
Addresses,
Transactions,
Mining,
Peers,
Price,
Encryption,
ConnectionInit,
Count
};
struct DispatchTicket {
Job job = Job::Core;
std::uint64_t generation = 0;
bool accepted = false;
};
struct JobStats {
std::uint64_t started = 0;
std::uint64_t finished = 0;
std::uint64_t skippedInFlight = 0;
std::uint64_t skippedQueuePressure = 0;
std::uint64_t staleCallbacks = 0;
std::size_t lastQueueDepth = 0;
};
struct EnqueueResult {
DispatchTicket ticket;
bool enqueued = false;
std::size_t queueDepth = 0;
};
struct ConnectionInfoResult {
bool ok = false;
std::optional<int> daemonVersion;
std::optional<int> protocolVersion;
std::optional<int> p2pPort;
std::optional<int> longestChain;
std::optional<int> notarized;
std::optional<int> blocks;
};
struct WalletEncryptionResult {
bool ok = false;
bool encrypted = false;
std::int64_t unlockedUntil = 0;
};
struct WarmupPollResult {
bool ready = false;
ConnectionInfoResult info;
std::string errorMessage;
};
struct ConnectionInitResult {
ConnectionInfoResult info;
WalletEncryptionResult encryption;
};
struct CoreRefreshResult {
bool balanceOk = false;
std::optional<double> shieldedBalance;
std::optional<double> transparentBalance;
std::optional<double> totalBalance;
bool blockchainOk = false;
std::optional<int> blocks;
std::optional<int> headers;
std::optional<std::string> bestBlockHash;
std::optional<double> verificationProgress;
std::optional<int> longestChain;
std::optional<int> notarized;
};
struct MiningRefreshResult {
std::optional<double> localHashrate;
bool miningOk = false;
std::optional<bool> generate;
std::optional<int> genproclimit;
std::optional<int> blocks;
std::optional<double> difficulty;
std::optional<double> networkHashrate;
std::optional<std::string> chain;
double daemonMemoryMb = 0.0;
};
struct PeerRefreshResult {
std::vector<PeerInfo> peers;
std::vector<BannedPeer> bannedPeers;
};
struct PriceRefreshResult {
MarketInfo market;
};
struct PriceHttpResponse {
bool transportOk = false;
long httpStatus = 0;
std::string body;
std::string transportError;
};
struct PriceHttpResult {
std::optional<PriceRefreshResult> price;
std::string errorMessage;
};
struct AddressRefreshResult {
std::vector<AddressInfo> shieldedAddresses;
std::vector<AddressInfo> transparentAddresses;
};
struct AddressRefreshSnapshot {
std::unordered_map<std::string, bool> shieldedSpendingKeys;
};
struct TransactionViewCacheEntry {
std::string from_address;
std::int64_t timestamp = 0;
int confirmations = 0;
struct Output {
std::string address;
double value = 0.0;
std::string memo;
};
std::vector<Output> outgoing_outputs;
};
using TransactionViewCache = std::unordered_map<std::string, TransactionViewCacheEntry>;
struct TransactionRefreshSnapshot {
std::vector<std::string> shieldedAddresses;
std::unordered_set<std::string> fullyEnrichedTxids;
TransactionViewCache viewTxCache;
std::unordered_set<std::string> sendTxids;
std::unordered_set<std::string> pendingOpids;
std::vector<TransactionInfo> previousTransactions;
std::set<std::string> miningAddresses;
std::unordered_map<std::string, int> shieldedScanHeights;
std::size_t shieldedScanStartIndex = 0;
std::size_t maxShieldedReceiveScans = 0;
};
struct TransactionRefreshResult {
std::vector<TransactionInfo> transactions;
int blockHeight = -1;
TransactionViewCache newViewTxEntries;
std::size_t nextShieldedScanStartIndex = 0;
std::size_t shieldedAddressesScanned = 0;
std::size_t shieldedAddressCount = 0;
std::unordered_map<std::string, int> shieldedScanHeights;
bool shieldedScanComplete = true;
};
struct OperationStatusPollResult {
std::vector<std::string> doneOpids;
std::vector<std::string> staleOpids;
std::vector<std::string> successTxids;
std::unordered_map<std::string, std::string> successTxidsByOpid;
std::vector<std::string> failureMessages;
bool anySuccess = false;
};
struct TransactionCacheUpdate {
TransactionViewCache& viewTxCache;
std::unordered_set<std::string>& sendTxids;
std::vector<TransactionInfo>& confirmedTxCache;
std::unordered_set<std::string>& confirmedTxIds;
int& confirmedCacheBlock;
int& lastTxBlockHeight;
};
static Intervals intervalsForPage(ui::NavPage page) { return RefreshScheduler::intervalsForPage(page); }
static ConnectionInfoResult parseConnectionInfoResult(const nlohmann::json& info);
static WalletEncryptionResult parseWalletEncryptionResult(const nlohmann::json& walletInfo);
static WarmupPollResult collectWarmupPollResult(RefreshRpcGateway& rpc);
static ConnectionInitResult collectConnectionInitResult(
RefreshRpcGateway& rpc,
const std::optional<ConnectionInfoResult>& prefetchedInfo = std::nullopt);
static CoreRefreshResult parseCoreRefreshResult(const nlohmann::json& totalBalance,
bool balanceOk,
const nlohmann::json& blockInfo,
bool blockOk);
static CoreRefreshResult collectCoreRefreshResult(RefreshRpcGateway& rpc);
static MiningRefreshResult parseMiningRefreshResult(const nlohmann::json& miningInfo,
bool miningOk,
const nlohmann::json& localHashrate,
bool hashrateOk,
double daemonMemoryMb);
static MiningRefreshResult collectMiningRefreshResult(RefreshRpcGateway& rpc,
double daemonMemoryMb,
bool includeSlowRefresh,
bool includeLocalHashrate = true);
static PeerRefreshResult parsePeerRefreshResult(const nlohmann::json& peers,
const nlohmann::json& bannedPeers);
static PeerRefreshResult collectPeerRefreshResult(RefreshRpcGateway& rpc);
static std::optional<PriceRefreshResult> parseCoinGeckoPriceResponse(const std::string& response,
std::time_t fetchedAt);
static PriceHttpResult parsePriceHttpResponse(const PriceHttpResponse& response,
std::time_t fetchedAt);
static AddressInfo buildShieldedAddressInfo(const std::string& address,
const nlohmann::json& validation,
bool validationSucceeded);
static AddressInfo buildTransparentAddressInfo(const std::string& address);
static std::vector<AddressInfo> parseTransparentAddressList(const nlohmann::json& addressList);
static void applyShieldedBalancesFromUnspent(std::vector<AddressInfo>& addresses,
const nlohmann::json& unspent);
static void applyTransparentBalancesFromUnspent(std::vector<AddressInfo>& addresses,
const nlohmann::json& unspent);
static AddressRefreshSnapshot buildAddressRefreshSnapshot(const WalletState& state);
static AddressRefreshResult collectAddressRefreshResult(
RefreshRpcGateway& rpc,
const AddressRefreshSnapshot& snapshot = {});
static TransactionRefreshSnapshot buildTransactionRefreshSnapshot(const WalletState& state,
const TransactionViewCache& viewTxCache,
const std::unordered_set<std::string>& sendTxids);
static void appendTransparentTransactions(std::vector<TransactionInfo>& transactions,
std::set<std::string>& knownTxids,
const nlohmann::json& result,
const std::set<std::string>& miningAddresses = {});
static void appendShieldedReceivedTransactions(std::vector<TransactionInfo>& transactions,
std::set<std::string>& knownTxids,
const std::string& address,
const nlohmann::json& received,
const std::set<std::string>& miningAddresses = {});
static TransactionViewCacheEntry parseViewTransactionCacheEntry(const nlohmann::json& viewTransaction);
static void appendViewTransactionOutputs(std::vector<TransactionInfo>& transactions,
const std::string& txid,
const TransactionViewCacheEntry& entry);
static void sortTransactionsNewestFirst(std::vector<TransactionInfo>& transactions);
static TransactionRefreshResult collectTransactionRefreshResult(RefreshRpcGateway& rpc,
const TransactionRefreshSnapshot& snapshot,
int currentBlockHeight,
int maxViewTransactionsPerCycle);
static TransactionRefreshResult collectRecentTransactionRefreshResult(
RefreshRpcGateway& rpc,
const TransactionRefreshSnapshot& snapshot,
int currentBlockHeight,
int pageSize = 100);
static OperationStatusPollResult parseOperationStatusPoll(const nlohmann::json& result,
const std::vector<std::string>& requestedOpids);
static void applyConnectionInfoResult(WalletState& state, const ConnectionInfoResult& result);
static void applyWalletEncryptionResult(WalletState& state, const WalletEncryptionResult& result);
static void applyConnectionInitResult(WalletState& state, const ConnectionInitResult& result);
static void applyCoreRefreshResult(WalletState& state,
const CoreRefreshResult& result,
std::time_t updatedAt);
static void applyMiningRefreshResult(WalletState& state,
const MiningRefreshResult& result,
std::time_t updatedAt);
static void applyPeerRefreshResult(WalletState& state,
PeerRefreshResult&& result,
std::time_t updatedAt);
static void markPriceRefreshStarted(WalletState& state);
static void applyPriceRefreshResult(WalletState& state,
const PriceRefreshResult& result,
std::chrono::steady_clock::time_point fetchedAt);
static void applyPriceRefreshFailure(WalletState& state,
const std::string& errorMessage);
static void applyAddressRefreshResult(WalletState& state,
AddressRefreshResult&& result);
static void applyTransactionRefreshResult(WalletState& state,
TransactionCacheUpdate cacheUpdate,
TransactionRefreshResult&& result,
std::time_t updatedAt);
void applyPage(ui::NavPage page) { scheduler_.applyPage(page); }
void setIntervals(Intervals intervals) { scheduler_.setIntervals(intervals); }
const Intervals& intervals() const { return scheduler_.intervals(); }
void tick(float deltaSeconds) { scheduler_.tick(deltaSeconds); }
bool isDue(Timer timer) const { return scheduler_.isDue(timer); }
bool consumeDue(Timer timer) { return scheduler_.consumeDue(timer); }
void reset(Timer timer) { scheduler_.reset(timer); }
void markDue(Timer timer) { scheduler_.markDue(timer); }
void setTimer(Timer timer, float seconds) { scheduler_.setTimer(timer, seconds); }
float timer(Timer timer) const { return scheduler_.timer(timer); }
float interval(Timer timer) const { return scheduler_.interval(timer); }
void markImmediateRefresh() { scheduler_.markImmediateRefresh(); }
void markWalletMutationRefresh() { scheduler_.markWalletMutationRefresh(); }
void resetTxAge() { scheduler_.resetTxAge(); }
bool shouldRefreshTransactions(int lastTxBlockHeight,
int currentBlockHeight,
bool transactionsDirty) const;
bool beginJob(Job job);
bool beginJob(Job job, std::size_t queuedWork, std::size_t maxQueuedWork);
void finishJob(Job job);
bool jobInProgress(Job job) const;
void resetJobs();
DispatchTicket beginDispatch(Job job, std::size_t queuedWork = 0, std::size_t maxQueuedWork = 0);
bool completeDispatch(const DispatchTicket& ticket);
void cancelDispatch(const DispatchTicket& ticket);
JobStats stats(Job job) const;
template <typename Worker, typename WorkFn>
EnqueueResult enqueue(Job job, Worker& worker, WorkFn&& work, std::size_t maxQueuedWork = 0)
{
std::size_t queueDepth = worker.pendingTaskCount();
auto ticket = beginDispatch(job, queueDepth, maxQueuedWork);
if (!ticket.accepted) return {ticket, false, queueDepth};
worker.post([this, ticket, work = std::forward<WorkFn>(work)]() mutable -> rpc::RPCWorker::MainCb {
rpc::RPCWorker::MainCb mainCallback;
try {
mainCallback = work();
} catch (...) {
mainCallback = nullptr;
}
return [this, ticket, mainCallback = std::move(mainCallback)]() mutable {
if (!completeDispatch(ticket)) return;
if (mainCallback) mainCallback();
};
});
return {ticket, true, queueDepth};
}
private:
std::atomic<bool>& jobFlag(Job job);
const std::atomic<bool>& jobFlag(Job job) const;
static std::size_t jobIndex(Job job);
RefreshScheduler scheduler_;
std::atomic<bool> coreInProgress_{false};
std::atomic<bool> addressesInProgress_{false};
std::atomic<bool> transactionsInProgress_{false};
std::atomic<bool> miningInProgress_{false};
std::atomic<bool> peersInProgress_{false};
std::atomic<bool> priceInProgress_{false};
std::atomic<bool> encryptionInProgress_{false};
std::atomic<bool> connectionInitInProgress_{false};
std::array<std::atomic<std::uint64_t>, static_cast<std::size_t>(Job::Count)> generations_{};
std::array<std::atomic<std::uint64_t>, static_cast<std::size_t>(Job::Count)> started_{};
std::array<std::atomic<std::uint64_t>, static_cast<std::size_t>(Job::Count)> finished_{};
std::array<std::atomic<std::uint64_t>, static_cast<std::size_t>(Job::Count)> skippedInFlight_{};
std::array<std::atomic<std::uint64_t>, static_cast<std::size_t>(Job::Count)> skippedQueuePressure_{};
std::array<std::atomic<std::uint64_t>, static_cast<std::size_t>(Job::Count)> staleCallbacks_{};
std::array<std::atomic<std::size_t>, static_cast<std::size_t>(Job::Count)> lastQueueDepth_{};
};
} // namespace services
} // namespace dragonx