prevent unwrap in get_block_range

This commit is contained in:
Deniod
2024-01-13 22:12:32 +01:00
parent feb26dbdfb
commit c8948af2fc
2 changed files with 45 additions and 32 deletions

View File

@@ -100,49 +100,63 @@ pub fn get_coinsupply(uri: http::Uri, no_cert: bool) -> Result<Coinsupply, Strin
// tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner)
}
async fn get_block_range<F : 'static + std::marker::Send>(uri: &http::Uri, start_height: u64, end_height: u64, no_cert: bool, pool: ThreadPool, c: F)
-> Result<(), Box<dyn std::error::Error>>
async fn get_block_range<F : 'static + std::marker::Send>(
uri: &http::Uri,
start_height: u64,
end_height: u64,
no_cert: bool,
pool: ThreadPool,
c: F
) -> Result<(), Box<dyn std::error::Error>>
where F : Fn(&[u8], u64) {
let mut client = get_client(uri, no_cert).await?;
let bs = BlockId{ height: start_height, hash: vec!()};
let be = BlockId{ height: end_height, hash: vec!()};
let bs = BlockId { height: start_height, hash: vec![] };
let be = BlockId { height: end_height, hash: vec![] };
let request = Request::new(BlockRange{ start: Some(bs), end: Some(be) });
let request = Request::new(BlockRange { start: Some(bs), end: Some(be) });
// Channel where the blocks are sent. A None signifies end of all blocks
let (tx, rx) = channel::<Option<CompactBlock>>();
let (tx, rx) = channel::<Option<CompactBlock>>();
let (ftx, frx) = channel();
// Channel that the processor signals it is done, so the method can return
let (ftx, frx) = channel();
// The processor runs on a different thread, so that the network calls don't
// block on this
pool.execute(move || {
while let Some(block) = rx.recv().unwrap() {
use prost::Message;
let mut encoded_buf = vec![];
block.encode(&mut encoded_buf).unwrap();
c(&encoded_buf, block.height);
}
ftx.send(Ok(())).unwrap();
});
pool.execute(move || {
while let Ok(Some(block)) = rx.recv() {
use prost::Message;
let mut encoded_buf = vec![];
match block.encode(&mut encoded_buf) {
Ok(_) => c(&encoded_buf, block.height),
Err(e) => {
eprintln!("Error encoding block: {:?}", e);
break;
}
}
}
if let Err(e) = ftx.send(Ok(())) {
eprintln!("Error sending completion signal: {:?}", e);
}
});
let mut response = client.get_block_range(request).await?.into_inner();
//println!("{:?}", response);
while let Some(block) = response.message().await? {
tx.send(Some(block)).unwrap();
}
tx.send(None).unwrap();
// Wait for the processor to exit
while let Some(block) = response.message().await? {
if let Err(e) = tx.send(Some(block)) {
eprintln!("Error sending block to channel: {:?}", e);
break;
}
}
if let Err(e) = tx.send(None) {
eprintln!("Error sending end signal to channel: {:?}", e);
}
frx.iter().take(1).collect::<Result<Vec<()>, String>>()?;
Ok(())
}
pub fn fetch_blocks<F : 'static + std::marker::Send>(uri: &http::Uri, start_height: u64, end_height: u64, no_cert: bool, pool: ThreadPool, c: F) -> Result<(), String>
where F : Fn(&[u8], u64) {

View File

@@ -1503,8 +1503,7 @@ pub fn start_mempool_monitor(lc: Arc<LightClient>) -> Result<(), String> {
let block_times = block_times.clone();
pool.execute(move || {
println!("Fetching transactions for address: {}", address_clone);
let r = fetch_transparent_txids(
&server_uri,
address,
@@ -1545,7 +1544,7 @@ pub fn start_mempool_monitor(lc: Arc<LightClient>) -> Result<(), String> {
);
match ctx.send(r) {
Ok(_) => println!("Successfully sent data for address: {}", address_clone),
Ok(_) => info!("Successfully sent data for address: {}", address_clone),
Err(e) => println!("Failed to send data for address: {}: {:?}", address_clone, e),
}
});