diff --git a/lib/src/grpcconnector.rs b/lib/src/grpcconnector.rs index 4419c0a..061b4d2 100644 --- a/lib/src/grpcconnector.rs +++ b/lib/src/grpcconnector.rs @@ -100,49 +100,63 @@ pub fn get_coinsupply(uri: http::Uri, no_cert: bool) -> Result(uri: &http::Uri, start_height: u64, end_height: u64, no_cert: bool, pool: ThreadPool, c: F) - -> Result<(), Box> +async fn get_block_range( + uri: &http::Uri, + start_height: u64, + end_height: u64, + no_cert: bool, + pool: ThreadPool, + c: F +) -> Result<(), Box> where F : Fn(&[u8], u64) { let mut client = get_client(uri, no_cert).await?; - let bs = BlockId{ height: start_height, hash: vec!()}; - let be = BlockId{ height: end_height, hash: vec!()}; + let bs = BlockId { height: start_height, hash: vec![] }; + let be = BlockId { height: end_height, hash: vec![] }; - let request = Request::new(BlockRange{ start: Some(bs), end: Some(be) }); + let request = Request::new(BlockRange { start: Some(bs), end: Some(be) }); - // Channel where the blocks are sent. A None signifies end of all blocks - let (tx, rx) = channel::>(); + let (tx, rx) = channel::>(); + let (ftx, frx) = channel(); - // Channel that the processor signals it is done, so the method can return - let (ftx, frx) = channel(); - - // The processor runs on a different thread, so that the network calls don't - // block on this - pool.execute(move || { - while let Some(block) = rx.recv().unwrap() { - use prost::Message; - let mut encoded_buf = vec![]; - - block.encode(&mut encoded_buf).unwrap(); - c(&encoded_buf, block.height); - } - - ftx.send(Ok(())).unwrap(); - }); + pool.execute(move || { + while let Ok(Some(block)) = rx.recv() { + use prost::Message; + let mut encoded_buf = vec![]; + + match block.encode(&mut encoded_buf) { + Ok(_) => c(&encoded_buf, block.height), + Err(e) => { + eprintln!("Error encoding block: {:?}", e); + break; + } + } + } + + if let Err(e) = ftx.send(Ok(())) { + eprintln!("Error sending completion signal: {:?}", e); + } + }); let mut response = client.get_block_range(request).await?.into_inner(); - //println!("{:?}", response); - while let Some(block) = response.message().await? { - tx.send(Some(block)).unwrap(); - } - tx.send(None).unwrap(); - // Wait for the processor to exit + while let Some(block) = response.message().await? { + if let Err(e) = tx.send(Some(block)) { + eprintln!("Error sending block to channel: {:?}", e); + break; + } + } + + if let Err(e) = tx.send(None) { + eprintln!("Error sending end signal to channel: {:?}", e); + } + frx.iter().take(1).collect::, String>>()?; Ok(()) } + pub fn fetch_blocks(uri: &http::Uri, start_height: u64, end_height: u64, no_cert: bool, pool: ThreadPool, c: F) -> Result<(), String> where F : Fn(&[u8], u64) { diff --git a/lib/src/lightclient.rs b/lib/src/lightclient.rs index 09d8882..c3c6de0 100644 --- a/lib/src/lightclient.rs +++ b/lib/src/lightclient.rs @@ -1503,8 +1503,7 @@ pub fn start_mempool_monitor(lc: Arc) -> Result<(), String> { let block_times = block_times.clone(); pool.execute(move || { - println!("Fetching transactions for address: {}", address_clone); - + let r = fetch_transparent_txids( &server_uri, address, @@ -1545,7 +1544,7 @@ pub fn start_mempool_monitor(lc: Arc) -> Result<(), String> { ); match ctx.send(r) { - Ok(_) => println!("Successfully sent data for address: {}", address_clone), + Ok(_) => info!("Successfully sent data for address: {}", address_clone), Err(e) => println!("Failed to send data for address: {}: {:?}", address_clone, e), } });