Multi Thread sync, ported from 5d2b85c03a

This commit is contained in:
DenioD
2020-07-29 15:23:54 +02:00
parent b23a311599
commit e6ad8d9617
5 changed files with 413 additions and 84 deletions

View File

@@ -2,17 +2,21 @@ use crate::lightwallet::LightWallet;
use rand::{rngs::OsRng, seq::SliceRandom};
use std::sync::{Arc, RwLock, Mutex};
use std::sync::atomic::{AtomicU64, AtomicI32, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock, Mutex, mpsc::channel};
use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering};
use std::path::{Path, PathBuf};
use std::fs::File;
use std::collections::HashMap;
use std::cmp::{max, min};
use std::io;
use std::io::prelude::*;
use std::io::{BufReader, BufWriter, Error, ErrorKind};
use protobuf::parse_from_bytes;
use threadpool::ThreadPool;
use json::{object, array, JsonValue};
use zcash_primitives::transaction::{TxId, Transaction};
use zcash_client_backend::{
@@ -30,7 +34,6 @@ use log4rs::append::rolling_file::policy::compound::{
roll::fixed_window::FixedWindowRoller,
};
use crate::grpc_client::{BlockId};
use crate::grpcconnector::{self, *};
use crate::SaplingParams;
@@ -991,15 +994,9 @@ impl LightClient {
let mut last_scanned_height = self.wallet.read().unwrap().last_scanned_height() as u64;
// This will hold the latest block fetched from the RPC
let latest_block_height = Arc::new(AtomicU64::new(0));
let lbh = latest_block_height.clone();
fetch_latest_block(&self.get_server_uri(), self.config.no_cert_verification,
move |block: BlockId| {
lbh.store(block.height, Ordering::SeqCst);
});
let latest_block = latest_block_height.load(Ordering::SeqCst);
let latest_block = fetch_latest_block(&self.get_server_uri(), self.config.no_cert_verification)?.height;
if latest_block < last_scanned_height {
let w = format!("Server's latest block({}) is behind ours({})", latest_block, last_scanned_height);
warn!("{}", w);
@@ -1035,6 +1032,9 @@ impl LightClient {
// belong to us.
let all_new_txs = Arc::new(RwLock::new(vec![]));
// Create a new threadpool (upto 8, atleast 2 threads) to scan with
let pool = ThreadPool::new(max(2, min(8, num_cpus::get())));
// Fetch CompactBlocks in increments
let mut pass = 0;
loop {
@@ -1070,7 +1070,8 @@ impl LightClient {
let last_invalid_height = Arc::new(AtomicI32::new(0));
let last_invalid_height_inner = last_invalid_height.clone();
fetch_blocks(&self.get_server_uri(), start_height, end_height, self.config.no_cert_verification,
let tpool = pool.clone();
fetch_blocks(&self.get_server_uri(), start_height, end_height, self.config.no_cert_verification, pool.clone(),
move |encoded_block: &[u8], height: u64| {
// Process the block only if there were no previous errors
if last_invalid_height_inner.load(Ordering::SeqCst) > 0 {
@@ -1088,7 +1089,7 @@ impl LightClient {
Err(_) => {}
}
match local_light_wallet.read().unwrap().scan_block(encoded_block) {
match local_light_wallet.read().unwrap().scan_block_with_pool(encoded_block, &tpool) {
Ok(block_txns) => {
// Add to global tx list
all_txs.write().unwrap().extend_from_slice(&block_txns.iter().map(|txid| (txid.clone(), height as i32)).collect::<Vec<_>>()[..]);
@@ -1102,6 +1103,16 @@ impl LightClient {
local_bytes_downloaded.fetch_add(encoded_block.len(), Ordering::SeqCst);
})?;
{
// println!("Total scan duration: {:?}", self.wallet.read().unwrap().total_scan_duration.read().unwrap().get(0).unwrap().as_millis());
let t = self.wallet.read().unwrap();
let mut d = t.total_scan_duration.write().unwrap();
d.clear();
d.push(std::time::Duration::new(0, 0));
}
// Check if there was any invalid block, which means we might have to do a reorg
let invalid_height = last_invalid_height.load(Ordering::SeqCst);
if invalid_height > 0 {
@@ -1136,11 +1147,16 @@ impl LightClient {
let addresses = self.wallet.read().unwrap()
.taddresses.read().unwrap().iter().map(|a| a.clone())
.collect::<Vec<String>>();
// Create a channel so the fetch_transparent_txids can send the results back
let (ctx, crx) = channel();
let num_addresses = addresses.len();
for address in addresses {
let wallet = self.wallet.clone();
let block_times_inner = block_times.clone();
// If this is the first pass after a retry, fetch older t address txids too, becuse
// If this is the first pass after a retry, fetch older t address txids too, becuse
// they might have been missed last time.
let transparent_start_height = if pass == 1 && retry_count > 0 {
start_height - scan_batch_size
@@ -1148,16 +1164,29 @@ impl LightClient {
start_height
};
fetch_transparent_txids(&self.get_server_uri(), address, transparent_start_height, end_height, self.config.no_cert_verification,
move |tx_bytes: &[u8], height: u64| {
let tx = Transaction::read(tx_bytes).unwrap();
let pool = pool.clone();
let server_uri = self.get_server_uri();
let ctx = ctx.clone();
let no_cert = self.config.no_cert_verification;
// Scan this Tx for transparent inputs and outputs
let datetime = block_times_inner.read().unwrap().get(&height).map(|v| *v).unwrap_or(0);
wallet.read().unwrap().scan_full_tx(&tx, height as i32, datetime as u64);
}
)?;
pool.execute(move || {
// Fetch the transparent transactions for this address, and send the results
// via the channel
let r = fetch_transparent_txids(&server_uri, address, transparent_start_height, end_height, no_cert,
move |tx_bytes: &[u8], height: u64| {
let tx = Transaction::read(tx_bytes).unwrap();
// Scan this Tx for transparent inputs and outputs
let datetime = block_times_inner.read().unwrap().get(&height).map(|v| *v).unwrap_or(0);
wallet.read().unwrap().scan_full_tx(&tx, height as i32, datetime as u64);
});
ctx.send(r).unwrap();
});
}
// Collect all results from the transparent fetches, and make sure everything was OK.
// If it was not, we return an error, which will go back to the retry
crx.iter().take(num_addresses).collect::<Result<Vec<()>, String>>()?;
}
// Do block height accounting
@@ -1200,24 +1229,44 @@ impl LightClient {
let mut rng = OsRng;
txids_to_fetch.shuffle(&mut rng);
let num_fetches = txids_to_fetch.len();
let (ctx, crx) = channel();
// And go and fetch the txids, getting the full transaction, so we can
// read the memos
for (txid, height) in txids_to_fetch {
let light_wallet_clone = self.wallet.clone();
info!("Fetching full Tx: {}", txid);
fetch_full_tx(&self.get_server_uri(), txid, self.config.no_cert_verification, move |tx_bytes: &[u8] | {
let tx = Transaction::read(tx_bytes).unwrap();
let pool = pool.clone();
let server_uri = self.get_server_uri();
let ctx = ctx.clone();
let no_cert = self.config.no_cert_verification;
pool.execute(move || {
info!("Fetching full Tx: {}", txid);
light_wallet_clone.read().unwrap().scan_full_tx(&tx, height, 0);
match fetch_full_tx(&server_uri, txid, no_cert) {
Ok(tx_bytes) => {
let tx = Transaction::read(&tx_bytes[..]).unwrap();
light_wallet_clone.read().unwrap().scan_full_tx(&tx, height, 0);
ctx.send(Ok(())).unwrap();
},
Err(e) => ctx.send(Err(e)).unwrap()
};
});
};
Ok(object!{
"result" => "success",
"latest_block" => latest_block,
"downloaded_bytes" => bytes_downloaded.load(Ordering::SeqCst)
})
// Wait for all the fetches to finish.
let result = crx.iter().take(num_fetches).collect::<Result<Vec<()>, String>>();
match result {
Ok(_) => Ok(object!{
"result" => "success",
"latest_block" => latest_block,
"downloaded_bytes" => bytes_downloaded.load(Ordering::SeqCst)
}),
Err(e) => Err(format!("Error fetching all txns for memos: {}", e))
}
}
pub fn do_send(&self, addrs: Vec<(&str, u64, Option<String>)>) -> Result<String, String> {