From 35ce90eec969732839b2a20a564c617b7608a31e Mon Sep 17 00:00:00 2001 From: Deniod Date: Fri, 1 Dec 2023 08:57:19 +0100 Subject: [PATCH 01/10] prevent unwrap(), poisoned data --- lib/src/lightwallet.rs | 45 +++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/lib/src/lightwallet.rs b/lib/src/lightwallet.rs index 3271a95..76d7f17 100644 --- a/lib/src/lightwallet.rs +++ b/lib/src/lightwallet.rs @@ -1470,26 +1470,43 @@ pub fn scan_full_tx(&self, tx: &Transaction, height: i32, datetime: u64) { // Do it in a short scope because of the write lock. { info!("A sapling output was sent in {}", tx.txid()); - let mut txs = self.txs.write().unwrap(); - if txs.get(&tx.txid()).unwrap().outgoing_metadata.iter() - .find(|om| om.address == address && om.value == note.value && om.memo == memo) - .is_some() { - warn!("Duplicate outgoing metadata"); - continue; - } + match self.txs.write() { + Ok(mut txs) => { + match txs.get(&tx.txid()) { + Some(wtx) => { + if wtx.outgoing_metadata.iter() + .any(|om| om.address == address && om.value == note.value && om.memo == memo) + { + warn!("Duplicate outgoing metadata"); + continue; + } - // Write the outgoing metadata - txs.get_mut(&tx.txid()).unwrap() - .outgoing_metadata - .push(OutgoingTxMetadata{ - address, value: note.value, memo, - }); + // Write the outgoing metadata + txs.get_mut(&tx.txid()).unwrap() + .outgoing_metadata + .push(OutgoingTxMetadata { + address, + value: note.value, + memo, + }); + }, + None => { + error!("Can not find any entry for txid : {}", tx.txid()); + continue; + } + } + }, + Err(poisoned) => { + error!("Lock is poisoned: {}", poisoned); + return; + } + } } }, None => {} }; } - } + } // Mark this Tx as scanned { let mut txs = self.txs.write().unwrap(); From feb26dbdfb778cd8e2a1d1a51bae45a8a0f5e747 Mon Sep 17 00:00:00 2001 From: Deniod Date: Sat, 13 Jan 2024 21:55:47 +0100 Subject: [PATCH 02/10] prevent unwrap in scan_taddress_txids --- lib/src/lightclient.rs | 93 +++++++++++++++++++++++++++++------------- 1 file changed, 65 insertions(+), 28 deletions(-) diff --git a/lib/src/lightclient.rs b/lib/src/lightclient.rs index 4f53169..09d8882 100644 --- a/lib/src/lightclient.rs +++ b/lib/src/lightclient.rs @@ -1477,47 +1477,84 @@ pub fn start_mempool_monitor(lc: Arc) -> Result<(), String> { } } } - - fn scan_taddress_txids(&self, pool: &ThreadPool, block_times: Arc>>, start_height: u64, end_height: u64, no_cert: bool) -> Result, String> { - // Copy over addresses so as to not lock up the wallet, which we'll use inside the callback below. - let addresses = self.wallet.read().unwrap() + fn scan_taddress_txids( + &self, + pool: &ThreadPool, + block_times: Arc>>, + start_height: u64, + end_height: u64, + no_cert: bool + ) -> Result, String> { + let addresses = self.wallet.read() + .map_err(|e| format!("Failed to read wallet: {:?}", e))? .get_all_taddresses().iter() - .map(|a| a.clone()) + .cloned() .collect::>(); - - // Create a channel so the fetch_transparent_txids can send the results back + let (ctx, crx) = channel(); let num_addresses = addresses.len(); - + for address in addresses { + let address_clone = address.clone(); let wallet = self.wallet.clone(); - let pool = pool.clone(); let server_uri = self.get_server_uri(); let ctx = ctx.clone(); - let block_times = block_times.clone(); - + pool.execute(move || { - // Fetch the transparent transactions for this address, and send the results - // via the channel - let r = fetch_transparent_txids(&server_uri, address, start_height, end_height,no_cert, - move |tx_bytes: &[u8], height: u64| { - let tx = Transaction::read(tx_bytes).unwrap(); - - // Scan this Tx for transparent inputs and outputs - let datetime = block_times.read().unwrap().get(&height).map(|v| *v).unwrap_or(0); - wallet.read().unwrap().scan_full_tx(&tx, height as i32, datetime as u64); - }); - ctx.send(r).unwrap(); + println!("Fetching transactions for address: {}", address_clone); + + let r = fetch_transparent_txids( + &server_uri, + address, + start_height, + end_height, + no_cert, + move |tx_bytes: &[u8], height: u64| { + let tx_result = Transaction::read(tx_bytes) + .map_err(|e| format!("Failed to read transaction: {:?}", e)); + + match tx_result { + Ok(tx) => { + let datetime_result = block_times.read() + .map_err(|e| format!("Failed to read block times: {:?}", e)) + .and_then(|bt| bt.get(&height).cloned().ok_or_else(|| format!("No datetime for height: {}", height))); + + match datetime_result { + Ok(datetime) => { + match wallet.read().map_err(|e| format!("Failed to read wallet: {:?}", e)) { + Ok(w) => { + w.scan_full_tx(&tx, height as i32, datetime as u64); + }, + Err(e) => { + println!("Error reading wallet: {}", e); + }, + } + }, + Err(e) => { + println!("Error processing transaction: {}", e); + }, + } + }, + Err(e) => { + println!("Error reading transaction: {}", e); + }, + } + } + ); + + match ctx.send(r) { + Ok(_) => println!("Successfully sent data for address: {}", address_clone), + Err(e) => println!("Failed to send data for address: {}: {:?}", address_clone, e), + } }); } - - // Collect all results from the transparent fetches, and make sure everything was OK. - // If it was not, we return an error, which will go back to the retry - crx.iter().take(num_addresses).collect::, String>>() - } - + + crx.iter().take(num_addresses).collect() + } + + fn scan_fill_fulltxs(&self, pool: &ThreadPool, decoy_txids: Vec<(TxId, i32)>) -> Result, String> { // We need to first copy over the Txids from the wallet struct, because From c8948af2fcb161d6a11017432f9ced39052e3d60 Mon Sep 17 00:00:00 2001 From: Deniod Date: Sat, 13 Jan 2024 22:12:32 +0100 Subject: [PATCH 03/10] prevent unwrap in get_block_range --- lib/src/grpcconnector.rs | 72 ++++++++++++++++++++++++---------------- lib/src/lightclient.rs | 5 ++- 2 files changed, 45 insertions(+), 32 deletions(-) diff --git a/lib/src/grpcconnector.rs b/lib/src/grpcconnector.rs index 4419c0a..061b4d2 100644 --- a/lib/src/grpcconnector.rs +++ b/lib/src/grpcconnector.rs @@ -100,49 +100,63 @@ pub fn get_coinsupply(uri: http::Uri, no_cert: bool) -> Result(uri: &http::Uri, start_height: u64, end_height: u64, no_cert: bool, pool: ThreadPool, c: F) - -> Result<(), Box> +async fn get_block_range( + uri: &http::Uri, + start_height: u64, + end_height: u64, + no_cert: bool, + pool: ThreadPool, + c: F +) -> Result<(), Box> where F : Fn(&[u8], u64) { let mut client = get_client(uri, no_cert).await?; - let bs = BlockId{ height: start_height, hash: vec!()}; - let be = BlockId{ height: end_height, hash: vec!()}; + let bs = BlockId { height: start_height, hash: vec![] }; + let be = BlockId { height: end_height, hash: vec![] }; - let request = Request::new(BlockRange{ start: Some(bs), end: Some(be) }); + let request = Request::new(BlockRange { start: Some(bs), end: Some(be) }); - // Channel where the blocks are sent. A None signifies end of all blocks - let (tx, rx) = channel::>(); + let (tx, rx) = channel::>(); + let (ftx, frx) = channel(); - // Channel that the processor signals it is done, so the method can return - let (ftx, frx) = channel(); - - // The processor runs on a different thread, so that the network calls don't - // block on this - pool.execute(move || { - while let Some(block) = rx.recv().unwrap() { - use prost::Message; - let mut encoded_buf = vec![]; - - block.encode(&mut encoded_buf).unwrap(); - c(&encoded_buf, block.height); - } - - ftx.send(Ok(())).unwrap(); - }); + pool.execute(move || { + while let Ok(Some(block)) = rx.recv() { + use prost::Message; + let mut encoded_buf = vec![]; + + match block.encode(&mut encoded_buf) { + Ok(_) => c(&encoded_buf, block.height), + Err(e) => { + eprintln!("Error encoding block: {:?}", e); + break; + } + } + } + + if let Err(e) = ftx.send(Ok(())) { + eprintln!("Error sending completion signal: {:?}", e); + } + }); let mut response = client.get_block_range(request).await?.into_inner(); - //println!("{:?}", response); - while let Some(block) = response.message().await? { - tx.send(Some(block)).unwrap(); - } - tx.send(None).unwrap(); - // Wait for the processor to exit + while let Some(block) = response.message().await? { + if let Err(e) = tx.send(Some(block)) { + eprintln!("Error sending block to channel: {:?}", e); + break; + } + } + + if let Err(e) = tx.send(None) { + eprintln!("Error sending end signal to channel: {:?}", e); + } + frx.iter().take(1).collect::, String>>()?; Ok(()) } + pub fn fetch_blocks(uri: &http::Uri, start_height: u64, end_height: u64, no_cert: bool, pool: ThreadPool, c: F) -> Result<(), String> where F : Fn(&[u8], u64) { diff --git a/lib/src/lightclient.rs b/lib/src/lightclient.rs index 09d8882..c3c6de0 100644 --- a/lib/src/lightclient.rs +++ b/lib/src/lightclient.rs @@ -1503,8 +1503,7 @@ pub fn start_mempool_monitor(lc: Arc) -> Result<(), String> { let block_times = block_times.clone(); pool.execute(move || { - println!("Fetching transactions for address: {}", address_clone); - + let r = fetch_transparent_txids( &server_uri, address, @@ -1545,7 +1544,7 @@ pub fn start_mempool_monitor(lc: Arc) -> Result<(), String> { ); match ctx.send(r) { - Ok(_) => println!("Successfully sent data for address: {}", address_clone), + Ok(_) => info!("Successfully sent data for address: {}", address_clone), Err(e) => println!("Failed to send data for address: {}: {:?}", address_clone, e), } }); From 60550982e0dd686b8ec940524091eb67667eff14 Mon Sep 17 00:00:00 2001 From: Deniod Date: Sat, 13 Jan 2024 22:15:31 +0100 Subject: [PATCH 04/10] less debug --- lib/src/grpcconnector.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/lib/src/grpcconnector.rs b/lib/src/grpcconnector.rs index 061b4d2..63db47a 100644 --- a/lib/src/grpcconnector.rs +++ b/lib/src/grpcconnector.rs @@ -97,7 +97,6 @@ pub fn get_coinsupply(uri: http::Uri, no_cert: bool) -> Result( @@ -165,7 +164,6 @@ pub fn fetch_blocks(uri: &http::Uri, start_heig Err(e) => { let es = format!("Error creating runtime {:?}", e); error!("{}", es); - eprintln!("{}", e); return Err(es); } }; @@ -175,7 +173,6 @@ pub fn fetch_blocks(uri: &http::Uri, start_heig Err(e) => { let e = format!("Error fetching blocks {:?}", e); error!("{}", e); - eprintln!("{}", e); Err(e) } } @@ -246,7 +243,6 @@ pub fn fetch_transparent_txids(uri: &http::Uri, Err(e) => { let e = format!("Error creating runtime {:?}", e); error!("{}", e); - eprintln!("{}", e); return Err(e); } }; @@ -256,13 +252,11 @@ pub fn fetch_transparent_txids(uri: &http::Uri, Err(e) => { let e = format!("Error with get_address_txids runtime {:?}", e); error!("{}", e); - eprintln!("{}", e); Err(e) } } } - // get_transaction GRPC call async fn get_transaction(uri: &http::Uri, txid: TxId, no_cert: bool) -> Result> { @@ -280,7 +274,6 @@ pub fn fetch_full_tx(uri: &http::Uri, txid: TxId, no_cert: bool) -> Result { let errstr = format!("Error creating runtime {}", e.to_string()); error!("{}", errstr); - eprintln!("{}", errstr); return Err(errstr); } }; @@ -290,7 +283,6 @@ pub fn fetch_full_tx(uri: &http::Uri, txid: TxId, no_cert: bool) -> Result { let errstr = format!("Error in get_transaction runtime {}", e.to_string()); error!("{}", errstr); - eprintln!("{}", errstr); Err(errstr) } } From a307f828c98ae7515e65b2d2ea7d58ffcac8f693 Mon Sep 17 00:00:00 2001 From: Deniod Date: Sat, 13 Jan 2024 22:41:02 +0100 Subject: [PATCH 05/10] less debug, handle error if channel is closed --- lib/src/grpcconnector.rs | 70 +++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 23 deletions(-) diff --git a/lib/src/grpcconnector.rs b/lib/src/grpcconnector.rs index 63db47a..f7d9894 100644 --- a/lib/src/grpcconnector.rs +++ b/lib/src/grpcconnector.rs @@ -122,21 +122,20 @@ where F : Fn(&[u8], u64) { while let Ok(Some(block)) = rx.recv() { use prost::Message; let mut encoded_buf = vec![]; - - match block.encode(&mut encoded_buf) { - Ok(_) => c(&encoded_buf, block.height), - Err(e) => { - eprintln!("Error encoding block: {:?}", e); - break; - } + + if let Err(e) = block.encode(&mut encoded_buf) { + eprintln!("Error encoding block: {:?}", e); + break; } + + c(&encoded_buf, block.height); } - + if let Err(e) = ftx.send(Ok(())) { eprintln!("Error sending completion signal: {:?}", e); } }); - + let mut response = client.get_block_range(request).await?.into_inner(); while let Some(block) = response.message().await? { @@ -178,25 +177,44 @@ pub fn fetch_blocks(uri: &http::Uri, start_heig } } - // get_address_txids GRPC call -async fn get_address_txids(uri: &http::Uri, address: String, - start_height: u64, end_height: u64, no_cert: bool, c: F) -> Result<(), Box> - where F : Fn(&[u8], u64) { +async fn get_address_txids( + uri: &http::Uri, + address: String, + start_height: u64, + end_height: u64, + no_cert: bool, + c: F +) -> Result<(), Box> +where F : Fn(&[u8], u64) { + + let mut client = match get_client(uri, no_cert).await { + Ok(client) => client, + Err(e) => { + eprintln!("Error creating client: {:?}", e); + return Err(e.into()); + } + }; - let mut client = get_client(uri, no_cert).await?; let start = Some(BlockId{ height: start_height, hash: vec!()}); - let end = Some(BlockId{ height: end_height, hash: vec!()}); + let end = Some(BlockId{ height: end_height, hash: vec!()}); - let request = Request::new(TransparentAddressBlockFilter{ address, range: Some(BlockRange{start, end}) }); + let request = Request::new(TransparentAddressBlockFilter{ address, range: Some(BlockRange{ start, end }) }); + + let maybe_response = match client.get_address_txids(request).await { + Ok(response) => response, + Err(e) => { + eprintln!("Error getting address txids: {:?}", e); + return Err(e.into()); + } + }; - let maybe_response = client.get_address_txids(request).await?; let mut response = maybe_response.into_inner(); while let Some(tx) = response.message().await? { c(&tx.data, tx.height); } - + Ok(()) } @@ -234,10 +252,16 @@ where Ok(()) } -pub fn fetch_transparent_txids(uri: &http::Uri, address: String, - start_height: u64, end_height: u64, no_cert: bool, c: F) -> Result<(), String> - where F : Fn(&[u8], u64) { - +pub fn fetch_transparent_txids( + uri: &http::Uri, + address: String, + start_height: u64, + end_height: u64, + no_cert: bool, + c: F +) -> Result<(), String> +where F : Fn(&[u8], u64) { + let mut rt = match tokio::runtime::Runtime::new() { Ok(r) => r, Err(e) => { @@ -252,7 +276,7 @@ pub fn fetch_transparent_txids(uri: &http::Uri, Err(e) => { let e = format!("Error with get_address_txids runtime {:?}", e); error!("{}", e); - Err(e) + return Err(e) } } } From 1f184f55d474b93464fdba0ee29bf54817e9d389 Mon Sep 17 00:00:00 2001 From: Deniod Date: Sat, 13 Jan 2024 22:43:49 +0100 Subject: [PATCH 06/10] less debug --- lib/src/lightclient.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/lightclient.rs b/lib/src/lightclient.rs index c3c6de0..e698c95 100644 --- a/lib/src/lightclient.rs +++ b/lib/src/lightclient.rs @@ -1233,7 +1233,7 @@ pub fn start_mempool_monitor(lc: Arc) -> Result<(), String> { } // Sleep exponentially backing off std::thread::sleep(std::time::Duration::from_secs((2 as u64).pow(retry_count))); - println!("Sync error {}\nRetry count {}", e, retry_count); + // println!("Sync error {}\nRetry count {}", e, retry_count); } } } From 8076ee1426aa5f0f95100a85404ad6f507a764fd Mon Sep 17 00:00:00 2001 From: lucretius Date: Sat, 20 Jan 2024 15:43:43 +0100 Subject: [PATCH 07/10] add check for wallet version --- lib/src/lightwallet/data.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/src/lightwallet/data.rs b/lib/src/lightwallet/data.rs index eb57b13..696c5f4 100644 --- a/lib/src/lightwallet/data.rs +++ b/lib/src/lightwallet/data.rs @@ -484,11 +484,11 @@ impl WalletTx { // Outgoing metadata was only added in version 2 let outgoing_metadata = Vector::read(&mut reader, |r| OutgoingTxMetadata::read(r))?; - let incoming_metadata = Vector::read(&mut reader, |r| IncomingTxMetadata::read(r))?; + //let incoming_metadata = Vector::read(&mut reader, |r| IncomingTxMetadata::read(r))?; let full_tx_scanned = reader.read_u8()? > 0; - Ok(WalletTx{ + let mut wallet_tx = WalletTx { block, datetime, txid, @@ -497,9 +497,15 @@ impl WalletTx { total_shielded_value_spent, total_transparent_value_spent, outgoing_metadata, - incoming_metadata, + incoming_metadata: vec![], full_tx_scanned - }) + }; + if version >= 5 { + wallet_tx.incoming_metadata = Vector::read(&mut reader, |r| IncomingTxMetadata::read(r))?; + } + + Ok(wallet_tx) + } pub fn write(&self, mut writer: W) -> io::Result<()> { From e4658582cbe610cb6f7f5d62d2ea9444bd38f4ca Mon Sep 17 00:00:00 2001 From: lucretius Date: Sat, 20 Jan 2024 15:59:30 +0100 Subject: [PATCH 08/10] check for version 5 --- lib/src/lightwallet/data.rs | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/lib/src/lightwallet/data.rs b/lib/src/lightwallet/data.rs index 696c5f4..b8abcd0 100644 --- a/lib/src/lightwallet/data.rs +++ b/lib/src/lightwallet/data.rs @@ -440,7 +440,7 @@ pub struct WalletTx { impl WalletTx { pub fn serialized_version() -> u64 { - return 4; + return 5; } pub fn new(height: i32, datetime: u64, txid: &TxId) -> Self { @@ -460,10 +460,10 @@ impl WalletTx { pub fn read(mut reader: R) -> io::Result { let version = reader.read_u64::()?; + println!("wallet Version : {}", version); assert!(version <= WalletTx::serialized_version()); let block = reader.read_i32::()?; - let datetime = if version >= 4 { reader.read_u64::()? } else { @@ -472,23 +472,24 @@ impl WalletTx { let mut txid_bytes = [0u8; 32]; reader.read_exact(&mut txid_bytes)?; - let txid = TxId{0: txid_bytes}; let notes = Vector::read(&mut reader, |r| SaplingNoteData::read(r))?; let utxos = Vector::read(&mut reader, |r| Utxo::read(r))?; - let total_shielded_value_spent = reader.read_u64::()?; let total_transparent_value_spent = reader.read_u64::()?; - - // Outgoing metadata was only added in version 2 let outgoing_metadata = Vector::read(&mut reader, |r| OutgoingTxMetadata::read(r))?; - //let incoming_metadata = Vector::read(&mut reader, |r| IncomingTxMetadata::read(r))?; + // Read incoming_metadata only if version is 5 or higher + let incoming_metadata = if version >= 5 { + Vector::read(&mut reader, |r| IncomingTxMetadata::read(r))? + } else { + vec![] + }; let full_tx_scanned = reader.read_u8()? > 0; - - let mut wallet_tx = WalletTx { + + Ok(WalletTx { block, datetime, txid, @@ -497,17 +498,12 @@ impl WalletTx { total_shielded_value_spent, total_transparent_value_spent, outgoing_metadata, - incoming_metadata: vec![], + incoming_metadata, full_tx_scanned - }; - if version >= 5 { - wallet_tx.incoming_metadata = Vector::read(&mut reader, |r| IncomingTxMetadata::read(r))?; - } - - Ok(wallet_tx) - + }) } + pub fn write(&self, mut writer: W) -> io::Result<()> { writer.write_u64::(WalletTx::serialized_version())?; From b3e4162f8a1b5161d02e56199dab95fc1cf93e20 Mon Sep 17 00:00:00 2001 From: lucretius Date: Sat, 20 Jan 2024 16:05:35 +0100 Subject: [PATCH 09/10] add error warning --- lib/src/lightwallet/data.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/lightwallet/data.rs b/lib/src/lightwallet/data.rs index b8abcd0..dcc0007 100644 --- a/lib/src/lightwallet/data.rs +++ b/lib/src/lightwallet/data.rs @@ -461,7 +461,7 @@ impl WalletTx { pub fn read(mut reader: R) -> io::Result { let version = reader.read_u64::()?; println!("wallet Version : {}", version); - assert!(version <= WalletTx::serialized_version()); + assert!(version <= WalletTx::serialized_version(), "Version mismatch. Please restore your Seed"); let block = reader.read_i32::()?; let datetime = if version >= 4 { From d8994eec8e285c70f0f26045a377eccc8265c2c4 Mon Sep 17 00:00:00 2001 From: lucretius Date: Sat, 20 Jan 2024 19:47:40 +0100 Subject: [PATCH 10/10] less debug --- lib/src/lightwallet/data.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/src/lightwallet/data.rs b/lib/src/lightwallet/data.rs index dcc0007..657f342 100644 --- a/lib/src/lightwallet/data.rs +++ b/lib/src/lightwallet/data.rs @@ -460,8 +460,7 @@ impl WalletTx { pub fn read(mut reader: R) -> io::Result { let version = reader.read_u64::()?; - println!("wallet Version : {}", version); - assert!(version <= WalletTx::serialized_version(), "Version mismatch. Please restore your Seed"); + assert!(version <= WalletTx::serialized_version(), "Version mismatch. Please restore with your Seed"); let block = reader.read_i32::()?; let datetime = if version >= 4 {