Merge branch 'beta' into mergemaster

# Conflicts:
#	src/main.cpp
This commit is contained in:
jl777
2018-04-16 10:16:01 +03:00
parent 9226f69ef1
commit e73b2055c6
910 changed files with 112009 additions and 11364 deletions

View File

@@ -0,0 +1,21 @@
// Copyright (c) 2017 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "amqpabstractnotifier.h"
#include "util.h"
AMQPAbstractNotifier::~AMQPAbstractNotifier()
{
}
bool AMQPAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/)
{
return true;
}
bool AMQPAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/)
{
return true;
}

View File

@@ -0,0 +1,43 @@
// Copyright (c) 2017 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef ZCASH_AMQP_AMQPABSTRACTNOTIFIER_H
#define ZCASH_AMQP_AMQPABSTRACTNOTIFIER_H
#include "amqpconfig.h"
class CBlockIndex;
class AMQPAbstractNotifier;
typedef AMQPAbstractNotifier* (*AMQPNotifierFactory)();
class AMQPAbstractNotifier
{
public:
AMQPAbstractNotifier() { }
virtual ~AMQPAbstractNotifier();
template <typename T>
static AMQPAbstractNotifier* Create()
{
return new T();
}
std::string GetType() const { return type; }
void SetType(const std::string &t) { type = t; }
std::string GetAddress() const { return address; }
void SetAddress(const std::string &a) { address = a; }
virtual bool Initialize() = 0;
virtual void Shutdown() = 0;
virtual bool NotifyBlock(const CBlockIndex *pindex);
virtual bool NotifyTransaction(const CTransaction &transaction);
protected:
std::string type;
std::string address;
};
#endif // ZCASH_AMQP_AMQPABSTRACTNOTIFIER_H

33
src/amqp/amqpconfig.h Normal file
View File

@@ -0,0 +1,33 @@
// Copyright (c) 2017 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef ZCASH_AMQP_AMQPCONFIG_H
#define ZCASH_AMQP_AMQPCONFIG_H
#if defined(HAVE_CONFIG_H)
#include "config/bitcoin-config.h"
#endif
#include <stdarg.h>
#include <string>
#if ENABLE_PROTON
#include <proton/connection.hpp>
#include <proton/connection_options.hpp>
#include <proton/container.hpp>
#include <proton/default_container.hpp>
#include <proton/message.hpp>
#include <proton/message_id.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/transport.hpp>
#include <proton/types.hpp>
#include <proton/url.hpp>
#endif
#include "primitives/block.h"
#include "primitives/transaction.h"
#endif // ZCASH_AMQP_AMQPCONFIG_H

View File

