diff --git a/src/httpserver.cpp b/src/httpserver.cpp index 9c033e415..59ba7c3bc 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -72,13 +72,35 @@ private: std::deque queue; bool running; size_t maxDepth; + int numThreads; + + /** RAII object to keep track of number of running worker threads */ + class ThreadCounter + { + public: + WorkQueue &wq; + ThreadCounter(WorkQueue &w): wq(w) + { + boost::lock_guard lock(wq.cs); + wq.numThreads += 1; + } + ~ThreadCounter() + { + boost::lock_guard lock(wq.cs); + wq.numThreads -= 1; + wq.cond.notify_all(); + } + }; public: WorkQueue(size_t maxDepth) : running(true), - maxDepth(maxDepth) + maxDepth(maxDepth), + numThreads(0) { } - /* Precondition: worker threads have all stopped */ + /*( Precondition: worker threads have all stopped + * (call WaitExit) + */ ~WorkQueue() { while (!queue.empty()) { @@ -100,6 +122,7 @@ public: /** Thread function */ void Run() { + ThreadCounter count(*this); while (running) { WorkItem* i = 0; { @@ -122,6 +145,13 @@ public: running = false; cond.notify_all(); } + /** Wait for worker threads to exit */ + void WaitExit() + { + boost::unique_lock lock(cs); + while (numThreads > 0) + cond.wait(lock); + } /** Return current depth of queue */ size_t Depth() @@ -155,6 +185,8 @@ static std::vector rpc_allow_subnets; static WorkQueue* workQueue = 0; //! Handlers for (sub)paths std::vector pathHandlers; +//! Bound listening sockets +std::vector boundSockets; /** Check if a network address is allowed to access the HTTP server */ static bool ClientAllowed(const CNetAddr& netaddr) @@ -264,6 +296,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg) } } +/** Callback to reject HTTP requests after shutdown. */ +static void http_reject_request_cb(struct evhttp_request* req, void*) +{ + LogPrint("http", "Rejecting request while shutting down\n"); + evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); +} + /** Event dispatcher thread */ static void ThreadHTTP(struct event_base* base, struct evhttp* http) { @@ -278,7 +317,6 @@ static void ThreadHTTP(struct event_base* base, struct evhttp* http) static bool HTTPBindAddresses(struct evhttp* http) { int defaultPort = GetArg("-rpcport", BaseParams().RPCPort()); - int nBound = 0; std::vector > endpoints; // Determine what addresses to bind to @@ -304,13 +342,14 @@ static bool HTTPBindAddresses(struct evhttp* http) // Bind addresses for (std::vector >::iterator i = endpoints.begin(); i != endpoints.end(); ++i) { LogPrint("http", "Binding RPC on address %s port %i\n", i->first, i->second); - if (evhttp_bind_socket(http, i->first.empty() ? NULL : i->first.c_str(), i->second) == 0) { - nBound += 1; + evhttp_bound_socket *bind_handle = evhttp_bind_socket_with_handle(http, i->first.empty() ? NULL : i->first.c_str(), i->second); + if (bind_handle) { + boundSockets.push_back(bind_handle); } else { LogPrintf("Binding RPC on address %s port %i failed.\n", i->first, i->second); } } - return nBound > 0; + return !boundSockets.empty(); } /** Simple wrapper to set thread name and run work queue */ @@ -399,23 +438,33 @@ bool InitHTTPServer() return true; } -bool StartHTTPServer(boost::thread_group& threadGroup) +boost::thread threadHTTP; + +bool StartHTTPServer() { LogPrint("http", "Starting HTTP server\n"); int rpcThreads = std::max((long)GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L); LogPrintf("HTTP: starting %d worker threads\n", rpcThreads); - threadGroup.create_thread(boost::bind(&ThreadHTTP, eventBase, eventHTTP)); + threadHTTP = boost::thread(boost::bind(&ThreadHTTP, eventBase, eventHTTP)); - for (int i = 0; i < rpcThreads; i++) - threadGroup.create_thread(boost::bind(&HTTPWorkQueueRun, workQueue)); + for (int i = 0; i < rpcThreads; i++) { + boost::thread rpc_worker(HTTPWorkQueueRun, workQueue); + rpc_worker.detach(); + } return true; } void InterruptHTTPServer() { LogPrint("http", "Interrupting HTTP server\n"); - if (eventBase) - event_base_loopbreak(eventBase); + if (eventHTTP) { + // Unlisten sockets + BOOST_FOREACH (evhttp_bound_socket *socket, boundSockets) { + evhttp_del_accept_socket(eventHTTP, socket); + } + // Reject requests on current connections + evhttp_set_gencb(eventHTTP, http_reject_request_cb, NULL); + } if (workQueue) workQueue->Interrupt(); } @@ -423,7 +472,27 @@ void InterruptHTTPServer() void StopHTTPServer() { LogPrint("http", "Stopping HTTP server\n"); - delete workQueue; + if (workQueue) { + LogPrint("http", "Waiting for HTTP worker threads to exit\n"); + workQueue->WaitExit(); + delete workQueue; + } + if (eventBase) { + LogPrint("http", "Waiting for HTTP event thread to exit\n"); + // Exit the event loop as soon as there are no active events. + event_base_loopexit(eventBase, nullptr); + // Give event loop a few seconds to exit (to send back last RPC responses), then break it + // Before this was solved with event_base_loopexit, but that didn't work as expected in + // at least libevent 2.0.21 and always introduced a delay. In libevent + // master that appears to be solved, so in the future that solution + // could be used again (if desirable). + // (see discussion in https://github.com/bitcoin/bitcoin/pull/6990) + if (!threadHTTP.try_join_for(boost::chrono::milliseconds(2000))) { + LogPrintf("HTTP event loop did not exit within allotted time, sending loopbreak\n"); + event_base_loopbreak(eventBase); + threadHTTP.join(); + } + } if (eventHTTP) { evhttp_free(eventHTTP); eventHTTP = 0; @@ -432,6 +501,7 @@ void StopHTTPServer() event_base_free(eventBase); eventBase = 0; } + LogPrint("http", "Stopped HTTP server\n"); } struct event_base* EventBase() diff --git a/src/httpserver.h b/src/httpserver.h index 347e7c3ab..93fb5d8d6 100644 --- a/src/httpserver.h +++ b/src/httpserver.h @@ -28,7 +28,7 @@ bool InitHTTPServer(); * This is separate from InitHTTPServer to give users race-condition-free time * to register their handlers between InitHTTPServer and StartHTTPServer. */ -bool StartHTTPServer(boost::thread_group& threadGroup); +bool StartHTTPServer(); /** Interrupt HTTP server threads */ void InterruptHTTPServer(); /** Stop HTTP server */ diff --git a/src/init.cpp b/src/init.cpp index bb57f257d..6f87917a0 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -714,7 +714,7 @@ bool AppInitServers(boost::thread_group& threadGroup) return false; if (GetBoolArg("-rest", false) && !StartREST()) return false; - if (!StartHTTPServer(threadGroup)) + if (!StartHTTPServer()) return false; return true; } diff --git a/src/rpcserver.cpp b/src/rpcserver.cpp index 08a556bb1..0bd8fb1b8 100644 --- a/src/rpcserver.cpp +++ b/src/rpcserver.cpp @@ -246,7 +246,8 @@ UniValue stop(const UniValue& params, bool fHelp) throw runtime_error( "stop\n" "\nStop Zcash server."); - // Shutdown will take long enough that the response should get back + // Event loop will exit after current HTTP requests have been handled, so + // this reply will get back to the client. StartShutdown(); return "Zcash server stopping"; }