From b922924d140b858c2182855c6534619fad074b0c Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 3 Sep 2016 22:25:13 -0700 Subject: [PATCH] Add test for AsyncRPCQueue and AsyncRPCOperation. --- src/test/rpc_wallet_tests.cpp | 200 +++++++++++++++++++++++++++++++++- 1 file changed, 198 insertions(+), 2 deletions(-) diff --git a/src/test/rpc_wallet_tests.cpp b/src/test/rpc_wallet_tests.cpp index 355e11682..1d93ed822 100644 --- a/src/test/rpc_wallet_tests.cpp +++ b/src/test/rpc_wallet_tests.cpp @@ -13,6 +13,14 @@ #include "zcash/Address.hpp" +#include "rpcserver.h" +#include "asyncrpcqueue.h" +#include "asyncrpcoperation.h" +#include "wallet/asyncrpcoperation_sendmany.h" + +#include +#include + #include #include @@ -439,6 +447,194 @@ BOOST_AUTO_TEST_CASE(rpc_wallet_z_importexport) auto newAddr = pa.Get(); BOOST_CHECK(pwalletMain->HaveSpendingKey(newAddr)); } - -BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file + + +/** + * Testing Async RPC operations. + * + * Tip: Create mock operations by subclassing AsyncRPCOperation. + */ +class MockSleepOperation : public AsyncRPCOperation { +public: + std::chrono::milliseconds naptime; + MockSleepOperation(int t=1000) { + this->naptime = std::chrono::milliseconds(t); + } + virtual ~MockSleepOperation() { + } + virtual void main() { + set_state(OperationStatus::EXECUTING); + start_execution_clock(); + std::this_thread::sleep_for(std::chrono::milliseconds(naptime)); + stop_execution_clock(); + set_result(Value("done")); + set_state(OperationStatus::SUCCESS); + } +}; + + +/* + * Test Aysnc RPC queue and operations. + */ +BOOST_AUTO_TEST_CASE(rpc_wallet_async_operations) +{ + std::shared_ptr q = std::make_shared(); + BOOST_CHECK(q->getNumberOfWorkers() == 0); + std::vector ids = q->getAllOperationIds(); + BOOST_CHECK(ids.size()==0); + + std::shared_ptr op1 = std::make_shared(); + q->addOperation(op1); + BOOST_CHECK(q->getOperationCount() == 1); + + OperationStatus status = op1->getState(); + BOOST_CHECK(status == OperationStatus::READY); + + AsyncRPCOperationId id1 = op1->getId(); + int64_t creationTime1 = op1->getCreationTime(); + + q->addWorker(); + BOOST_CHECK(q->getNumberOfWorkers() == 1); + + // an AsyncRPCOperation doesn't do anything so will finish immediately + std::this_thread::sleep_for(std::chrono::seconds(1)); + BOOST_CHECK(q->getOperationCount() == 0); + + // operation should be a success + BOOST_CHECK_EQUAL(op1->isCancelled(), false); + BOOST_CHECK_EQUAL(op1->isExecuting(), false); + BOOST_CHECK_EQUAL(op1->isReady(), false); + BOOST_CHECK_EQUAL(op1->isFailed(), false); + BOOST_CHECK_EQUAL(op1->isSuccess(), true); + BOOST_CHECK(op1->getError() == Value::null ); + BOOST_CHECK(op1->getResult().is_null() == false ); + BOOST_CHECK_EQUAL(op1->getStateAsString(), "success"); + BOOST_CHECK_NE(op1->getStateAsString(), "executing"); + + // Create a second operation which just sleeps + std::shared_ptr op2(new MockSleepOperation(2500)); + AsyncRPCOperationId id2 = op2->getId(); + int64_t creationTime2 = op2->getCreationTime(); + + // it's different from the previous operation + BOOST_CHECK_NE(op1.get(), op2.get()); + BOOST_CHECK_NE(id1, id2); + BOOST_CHECK_NE(creationTime1, creationTime2); + + // Only the first operation has been added to the queue + std::vector v = q->getAllOperationIds(); + std::set opids(v.begin(), v.end()); + BOOST_CHECK(opids.size() == 1); + BOOST_CHECK(opids.count(id1)==1); + BOOST_CHECK(opids.count(id2)==0); + std::shared_ptr p1 = q->getOperationForId(id1); + BOOST_CHECK_EQUAL(p1.get(), op1.get()); + std::shared_ptr p2 = q->getOperationForId(id2); + BOOST_CHECK(!p2); // null ptr as not added to queue yet + + // Add operation 2 and 3 to the queue + q->addOperation(op2); + std::shared_ptr op3(new MockSleepOperation(1000)); + q->addOperation(op3); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + BOOST_CHECK_EQUAL(op2->isExecuting(), true); + op2->cancel(); // too late, already executing + op3->cancel(); + std::this_thread::sleep_for(std::chrono::milliseconds(3000)); + BOOST_CHECK_EQUAL(op2->isSuccess(), true); + BOOST_CHECK_EQUAL(op2->isCancelled(), false); + BOOST_CHECK_EQUAL(op3->isCancelled(), true); + + + v = q->getAllOperationIds(); + std::copy( v.begin(), v.end(), std::inserter( opids, opids.end() ) ); + BOOST_CHECK(opids.size() == 3); + BOOST_CHECK(opids.count(id1)==1); + BOOST_CHECK(opids.count(id2)==1); + BOOST_CHECK(opids.count(op3->getId())==1); + q->finishAndWait(); +} + + +// The CountOperation will increment this global +std::atomic gCounter(0); + +class CountOperation : public AsyncRPCOperation { +public: + CountOperation() {} + virtual ~CountOperation() {} + virtual void main() { + set_state(OperationStatus::EXECUTING); + gCounter++; + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + set_state(OperationStatus::SUCCESS); + } +}; + +// This tests the queue waiting for multiple workers to finish +BOOST_AUTO_TEST_CASE(rpc_wallet_async_operations_parallel_wait) +{ + gCounter = 0; + + std::shared_ptr q = std::make_shared(); + q->addWorker(); + q->addWorker(); + q->addWorker(); + q->addWorker(); + BOOST_CHECK(q->getNumberOfWorkers() == 4); + + int64_t numOperations = 10; // 10 * 1000ms / 4 = 2.5 secs to finish + for (int i=0; i op(new CountOperation()); + q->addOperation(op); + } + + std::vector ids = q->getAllOperationIds(); + BOOST_CHECK(ids.size()==numOperations); + q->finishAndWait(); + BOOST_CHECK_EQUAL(q->isFinishing(), true); + BOOST_CHECK_EQUAL(numOperations, gCounter.load()); +} + +// This tests the queue shutting down immediately +BOOST_AUTO_TEST_CASE(rpc_wallet_async_operations_parallel_cancel) +{ + gCounter = 0; + + std::shared_ptr q = std::make_shared(); + q->addWorker(); + q->addWorker(); + BOOST_CHECK(q->getNumberOfWorkers() == 2); + + int numOperations = 10000; + for (int i=0; i op(new CountOperation()); + q->addOperation(op); + } + std::vector ids = q->getAllOperationIds(); + BOOST_CHECK(ids.size()==numOperations); + q->closeAndWait(); + + BOOST_CHECK_NE(numOperations, gCounter.load()); + + int numSuccess = 0; + int numCancelled = 0; + for (auto & id : ids) { + std::shared_ptr ptr = q->popOperationForId(id); + if (ptr->isCancelled()) { + numCancelled++; + } else if (ptr->isSuccess()) { + numSuccess++; + } + } + + BOOST_CHECK_EQUAL(numOperations, numSuccess+numCancelled); + BOOST_CHECK_EQUAL(gCounter.load(), numSuccess); + BOOST_CHECK(q->getOperationCount() == 0); + ids = q->getAllOperationIds(); + BOOST_CHECK(ids.size()==0); +} + + +BOOST_AUTO_TEST_SUITE_END()