Skip to content

Commit 1a6b10a

Browse files
committed
Wildly experimental, massively un-threadsafe. Do not use.
1 parent 5ad858b commit 1a6b10a

File tree

5 files changed

+318
-236
lines changed

5 files changed

+318
-236
lines changed

src/init.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ static const bool DEFAULT_STOPAFTERBLOCKIMPORT = false;
7575

7676
std::unique_ptr<CConnman> g_connman;
7777
std::unique_ptr<PeerLogicValidation> peerLogic;
78+
std::unique_ptr<CMessageProcessorInterface> msgProc;
7879

7980
#if ENABLE_ZMQ
8081
static CZMQNotificationInterface* pzmqNotificationInterface = NULL;
@@ -207,10 +208,10 @@ void Shutdown()
207208
MapPort(false);
208209
UnregisterValidationInterface(peerLogic.get());
209210
peerLogic.reset();
211+
g_connman->Stop();
212+
msgProc.reset();
210213
g_connman.reset();
211-
212214
StopTorControl();
213-
UnregisterNodeSignals(GetNodeSignals());
214215
DumpMempool();
215216

216217
if (fFeeEstimatesInitialized)
@@ -1150,9 +1151,10 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
11501151
g_connman = std::unique_ptr<CConnman>(new CConnman(GetRand(std::numeric_limits<uint64_t>::max()), GetRand(std::numeric_limits<uint64_t>::max())));
11511152
CConnman& connman = *g_connman;
11521153

1154+
msgProc = std::unique_ptr<CMessageProcessorInterface>(new CMessageProcessor(connman));
1155+
11531156
peerLogic.reset(new PeerLogicValidation(&connman));
11541157
RegisterValidationInterface(peerLogic.get());
1155-
RegisterNodeSignals(GetNodeSignals());
11561158

11571159
// sanitize comments per BIP-0014, format user agent and check total size
11581160
std::vector<string> uacomments;
@@ -1559,6 +1561,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
15591561
connOptions.nMaxOutbound = std::min(MAX_OUTBOUND_CONNECTIONS, connOptions.nMaxConnections);
15601562
connOptions.nMaxFeeler = 1;
15611563
connOptions.nBestHeight = chainActive.Height();
1564+
connOptions.msgProc = msgProc.get();
15621565
connOptions.uiInterface = &uiInterface;
15631566
connOptions.nSendBufferMaxSize = 1000*GetArg("-maxsendbuffer", DEFAULT_MAXSENDBUFFER);
15641567
connOptions.nReceiveFloodSize = 1000*GetArg("-maxreceivebuffer", DEFAULT_MAXRECEIVEBUFFER);

src/net.cpp

Lines changed: 56 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,6 @@ std::string strSubVersion;
7676

7777
limitedmap<uint256, int64_t> mapAlreadyAskedFor(MAX_INV_SZ);
7878

79-
// Signals for message handling
80-
static CNodeSignals g_signals;
81-
CNodeSignals& GetNodeSignals() { return g_signals; }
82-
8379
void CConnman::AddOneShot(const std::string& strDest)
8480
{
8581
LOCK(cs_vOneShots);
@@ -393,7 +389,8 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
393389
pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices);
394390
pnode->nTimeConnected = GetTime();
395391
pnode->AddRef();
396-
GetNodeSignals().InitializeNode(pnode, *this);
392+
if (msgProc)
393+
msgProc->InitializeNode(pnode);
397394
{
398395
LOCK(cs_vNodes);
399396
vNodes.push_back(pnode);
@@ -1027,7 +1024,8 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
10271024
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, "", true);
10281025
pnode->AddRef();
10291026
pnode->fWhitelisted = whitelisted;
1030-
GetNodeSignals().InitializeNode(pnode, *this);
1027+
if (msgProc)
1028+
msgProc->InitializeNode(pnode);
10311029

10321030
LogPrint("net", "connection from %s accepted\n", addr.ToString());
10331031

