asmap bucketing ported from Komodo

This commit is contained in:
miodragpop
2020-09-30 14:28:55 +02:00
parent 9a3e68a832
commit 56d9c00749
19 changed files with 1562 additions and 119 deletions

View File

@@ -400,8 +400,6 @@ CNode* FindNode(const CService& addr)
return NULL;
}
CNode* ConnectNode(CAddress addrConnect, const char *pszDest)
{
if (pszDest == NULL) {
@@ -515,11 +513,11 @@ void CNode::CloseSocketDisconnect()
{
LOCK(cs_hSocket);
if (hSocket != INVALID_SOCKET)
{
if (hSocket != INVALID_SOCKET)
{
try
{
LogPrint("net", "disconnecting peer=%d\n", id);
LogPrint("net", "disconnecting peer=%d\n", id);
}
catch(std::bad_alloc&)
{
@@ -535,8 +533,8 @@ void CNode::CloseSocketDisconnect()
SSL_free(ssl);
ssl = NULL;
}
CloseSocket(hSocket);
}
CloseSocket(hSocket);
}
}
// in case this fails, we'll empty the recv buffer when the CNode is deleted
@@ -668,10 +666,13 @@ void CNode::AddWhitelistedRange(const CSubNet &subnet) {
vWhitelistedRange.push_back(subnet);
}
void CNode::copyStats(CNodeStats &stats)
void CNode::copyStats(CNodeStats &stats, const std::vector<bool> &m_asmap)
{
stats.nodeid = this->GetId();
stats.nServices = nServices;
stats.addr = addr;
// stats.addrBind = addrBind;
stats.m_mapped_as = addr.GetMappedAS(m_asmap);
stats.nLastSend = nLastSend;
stats.nLastRecv = nLastRecv;
stats.nTimeConnected = nTimeConnected;
@@ -855,15 +856,15 @@ void SocketSendData(CNode *pnode)
if (nRet != SSL_ERROR_WANT_READ && nRet != SSL_ERROR_WANT_WRITE)
{
LogPrintf("ERROR: SSL_write %s; closing connection\n", ERR_error_string(nRet, NULL));
pnode->CloseSocketDisconnect();
}
pnode->CloseSocketDisconnect();
}
else
{
// preventive measure from exhausting CPU usage
//
MilliSleep(1); // 1 msec
}
}
}
else
{
if (nRet != WSAEWOULDBLOCK && nRet != WSAEMSGSIZE && nRet != WSAEINTR && nRet != WSAEINPROGRESS)
@@ -951,8 +952,8 @@ public:
CSHA256 hashA, hashB;
std::vector<unsigned char> vchA(32), vchB(32);
vchGroupA = a->addr.GetGroup();
vchGroupB = b->addr.GetGroup();
vchGroupA = a->addr.GetGroup(addrman.m_asmap);
vchGroupB = b->addr.GetGroup(addrman.m_asmap);
hashA.Write(begin_ptr(vchGroupA), vchGroupA.size());
hashB.Write(begin_ptr(vchGroupB), vchGroupB.size());
@@ -1048,14 +1049,14 @@ static bool AttemptToEvictConnection(bool fPreferNewConnection) {
int64_t nMostConnectionsTime = 0;
std::map<std::vector<unsigned char>, std::vector<CNodeRef> > mapAddrCounts;
BOOST_FOREACH(const CNodeRef &node, vEvictionCandidates) {
mapAddrCounts[node->addr.GetGroup()].push_back(node);
int64_t grouptime = mapAddrCounts[node->addr.GetGroup()][0]->nTimeConnected;
size_t groupsize = mapAddrCounts[node->addr.GetGroup()].size();
mapAddrCounts[node->addr.GetGroup(addrman.m_asmap)].push_back(node);
int64_t grouptime = mapAddrCounts[node->addr.GetGroup(addrman.m_asmap)][0]->nTimeConnected;
size_t groupsize = mapAddrCounts[node->addr.GetGroup(addrman.m_asmap)].size();
if (groupsize > nMostConnections || (groupsize == nMostConnections && grouptime > nMostConnectionsTime)) {
nMostConnections = groupsize;
nMostConnectionsTime = grouptime;
naMostConnections = node->addr.GetGroup();
naMostConnections = node->addr.GetGroup(addrman.m_asmap);
}
}
@@ -1074,7 +1075,6 @@ static bool AttemptToEvictConnection(bool fPreferNewConnection) {
return true;
}
static void AcceptConnection(const ListenSocket& hListenSocket) {
struct sockaddr_storage sockaddr;
socklen_t len = sizeof(sockaddr);
@@ -1347,7 +1347,6 @@ void ThreadSocketHandler()
// * We send some data.
// * We wait for data to be received (and disconnect after timeout).
// * We process a message in the buffer (message handler thread).
{
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend && !pnode->vSendMsg.empty()) {
@@ -1588,7 +1587,7 @@ void ThreadOpenConnections()
LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes) {
if (!pnode->fInbound) {
setConnected.insert(pnode->addr.GetGroup());
setConnected.insert(pnode->addr.GetGroup(addrman.m_asmap));
nOutbound++;
}
}
@@ -1602,7 +1601,7 @@ void ThreadOpenConnections()
CAddrInfo addr = addrman.Select();
// if we selected an invalid address, restart
if (!addr.IsValid() || setConnected.count(addr.GetGroup()) || IsLocal(addr))
if (!addr.IsValid() || setConnected.count(addr.GetGroup(addrman.m_asmap)) || IsLocal(addr))
break;
// If we didn't find an appropriate destination after trying 100 addresses fetched from addrman,
@@ -2388,7 +2387,7 @@ CNode::~CNode()
ssl = NULL;
}
CloseSocket(hSocket);
CloseSocket(hSocket);
}
if (pfilter)
@@ -2490,3 +2489,16 @@ void CNode::EndMessage() UNLOCK_FUNCTION(cs_vSend)
LEAVE_CRITICAL_SECTION(cs_vSend);
}
void CopyNodeStats(std::vector<CNodeStats>& vstats)
{
vstats.clear();
LOCK(cs_vNodes);
vstats.reserve(vNodes.size());
BOOST_FOREACH(CNode* pnode, vNodes) {
CNodeStats stats;
pnode->copyStats(stats, addrman.m_asmap);
vstats.push_back(stats);
}
}