@@ -0,0 +1,136 @@
// Copyright (c) 2017 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "amqpnotificationinterface.h"
#include "amqppublishnotifier.h"
#include "version.h"
#include "main.h"
#include "streams.h"
#include "util.h"
// AMQP 1.0 Support
//
// The boost::signals2 signals and slot system is thread safe, so CValidationInterface listeners
// can be invoked from any thread.
//
// Currently signals are fired from main.cpp so the callbacks should be invoked on the same thread.
// It should be safe to share objects responsible for sending, as they should not be run concurrently
// across different threads.
//
// Developers should be mindful of where notifications are fired to avoid potential race conditions.
// For example, different signals targeting the same address could be fired from different threads
// in different parts of the system around the same time.
//
// Like the ZMQ notification interface, if a notifier fails to send a message, the notifier is shut down.
//
AMQPNotificationInterface::AMQPNotificationInterface()
{
}
AMQPNotificationInterface::~AMQPNotificationInterface()
{
Shutdown();
for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ++i) {
delete *i;
}
}
AMQPNotificationInterface* AMQPNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args)
{
AMQPNotificationInterface* notificationInterface = nullptr;
std::map<std::string, AMQPNotifierFactory> factories;
std::list<AMQPAbstractNotifier*> notifiers;
factories["pubhashblock"] = AMQPAbstractNotifier::Create<AMQPPublishHashBlockNotifier>;
factories["pubhashtx"] = AMQPAbstractNotifier::Create<AMQPPublishHashTransactionNotifier>;
factories["pubrawblock"] = AMQPAbstractNotifier::Create<AMQPPublishRawBlockNotifier>;
factories["pubrawtx"] = AMQPAbstractNotifier::Create<AMQPPublishRawTransactionNotifier>;
for (std::map<std::string, AMQPNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i) {
std::map<std::string, std::string>::const_iterator j = args.find("-amqp" + i->first);
if (j!=args.end()) {
AMQPNotifierFactory factory = i->second;
std::string address = j->second;
AMQPAbstractNotifier *notifier = factory();
notifier->SetType(i->first);
notifier->SetAddress(address);
notifiers.push_back(notifier);
}
}
if (!notifiers.empty()) {
notificationInterface = new AMQPNotificationInterface();
notificationInterface->notifiers = notifiers;
if (!notificationInterface->Initialize()) {
delete notificationInterface;
notificationInterface = nullptr;
}
}
return notificationInterface;
}
// Called at startup to conditionally set up
bool AMQPNotificationInterface::Initialize()
{
LogPrint("amqp", "amqp: Initialize notification interface\n");
std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin();
for (; i != notifiers.end(); ++i) {
AMQPAbstractNotifier *notifier = *i;
if (notifier->Initialize()) {
LogPrint("amqp", "amqp: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
} else {
LogPrint("amqp", "amqp: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
break;
}
}
if (i != notifiers.end()) {
return false;
}
return true;
}
// Called during shutdown sequence
void AMQPNotificationInterface::Shutdown()
{
LogPrint("amqp", "amqp: Shutdown notification interface\n");
for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ++i) {
AMQPAbstractNotifier *notifier = *i;
notifier->Shutdown();
}
}
void AMQPNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindex)
{
for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ) {
AMQPAbstractNotifier *notifier = *i;
if (notifier->NotifyBlock(pindex)) {
i++;
} else {
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
void AMQPNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock)
{
for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ) {
AMQPAbstractNotifier *notifier = *i;
if (notifier->NotifyTransaction(tx)) {
i++;
} else {
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}

View File

@@ -0,0 +1,36 @@
// Copyright (c) 2017 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef ZCASH_AMQP_AMQPNOTIFICATIONINTERFACE_H
#define ZCASH_AMQP_AMQPNOTIFICATIONINTERFACE_H
#include "validationinterface.h"
#include <string>
#include <map>
class CBlockIndex;
class AMQPAbstractNotifier;
class AMQPNotificationInterface : public CValidationInterface
{
public:
virtual ~AMQPNotificationInterface();
static AMQPNotificationInterface* CreateWithArguments(const std::map<std::string, std::string> &args);
protected:
bool Initialize();
void Shutdown();
// CValidationInterface
void SyncTransaction(const CTransaction &tx, const CBlock *pblock);
void UpdatedBlockTip(const CBlockIndex *pindex);
private:
AMQPNotificationInterface();
std::list<AMQPAbstractNotifier*> notifiers;
};
#endif // ZCASH_AMQP_AMQPNOTIFICATIONINTERFACE_H

View File

@@ -0,0 +1,177 @@
// Copyright (c) 2017 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "amqppublishnotifier.h"
#include "main.h"
#include "util.h"
#include "amqpsender.h"
#include <memory>
#include <thread>
static std::multimap<std::string, AMQPAbstractPublishNotifier*> mapPublishNotifiers;
static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
// Invoke this method from a new thread to run the proton container event loop.
void AMQPAbstractPublishNotifier::SpawnProtonContainer()
{
try {
proton::default_container(*handler_).run();
}
catch (const proton::error_condition &e) {
LogPrint("amqp", "amqp: container error: %s\n", e.what());
}
catch (const std::runtime_error &e) {
LogPrint("amqp", "amqp: runtime error: %s\n", e.what());
}
catch (const std::exception &e) {
LogPrint("amqp", "amqp: exception: %s\n", e.what());
}
catch (...) {
LogPrint("amqp", "amqp: unknown error\n");
}
handler_->terminate();
}
bool AMQPAbstractPublishNotifier::Initialize()
{
std::multimap<std::string, AMQPAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
if (i == mapPublishNotifiers.end()) {
try {
handler_ = std::make_shared<AMQPSender>(address);
thread_ = std::make_shared<std::thread>(&AMQPAbstractPublishNotifier::SpawnProtonContainer, this);
}
catch (std::exception &e) {
LogPrint("amqp", "amqp: initialization error: %s\n", e.what());
return false;
}
mapPublishNotifiers.insert(std::make_pair(address, this));
} else {
// copy the shared ptrs to the message handler and the thread where the proton container is running
handler_ = i->second->handler_;
thread_ = i->second->thread_;
mapPublishNotifiers.insert(std::make_pair(address, this));
}
return true;
}
void AMQPAbstractPublishNotifier::Shutdown()
{
LogPrint("amqp", "amqp: Shutdown notifier %s at %s\n", GetType(), GetAddress());
int count = mapPublishNotifiers.count(address);
// remove this notifier from the list of publishers using this address
typedef std::multimap<std::string, AMQPAbstractPublishNotifier*>::iterator iterator;
std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
for (iterator it = iterpair.first; it != iterpair.second; ++it) {
if (it->second == this) {
mapPublishNotifiers.erase(it);
break;
}
}
// terminate the connection if this is the last publisher using this address
if (count == 1) {
handler_->terminate();
if (thread_.get() != nullptr) {
if (thread_->joinable()) {
thread_->join();
}
}
}
}
bool AMQPAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
{
try {
proton::binary content;
const char *p = (const char *)data;
content.assign(p, p + size);
proton::message message(content);
message.subject(std::string(command));
proton::message::property_map & props = message.properties();
props.put("x-opt-sequence-number", sequence_);
handler_->publish(message);
} catch (proton::error_condition &e) {
LogPrint("amqp", "amqp: error : %s\n", e.what());
return false;
}
catch (const std::runtime_error &e) {
LogPrint("amqp", "amqp: runtime error: %s\n", e.what());
return false;
}
catch (const std::exception &e) {
LogPrint("amqp", "amqp: exception: %s\n", e.what());
return false;
}
catch (...) {
LogPrint("amqp", "amqp: unknown error\n");
return false;
}
sequence_++;
return true;
}
bool AMQPPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint("amqp", "amqp: Publish hashblock %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
return SendMessage(MSG_HASHBLOCK, data, 32);
}
bool AMQPPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint("amqp", "amqp: Publish hashtx %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
return SendMessage(MSG_HASHTX, data, 32);
}
bool AMQPPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
LogPrint("amqp", "amqp: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
{
LOCK(cs_main);
CBlock block;
if(!ReadBlockFromDisk(block, pindex)) {
LogPrint("amqp", "amqp: Can't read block from disk");
return false;
}
ss << block;
}
return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
}
bool AMQPPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint("amqp", "amqp: Publish rawtx %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction;
return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
}

View File

@@ -0,0 +1,56 @@
// Copyright (c) 2017 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef ZCASH_AMQP_AMQPPUBLISHNOTIFIER_H
#define ZCASH_AMQP_AMQPPUBLISHNOTIFIER_H
#include "amqpabstractnotifier.h"
#include "amqpconfig.h"
#include "amqpsender.h"
#include <memory>
#include <thread>
class CBlockIndex;
class AMQPAbstractPublishNotifier : public AMQPAbstractNotifier
{
private:
uint64_t sequence_; // memory only, per notifier instance: upcounting message sequence number
std::shared_ptr<std::thread> thread_; // proton container thread, may be shared between notifiers
std::shared_ptr<AMQPSender> handler_; // proton container message handler, may be shared between notifiers
public:
bool SendMessage(const char *command, const void* data, size_t size);
bool Initialize();
void Shutdown();
void SpawnProtonContainer();
};
class AMQPPublishHashBlockNotifier : public AMQPAbstractPublishNotifier
{
public:
bool NotifyBlock(const CBlockIndex *pindex);
};
class AMQPPublishHashTransactionNotifier : public AMQPAbstractPublishNotifier
{
public:
bool NotifyTransaction(const CTransaction &transaction);
};
class AMQPPublishRawBlockNotifier : public AMQPAbstractPublishNotifier
{
public:
bool NotifyBlock(const CBlockIndex *pindex);
};
class AMQPPublishRawTransactionNotifier : public AMQPAbstractPublishNotifier
{
public:
bool NotifyTransaction(const CTransaction &transaction);
};
#endif // ZCASH_AMQP_AMQPPUBLISHNOTIFIER_H

115
src/amqp/amqpsender.h Normal file
View File

@@ -0,0 +1,115 @@
// Copyright (c) 2017 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef ZCASH_AMQP_AMQPSENDER_H
#define ZCASH_AMQP_AMQPSENDER_H
#include "amqpconfig.h"
#include <deque>
#include <memory>
#include <future>
#include <iostream>
class AMQPSender : public proton::messaging_handler {
private:
std::deque<proton::message> messages_;
proton::url url_;
proton::connection conn_;
proton::sender sender_;
std::mutex lock_;
std::atomic<bool> terminated_ = {false};
public:
AMQPSender(const std::string& url) : url_(url) {}
// Callback to initialize the container when run() is invoked
void on_container_start(proton::container& c) override {
proton::duration t(10000); // milliseconds
proton::connection_options opts = proton::connection_options().idle_timeout(t);
conn_ = c.connect(url_, opts);
sender_ = conn_.open_sender(url_.path());
}
// Remote end signals when the local end can send (i.e. has credit)
void on_sendable(proton::sender &s) override {
dispatch();
}
// Publish message by adding to queue and trying to dispatch it
void publish(const proton::message &m) {
add_message(m);
dispatch();
}
// Add message to queue
void add_message(const proton::message &m) {
std::lock_guard<std::mutex> guard(lock_);
messages_.push_back(m);
}
// Send messages in queue
void dispatch() {
std::lock_guard<std::mutex> guard(lock_);
if (isTerminated()) {
throw std::runtime_error("amqp connection was terminated");
}
if (!conn_.active()) {
throw std::runtime_error("amqp connection is not active");
}
while (messages_.size() > 0) {
if (sender_.credit()) {
const proton::message& m = messages_.front();
sender_.send(m);
messages_.pop_front();
} else {
break;
}
}
}
// Close connection to remote end. Container event-loop, by default, will auto-stop.
void terminate() {
std::lock_guard<std::mutex> guard(lock_);
conn_.close();
terminated_.store(true);
}
bool isTerminated() const {
return terminated_.load();
}
void on_transport_error(proton::transport &t) override {
t.connection().close();
throw t.error();
}
void on_connection_error(proton::connection &c) override {
c.close();
throw c.error();
}
void on_session_error(proton::session &s) override {
s.connection().close();
throw s.error();
}
void on_receiver_error(proton::receiver &r) override {
r.connection().close();
throw r.error();
}
void on_sender_error(proton::sender &s) override {
s.connection().close();
throw s.error();
}
};
#endif //ZCASH_AMQP_AMQPSENDER_H