@@ -1158,19 +1156,14 @@ void CConnman::ThreadSocketHandler()
11581156
// * We wait for data to be received (and disconnect after timeout).
11591157
// * We process a message in the buffer (message handler thread).
11601158
{
1161-
TRY_LOCK(pnode->cs_vSend, lockSend);
1162-
if (lockSend) {
1163-
if (!pnode->vSendMsg.empty()) {
1159+
if (pnode->nSendSize) {
1160+
LOCK(pnode->cs_vSend);
11641161
FD_SET(pnode->hSocket, &fdsetSend);
11651162
continue;
11661163
}
1167-
}
11681164
}
11691165
{
1170-
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
1171-
if (lockRecv && (
1172-
pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
1173-
pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
1166+
if (!pnode->fPauseRecv)
11741167
FD_SET(pnode->hSocket, &fdsetRecv);
11751168
}
11761169
}
@@ -1229,23 +1222,43 @@ void CConnman::ThreadSocketHandler()
12291222
continue;
12301223
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
12311224
{
1232-
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
1233-
if (lockRecv)
1225+
bool notify = false;
12341226
{
12351227
{
12361228
// typical socket buffer is 8K-64K
12371229
char pchBuf[0x10000];
12381230
int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
12391231
if (nBytes > 0)
12401232
{
1241-
bool notify = false;
12421233
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
12431234
pnode->CloseSocketDisconnect();
1244-
if(notify)
1245-
condMsgProc.notify_one();
1235+
1236+
RecordBytesRecv(nBytes);
12461237
pnode->nLastRecv = GetTime();
12471238
pnode->nRecvBytes += nBytes;
1248-
RecordBytesRecv(nBytes);
1239+
1240+
auto it(pnode->vRecvMsg.rbegin());
1241+
if (it != pnode->vRecvMsg.rend() && ( it->complete() || ++it != pnode->vRecvMsg.rend())) {
1242+
std::list<CNetMessage> messages;
1243+
messages.splice(messages.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it.base());
1244+
{
1245+
LOCK(pnode->cs_vRecvMsg);
1246+
for (const auto& msg : messages) {
1247+
mapMsgCmdSize::iterator i = pnode->mapRecvBytesPerMsgCmd.find(msg.hdr.pchCommand);
1248+
if (i == pnode->mapRecvBytesPerMsgCmd.end())
1249+
i = pnode->mapRecvBytesPerMsgCmd.find(NET_MESSAGE_COMMAND_OTHER);
1250+
assert(i != pnode->mapRecvBytesPerMsgCmd.end());
1251+
i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE;
1252+
}
1253+
}
1254+
if(msgProc) {
1255+
bool ok = msgProc->OnNewMessages(pnode->id, std::move(messages));
1256+
if (!ok) {
1257+
LogPrint("net", "pausing receive for node: %lu\n", pnode->id);
1258+
pnode->fPauseRecv = true;
1259+
}
1260+
}
1261+
}
12491262
}
12501263
else if (nBytes == 0)
12511264
{
@@ -1276,12 +1289,10 @@ void CConnman::ThreadSocketHandler()
12761289
continue;
12771290
if (FD_ISSET(pnode->hSocket, &fdsetSend))
12781291
{
1279-
TRY_LOCK(pnode->cs_vSend, lockSend);
1280-
if (lockSend) {
1292+
LOCK(pnode->cs_vSend);
12811293
size_t nBytes = SocketSendData(pnode);
12821294
if (nBytes)
12831295
RecordBytesSent(nBytes);
1284-
}
12851296
}
12861297

12871298
//
@@ -1828,74 +1839,6 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
18281839
return true;
18291840
}
18301841

1831-
void CConnman::ThreadMessageHandler()
1832-
{
1833-
while (!flagInterruptMsgProc)
1834-
{
1835-
std::vector<CNode*> vNodesCopy;
1836-
{
1837-
LOCK(cs_vNodes);
1838-
vNodesCopy = vNodes;
1839-
BOOST_FOREACH(CNode* pnode, vNodesCopy) {
1840-
pnode->AddRef();
1841-
}
1842-
}
1843-
1844-
bool fSleep = true;
1845-
1846-
BOOST_FOREACH(CNode* pnode, vNodesCopy)
1847-
{
1848-
if (pnode->fDisconnect)
1849-
continue;
1850-
1851-
// Receive messages
1852-
{
1853-
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
1854-
if (lockRecv)
1855-
{
1856-
if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
1857-
pnode->CloseSocketDisconnect();
1858-
1859-
if (pnode->nSendSize < GetSendBufferSize())
1860-
{
1861-
if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete()))
1862-
{
1863-
fSleep = false;
1864-
}
1865-
}
1866-
}
1867-
}
1868-
if(flagInterruptMsgProc)
1869-
return;
1870-
1871-
// Send messages
1872-
{
1873-
TRY_LOCK(pnode->cs_vSend, lockSend);
1874-
if (lockSend)
1875-
GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc);
1876-
}
1877-
if(flagInterruptMsgProc)
1878-
return;
1879-
}
1880-
1881-
{
1882-
LOCK(cs_vNodes);
1883-
BOOST_FOREACH(CNode* pnode, vNodesCopy)
1884-
pnode->Release();
1885-
}
1886-
1887-
if(fSleep) {
1888-
std::unique_lock<std::mutex> lock(mutexMsgProc);
1889-
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
1890-
}
1891-
}
1892-
}
1893-
1894-
1895-
1896-
1897-
1898-
18991842
bool CConnman::BindListenPort(const CService &addrBind, std::string& strError, bool fWhitelisted)
19001843
{
19011844
strError = "";
@@ -2079,7 +2022,7 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe
20792022
nMaxOutbound = 0;
20802023
nBestHeight = 0;
20812024
clientInterface = NULL;
2082-
flagInterruptMsgProc = false;
2025+
msgProc = nullptr;
20832026
}
20842027

20852028
NodeId CConnman::GetNewNodeId()
@@ -2101,14 +2044,16 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
21012044
nMaxFeeler = connOptions.nMaxFeeler;
21022045

21032046
nSendBufferMaxSize = connOptions.nSendBufferMaxSize;
2104-
nReceiveFloodSize = connOptions.nSendBufferMaxSize;
2047+
nReceiveFloodSize = connOptions.nReceiveFloodSize;
21052048

21062049
nMaxOutboundLimit = connOptions.nMaxOutboundLimit;
21072050
nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe;
21082051

21092052
SetBestHeight(connOptions.nBestHeight);
21102053

21112054
clientInterface = connOptions.uiInterface;
2055+
msgProc = connOptions.msgProc;
2056+
21122057
if (clientInterface)
21132058
clientInterface->InitMessage(_("Loading addresses..."));
21142059
// Load addresses from peers.dat
@@ -2156,7 +2101,6 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
21562101
//
21572102
InterruptSocks5(false);
21582103
interruptNet.reset();
2159-
flagInterruptMsgProc = false;
21602104

21612105
// Send and receive from sockets, accept connections
21622106
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
@@ -2173,8 +2117,8 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
21732117
if (!mapArgs.count("-connect") || mapMultiArgs["-connect"].size() != 1 || mapMultiArgs["-connect"][0] != "0")
21742118
threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this)));
21752119

