@@ -76,10 +76,6 @@ std::string strSubVersion;
7676
7777limitedmap<uint256, int64_t > mapAlreadyAskedFor (MAX_INV_SZ);
7878
79- // Signals for message handling
80- static CNodeSignals g_signals;
81- CNodeSignals& GetNodeSignals () { return g_signals; }
82-
8379void CConnman::AddOneShot (const std::string& strDest)
8480{
8581 LOCK (cs_vOneShots);
@@ -393,7 +389,8 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
393389 pnode->nServicesExpected = ServiceFlags (addrConnect.nServices & nRelevantServices);
394390 pnode->nTimeConnected = GetTime ();
395391 pnode->AddRef ();
396- GetNodeSignals ().InitializeNode (pnode, *this );
392+ if (msgProc)
393+ msgProc->InitializeNode (pnode);
397394 {
398395 LOCK (cs_vNodes);
399396 vNodes.push_back (pnode);
@@ -1027,7 +1024,8 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
10271024 CNode* pnode = new CNode (id, nLocalServices, GetBestHeight (), hSocket, addr, CalculateKeyedNetGroup (addr), nonce, " " , true );
10281025 pnode->AddRef ();
10291026 pnode->fWhitelisted = whitelisted;
1030- GetNodeSignals ().InitializeNode (pnode, *this );
1027+ if (msgProc)
1028+ msgProc->InitializeNode (pnode);
10311029
10321030 LogPrint (" net" , " connection from %s accepted\n " , addr.ToString ());
10331031
@@ -1158,19 +1156,14 @@ void CConnman::ThreadSocketHandler()
11581156 // * We wait for data to be received (and disconnect after timeout).
11591157 // * We process a message in the buffer (message handler thread).
11601158 {
1161- TRY_LOCK (pnode->cs_vSend , lockSend);
1162- if (lockSend) {
1163- if (!pnode->vSendMsg .empty ()) {
1159+ if (pnode->nSendSize ) {
1160+ LOCK (pnode->cs_vSend );
11641161 FD_SET (pnode->hSocket , &fdsetSend);
11651162 continue ;
11661163 }
1167- }
11681164 }
11691165 {
1170- TRY_LOCK (pnode->cs_vRecvMsg , lockRecv);
1171- if (lockRecv && (
1172- pnode->vRecvMsg .empty () || !pnode->vRecvMsg .front ().complete () ||
1173- pnode->GetTotalRecvSize () <= GetReceiveFloodSize ()))
1166+ if (!pnode->fPauseRecv )
11741167 FD_SET (pnode->hSocket , &fdsetRecv);
11751168 }
11761169 }
@@ -1229,23 +1222,43 @@ void CConnman::ThreadSocketHandler()
12291222 continue ;
12301223 if (FD_ISSET (pnode->hSocket , &fdsetRecv) || FD_ISSET (pnode->hSocket , &fdsetError))
12311224 {
1232- TRY_LOCK (pnode->cs_vRecvMsg , lockRecv);
1233- if (lockRecv)
1225+ bool notify = false ;
12341226 {
12351227 {
12361228 // typical socket buffer is 8K-64K
12371229 char pchBuf[0x10000 ];
12381230 int nBytes = recv (pnode->hSocket , pchBuf, sizeof (pchBuf), MSG_DONTWAIT);
12391231 if (nBytes > 0 )
12401232 {
1241- bool notify = false ;
12421233 if (!pnode->ReceiveMsgBytes (pchBuf, nBytes, notify))
12431234 pnode->CloseSocketDisconnect ();
1244- if (notify)
1245- condMsgProc. notify_one ( );
1235+
1236+ RecordBytesRecv (nBytes );
12461237 pnode->nLastRecv = GetTime ();
12471238 pnode->nRecvBytes += nBytes;
1248- RecordBytesRecv (nBytes);
1239+
1240+ auto it (pnode->vRecvMsg .rbegin ());
1241+ if (it != pnode->vRecvMsg .rend () && ( it->complete () || ++it != pnode->vRecvMsg .rend ())) {
1242+ std::list<CNetMessage> messages;
1243+ messages.splice (messages.end (), pnode->vRecvMsg , pnode->vRecvMsg .begin (), it.base ());
1244+ {
1245+ LOCK (pnode->cs_vRecvMsg );
1246+ for (const auto & msg : messages) {
1247+ mapMsgCmdSize::iterator i = pnode->mapRecvBytesPerMsgCmd .find (msg.hdr .pchCommand );
1248+ if (i == pnode->mapRecvBytesPerMsgCmd .end ())
1249+ i = pnode->mapRecvBytesPerMsgCmd .find (NET_MESSAGE_COMMAND_OTHER);
1250+ assert (i != pnode->mapRecvBytesPerMsgCmd .end ());
1251+ i->second += msg.hdr .nMessageSize + CMessageHeader::HEADER_SIZE;
1252+ }
1253+ }
1254+ if (msgProc) {
1255+ bool ok = msgProc->OnNewMessages (pnode->id , std::move (messages));
1256+ if (!ok) {
1257+ LogPrint (" net" , " pausing receive for node: %lu\n " , pnode->id );
1258+ pnode->fPauseRecv = true ;
1259+ }
1260+ }
1261+ }
12491262 }
12501263 else if (nBytes == 0 )
12511264 {
@@ -1276,12 +1289,10 @@ void CConnman::ThreadSocketHandler()
12761289 continue ;
12771290 if (FD_ISSET (pnode->hSocket , &fdsetSend))
12781291 {
1279- TRY_LOCK (pnode->cs_vSend , lockSend);
1280- if (lockSend) {
1292+ LOCK (pnode->cs_vSend );
12811293 size_t nBytes = SocketSendData (pnode);
12821294 if (nBytes)
12831295 RecordBytesSent (nBytes);
1284- }
12851296 }
12861297
12871298 //
@@ -1828,74 +1839,6 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
18281839 return true ;
18291840}
18301841
1831- void CConnman::ThreadMessageHandler ()
1832- {
1833- while (!flagInterruptMsgProc)
1834- {
1835- std::vector<CNode*> vNodesCopy;
1836- {
1837- LOCK (cs_vNodes);
1838- vNodesCopy = vNodes;
1839- BOOST_FOREACH (CNode* pnode, vNodesCopy) {
1840- pnode->AddRef ();
1841- }
1842- }
1843-
1844- bool fSleep = true ;
1845-
1846- BOOST_FOREACH (CNode* pnode, vNodesCopy)
1847- {
1848- if (pnode->fDisconnect )
1849- continue ;
1850-
1851- // Receive messages
1852- {
1853- TRY_LOCK (pnode->cs_vRecvMsg , lockRecv);
1854- if (lockRecv)
1855- {
1856- if (!GetNodeSignals ().ProcessMessages (pnode, *this , flagInterruptMsgProc))
1857- pnode->CloseSocketDisconnect ();
1858-
1859- if (pnode->nSendSize < GetSendBufferSize ())
1860- {
1861- if (!pnode->vRecvGetData .empty () || (!pnode->vRecvMsg .empty () && pnode->vRecvMsg [0 ].complete ()))
1862- {
1863- fSleep = false ;
1864- }
1865- }
1866- }
1867- }
1868- if (flagInterruptMsgProc)
1869- return ;
1870-
1871- // Send messages
1872- {
1873- TRY_LOCK (pnode->cs_vSend , lockSend);
1874- if (lockSend)
1875- GetNodeSignals ().SendMessages (pnode, *this , flagInterruptMsgProc);
1876- }
1877- if (flagInterruptMsgProc)
1878- return ;
1879- }
1880-
1881- {
1882- LOCK (cs_vNodes);
1883- BOOST_FOREACH (CNode* pnode, vNodesCopy)
1884- pnode->Release ();
1885- }
1886-
1887- if (fSleep ) {
1888- std::unique_lock<std::mutex> lock (mutexMsgProc);
1889- condMsgProc.wait_until (lock, std::chrono::steady_clock::now () + std::chrono::milliseconds (100 ));
1890- }
1891- }
1892- }
1893-
1894-
1895-
1896-
1897-
1898-
18991842bool CConnman::BindListenPort (const CService &addrBind, std::string& strError, bool fWhitelisted )
19001843{
19011844 strError = " " ;
@@ -2079,7 +2022,7 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe
20792022 nMaxOutbound = 0 ;
20802023 nBestHeight = 0 ;
20812024 clientInterface = NULL ;
2082- flagInterruptMsgProc = false ;
2025+ msgProc = nullptr ;
20832026}
20842027
20852028NodeId CConnman::GetNewNodeId ()
@@ -2101,14 +2044,16 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
21012044 nMaxFeeler = connOptions.nMaxFeeler ;
21022045
21032046 nSendBufferMaxSize = connOptions.nSendBufferMaxSize ;
2104- nReceiveFloodSize = connOptions.nSendBufferMaxSize ;
2047+ nReceiveFloodSize = connOptions.nReceiveFloodSize ;
21052048
21062049 nMaxOutboundLimit = connOptions.nMaxOutboundLimit ;
21072050 nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe ;
21082051
21092052 SetBestHeight (connOptions.nBestHeight );
21102053
21112054 clientInterface = connOptions.uiInterface ;
2055+ msgProc = connOptions.msgProc ;
2056+
21122057 if (clientInterface)
21132058 clientInterface->InitMessage (_ (" Loading addresses..." ));
21142059 // Load addresses from peers.dat
@@ -2156,7 +2101,6 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
21562101 //
21572102 InterruptSocks5 (false );
21582103 interruptNet.reset ();
2159- flagInterruptMsgProc = false ;
21602104
21612105 // Send and receive from sockets, accept connections
21622106 threadSocketHandler = std::thread (&TraceThread<std::function<void ()> >, " net" , std::function<void ()>(std::bind (&CConnman::ThreadSocketHandler, this )));
@@ -2173,8 +2117,8 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
21732117 if (!mapArgs.count (" -connect" ) || mapMultiArgs[" -connect" ].size () != 1 || mapMultiArgs[" -connect" ][0 ] != " 0" )
21742118 threadOpenConnections = std::thread (&TraceThread<std::function<void ()> >, " opencon" , std::function<void ()>(std::bind (&CConnman::ThreadOpenConnections, this )));
21752119
2176- // Process messages
2177- threadMessageHandler = std::thread (&TraceThread<std::function< void ()> >, " msghand " , std::function< void ()>( std::bind (&CConnman::ThreadMessageHandler, this )) );
2120+ if (msgProc)
2121+ msgProc-> OnStartup ( );
21782122
21792123 // Dump network addresses
21802124 scheduler.scheduleEvery (boost::bind (&CConnman::DumpData, this ), DUMP_ADDRESSES_INTERVAL);
@@ -2200,14 +2144,10 @@ instance_of_cnetcleanup;
22002144void CConnman::Interrupt ()
22012145{
22022146 LogPrintf (" %s\n " ,__func__);
2203- {
2204- std::lock_guard<std::mutex> lock (mutexMsgProc);
2205- flagInterruptMsgProc = true ;
2206- }
2207- condMsgProc.notify_all ();
2208-
22092147 interruptNet ();
22102148 InterruptSocks5 (true );
2149+ if (msgProc)
2150+ msgProc->OnInterrupt ();
22112151
22122152 if (semOutbound)
22132153 for (int i=0 ; i<(nMaxOutbound + nMaxFeeler); i++)
@@ -2217,8 +2157,6 @@ void CConnman::Interrupt()
22172157void CConnman::Stop ()
22182158{
22192159 LogPrintf (" %s\n " ,__func__);
2220- if (threadMessageHandler.joinable ())
2221- threadMessageHandler.join ();
22222160 if (threadOpenConnections.joinable ())
22232161 threadOpenConnections.join ();
22242162 if (threadOpenAddedConnections.joinable ())
@@ -2228,12 +2166,6 @@ void CConnman::Stop()
22282166 if (threadSocketHandler.joinable ())
22292167 threadSocketHandler.join ();
22302168
2231- if (fAddressesInitialized )
2232- {
2233- DumpData ();
2234- fAddressesInitialized = false ;
2235- }
2236-
22372169 // Close sockets
22382170 BOOST_FOREACH (CNode* pnode, vNodes)
22392171 if (pnode->hSocket != INVALID_SOCKET)
@@ -2250,6 +2182,17 @@ void CConnman::Stop()
22502182 BOOST_FOREACH (CNode *pnode, vNodesDisconnected) {
22512183 DeleteNode (pnode);
22522184 }
2185+
2186+ if (msgProc)
2187+ msgProc->OnShutdown ();
2188+ msgProc = nullptr ;
2189+
2190+ if (fAddressesInitialized )
2191+ {
2192+ DumpData ();
2193+ fAddressesInitialized = false ;
2194+ }
2195+
22532196 vNodes.clear ();
22542197 vNodesDisconnected.clear ();
22552198 vhListenSocket.clear ();
@@ -2261,7 +2204,8 @@ void CConnman::DeleteNode(CNode* pnode)
22612204{
22622205 assert (pnode);
22632206 bool fUpdateConnectionTime = false ;
2264- GetNodeSignals ().FinalizeNode (pnode->GetId (), fUpdateConnectionTime );
2207+ if (msgProc)
2208+ msgProc->FinalizeNode (pnode->GetId (), fUpdateConnectionTime );
22652209 if (fUpdateConnectionTime )
22662210 addrman.Connected (pnode->addr );
22672211 delete pnode;
@@ -2584,6 +2528,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
25842528 minFeeFilter = 0 ;
25852529 lastSentFeeFilter = 0 ;
25862530 nextSendTimeFeeFilter = 0 ;
2531+ fPauseRecv = false ;
25872532
25882533 BOOST_FOREACH (const std::string &msg, getAllNetMessageTypes ())
25892534 mapRecvBytesPerMsgCmd[msg] = 0 ;
0 commit comments