Refactoring and small improvements to async rpc operations.
Added AsyncRPCQueue::closeAndWait() so rpcserver can block on worker threads when shutting down. AsyncRPCOperation is no longer copyable - copy constructor and assignment operators now private. Refactoring: renamed methods, renamed member variables Tidy up: comments, const, size_t, braces
This commit is contained in:
@@ -4,47 +4,41 @@
|
||||
|
||||
#include "asyncrpcqueue.h"
|
||||
|
||||
static std::atomic<int> workerCounter(0);
|
||||
static std::atomic<size_t> workerCounter(0);
|
||||
|
||||
AsyncRPCQueue::AsyncRPCQueue() : closed(false) {
|
||||
AsyncRPCQueue::AsyncRPCQueue() : closed_(false) {
|
||||
}
|
||||
|
||||
/*
|
||||
* Calling thread will join on all the worker threads
|
||||
*/
|
||||
AsyncRPCQueue::~AsyncRPCQueue() {
|
||||
this->closed = true; // set this in case close() was not invoked
|
||||
for (std::thread & t : this->workers) {
|
||||
t.join();
|
||||
}
|
||||
closeAndWait(); // join on all worker threads
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* A worker will execute this method on a new thread
|
||||
*/
|
||||
void AsyncRPCQueue::run(int workerId) {
|
||||
// std::cout << "Launched queue worker " << workerId << std::endl;
|
||||
void AsyncRPCQueue::run(size_t workerId) {
|
||||
|
||||
while (!isClosed()) {
|
||||
AsyncRPCOperationId key;
|
||||
std::shared_ptr<AsyncRPCOperation> operation;
|
||||
{
|
||||
std::unique_lock< std::mutex > guard(cs_lock);
|
||||
while (operationIdQueue.empty() && !isClosed()) {
|
||||
this->cs_condition.wait(guard);
|
||||
std::unique_lock< std::mutex > guard(lock_);
|
||||
while (operation_id_queue_.empty() && !isClosed()) {
|
||||
this->condition_.wait(guard);
|
||||
}
|
||||
|
||||
// Exit if the queue is closing.
|
||||
if (isClosed())
|
||||
if (isClosed()) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Get operation id
|
||||
key = operationIdQueue.front();
|
||||
operationIdQueue.pop();
|
||||
key = operation_id_queue_.front();
|
||||
operation_id_queue_.pop();
|
||||
|
||||
// Search operation map
|
||||
AsyncRPCOperationMap::const_iterator iter = operationMap.find(key);
|
||||
if (iter != operationMap.end()) {
|
||||
AsyncRPCOperationMap::const_iterator iter = operation_map_.find(key);
|
||||
if (iter != operation_map_.end()) {
|
||||
operation = iter->second;
|
||||
}
|
||||
}
|
||||
@@ -57,15 +51,14 @@ void AsyncRPCQueue::run(int workerId) {
|
||||
operation->main();
|
||||
}
|
||||
}
|
||||
// std::cout << "Terminating queue worker " << workerId << std::endl;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
/**
|
||||
* Add shared_ptr to operation.
|
||||
*
|
||||
* To retain polymorphic behaviour, i.e. main() method of derived classes is invoked,
|
||||
* caller should create the shared_ptr like thi:
|
||||
* caller should create the shared_ptr like this:
|
||||
*
|
||||
* std::shared_ptr<AsyncRPCOperation> ptr(new MyCustomAsyncRPCOperation(params));
|
||||
*
|
||||
@@ -74,84 +67,116 @@ void AsyncRPCQueue::run(int workerId) {
|
||||
void AsyncRPCQueue::addOperation(const std::shared_ptr<AsyncRPCOperation> &ptrOperation) {
|
||||
|
||||
// Don't add if queue is closed
|
||||
if (isClosed())
|
||||
if (isClosed()) {
|
||||
return;
|
||||
}
|
||||
|
||||
AsyncRPCOperationId id = ptrOperation->getId();
|
||||
{
|
||||
std::lock_guard< std::mutex > guard(cs_lock);
|
||||
operationMap.emplace(id, ptrOperation);
|
||||
operationIdQueue.push(id);
|
||||
this->cs_condition.notify_one();
|
||||
}
|
||||
std::lock_guard< std::mutex > guard(lock_);
|
||||
operation_map_.emplace(id, ptrOperation);
|
||||
operation_id_queue_.push(id);
|
||||
this->condition_.notify_one();
|
||||
}
|
||||
|
||||
|
||||
std::shared_ptr<AsyncRPCOperation> AsyncRPCQueue::getOperationForId(AsyncRPCOperationId id) {
|
||||
/**
|
||||
* Return the operation for a given operation id.
|
||||
*/
|
||||
std::shared_ptr<AsyncRPCOperation> AsyncRPCQueue::getOperationForId(AsyncRPCOperationId id) const {
|
||||
std::shared_ptr<AsyncRPCOperation> ptr;
|
||||
|
||||
std::lock_guard< std::mutex > guard(cs_lock);
|
||||
AsyncRPCOperationMap::const_iterator iter = operationMap.find(id);
|
||||
if (iter != operationMap.end()) {
|
||||
std::lock_guard< std::mutex > guard(lock_);
|
||||
AsyncRPCOperationMap::const_iterator iter = operation_map_.find(id);
|
||||
if (iter != operation_map_.end()) {
|
||||
ptr = iter->second;
|
||||
}
|
||||
return ptr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the operation for a given operation id and then remove the operation from internal storage.
|
||||
*/
|
||||
std::shared_ptr<AsyncRPCOperation> AsyncRPCQueue::popOperationForId(AsyncRPCOperationId id) {
|
||||
std::shared_ptr<AsyncRPCOperation> ptr = getOperationForId(id);
|
||||
if (ptr) {
|
||||
std::lock_guard< std::mutex > guard(cs_lock);
|
||||
std::lock_guard< std::mutex > guard(lock_);
|
||||
// Note: if the id still exists in the operationIdQueue, when it gets processed by a worker
|
||||
// there will no operation in the map to execute, so nothing will happen.
|
||||
operationMap.erase(id);
|
||||
operation_map_.erase(id);
|
||||
}
|
||||
return ptr;
|
||||
}
|
||||
|
||||
bool AsyncRPCQueue::isClosed() {
|
||||
return closed;
|
||||
/**
|
||||
* Return true if the queue is closed to new operations.
|
||||
*/
|
||||
bool AsyncRPCQueue::isClosed() const {
|
||||
return closed_;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the queue and cancel all existing operations
|
||||
*/
|
||||
void AsyncRPCQueue::close() {
|
||||
this->closed = true;
|
||||
this->closed_ = true;
|
||||
cancelAllOperations();
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Call cancel() on all operations
|
||||
*/
|
||||
void AsyncRPCQueue::cancelAllOperations() {
|
||||
std::unique_lock< std::mutex > guard(cs_lock);
|
||||
for (auto key : operationMap) {
|
||||
std::unique_lock< std::mutex > guard(lock_);
|
||||
for (auto key : operation_map_) {
|
||||
key.second->cancel();
|
||||
}
|
||||
this->cs_condition.notify_all();
|
||||
this->condition_.notify_all();
|
||||
}
|
||||
|
||||
int AsyncRPCQueue::getOperationCount() {
|
||||
std::unique_lock< std::mutex > guard(cs_lock);
|
||||
return operationIdQueue.size();
|
||||
/**
|
||||
* Return the number of operations in the queue
|
||||
*/
|
||||
size_t AsyncRPCQueue::getOperationCount() const {
|
||||
std::unique_lock< std::mutex > guard(lock_);
|
||||
return operation_id_queue_.size();
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Spawn a worker thread
|
||||
*/
|
||||
void AsyncRPCQueue::addWorker() {
|
||||
std::unique_lock< std::mutex > guard(cs_lock); // Todo: could just have a lock on the vector
|
||||
workers.emplace_back( std::thread(&AsyncRPCQueue::run, this, ++workerCounter) );
|
||||
std::unique_lock< std::mutex > guard(lock_); // Todo: could just have a lock on the vector
|
||||
workers_.emplace_back( std::thread(&AsyncRPCQueue::run, this, ++workerCounter) );
|
||||
}
|
||||
|
||||
int AsyncRPCQueue::getNumberOfWorkers() {
|
||||
return workers.size();
|
||||
/**
|
||||
* Return the number of worker threads spawned by the queue
|
||||
*/
|
||||
size_t AsyncRPCQueue::getNumberOfWorkers() const {
|
||||
return workers_.size();
|
||||
}
|
||||
|
||||
|
||||
std::vector<AsyncRPCOperationId> AsyncRPCQueue::getAllOperationIds() {
|
||||
std::unique_lock< std::mutex > guard(cs_lock);
|
||||
/**
|
||||
* Return a list of all known operation ids found in internal storage.
|
||||
*/
|
||||
std::vector<AsyncRPCOperationId> AsyncRPCQueue::getAllOperationIds() const {
|
||||
std::unique_lock< std::mutex > guard(lock_);
|
||||
std::vector<AsyncRPCOperationId> v;
|
||||
for(auto & entry: operationMap)
|
||||
for(auto & entry: operation_map_) {
|
||||
v.push_back(entry.first);
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calling thread will close and wait for worker threads to join.
|
||||
*/
|
||||
void AsyncRPCQueue::closeAndWait() {
|
||||
if (!this->closed_) {
|
||||
close();
|
||||
}
|
||||
for (std::thread & t : this->workers_) {
|
||||
if (t.joinable()) {
|
||||
t.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user