2176-
// Process messages
2177-
threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
2120+
if (msgProc)
2121+
msgProc->OnStartup();
21782122

21792123
// Dump network addresses
21802124
scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL);
@@ -2200,14 +2144,10 @@ instance_of_cnetcleanup;
22002144
void CConnman::Interrupt()
22012145
{
22022146
LogPrintf("%s\n",__func__);
2203-
{
2204-
std::lock_guard<std::mutex> lock(mutexMsgProc);
2205-
flagInterruptMsgProc = true;
2206-
}
2207-
condMsgProc.notify_all();
2208-
22092147
interruptNet();
22102148
InterruptSocks5(true);
2149+
if (msgProc)
2150+
msgProc->OnInterrupt();
22112151

22122152
if (semOutbound)
22132153
for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++)
@@ -2217,8 +2157,6 @@ void CConnman::Interrupt()
22172157
void CConnman::Stop()
22182158
{
22192159
LogPrintf("%s\n",__func__);
2220-
if (threadMessageHandler.joinable())
2221-
threadMessageHandler.join();
22222160
if (threadOpenConnections.joinable())
22232161
threadOpenConnections.join();
22242162
if (threadOpenAddedConnections.joinable())
@@ -2228,12 +2166,6 @@ void CConnman::Stop()
22282166
if (threadSocketHandler.joinable())
22292167
threadSocketHandler.join();
22302168

2231-
if (fAddressesInitialized)
2232-
{
2233-
DumpData();
2234-
fAddressesInitialized = false;
2235-
}
2236-
22372169
// Close sockets
22382170
BOOST_FOREACH(CNode* pnode, vNodes)
22392171
if (pnode->hSocket != INVALID_SOCKET)
@@ -2250,6 +2182,17 @@ void CConnman::Stop()
22502182
BOOST_FOREACH(CNode *pnode, vNodesDisconnected) {
22512183
DeleteNode(pnode);
22522184
}
2185+
2186+
if (msgProc)
2187+
msgProc->OnShutdown();
2188+
msgProc = nullptr;
2189+
2190+
if (fAddressesInitialized)
2191+
{
2192+
DumpData();
2193+
fAddressesInitialized = false;
2194+
}
2195+
22532196
vNodes.clear();
22542197
vNodesDisconnected.clear();
22552198
vhListenSocket.clear();
@@ -2261,7 +2204,8 @@ void CConnman::DeleteNode(CNode* pnode)
22612204
{
22622205
assert(pnode);
22632206
bool fUpdateConnectionTime = false;
2264-
GetNodeSignals().FinalizeNode(pnode->GetId(), fUpdateConnectionTime);
2207+
if (msgProc)
2208+
msgProc->FinalizeNode(pnode->GetId(), fUpdateConnectionTime);
22652209
if(fUpdateConnectionTime)
22662210
addrman.Connected(pnode->addr);
22672211
delete pnode;
@@ -2584,6 +2528,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
25842528
minFeeFilter = 0;
25852529
lastSentFeeFilter = 0;
25862530
nextSendTimeFeeFilter = 0;
2531+
fPauseRecv = false;
25872532

25882533
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
25892534
mapRecvBytesPerMsgCmd[msg] = 0;

0 commit comments

Comments
 (0)