Auto merge of #2050 - str4d:2020-zmq, r=bitcartel
Add ZeroMQ notifications Cherry-picked from the following upstream PRs: - bitcoin/bitcoin#6103 - bitcoin/bitcoin#6684 - bitcoin/bitcoin#6686 - bitcoin/bitcoin#6736 - bitcoin/bitcoin#6739 - bitcoin/bitcoin#6743 - bitcoin/bitcoin#6768 - bitcoin/bitcoin#6779 - bitcoin/bitcoin#6810 - bitcoin/bitcoin#6927 - bitcoin/bitcoin#6980 (only upgrading zeromq) - bitcoin/bitcoin#6680 - bitcoin/bitcoin#7058 - bitcoin/bitcoin#7621 - bitcoin/bitcoin#7335 (only parts affecting `zmq_test.py`) - bitcoin/bitcoin#7853 (only parts affecting `zmq_test.py`) - bitcoin/bitcoin#7762 - bitcoin/bitcoin#7993 (only upgrading zeromq) - bitcoin/bitcoin#8238 - bitcoin/bitcoin#8701 - bitcoin/bitcoin#6685 Closes #2020.
This commit is contained in:
@@ -50,6 +50,9 @@ if ENABLE_WALLET
|
||||
BITCOIN_INCLUDES += $(BDB_CPPFLAGS)
|
||||
EXTRA_LIBRARIES += libbitcoin_wallet.a
|
||||
endif
|
||||
if ENABLE_ZMQ
|
||||
EXTRA_LIBRARIES += libbitcoin_zmq.a
|
||||
endif
|
||||
|
||||
if BUILD_BITCOIN_LIBS
|
||||
lib_LTLIBRARIES = libzcashconsensus.la
|
||||
@@ -172,7 +175,12 @@ BITCOIN_CORE_H = \
|
||||
wallet/db.h \
|
||||
wallet/wallet.h \
|
||||
wallet/wallet_ismine.h \
|
||||
wallet/walletdb.h
|
||||
wallet/walletdb.h \
|
||||
zmq/zmqabstractnotifier.h \
|
||||
zmq/zmqconfig.h\
|
||||
zmq/zmqnotificationinterface.h \
|
||||
zmq/zmqpublishnotifier.h
|
||||
|
||||
|
||||
JSON_H = \
|
||||
json/json_spirit.h \
|
||||
@@ -229,6 +237,17 @@ libbitcoin_server_a_SOURCES = \
|
||||
$(BITCOIN_CORE_H) \
|
||||
$(LIBZCASH_H)
|
||||
|
||||
if ENABLE_ZMQ
|
||||
LIBBITCOIN_ZMQ=libbitcoin_zmq.a
|
||||
|
||||
libbitcoin_zmq_a_CPPFLAGS = $(BITCOIN_INCLUDES) $(ZMQ_CFLAGS)
|
||||
libbitcoin_zmq_a_SOURCES = \
|
||||
zmq/zmqabstractnotifier.cpp \
|
||||
zmq/zmqnotificationinterface.cpp \
|
||||
zmq/zmqpublishnotifier.cpp
|
||||
endif
|
||||
|
||||
|
||||
# wallet: shared between bitcoind and bitcoin-qt, but only linked
|
||||
# when wallet enabled
|
||||
libbitcoin_wallet_a_CPPFLAGS = $(BITCOIN_INCLUDES)
|
||||
@@ -375,6 +394,10 @@ zcashd_LDADD = \
|
||||
$(LIBMEMENV) \
|
||||
$(LIBSECP256K1)
|
||||
|
||||
if ENABLE_ZMQ
|
||||
zcashd_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
|
||||
endif
|
||||
|
||||
if ENABLE_WALLET
|
||||
zcashd_LDADD += libbitcoin_wallet.a
|
||||
endif
|
||||
@@ -388,7 +411,6 @@ zcashd_LDADD += \
|
||||
$(LIBZCASH) \
|
||||
$(LIBBITCOIN_CRYPTO) \
|
||||
$(LIBZCASH_LIBS)
|
||||
#
|
||||
|
||||
# bitcoin-cli binary #
|
||||
zcash_cli_SOURCES = bitcoin-cli.cpp
|
||||
|
||||
@@ -44,6 +44,9 @@ zcash_gtest_CPPFLAGS = -DMULTICORE -fopenmp -DBINARY_OUTPUT -DCURVE_ALT_BN128 -D
|
||||
|
||||
zcash_gtest_LDADD = -lgtest -lgmock $(LIBBITCOIN_SERVER) $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) $(LIBMEMENV) \
|
||||
$(BOOST_LIBS) $(BOOST_UNIT_TEST_FRAMEWORK_LIB) $(LIBSECP256K1)
|
||||
if ENABLE_ZMQ
|
||||
zcash_gtest_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
|
||||
endif
|
||||
if ENABLE_WALLET
|
||||
zcash_gtest_LDADD += $(LIBBITCOIN_WALLET)
|
||||
endif
|
||||
|
||||
@@ -361,6 +361,9 @@ qt_bitcoin_qt_LDADD = qt/libbitcoinqt.a $(LIBBITCOIN_SERVER)
|
||||
if ENABLE_WALLET
|
||||
qt_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
|
||||
endif
|
||||
if ENABLE_ZMQ
|
||||
qt_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
|
||||
endif
|
||||
qt_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) $(LIBMEMENV) \
|
||||
$(BOOST_LIBS) $(QT_LIBS) $(QT_DBUS_LIBS) $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) $(LIBZCASH_LIBS)
|
||||
qt_bitcoin_qt_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(QT_LDFLAGS) $(LIBTOOL_APP_LDFLAGS)
|
||||
|
||||
@@ -30,6 +30,9 @@ qt_test_test_bitcoin_qt_LDADD = $(LIBBITCOINQT) $(LIBBITCOIN_SERVER)
|
||||
if ENABLE_WALLET
|
||||
qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
|
||||
endif
|
||||
if ENABLE_ZMQ
|
||||
qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
|
||||
endif
|
||||
qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) \
|
||||
$(LIBMEMENV) $(BOOST_LIBS) $(QT_DBUS_LIBS) $(QT_TEST_LIBS) $(QT_LIBS) \
|
||||
$(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) $(LIBZCASH_LIBS)
|
||||
|
||||
@@ -107,6 +107,10 @@ endif
|
||||
test_test_bitcoin_LDADD += $(LIBZCASH_CONSENSUS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBZCASH) $(LIBZCASH_LIBS)
|
||||
test_test_bitcoin_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS) -static
|
||||
|
||||
if ENABLE_ZMQ
|
||||
test_test_bitcoin_LDADD += $(ZMQ_LIBS)
|
||||
endif
|
||||
|
||||
nodist_test_test_bitcoin_SOURCES = $(GENERATED_TEST_FILES)
|
||||
|
||||
$(BITCOIN_TESTS): $(GENERATED_TEST_FILES)
|
||||
|
||||
36
src/init.cpp
36
src/init.cpp
@@ -31,7 +31,6 @@
|
||||
#include "wallet/wallet.h"
|
||||
#include "wallet/walletdb.h"
|
||||
#endif
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
|
||||
@@ -50,6 +49,10 @@
|
||||
|
||||
#include "libsnark/common/profiling.hpp"
|
||||
|
||||
#if ENABLE_ZMQ
|
||||
#include "zmq/zmqnotificationinterface.h"
|
||||
#endif
|
||||
|
||||
using namespace std;
|
||||
|
||||
extern void ThreadSendAlert();
|
||||
@@ -61,6 +64,10 @@ CWallet* pwalletMain = NULL;
|
||||
#endif
|
||||
bool fFeeEstimatesInitialized = false;
|
||||
|
||||
#if ENABLE_ZMQ
|
||||
static CZMQNotificationInterface* pzmqNotificationInterface = NULL;
|
||||
#endif
|
||||
|
||||
#ifdef WIN32
|
||||
// Win32 LevelDB doesn't use file descriptors, and the ones used for
|
||||
// accessing block files don't count towards the fd_set size limit
|
||||
@@ -199,6 +206,15 @@ void Shutdown()
|
||||
if (pwalletMain)
|
||||
pwalletMain->Flush(true);
|
||||
#endif
|
||||
|
||||
#if ENABLE_ZMQ
|
||||
if (pzmqNotificationInterface) {
|
||||
UnregisterValidationInterface(pzmqNotificationInterface);
|
||||
delete pzmqNotificationInterface;
|
||||
pzmqNotificationInterface = NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifndef WIN32
|
||||
try {
|
||||
boost::filesystem::remove(GetPidFile());
|
||||
@@ -367,6 +383,14 @@ std::string HelpMessage(HelpMessageMode mode)
|
||||
|
||||
#endif
|
||||
|
||||
#if ENABLE_ZMQ
|
||||
strUsage += HelpMessageGroup(_("ZeroMQ notification options:"));
|
||||
strUsage += HelpMessageOpt("-zmqpubhashblock=<address>", _("Enable publish hash block in <address>"));
|
||||
strUsage += HelpMessageOpt("-zmqpubhashtx=<address>", _("Enable publish hash transaction in <address>"));
|
||||
strUsage += HelpMessageOpt("-zmqpubrawblock=<address>", _("Enable publish raw block in <address>"));
|
||||
strUsage += HelpMessageOpt("-zmqpubrawtx=<address>", _("Enable publish raw transaction in <address>"));
|
||||
#endif
|
||||
|
||||
strUsage += HelpMessageGroup(_("Debugging/Testing options:"));
|
||||
if (showDebug)
|
||||
{
|
||||
@@ -380,7 +404,7 @@ std::string HelpMessage(HelpMessageMode mode)
|
||||
strUsage += HelpMessageOpt("-stopafterblockimport", strprintf("Stop running after importing blocks from disk (default: %u)", 0));
|
||||
}
|
||||
string debugCategories = "addrman, alert, bench, coindb, db, estimatefee, lock, mempool, net, partitioncheck, pow, proxy, prune, "
|
||||
"rand, reindex, rpc, selectcoins, zrpc, zrpcunsafe (implies zrpc)"; // Don't translate these and qt below
|
||||
"rand, reindex, rpc, selectcoins, zmq, zrpc, zrpcunsafe (implies zrpc)"; // Don't translate these and qt below
|
||||
if (mode == HMM_BITCOIN_QT)
|
||||
debugCategories += ", qt";
|
||||
strUsage += HelpMessageOpt("-debug=<category>", strprintf(_("Output debugging information (default: %u, supplying <category> is optional)"), 0) + ". " +
|
||||
@@ -1143,6 +1167,14 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
|
||||
BOOST_FOREACH(string strDest, mapMultiArgs["-seednode"])
|
||||
AddOneShot(strDest);
|
||||
|
||||
#if ENABLE_ZMQ
|
||||
pzmqNotificationInterface = CZMQNotificationInterface::CreateWithArguments(mapArgs);
|
||||
|
||||
if (pzmqNotificationInterface) {
|
||||
RegisterValidationInterface(pzmqNotificationInterface);
|
||||
}
|
||||
#endif
|
||||
|
||||
// ********************************************************* Step 7: load block chain
|
||||
|
||||
fReindex = GetBoolArg("-reindex", false);
|
||||
|
||||
@@ -2707,6 +2707,7 @@ bool ActivateBestChain(CValidationState &state, CBlock *pblock) {
|
||||
pnode->PushInventory(CInv(MSG_BLOCK, hashNewTip));
|
||||
}
|
||||
// Notify external listeners about the new tip.
|
||||
GetMainSignals().UpdatedBlockTip(pindexNewTip);
|
||||
uiInterface.NotifyBlockTip(hashNewTip);
|
||||
}
|
||||
} while(pindexMostWork != chainActive.Tip());
|
||||
|
||||
@@ -13,6 +13,7 @@ CMainSignals& GetMainSignals()
|
||||
}
|
||||
|
||||
void RegisterValidationInterface(CValidationInterface* pwalletIn) {
|
||||
g_signals.UpdatedBlockTip.connect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1));
|
||||
g_signals.SyncTransaction.connect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2));
|
||||
g_signals.EraseTransaction.connect(boost::bind(&CValidationInterface::EraseFromWallet, pwalletIn, _1));
|
||||
g_signals.UpdatedTransaction.connect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1));
|
||||
@@ -32,6 +33,7 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
|
||||
g_signals.UpdatedTransaction.disconnect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1));
|
||||
g_signals.EraseTransaction.disconnect(boost::bind(&CValidationInterface::EraseFromWallet, pwalletIn, _1));
|
||||
g_signals.SyncTransaction.disconnect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2));
|
||||
g_signals.UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1));
|
||||
}
|
||||
|
||||
void UnregisterAllValidationInterfaces() {
|
||||
@@ -43,6 +45,7 @@ void UnregisterAllValidationInterfaces() {
|
||||
g_signals.UpdatedTransaction.disconnect_all_slots();
|
||||
g_signals.EraseTransaction.disconnect_all_slots();
|
||||
g_signals.SyncTransaction.disconnect_all_slots();
|
||||
g_signals.UpdatedBlockTip.disconnect_all_slots();
|
||||
}
|
||||
|
||||
void SyncWithWallets(const CTransaction &tx, const CBlock *pblock) {
|
||||
|
||||
@@ -31,6 +31,7 @@ void SyncWithWallets(const CTransaction& tx, const CBlock* pblock = NULL);
|
||||
|
||||
class CValidationInterface {
|
||||
protected:
|
||||
virtual void UpdatedBlockTip(const CBlockIndex *pindex) {}
|
||||
virtual void SyncTransaction(const CTransaction &tx, const CBlock *pblock) {}
|
||||
virtual void EraseFromWallet(const uint256 &hash) {}
|
||||
virtual void ChainTip(const CBlockIndex *pindex, const CBlock *pblock, ZCIncrementalMerkleTree tree, bool added) {}
|
||||
@@ -45,6 +46,8 @@ protected:
|
||||
};
|
||||
|
||||
struct CMainSignals {
|
||||
/** Notifies listeners of updated block chain tip */
|
||||
boost::signals2::signal<void (const CBlockIndex *)> UpdatedBlockTip;
|
||||
/** Notifies listeners of updated transaction data (transaction, and optionally the block it is found in. */
|
||||
boost::signals2::signal<void (const CTransaction &, const CBlock *)> SyncTransaction;
|
||||
/** Notifies listeners of an erased transaction (currently disabled, requires transaction replacement). */
|
||||
|
||||
22
src/zmq/zmqabstractnotifier.cpp
Normal file
22
src/zmq/zmqabstractnotifier.cpp
Normal file
@@ -0,0 +1,22 @@
|
||||
// Copyright (c) 2015 The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#include "zmqabstractnotifier.h"
|
||||
#include "util.h"
|
||||
|
||||
|
||||
CZMQAbstractNotifier::~CZMQAbstractNotifier()
|
||||
{
|
||||
assert(!psocket);
|
||||
}
|
||||
|
||||
bool CZMQAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
44
src/zmq/zmqabstractnotifier.h
Normal file
44
src/zmq/zmqabstractnotifier.h
Normal file
@@ -0,0 +1,44 @@
|
||||
// Copyright (c) 2015 The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
|
||||
#define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
|
||||
|
||||
#include "zmqconfig.h"
|
||||
|
||||
class CBlockIndex;
|
||||
class CZMQAbstractNotifier;
|
||||
|
||||
typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)();
|
||||
|
||||
class CZMQAbstractNotifier
|
||||
{
|
||||
public:
|
||||
CZMQAbstractNotifier() : psocket(0) { }
|
||||
virtual ~CZMQAbstractNotifier();
|
||||
|
||||
template <typename T>
|
||||
static CZMQAbstractNotifier* Create()
|
||||
{
|
||||
return new T();
|
||||
}
|
||||
|
||||
std::string GetType() const { return type; }
|
||||
void SetType(const std::string &t) { type = t; }
|
||||
std::string GetAddress() const { return address; }
|
||||
void SetAddress(const std::string &a) { address = a; }
|
||||
|
||||
virtual bool Initialize(void *pcontext) = 0;
|
||||
virtual void Shutdown() = 0;
|
||||
|
||||
virtual bool NotifyBlock(const CBlockIndex *pindex);
|
||||
virtual bool NotifyTransaction(const CTransaction &transaction);
|
||||
|
||||
protected:
|
||||
void *psocket;
|
||||
std::string type;
|
||||
std::string address;
|
||||
};
|
||||
|
||||
#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
|
||||
24
src/zmq/zmqconfig.h
Normal file
24
src/zmq/zmqconfig.h
Normal file
@@ -0,0 +1,24 @@
|
||||
// Copyright (c) 2015 The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#ifndef BITCOIN_ZMQ_ZMQCONFIG_H
|
||||
#define BITCOIN_ZMQ_ZMQCONFIG_H
|
||||
|
||||
#if defined(HAVE_CONFIG_H)
|
||||
#include "config/bitcoin-config.h"
|
||||
#endif
|
||||
|
||||
#include <stdarg.h>
|
||||
#include <string>
|
||||
|
||||
#if ENABLE_ZMQ
|
||||
#include <zmq.h>
|
||||
#endif
|
||||
|
||||
#include "primitives/block.h"
|
||||
#include "primitives/transaction.h"
|
||||
|
||||
void zmqError(const char *str);
|
||||
|
||||
#endif // BITCOIN_ZMQ_ZMQCONFIG_H
|
||||
159
src/zmq/zmqnotificationinterface.cpp
Normal file
159
src/zmq/zmqnotificationinterface.cpp
Normal file
@@ -0,0 +1,159 @@
|
||||
// Copyright (c) 2015 The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#include "zmqnotificationinterface.h"
|
||||
#include "zmqpublishnotifier.h"
|
||||
|
||||
#include "version.h"
|
||||
#include "main.h"
|
||||
#include "streams.h"
|
||||
#include "util.h"
|
||||
|
||||
void zmqError(const char *str)
|
||||
{
|
||||
LogPrint("zmq", "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno));
|
||||
}
|
||||
|
||||
CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL)
|
||||
{
|
||||
}
|
||||
|
||||
CZMQNotificationInterface::~CZMQNotificationInterface()
|
||||
{
|
||||
Shutdown();
|
||||
|
||||
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
|
||||
{
|
||||
delete *i;
|
||||
}
|
||||
}
|
||||
|
||||
CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args)
|
||||
{
|
||||
CZMQNotificationInterface* notificationInterface = NULL;
|
||||
std::map<std::string, CZMQNotifierFactory> factories;
|
||||
std::list<CZMQAbstractNotifier*> notifiers;
|
||||
|
||||
factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
|
||||
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
|
||||
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
|
||||
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
|
||||
|
||||
for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
|
||||
{
|
||||
std::map<std::string, std::string>::const_iterator j = args.find("-zmq" + i->first);
|
||||
if (j!=args.end())
|
||||
{
|
||||
CZMQNotifierFactory factory = i->second;
|
||||
std::string address = j->second;
|
||||
CZMQAbstractNotifier *notifier = factory();
|
||||
notifier->SetType(i->first);
|
||||
notifier->SetAddress(address);
|
||||
notifiers.push_back(notifier);
|
||||
}
|
||||
}
|
||||
|
||||
if (!notifiers.empty())
|
||||
{
|
||||
notificationInterface = new CZMQNotificationInterface();
|
||||
notificationInterface->notifiers = notifiers;
|
||||
|
||||
if (!notificationInterface->Initialize())
|
||||
{
|
||||
delete notificationInterface;
|
||||
notificationInterface = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return notificationInterface;
|
||||
}
|
||||
|
||||
// Called at startup to conditionally set up ZMQ socket(s)
|
||||
bool CZMQNotificationInterface::Initialize()
|
||||
{
|
||||
LogPrint("zmq", "zmq: Initialize notification interface\n");
|
||||
assert(!pcontext);
|
||||
|
||||
pcontext = zmq_init(1);
|
||||
|
||||
if (!pcontext)
|
||||
{
|
||||
zmqError("Unable to initialize context");
|
||||
return false;
|
||||
}
|
||||
|
||||
std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
|
||||
for (; i!=notifiers.end(); ++i)
|
||||
{
|
||||
CZMQAbstractNotifier *notifier = *i;
|
||||
if (notifier->Initialize(pcontext))
|
||||
{
|
||||
LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (i!=notifiers.end())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// Called during shutdown sequence
|
||||
void CZMQNotificationInterface::Shutdown()
|
||||
{
|
||||
LogPrint("zmq", "zmq: Shutdown notification interface\n");
|
||||
if (pcontext)
|
||||
{
|
||||
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
|
||||
{
|
||||
CZMQAbstractNotifier *notifier = *i;
|
||||
LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
|
||||
notifier->Shutdown();
|
||||
}
|
||||
zmq_ctx_destroy(pcontext);
|
||||
|
||||
pcontext = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindex)
|
||||
{
|
||||
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
|
||||
{
|
||||
CZMQAbstractNotifier *notifier = *i;
|
||||
if (notifier->NotifyBlock(pindex))
|
||||
{
|
||||
i++;
|
||||
}
|
||||
else
|
||||
{
|
||||
notifier->Shutdown();
|
||||
i = notifiers.erase(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock)
|
||||
{
|
||||
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
|
||||
{
|
||||
CZMQAbstractNotifier *notifier = *i;
|
||||
if (notifier->NotifyTransaction(tx))
|
||||
{
|
||||
i++;
|
||||
}
|
||||
else
|
||||
{
|
||||
notifier->Shutdown();
|
||||
i = notifiers.erase(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
37
src/zmq/zmqnotificationinterface.h
Normal file
37
src/zmq/zmqnotificationinterface.h
Normal file
@@ -0,0 +1,37 @@
|
||||
// Copyright (c) 2015 The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#ifndef BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
|
||||
#define BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
|
||||
|
||||
#include "validationinterface.h"
|
||||
#include <string>
|
||||
#include <map>
|
||||
|
||||
class CBlockIndex;
|
||||
class CZMQAbstractNotifier;
|
||||
|
||||
class CZMQNotificationInterface : public CValidationInterface
|
||||
{
|
||||
public:
|
||||
virtual ~CZMQNotificationInterface();
|
||||
|
||||
static CZMQNotificationInterface* CreateWithArguments(const std::map<std::string, std::string> &args);
|
||||
|
||||
protected:
|
||||
bool Initialize();
|
||||
void Shutdown();
|
||||
|
||||
// CValidationInterface
|
||||
void SyncTransaction(const CTransaction &tx, const CBlock *pblock);
|
||||
void UpdatedBlockTip(const CBlockIndex *pindex);
|
||||
|
||||
private:
|
||||
CZMQNotificationInterface();
|
||||
|
||||
void *pcontext;
|
||||
std::list<CZMQAbstractNotifier*> notifiers;
|
||||
};
|
||||
|
||||
#endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
|
||||
189
src/zmq/zmqpublishnotifier.cpp
Normal file
189
src/zmq/zmqpublishnotifier.cpp
Normal file
@@ -0,0 +1,189 @@
|
||||
// Copyright (c) 2015 The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#include "zmqpublishnotifier.h"
|
||||
#include "main.h"
|
||||
#include "util.h"
|
||||
|
||||
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
|
||||
|
||||
static const char *MSG_HASHBLOCK = "hashblock";
|
||||
static const char *MSG_HASHTX = "hashtx";
|
||||
static const char *MSG_RAWBLOCK = "rawblock";
|
||||
static const char *MSG_RAWTX = "rawtx";
|
||||
|
||||
// Internal function to send multipart message
|
||||
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
|
||||
{
|
||||
va_list args;
|
||||
va_start(args, size);
|
||||
|
||||
while (1)
|
||||
{
|
||||
zmq_msg_t msg;
|
||||
|
||||
int rc = zmq_msg_init_size(&msg, size);
|
||||
if (rc != 0)
|
||||
{
|
||||
zmqError("Unable to initialize ZMQ msg");
|
||||
return -1;
|
||||
}
|
||||
|
||||
void *buf = zmq_msg_data(&msg);
|
||||
memcpy(buf, data, size);
|
||||
|
||||
data = va_arg(args, const void*);
|
||||
|
||||
rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
|
||||
if (rc == -1)
|
||||
{
|
||||
zmqError("Unable to send ZMQ msg");
|
||||
zmq_msg_close(&msg);
|
||||
return -1;
|
||||
}
|
||||
|
||||
zmq_msg_close(&msg);
|
||||
|
||||
if (!data)
|
||||
break;
|
||||
|
||||
size = va_arg(args, size_t);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
|
||||
{
|
||||
assert(!psocket);
|
||||
|
||||
// check if address is being used by other publish notifier
|
||||
std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
|
||||
|
||||
if (i==mapPublishNotifiers.end())
|
||||
{
|
||||
psocket = zmq_socket(pcontext, ZMQ_PUB);
|
||||
if (!psocket)
|
||||
{
|
||||
zmqError("Failed to create socket");
|
||||
return false;
|
||||
}
|
||||
|
||||
int rc = zmq_bind(psocket, address.c_str());
|
||||
if (rc!=0)
|
||||
{
|
||||
zmqError("Failed to bind address");
|
||||
zmq_close(psocket);
|
||||
return false;
|
||||
}
|
||||
|
||||
// register this notifier for the address, so it can be reused for other publish notifier
|
||||
mapPublishNotifiers.insert(std::make_pair(address, this));
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
LogPrint("zmq", "zmq: Reusing socket for address %s\n", address);
|
||||
|
||||
psocket = i->second->psocket;
|
||||
mapPublishNotifiers.insert(std::make_pair(address, this));
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
void CZMQAbstractPublishNotifier::Shutdown()
|
||||
{
|
||||
assert(psocket);
|
||||
|
||||
int count = mapPublishNotifiers.count(address);
|
||||
|
||||
// remove this notifier from the list of publishers using this address
|
||||
typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
|
||||
std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
|
||||
|
||||
for (iterator it = iterpair.first; it != iterpair.second; ++it)
|
||||
{
|
||||
if (it->second==this)
|
||||
{
|
||||
mapPublishNotifiers.erase(it);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (count == 1)
|
||||
{
|
||||
LogPrint("zmq", "Close socket at address %s\n", address);
|
||||
int linger = 0;
|
||||
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
|
||||
zmq_close(psocket);
|
||||
}
|
||||
|
||||
psocket = 0;
|
||||
}
|
||||
|
||||
bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
|
||||
{
|
||||
assert(psocket);
|
||||
|
||||
/* send three parts, command & data & a LE 4byte sequence number */
|
||||
unsigned char msgseq[sizeof(uint32_t)];
|
||||
WriteLE32(&msgseq[0], nSequence);
|
||||
int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0);
|
||||
if (rc == -1)
|
||||
return false;
|
||||
|
||||
/* increment memory only sequence number after sending */
|
||||
nSequence++;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
|
||||
{
|
||||
uint256 hash = pindex->GetBlockHash();
|
||||
LogPrint("zmq", "zmq: Publish hashblock %s\n", hash.GetHex());
|
||||
char data[32];
|
||||
for (unsigned int i = 0; i < 32; i++)
|
||||
data[31 - i] = hash.begin()[i];
|
||||
return SendMessage(MSG_HASHBLOCK, data, 32);
|
||||
}
|
||||
|
||||
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
|
||||
{
|
||||
uint256 hash = transaction.GetHash();
|
||||
LogPrint("zmq", "zmq: Publish hashtx %s\n", hash.GetHex());
|
||||
char data[32];
|
||||
for (unsigned int i = 0; i < 32; i++)
|
||||
data[31 - i] = hash.begin()[i];
|
||||
return SendMessage(MSG_HASHTX, data, 32);
|
||||
}
|
||||
|
||||
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
|
||||
{
|
||||
LogPrint("zmq", "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
|
||||
|
||||
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
|
||||
{
|
||||
LOCK(cs_main);
|
||||
CBlock block;
|
||||
if(!ReadBlockFromDisk(block, pindex))
|
||||
{
|
||||
zmqError("Can't read block from disk");
|
||||
return false;
|
||||
}
|
||||
|
||||
ss << block;
|
||||
}
|
||||
|
||||
return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
|
||||
}
|
||||
|
||||
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
|
||||
{
|
||||
uint256 hash = transaction.GetHash();
|
||||
LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex());
|
||||
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
|
||||
ss << transaction;
|
||||
return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
|
||||
}
|
||||
55
src/zmq/zmqpublishnotifier.h
Normal file
55
src/zmq/zmqpublishnotifier.h
Normal file
@@ -0,0 +1,55 @@
|
||||
// Copyright (c) 2015 The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#ifndef BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
|
||||
#define BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
|
||||
|
||||
#include "zmqabstractnotifier.h"
|
||||
|
||||
class CBlockIndex;
|
||||
|
||||
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
|
||||
{
|
||||
private:
|
||||
uint32_t nSequence; //! upcounting per message sequence number
|
||||
|
||||
public:
|
||||
|
||||
/* send zmq multipart message
|
||||
parts:
|
||||
* command
|
||||
* data
|
||||
* message sequence number
|
||||
*/
|
||||
bool SendMessage(const char *command, const void* data, size_t size);
|
||||
|
||||
bool Initialize(void *pcontext);
|
||||
void Shutdown();
|
||||
};
|
||||
|
||||
class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier
|
||||
{
|
||||
public:
|
||||
bool NotifyBlock(const CBlockIndex *pindex);
|
||||
};
|
||||
|
||||
class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier
|
||||
{
|
||||
public:
|
||||
bool NotifyTransaction(const CTransaction &transaction);
|
||||
};
|
||||
|
||||
class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier
|
||||
{
|
||||
public:
|
||||
bool NotifyBlock(const CBlockIndex *pindex);
|
||||
};
|
||||
|
||||
class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier
|
||||
{
|
||||
public:
|
||||
bool NotifyTransaction(const CTransaction &transaction);
|
||||
};
|
||||
|
||||
#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
|
||||
Reference in New Issue
Block a user