Add opt-in bulk block streaming (-bulkblocksync)
A single getblockstrm request makes a peer stream a contiguous range of old blocks back-to-back as ordinary BLOCK messages, amortizing the per-block round-trip over the whole range instead of the MAX_BLOCKS_IN_TRANSIT_PER_PEER window. This targets the bandwidth-delay-product ceiling that dominates IBD from few/high-latency peers below the checkpoint. Design (off by default; negotiated via a NODE_BULKBLOCKS service bit; the default getdata IBD path is untouched when disabled): - protocol: NODE_BULKBLOCKS service bit + getblockstrm/blockstream messages. - requester: in SendMessages, after FindNextBlocksToDownload, when the first needed block is >= BULK_TIP_MARGIN (5000) below the network tip and the peer advertises the bit and we are in IBD, request a contiguous range (<=128 blocks) instead of per-block getdata; mark the range in-flight. - server: stream the range (caps 128 blocks / 8 MiB; reads outside cs_main; per-peer flood throttle), then a trailing blockstream header with the actual count sent. Self-suppresses while the server itself is in IBD. - received blocks ride the existing BLOCK -> ProcessNewBlock path (fully validated; checkpoints below 2.84M still apply); the trailing header reconciles partial deliveries and the range is freed on a 90s timeout, so a partial/withheld/refused batch falls back to the normal path (no leak, no permanent gap, no disconnect). In-flight tracking is by literal hash, so a reorg cannot orphan range entries. Hardened against the issues found in two adversarial review passes (drain vs timeout, partial reconciliation, ownership-guarded frees, one-shot header, reorg-proof helpers, cs_main hold). Validated end-to-end between two local v1.0.3 nodes (128/128 and partial serves; height advanced; no errors). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1448,6 +1448,11 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
|
|||||||
MAX_BLOCKS_IN_TRANSIT_PER_PEER = 4096;
|
MAX_BLOCKS_IN_TRANSIT_PER_PEER = 4096;
|
||||||
LogPrintf("Per-peer max blocks in transit: %d\n", MAX_BLOCKS_IN_TRANSIT_PER_PEER);
|
LogPrintf("Per-peer max blocks in transit: %d\n", MAX_BLOCKS_IN_TRANSIT_PER_PEER);
|
||||||
|
|
||||||
|
// Opt-in bulk block streaming (DragonX). Drives the requester branch in SendMessages and, when
|
||||||
|
// set, also advertises NODE_BULKBLOCKS below so we serve bulk ranges to peers. OFF by default.
|
||||||
|
fBulkBlockSync = GetBoolArg("-bulkblocksync", DEFAULT_BULKBLOCKSYNC);
|
||||||
|
LogPrintf("Bulk block streaming: %s\n", fBulkBlockSync ? "enabled" : "disabled");
|
||||||
|
|
||||||
fServer = GetBoolArg("-server", false);
|
fServer = GetBoolArg("-server", false);
|
||||||
//fprintf(stderr,"%s tik6\n", __FUNCTION__);
|
//fprintf(stderr,"%s tik6\n", __FUNCTION__);
|
||||||
|
|
||||||
@@ -2574,6 +2579,9 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
|
|||||||
nLocalServices |= NODE_ADDRINDEX;
|
nLocalServices |= NODE_ADDRINDEX;
|
||||||
if ( GetBoolArg("-spentindex", DEFAULT_SPENTINDEX) != 0 )
|
if ( GetBoolArg("-spentindex", DEFAULT_SPENTINDEX) != 0 )
|
||||||
nLocalServices |= NODE_SPENTINDEX;
|
nLocalServices |= NODE_SPENTINDEX;
|
||||||
|
// Advertise willingness to SERVE bulk block streams (full nodes only) when opted in.
|
||||||
|
if ( fBulkBlockSync )
|
||||||
|
nLocalServices |= NODE_BULKBLOCKS;
|
||||||
fprintf(stderr,"nLocalServices %llx %d, %d\n",(long long)nLocalServices,GetBoolArg("-addressindex", DEFAULT_ADDRESSINDEX),GetBoolArg("-spentindex", DEFAULT_SPENTINDEX));
|
fprintf(stderr,"nLocalServices %llx %d, %d\n",(long long)nLocalServices,GetBoolArg("-addressindex", DEFAULT_ADDRESSINDEX),GetBoolArg("-spentindex", DEFAULT_SPENTINDEX));
|
||||||
}
|
}
|
||||||
// ********************************************************* Step 10: import blocks
|
// ********************************************************* Step 10: import blocks
|
||||||
|
|||||||
238
src/main.cpp
238
src/main.cpp
@@ -91,6 +91,10 @@ CWaitableCriticalSection csBestBlock;
|
|||||||
CConditionVariable cvBlockChange;
|
CConditionVariable cvBlockChange;
|
||||||
int nScriptCheckThreads = 0;
|
int nScriptCheckThreads = 0;
|
||||||
int MAX_BLOCKS_IN_TRANSIT_PER_PEER = DEFAULT_MAX_BLOCKS_IN_TRANSIT_PER_PEER;
|
int MAX_BLOCKS_IN_TRANSIT_PER_PEER = DEFAULT_MAX_BLOCKS_IN_TRANSIT_PER_PEER;
|
||||||
|
bool fBulkBlockSync = DEFAULT_BULKBLOCKSYNC;
|
||||||
|
// Server-side flood throttle: minimum interval between bulk serves to the same peer (main.cpp-local
|
||||||
|
// since only the serve handler uses it; kept out of main.h to avoid a full-tree recompile).
|
||||||
|
static const int64_t BULK_MIN_SERVE_INTERVAL_US = 50000; // 50 ms => <= 20 bulk serves/s/peer
|
||||||
int nRandomXVerifyThreads = 0; // parallel RandomX pre-verification worker count (0 = inline only)
|
int nRandomXVerifyThreads = 0; // parallel RandomX pre-verification worker count (0 = inline only)
|
||||||
bool fExperimentalMode = true;
|
bool fExperimentalMode = true;
|
||||||
bool fImporting = false;
|
bool fImporting = false;
|
||||||
@@ -250,6 +254,7 @@ namespace {
|
|||||||
int64_t nTime; //! Time of "getdata" request in microseconds.
|
int64_t nTime; //! Time of "getdata" request in microseconds.
|
||||||
bool fValidatedHeaders; //! Whether this block has validated headers at the time of request.
|
bool fValidatedHeaders; //! Whether this block has validated headers at the time of request.
|
||||||
int64_t nTimeDisconnect; //! The timeout for this block request (for disconnecting a slow peer)
|
int64_t nTimeDisconnect; //! The timeout for this block request (for disconnecting a slow peer)
|
||||||
|
bool fBulk; //! Requested as part of a bulk stream range (exempt from the front() stall-disconnect).
|
||||||
};
|
};
|
||||||
map<uint256, pair<NodeId, list<QueuedBlock>::iterator> > mapBlocksInFlight;
|
map<uint256, pair<NodeId, list<QueuedBlock>::iterator> > mapBlocksInFlight;
|
||||||
|
|
||||||
@@ -309,6 +314,21 @@ namespace {
|
|||||||
int nBlocksInFlightValidHeaders;
|
int nBlocksInFlightValidHeaders;
|
||||||
//! Whether we consider this a preferred download peer.
|
//! Whether we consider this a preferred download peer.
|
||||||
bool fPreferredDownload;
|
bool fPreferredDownload;
|
||||||
|
//! Opt-in bulk block streaming (DragonX): whether a bulk range request is outstanding to this peer.
|
||||||
|
bool fBulkInFlight;
|
||||||
|
//! Time (us) the outstanding bulk request was issued, for the response timeout/fallback.
|
||||||
|
int64_t nBulkSince;
|
||||||
|
//! Height of the first block in the outstanding bulk range.
|
||||||
|
int nBulkRangeStart;
|
||||||
|
//! Number of blocks requested in the outstanding bulk range.
|
||||||
|
int nBulkRangeCount;
|
||||||
|
//! Hash of the first block of the outstanding bulk range (request identity; the server echoes it
|
||||||
|
//! in the BLOCKSTREAM header so a stale/duplicate header for an old request can be ignored).
|
||||||
|
uint256 nBulkHashStart;
|
||||||
|
//! Whether the (one-shot) trailing BLOCKSTREAM header for the outstanding request was processed.
|
||||||
|
bool fBulkHeaderSeen;
|
||||||
|
//! (server side) time (us) we last served a bulk stream to this peer, for flood throttling.
|
||||||
|
int64_t nLastBulkServeTime;
|
||||||
|
|
||||||
CNodeState() {
|
CNodeState() {
|
||||||
fCurrentlyConnected = false;
|
fCurrentlyConnected = false;
|
||||||
@@ -322,6 +342,13 @@ namespace {
|
|||||||
nBlocksInFlight = 0;
|
nBlocksInFlight = 0;
|
||||||
nBlocksInFlightValidHeaders = 0;
|
nBlocksInFlightValidHeaders = 0;
|
||||||
fPreferredDownload = false;
|
fPreferredDownload = false;
|
||||||
|
fBulkInFlight = false;
|
||||||
|
nBulkSince = 0;
|
||||||
|
nBulkRangeStart = 0;
|
||||||
|
nBulkRangeCount = 0;
|
||||||
|
nBulkHashStart.SetNull();
|
||||||
|
fBulkHeaderSeen = false;
|
||||||
|
nLastBulkServeTime = 0;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -416,7 +443,7 @@ namespace {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Requires cs_main.
|
// Requires cs_main.
|
||||||
void MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const Consensus::Params& consensusParams, CBlockIndex *pindex = NULL) {
|
void MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const Consensus::Params& consensusParams, CBlockIndex *pindex = NULL, bool fBulk = false) {
|
||||||
CNodeState *state = State(nodeid);
|
CNodeState *state = State(nodeid);
|
||||||
assert(state != NULL);
|
assert(state != NULL);
|
||||||
|
|
||||||
@@ -424,7 +451,7 @@ namespace {
|
|||||||
MarkBlockAsReceived(hash);
|
MarkBlockAsReceived(hash);
|
||||||
|
|
||||||
int64_t nNow = GetTimeMicros();
|
int64_t nNow = GetTimeMicros();
|
||||||
QueuedBlock newentry = {hash, pindex, nNow, pindex != NULL, GetBlockTimeout(nNow, nQueuedValidatedHeaders, consensusParams)};
|
QueuedBlock newentry = {hash, pindex, nNow, pindex != NULL, GetBlockTimeout(nNow, nQueuedValidatedHeaders, consensusParams), fBulk};
|
||||||
nQueuedValidatedHeaders += newentry.fValidatedHeaders;
|
nQueuedValidatedHeaders += newentry.fValidatedHeaders;
|
||||||
list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), newentry);
|
list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), newentry);
|
||||||
state->nBlocksInFlight++;
|
state->nBlocksInFlight++;
|
||||||
@@ -432,6 +459,36 @@ namespace {
|
|||||||
mapBlocksInFlight[hash] = std::make_pair(nodeid, it);
|
mapBlocksInFlight[hash] = std::make_pair(nodeid, it);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Opt-in bulk block streaming (DragonX): free this peer's still-in-flight bulk blocks whose height
|
||||||
|
// falls in [hStart, hEnd), so the normal per-block path re-fetches them. We scan the peer's OWN
|
||||||
|
// vBlocksInFlight by the LITERAL hash marked at request time (via the stored pindex) rather than
|
||||||
|
// re-deriving hashes from the mutable pindexBestKnownBlock - the latter would miss the real entries
|
||||||
|
// after a reorg (leaking in-flight slots) and can never touch another peer's blocks. Requires cs_main.
|
||||||
|
void FreeBulkRangeInFlight(CNodeState* state, int hStart, int hEnd) {
|
||||||
|
if (state == NULL) return;
|
||||||
|
std::vector<uint256> toFree; // collect first: MarkBlockAsReceived erases from vBlocksInFlight
|
||||||
|
BOOST_FOREACH(const QueuedBlock& q, state->vBlocksInFlight) {
|
||||||
|
if (q.fBulk && q.pindex != NULL) {
|
||||||
|
int h = q.pindex->GetHeight();
|
||||||
|
if (h >= hStart && h < hEnd) toFree.push_back(q.hash);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
BOOST_FOREACH(const uint256& hh, toFree)
|
||||||
|
MarkBlockAsReceived(hh);
|
||||||
|
}
|
||||||
|
// True if any of this peer's bulk blocks with height in [hStart, hEnd) is still in flight (range not
|
||||||
|
// fully drained). Completion is decided by the RANGE draining, not the global per-peer window count.
|
||||||
|
bool BulkRangeInFlight(CNodeState* state, int hStart, int hEnd) {
|
||||||
|
if (state == NULL) return false;
|
||||||
|
BOOST_FOREACH(const QueuedBlock& q, state->vBlocksInFlight) {
|
||||||
|
if (q.fBulk && q.pindex != NULL) {
|
||||||
|
int h = q.pindex->GetHeight();
|
||||||
|
if (h >= hStart && h < hEnd) return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/** Check whether the last unknown block a peer advertized is not yet known. */
|
/** Check whether the last unknown block a peer advertized is not yet known. */
|
||||||
void ProcessBlockAvailability(NodeId nodeid) {
|
void ProcessBlockAvailability(NodeId nodeid) {
|
||||||
CNodeState *state = State(nodeid);
|
CNodeState *state = State(nodeid);
|
||||||
@@ -7794,6 +7851,118 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||||||
}
|
}
|
||||||
|
|
||||||
CheckBlockIndex();
|
CheckBlockIndex();
|
||||||
|
} else if (strCommand == NetMsgType::GETBLOCKSTREAM) {
|
||||||
|
// Opt-in bulk block streaming (DragonX): a peer asks us to stream a contiguous range of
|
||||||
|
// old blocks as back-to-back BLOCK messages. We only honor it if we advertised the bit
|
||||||
|
// (i.e. were started with -bulkblocksync) and we are not mid-import/reindex.
|
||||||
|
if ((nLocalServices & NODE_BULKBLOCKS) == 0 || fImporting || fReindex)
|
||||||
|
return true;
|
||||||
|
uint256 hashStart; int32_t nStartHeight; uint16_t nCount;
|
||||||
|
vRecv >> hashStart >> nStartHeight >> nCount;
|
||||||
|
|
||||||
|
// Resolve the range under cs_main (cheap, no disk I/O), then read + stream the blocks WITHOUT
|
||||||
|
// holding the lock, so a 128-block / 8 MiB serve never holds cs_main across disk reads (the
|
||||||
|
// analogous ProcessGetData caps per-pass work precisely because it reads under cs_main).
|
||||||
|
std::vector<CBlockIndex*> vSend;
|
||||||
|
int firstH = -1;
|
||||||
|
bool refuse = false;
|
||||||
|
{
|
||||||
|
LOCK(cs_main);
|
||||||
|
if (nCount == 0 || nCount > BULK_MAX_BLOCKS_PER_REQUEST) {
|
||||||
|
Misbehaving(pfrom->GetId(), 20); // mirrors the getdata MAX_INV_SZ penalty
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// Light flood throttle: at most one bulk serve per peer per BULK_MIN_SERVE_INTERVAL_US. On
|
||||||
|
// throttle, send a refusal header so the requester falls back immediately (not after 90s).
|
||||||
|
int64_t nNowServe = GetTimeMicros();
|
||||||
|
CNodeState* sst = State(pfrom->GetId());
|
||||||
|
if (sst != NULL && sst->nLastBulkServeTime > nNowServe - BULK_MIN_SERVE_INTERVAL_US) {
|
||||||
|
pfrom->PushMessage(NetMsgType::BLOCKSTREAM, hashStart, (int32_t)-1, (uint16_t)0);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (sst != NULL) sst->nLastBulkServeTime = nNowServe;
|
||||||
|
|
||||||
|
BlockMap::iterator mi = mapBlockIndex.find(hashStart);
|
||||||
|
// Don't flood old blocks while WE are still syncing (unless allowlisted); only serve blocks
|
||||||
|
// on our active chain at the height the requester expects (nStartHeight, tamper-checked).
|
||||||
|
if ((IsInitialBlockDownload() && !pfrom->fAllowlisted) ||
|
||||||
|
mi == mapBlockIndex.end() || !chainActive.Contains(mi->second) ||
|
||||||
|
mi->second->GetHeight() != nStartHeight) {
|
||||||
|
refuse = true;
|
||||||
|
} else {
|
||||||
|
CBlockIndex* pindex = mi->second;
|
||||||
|
firstH = pindex->GetHeight();
|
||||||
|
for (uint16_t i = 0; i < nCount && pindex != NULL; i++, pindex = chainActive.Next(pindex)) {
|
||||||
|
if ((pindex->nStatus & BLOCK_HAVE_DATA) == 0) break; // pruned/missing
|
||||||
|
vSend.push_back(pindex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (refuse) {
|
||||||
|
pfrom->PushMessage(NetMsgType::BLOCKSTREAM, hashStart, (int32_t)-1, (uint16_t)0);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// Read from disk + stream OUTSIDE cs_main. CBlockIndex pointers are stable and block files are
|
||||||
|
// append-only, so reading by pindex without the lock is safe (a concurrent reorg cannot delete
|
||||||
|
// block data, and the requester validates every block against its own headers regardless).
|
||||||
|
uint16_t nSent = 0;
|
||||||
|
size_t cumBytes = 0;
|
||||||
|
BOOST_FOREACH(CBlockIndex* pb, vSend) {
|
||||||
|
if (pfrom->nSendSize >= SendBufferSize()) break; // send-buffer backpressure
|
||||||
|
boost::this_thread::interruption_point();
|
||||||
|
CBlock block;
|
||||||
|
if (!ReadBlockFromDisk(block, pb, 1)) break; // graceful, never assert
|
||||||
|
size_t sz = GetSerializeSize(block, SER_NETWORK, PROTOCOL_VERSION);
|
||||||
|
if (nSent > 0 && cumBytes + sz > BULK_MAX_RESPONSE_BYTES) break; // total byte cap
|
||||||
|
cumBytes += sz;
|
||||||
|
pfrom->PushMessage(NetMsgType::BLOCK, block);
|
||||||
|
nSent++;
|
||||||
|
}
|
||||||
|
// Trailing control header carries the ACTUAL count sent (authoritative), so the requester can
|
||||||
|
// free any undelivered tail immediately rather than waiting for the bulk response timeout.
|
||||||
|
pfrom->PushMessage(NetMsgType::BLOCKSTREAM, hashStart, (int32_t)firstH, nSent);
|
||||||
|
LogPrint("net", "Bulk stream serve: %u/%u blocks from height %d (%lu bytes) peer=%d\n",
|
||||||
|
(unsigned)nSent, (unsigned)nCount, firstH, (unsigned long)cumBytes, pfrom->id);
|
||||||
|
return true;
|
||||||
|
} else if (strCommand == NetMsgType::BLOCKSTREAM) {
|
||||||
|
// Opt-in bulk block streaming (DragonX): the trailing control header for a streamed range. The
|
||||||
|
// blocks themselves arrive as ordinary BLOCK messages (handled below); this reconciles what the
|
||||||
|
// peer actually delivered so the undelivered tail (or a refusal) falls back at once instead of
|
||||||
|
// waiting for the bulk timeout. Service bits are unauthenticated, so we ignore anything that
|
||||||
|
// doesn't match our exact outstanding request.
|
||||||
|
uint256 hashStart; int32_t nFirstHeight; uint16_t nBlocks;
|
||||||
|
vRecv >> hashStart >> nFirstHeight >> nBlocks;
|
||||||
|
|
||||||
|
LOCK(cs_main);
|
||||||
|
CNodeState* state = State(pfrom->GetId());
|
||||||
|
if (state == NULL || !state->fBulkInFlight)
|
||||||
|
return true; // nothing outstanding
|
||||||
|
if (hashStart != state->nBulkHashStart)
|
||||||
|
return true; // header for a different/stale request; ignore
|
||||||
|
if (state->fBulkHeaderSeen)
|
||||||
|
return true; // one-shot: already reconciled this request
|
||||||
|
state->fBulkHeaderSeen = true;
|
||||||
|
|
||||||
|
// nBlocks==0 (refusal) or an over-count => free our whole outstanding range and fall back.
|
||||||
|
// 0 < nBlocks <= count => the peer commits to that many; free only the undelivered tail now.
|
||||||
|
// FreeBulkRangeInFlight scans THIS peer's vBlocksInFlight by literal hash, so it only ever frees
|
||||||
|
// heights still genuinely in flight to this peer (no cross-peer effect, reorg-proof).
|
||||||
|
bool refuse = (nBlocks == 0 || nBlocks > state->nBulkRangeCount);
|
||||||
|
int deliver = refuse ? 0 : (int)nBlocks;
|
||||||
|
FreeBulkRangeInFlight(state, state->nBulkRangeStart + deliver,
|
||||||
|
state->nBulkRangeStart + state->nBulkRangeCount);
|
||||||
|
if (refuse) {
|
||||||
|
state->fBulkInFlight = false;
|
||||||
|
pfrom->nServices &= ~(uint64_t)NODE_BULKBLOCKS; // local hint: don't retry bulk on this peer
|
||||||
|
LogPrint("net", "Bulk stream refused by peer=%d (nBlocks=%u), falling back\n", pfrom->id, (unsigned)nBlocks);
|
||||||
|
} else {
|
||||||
|
// Track only what was promised; fBulkInFlight clears once that prefix fully drains
|
||||||
|
// (range-drain check in SendMessages) or via the timeout fallback.
|
||||||
|
state->nBulkRangeCount = deliver;
|
||||||
|
if (deliver == 0)
|
||||||
|
state->fBulkInFlight = false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
} else if (strCommand == NetMsgType::BLOCK && !fImporting && !fReindex) // Ignore blocks received while importing
|
} else if (strCommand == NetMsgType::BLOCK && !fImporting && !fReindex) // Ignore blocks received while importing
|
||||||
{
|
{
|
||||||
CBlock block;
|
CBlock block;
|
||||||
@@ -8239,25 +8408,86 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
|
|||||||
LogPrint("net", "Reducing block download timeout for peer=%d block=%s, orig=%d new=%d\n", pto->id, queuedBlock.hash.ToString(), queuedBlock.nTimeDisconnect, nTimeoutIfRequestedNow);
|
LogPrint("net", "Reducing block download timeout for peer=%d block=%s, orig=%d new=%d\n", pto->id, queuedBlock.hash.ToString(), queuedBlock.nTimeDisconnect, nTimeoutIfRequestedNow);
|
||||||
queuedBlock.nTimeDisconnect = nTimeoutIfRequestedNow;
|
queuedBlock.nTimeDisconnect = nTimeoutIfRequestedNow;
|
||||||
}
|
}
|
||||||
if (queuedBlock.nTimeDisconnect < nNow) {
|
if (queuedBlock.nTimeDisconnect < nNow && !queuedBlock.fBulk) {
|
||||||
|
// Bulk-stream blocks are exempt: a 128-block batch shares one request time, so the
|
||||||
|
// front() entry could expire before the tail streams in. The bulk response timeout
|
||||||
|
// below frees the range without disconnecting instead.
|
||||||
LogPrintf("Timeout downloading block %s from peer=%d, disconnecting\n", queuedBlock.hash.ToString(), pto->id);
|
LogPrintf("Timeout downloading block %s from peer=%d, disconnecting\n", queuedBlock.hash.ToString(), pto->id);
|
||||||
pto->fDisconnect = true;
|
pto->fDisconnect = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Opt-in bulk block streaming (DragonX): manage the outstanding bulk range, then (below)
|
||||||
|
// possibly issue a new one. Clearing fBulkInFlight once the batch has drained below the
|
||||||
|
// normal window re-enables the next bulk request; a never-fully-delivered batch is freed
|
||||||
|
// after BULK_RESPONSE_TIMEOUT_US so the normal per-block path re-fetches it (no disconnect).
|
||||||
|
if (state.fBulkInFlight) {
|
||||||
|
int hEnd = state.nBulkRangeStart + state.nBulkRangeCount;
|
||||||
|
if (!BulkRangeInFlight(&state, state.nBulkRangeStart, hEnd)) {
|
||||||
|
// Whole (possibly shrunk) range received -> done. Completion is keyed on the RANGE
|
||||||
|
// draining, NOT on the global in-flight count crossing the window, so a partially
|
||||||
|
// delivered batch can never leave undelivered heights stuck in-flight.
|
||||||
|
state.fBulkInFlight = false;
|
||||||
|
} else if (state.nBulkSince > 0 && state.nBulkSince < nNow - BULK_RESPONSE_TIMEOUT_US) {
|
||||||
|
// Promised blocks never fully arrived: free the still-in-flight remainder (the normal
|
||||||
|
// per-block path re-fetches it), give up bulk on this unresponsive peer. No disconnect.
|
||||||
|
FreeBulkRangeInFlight(&state, state.nBulkRangeStart, hEnd);
|
||||||
|
state.fBulkInFlight = false;
|
||||||
|
pto->nServices &= ~(uint64_t)NODE_BULKBLOCKS;
|
||||||
|
LogPrint("net", "Bulk stream timeout peer=%d, freed range [%d,%d)\n",
|
||||||
|
pto->id, state.nBulkRangeStart, hEnd);
|
||||||
|
}
|
||||||
|
}
|
||||||
// Message: getdata (blocks)
|
// Message: getdata (blocks)
|
||||||
static uint256 zero;
|
static uint256 zero;
|
||||||
vector<CInv> vGetData;
|
vector<CInv> vGetData;
|
||||||
if (!pto->fDisconnect && !pto->fClient && (fFetch || !IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
if (!pto->fDisconnect && !pto->fClient && (fFetch || !IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER && !state.fBulkInFlight) {
|
||||||
vector<CBlockIndex*> vToDownload;
|
vector<CBlockIndex*> vToDownload;
|
||||||
NodeId staller = -1;
|
NodeId staller = -1;
|
||||||
CBlockIndex *pFrontierStuck = NULL;
|
CBlockIndex *pFrontierStuck = NULL;
|
||||||
FindNextBlocksToDownload(pto->GetId(), MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller, &pFrontierStuck);
|
FindNextBlocksToDownload(pto->GetId(), MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller, &pFrontierStuck);
|
||||||
|
|
||||||
|
// Opt-in bulk block streaming (DragonX): if the first block we need is in the deep,
|
||||||
|
// stable region (>= BULK_TIP_MARGIN below the NETWORK tip) and the peer advertised the
|
||||||
|
// capability, request a whole contiguous range in one shot instead of per-block getdata.
|
||||||
|
// FindNextBlocksToDownload already advanced the cursor past what we have, so
|
||||||
|
// vToDownload.front() is the correct, cursor-managed starting point.
|
||||||
|
bool didBulk = false;
|
||||||
|
if (fBulkBlockSync && (pto->nServices & NODE_BULKBLOCKS) && IsInitialBlockDownload()
|
||||||
|
&& !vToDownload.empty() && state.pindexBestKnownBlock != NULL) {
|
||||||
|
CBlockIndex* pfirst = vToDownload.front();
|
||||||
|
int cursorH = pfirst->GetHeight();
|
||||||
|
int maxH = state.pindexBestKnownBlock->GetHeight() - BULK_TIP_MARGIN;
|
||||||
|
if (cursorH <= maxH) {
|
||||||
|
int want = std::min(maxH - cursorH + 1, (int)BULK_MAX_BLOCKS_PER_REQUEST);
|
||||||
|
uint16_t n = 0;
|
||||||
|
for (int i = 0; i < want; i++) {
|
||||||
|
CBlockIndex* pb = state.pindexBestKnownBlock->GetAncestor(cursorH + i);
|
||||||
|
if (pb == NULL || mapBlocksInFlight.count(pb->GetBlockHash())) break;
|
||||||
|
MarkBlockAsInFlight(pto->GetId(), pb->GetBlockHash(), consensusParams, pb, true);
|
||||||
|
n++;
|
||||||
|
}
|
||||||
|
if (n > 0) {
|
||||||
|
pto->PushMessage(NetMsgType::GETBLOCKSTREAM, pfirst->GetBlockHash(), (int32_t)cursorH, n);
|
||||||
|
state.fBulkInFlight = true;
|
||||||
|
state.nBulkSince = nNow;
|
||||||
|
state.nBulkRangeStart = cursorH;
|
||||||
|
state.nBulkRangeCount = n;
|
||||||
|
state.nBulkHashStart = pfirst->GetBlockHash(); // request identity (matched in BLOCKSTREAM)
|
||||||
|
state.fBulkHeaderSeen = false; // arm the one-shot header reconciliation
|
||||||
|
didBulk = true;
|
||||||
|
LogPrint("net", "Requesting bulk stream [%d..%d] (%u blocks) peer=%d\n",
|
||||||
|
cursorH, cursorH + n - 1, (unsigned)n, pto->id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!didBulk) {
|
||||||
BOOST_FOREACH(CBlockIndex *pindex, vToDownload) {
|
BOOST_FOREACH(CBlockIndex *pindex, vToDownload) {
|
||||||
vGetData.push_back(CInv(MSG_BLOCK, pindex->GetBlockHash()));
|
vGetData.push_back(CInv(MSG_BLOCK, pindex->GetBlockHash()));
|
||||||
MarkBlockAsInFlight(pto->GetId(), pindex->GetBlockHash(), consensusParams, pindex);
|
MarkBlockAsInFlight(pto->GetId(), pindex->GetBlockHash(), consensusParams, pindex);
|
||||||
LogPrint("net", "Requesting block %s (%d) peer=%d\n", pindex->GetBlockHash().ToString(), pindex->GetHeight(), pto->id);
|
LogPrint("net", "Requesting block %s (%d) peer=%d\n", pindex->GetBlockHash().ToString(), pindex->GetHeight(), pto->id);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// Frontier reassignment: when this peer has nothing new to fetch because the next-needed
|
// Frontier reassignment: when this peer has nothing new to fetch because the next-needed
|
||||||
// (frontier) block is in flight from another, slow peer and has been stuck beyond a short
|
// (frontier) block is in flight from another, slow peer and has been stuck beyond a short
|
||||||
// threshold, re-request it from THIS (responsive) peer instead of waiting out the long
|
// threshold, re-request it from THIS (responsive) peer instead of waiting out the long
|
||||||
|
|||||||
16
src/main.h
16
src/main.h
@@ -102,6 +102,22 @@ static const int DEFAULT_SCRIPTCHECK_THREADS = 0;
|
|||||||
* ceiling at negligible bandwidth cost. */
|
* ceiling at negligible bandwidth cost. */
|
||||||
static const int DEFAULT_MAX_BLOCKS_IN_TRANSIT_PER_PEER = 16;
|
static const int DEFAULT_MAX_BLOCKS_IN_TRANSIT_PER_PEER = 16;
|
||||||
extern int MAX_BLOCKS_IN_TRANSIT_PER_PEER;
|
extern int MAX_BLOCKS_IN_TRANSIT_PER_PEER;
|
||||||
|
/** Opt-in bulk block streaming (DragonX, -bulkblocksync). A single GETBLOCKSTREAM request makes a
|
||||||
|
* peer stream a contiguous range of old blocks as back-to-back BLOCK messages, amortizing the
|
||||||
|
* per-block round-trip over the whole range instead of the MAX_BLOCKS_IN_TRANSIT_PER_PEER window.
|
||||||
|
* OFF by default; negotiated via NODE_BULKBLOCKS; only used during IBD for blocks more than
|
||||||
|
* BULK_TIP_MARGIN below the active tip; never alters the default getdata path. */
|
||||||
|
static const bool DEFAULT_BULKBLOCKSYNC = false;
|
||||||
|
extern bool fBulkBlockSync;
|
||||||
|
/** Only bulk-stream blocks at least this far below the active tip (near-tip uses the normal path). */
|
||||||
|
static const int BULK_TIP_MARGIN = 5000;
|
||||||
|
/** Hard DoS cap: max blocks a single GETBLOCKSTREAM may request/serve. */
|
||||||
|
static const uint16_t BULK_MAX_BLOCKS_PER_REQUEST = 128;
|
||||||
|
/** Hard DoS cap: max total bytes streamed in response to one GETBLOCKSTREAM. */
|
||||||
|
static const size_t BULK_MAX_RESPONSE_BYTES = 8 * 1024 * 1024;
|
||||||
|
/** Requester fallback: if a promised bulk range doesn't fully arrive within this many microseconds,
|
||||||
|
* free the in-flight range so the normal per-block path re-fetches it. */
|
||||||
|
static const int64_t BULK_RESPONSE_TIMEOUT_US = 90 * 1000000LL;
|
||||||
/** Timeout in seconds during which a peer must stall block download progress before being disconnected. */
|
/** Timeout in seconds during which a peer must stall block download progress before being disconnected. */
|
||||||
static const unsigned int BLOCK_STALLING_TIMEOUT = 2;
|
static const unsigned int BLOCK_STALLING_TIMEOUT = 2;
|
||||||
/** Number of headers sent in one getheaders result. We rely on the assumption that if a peer sends
|
/** Number of headers sent in one getheaders result. We rely on the assumption that if a peer sends
|
||||||
|
|||||||
@@ -75,6 +75,8 @@ const char *GETNSPV="getnSPV"; //used
|
|||||||
const char *NSPV="nSPV"; //used
|
const char *NSPV="nSPV"; //used
|
||||||
const char *ALERT="alert"; //used
|
const char *ALERT="alert"; //used
|
||||||
const char *REJECT="reject"; //used
|
const char *REJECT="reject"; //used
|
||||||
|
const char *GETBLOCKSTREAM="getblockstrm"; // 12 chars (COMMAND_SIZE max); "getblockstream" would truncate
|
||||||
|
const char *BLOCKSTREAM="blockstream";
|
||||||
} // namespace NetMsgType
|
} // namespace NetMsgType
|
||||||
|
|
||||||
/** All known message types. Keep this in the same order as the list of
|
/** All known message types. Keep this in the same order as the list of
|
||||||
@@ -119,6 +121,8 @@ const static std::string allNetMessageTypes[] = {
|
|||||||
NetMsgType::NSPV,
|
NetMsgType::NSPV,
|
||||||
NetMsgType::ALERT,
|
NetMsgType::ALERT,
|
||||||
NetMsgType::REJECT,
|
NetMsgType::REJECT,
|
||||||
|
NetMsgType::GETBLOCKSTREAM,
|
||||||
|
NetMsgType::BLOCKSTREAM,
|
||||||
};
|
};
|
||||||
|
|
||||||
CMessageHeader::CMessageHeader(const MessageStartChars& pchMessageStartIn)
|
CMessageHeader::CMessageHeader(const MessageStartChars& pchMessageStartIn)
|
||||||
|
|||||||
@@ -285,6 +285,10 @@ extern const char* GETNSPV;
|
|||||||
extern const char* NSPV;
|
extern const char* NSPV;
|
||||||
extern const char* ALERT;
|
extern const char* ALERT;
|
||||||
extern const char* REJECT;
|
extern const char* REJECT;
|
||||||
|
/** Opt-in bulk block streaming (DragonX): request a contiguous range of old blocks. */
|
||||||
|
extern const char* GETBLOCKSTREAM;
|
||||||
|
/** Opt-in bulk block streaming (DragonX): control header preceding a streamed block range. */
|
||||||
|
extern const char* BLOCKSTREAM;
|
||||||
}; // namespace NetMsgType
|
}; // namespace NetMsgType
|
||||||
|
|
||||||
/* Get a vector of all valid message types (see above) */
|
/* Get a vector of all valid message types (see above) */
|
||||||
@@ -304,6 +308,9 @@ enum ServiceFlags : uint64_t {
|
|||||||
NODE_NSPV = (1 << 30),
|
NODE_NSPV = (1 << 30),
|
||||||
NODE_ADDRINDEX = (1 << 29),
|
NODE_ADDRINDEX = (1 << 29),
|
||||||
NODE_SPENTINDEX = (1 << 28),
|
NODE_SPENTINDEX = (1 << 28),
|
||||||
|
// Opt-in bulk block streaming (DragonX). Unauthenticated advertisement; serve/request
|
||||||
|
// handlers validate every block regardless, so robustness against false advertisement holds.
|
||||||
|
NODE_BULKBLOCKS = (1 << 27),
|
||||||
|
|
||||||
// Bits 24-31 are reserved for temporary experiments. Just pick a bit that
|
// Bits 24-31 are reserved for temporary experiments. Just pick a bit that
|
||||||
// isn't getting used, or one not being used much, and notify the
|
// isn't getting used, or one not being used much, and notify the
|
||||||
|
|||||||
Reference in New Issue
Block a user