diff --git a/src/grpcconnector.rs b/src/grpcconnector.rs new file mode 100644 index 0000000..073cda4 --- /dev/null +++ b/src/grpcconnector.rs @@ -0,0 +1,288 @@ + +use log::{error}; + +use std::sync::{Arc}; +use std::net::ToSocketAddrs; +use std::net::SocketAddr; + +use futures::{Future}; +use futures::stream::Stream; + +use tower_h2; +use tower_util::MakeService; +use tower_grpc::Request; + +use tokio_rustls::client::TlsStream; +use tokio_rustls::{rustls::ClientConfig, TlsConnector}; + +use tokio::executor::DefaultExecutor; +use tokio::net::tcp::TcpStream; + +use zcash_primitives::transaction::{TxId}; + +use crate::grpc_client::{ChainSpec, BlockId, BlockRange, RawTransaction, + TransparentAddressBlockFilter, TxFilter, Empty, LightdInfo}; +use crate::grpc_client::client::CompactTxStreamer; + + +/// A Secure (https) grpc destination. +struct Dst { + addr: SocketAddr, + host: String, +} + +impl tower_service::Service<()> for Dst { + type Response = TlsStream; + type Error = ::std::io::Error; + type Future = Box, Error = ::std::io::Error> + Send>; + + fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { + Ok(().into()) + } + + fn call(&mut self, _: ()) -> Self::Future { + let mut config = ClientConfig::new(); + + config.alpn_protocols.push(b"h2".to_vec()); + config.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + + let config = Arc::new(config); + let tls_connector = TlsConnector::from(config); + + let addr_string_local = self.host.clone(); + + let domain = webpki::DNSNameRef::try_from_ascii_str(&addr_string_local).unwrap(); + let domain_local = domain.to_owned(); + + let stream = TcpStream::connect(&self.addr).and_then(move |sock| { + sock.set_nodelay(true).unwrap(); + tls_connector.connect(domain_local.as_ref(), sock) + }) + .map(move |tcp| tcp); + + Box::new(stream) + } +} + +// Same implementation but without TLS. Should make it straightforward to run without TLS +// when testing on local machine +// +// impl tower_service::Service<()> for Dst { +// type Response = TcpStream; +// type Error = ::std::io::Error; +// type Future = Box + Send>; +// +// fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { +// Ok(().into()) +// } +// +// fn call(&mut self, _: ()) -> Self::Future { +// let mut config = ClientConfig::new(); +// config.alpn_protocols.push(b"h2".to_vec()); +// config.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); +// +// let stream = TcpStream::connect(&self.addr) +// .and_then(move |sock| { +// sock.set_nodelay(true).unwrap(); +// Ok(sock) +// }); +// Box::new(stream) +// } +// } + + +macro_rules! make_grpc_client { + ($protocol:expr, $host:expr, $port:expr) => {{ + let uri: http::Uri = format!("{}://{}", $protocol, $host).parse().unwrap(); + + let addr = format!("{}:{}", $host, $port) + .to_socket_addrs() + .unwrap() + .next() + .unwrap(); + + let h2_settings = Default::default(); + let mut make_client = tower_h2::client::Connect::new(Dst {addr, host: $host.to_string()}, h2_settings, DefaultExecutor::current()); + + make_client + .make_service(()) + .map_err(|e| { format!("HTTP/2 connection failed; err={:?}", e) }) + .and_then(move |conn| { + let conn = tower_request_modifier::Builder::new() + .set_origin(uri) + .build(conn) + .unwrap(); + + CompactTxStreamer::new(conn) + // Wait until the client is ready... + .ready() + .map_err(|e| { format!("client closed: {:?}", e) }) + }) + }}; +} + + +// ============== +// GRPC code +// ============== + +pub fn get_info(uri: http::Uri) -> Result { + let runner = make_grpc_client!(uri.scheme_str().unwrap(), uri.host().unwrap(), uri.port_part().unwrap()) + .and_then(move |mut client| { + client.get_lightd_info(Request::new(Empty{})) + .map_err(|e| { + format!("ERR = {:?}", e) + }) + .and_then(move |response| { + Ok(response.into_inner()) + }) + .map_err(|e| { + format!("ERR = {:?}", e) + }) + }); + + tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner) +} + + +pub fn fetch_blocks(uri: &http::Uri, start_height: u64, end_height: u64, c: F) + where F : Fn(&[u8]) { + let runner = make_grpc_client!(uri.scheme_str().unwrap(), uri.host().unwrap(), uri.port_part().unwrap()) + .and_then(move |mut client| { + let bs = BlockId{ height: start_height, hash: vec!()}; + let be = BlockId{ height: end_height, hash: vec!()}; + + let br = Request::new(BlockRange{ start: Some(bs), end: Some(be)}); + client + .get_block_range(br) + .map_err(|e| { + format!("RouteChat request failed; err={:?}", e) + }) + .and_then(move |response| { + let inbound = response.into_inner(); + inbound.for_each(move |b| { + use prost::Message; + let mut encoded_buf = vec![]; + + b.encode(&mut encoded_buf).unwrap(); + c(&encoded_buf); + + Ok(()) + }) + .map_err(|e| format!("gRPC inbound stream error: {:?}", e)) + }) + }); + + match tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner) { + Ok(_) => {}, // The result is processed in callbacks, so nothing to do here + Err(e) => { + error!("Error while executing fetch_blocks: {}", e); + eprintln!("{}", e); + } + }; +} + +pub fn fetch_transparent_txids(uri: &http::Uri, address: String, + start_height: u64, end_height: u64,c: F) + where F : Fn(&[u8], u64) { + let runner = make_grpc_client!(uri.scheme_str().unwrap(), uri.host().unwrap(), uri.port_part().unwrap()) + .and_then(move |mut client| { + let start = Some(BlockId{ height: start_height, hash: vec!()}); + let end = Some(BlockId{ height: end_height, hash: vec!()}); + + let br = Request::new(TransparentAddressBlockFilter{ address, range: Some(BlockRange{start, end}) }); + + client + .get_address_txids(br) + .map_err(|e| { + format!("RouteChat request failed; err={:?}", e) + }) + .and_then(move |response| { + let inbound = response.into_inner(); + inbound.for_each(move |tx| { + //let tx = Transaction::read(&tx.into_inner().data[..]).unwrap(); + c(&tx.data, tx.height); + + Ok(()) + }) + .map_err(|e| format!("gRPC inbound stream error: {:?}", e)) + }) + }); + + match tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner) { + Ok(_) => {}, // The result is processed in callbacks, so nothing to do here + Err(e) => { + error!("Error while executing fetch_transparent_txids: {}", e); + eprintln!("{}", e); + } + }; +} + +pub fn fetch_full_tx(uri: &http::Uri, txid: TxId, c: F) + where F : Fn(&[u8]) { + let runner = make_grpc_client!(uri.scheme_str().unwrap(), uri.host().unwrap(), uri.port_part().unwrap()) + .and_then(move |mut client| { + let txfilter = TxFilter { block: None, index: 0, hash: txid.0.to_vec() }; + client.get_transaction(Request::new(txfilter)) + .map_err(|e| { + format!("RouteChat request failed; err={:?}", e) + }) + .and_then(move |response| { + c(&response.into_inner().data); + + Ok(()) + }) + .map_err(|e| { format!("ERR = {:?}", e) }) + }); + + match tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner) { + Ok(_) => {}, // The result is processed in callbacks, so nothing to do here + Err(e) => { + error!("Error while executing fetch_full_tx: {}", e); + eprintln!("{}", e); + } + }; +} + +pub fn broadcast_raw_tx(uri: &http::Uri, tx_bytes: Box<[u8]>) -> Result { + let runner = make_grpc_client!(uri.scheme_str().unwrap(), uri.host().unwrap(), uri.port_part().unwrap()) + .and_then(move |mut client| { + client.send_transaction(Request::new(RawTransaction {data: tx_bytes.to_vec(), height: 0})) + .map_err(|e| { + format!("ERR = {:?}", e) + }) + .and_then(move |response| { + let sendresponse = response.into_inner(); + if sendresponse.error_code == 0 { + Ok(format!("Successfully broadcast Tx: {}", sendresponse.error_message)) + } else { + Err(format!("Error: {:?}", sendresponse)) + } + }) + .map_err(|e| { format!("ERR = {:?}", e) }) + }); + + tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner) +} + +pub fn fetch_latest_block(uri: &http::Uri, mut c : F) + where F : FnMut(BlockId) { + let runner = make_grpc_client!(uri.scheme_str().unwrap(), uri.host().unwrap(), uri.port_part().unwrap()) + .and_then(|mut client| { + client.get_latest_block(Request::new(ChainSpec {})) + .map_err(|e| { format!("ERR = {:?}", e) }) + .and_then(move |response| { + c(response.into_inner()); + Ok(()) + }) + .map_err(|e| { format!("ERR = {:?}", e) }) + }); + + match tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner) { + Ok(_) => {}, // The result is processed in callbacks, so nothing to do here + Err(e) => { + error!("Error while executing fetch_latest_block: {}", e); + eprintln!("{}", e); + } + }; +} diff --git a/src/lightclient.rs b/src/lightclient.rs index 198b33b..1032889 100644 --- a/src/lightclient.rs +++ b/src/lightclient.rs @@ -4,37 +4,20 @@ use log::{info, warn, error}; use std::sync::{Arc}; use std::sync::atomic::{AtomicU64, AtomicI32, AtomicUsize, Ordering}; -use std::net::ToSocketAddrs; use std::path::Path; use std::fs::File; use std::io; use std::io::prelude::*; use std::io::{BufReader, BufWriter, Error, ErrorKind}; -use std::net::SocketAddr; use json::{object, JsonValue}; - -use futures::{Future}; -use futures::stream::Stream; - -use tower_h2; -use tower_util::MakeService; -use tower_grpc::Request; - -use tokio_rustls::client::TlsStream; -use tokio_rustls::{rustls::ClientConfig, TlsConnector}; - -use tokio::executor::DefaultExecutor; -use tokio::net::tcp::TcpStream; - use zcash_primitives::transaction::{TxId, Transaction}; use zcash_client_backend::{ constants::testnet, constants::mainnet, constants::regtest, encoding::encode_payment_address, }; -use crate::grpc_client::{ChainSpec, BlockId, BlockRange, RawTransaction, - TransparentAddressBlockFilter, TxFilter, Empty, LightdInfo}; -use crate::grpc_client::client::CompactTxStreamer; +use crate::grpc_client::{BlockId}; +use crate::grpcconnector::*; use crate::SaplingParams; @@ -43,101 +26,6 @@ pub const WALLET_NAME: &str = "zecwallet-light-wallet.dat"; pub const LOGFILE_NAME: &str = "zecwallet-light-wallet.debug.log"; -/// A Secure (https) grpc destination. -struct Dst { - addr: SocketAddr, - host: String, -} - -impl tower_service::Service<()> for Dst { - type Response = TlsStream; - type Error = ::std::io::Error; - type Future = Box, Error = ::std::io::Error> + Send>; - - fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { - Ok(().into()) - } - - fn call(&mut self, _: ()) -> Self::Future { - let mut config = ClientConfig::new(); - - config.alpn_protocols.push(b"h2".to_vec()); - config.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - - let config = Arc::new(config); - let tls_connector = TlsConnector::from(config); - - let addr_string_local = self.host.clone(); - - let domain = webpki::DNSNameRef::try_from_ascii_str(&addr_string_local).unwrap(); - let domain_local = domain.to_owned(); - - let stream = TcpStream::connect(&self.addr).and_then(move |sock| { - sock.set_nodelay(true).unwrap(); - tls_connector.connect(domain_local.as_ref(), sock) - }) - .map(move |tcp| tcp); - - Box::new(stream) - } -} - -// Same implementation but without TLS. Should make it straightforward to run without TLS -// when testing on local machine -// -// impl tower_service::Service<()> for Dst { -// type Response = TcpStream; -// type Error = ::std::io::Error; -// type Future = Box + Send>; -// -// fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { -// Ok(().into()) -// } -// -// fn call(&mut self, _: ()) -> Self::Future { -// let mut config = ClientConfig::new(); -// config.alpn_protocols.push(b"h2".to_vec()); -// config.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); -// -// let stream = TcpStream::connect(&self.addr) -// .and_then(move |sock| { -// sock.set_nodelay(true).unwrap(); -// Ok(sock) -// }); -// Box::new(stream) -// } -// } - -macro_rules! make_grpc_client { - ($protocol:expr, $host:expr, $port:expr) => {{ - let uri: http::Uri = format!("{}://{}", $protocol, $host).parse().unwrap(); - - let addr = format!("{}:{}", $host, $port) - .to_socket_addrs() - .unwrap() - .next() - .unwrap(); - - let h2_settings = Default::default(); - let mut make_client = tower_h2::client::Connect::new(Dst {addr, host: $host.to_string()}, h2_settings, DefaultExecutor::current()); - - make_client - .make_service(()) - .map_err(|e| { format!("HTTP/2 connection failed; err={:?}", e) }) - .and_then(move |conn| { - let conn = tower_request_modifier::Builder::new() - .set_origin(uri) - .build(conn) - .unwrap(); - - CompactTxStreamer::new(conn) - // Wait until the client is ready... - .ready() - .map_err(|e| { format!("client closed: {:?}", e) }) - }) - }}; -} - #[derive(Clone, Debug)] pub struct LightClientConfig { pub server : http::Uri, @@ -436,26 +324,8 @@ impl LightClient { self.config.server.clone() } - pub fn get_info(uri: http::Uri) -> Result { - let runner = make_grpc_client!(uri.scheme_str().unwrap(), uri.host().unwrap(), uri.port_part().unwrap()) - .and_then(move |mut client| { - client.get_lightd_info(Request::new(Empty{})) - .map_err(|e| { - format!("ERR = {:?}", e) - }) - .and_then(move |response| { - Ok(response.into_inner()) - }) - .map_err(|e| { - format!("ERR = {:?}", e) - }) - }); - - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner) - } - pub fn do_info(uri: http::Uri) -> String { - format!("{:?}", LightClient::get_info(uri)) + format!("{:?}", get_info(uri)) } pub fn do_seed_phrase(&self) -> JsonValue { @@ -672,7 +542,7 @@ impl LightClient { // This will hold the latest block fetched from the RPC let latest_block_height = Arc::new(AtomicU64::new(0)); let lbh = latest_block_height.clone(); - self.fetch_latest_block(move |block: BlockId| { + fetch_latest_block(&self.get_server_uri(), move |block: BlockId| { lbh.store(block.height, Ordering::SeqCst); }); let latest_block = latest_block_height.load(Ordering::SeqCst); @@ -712,7 +582,7 @@ impl LightClient { let last_invalid_height = Arc::new(AtomicI32::new(0)); let last_invalid_height_inner = last_invalid_height.clone(); - self.fetch_blocks(start_height, end_height, + fetch_blocks(&self.get_server_uri(), start_height, end_height, move |encoded_block: &[u8]| { // Process the block only if there were no previous errors if last_invalid_height_inner.load(Ordering::SeqCst) > 0 { @@ -762,7 +632,7 @@ impl LightClient { // TODO: Use for all t addresses let address = self.wallet.address_from_sk(&self.wallet.tkeys[0]); let wallet = self.wallet.clone(); - self.fetch_transparent_txids(address, start_height, end_height, + fetch_transparent_txids(&self.get_server_uri(), address, start_height, end_height, move |tx_bytes: &[u8], height: u64 | { let tx = Transaction::read(tx_bytes).unwrap(); @@ -807,7 +677,7 @@ impl LightClient { info!("Fetching full Tx: {}", txid); responses.push(format!("Fetching full Tx: {}", txid)); - self.fetch_full_tx(txid, move |tx_bytes: &[u8] | { + fetch_full_tx(&self.get_server_uri(), txid, move |tx_bytes: &[u8] | { let tx = Transaction::read(tx_bytes).unwrap(); light_wallet_clone.scan_full_tx(&tx, height); @@ -826,163 +696,11 @@ impl LightClient { ); match rawtx { - Some(txbytes) => match self.broadcast_raw_tx(txbytes) { + Some(txbytes) => match broadcast_raw_tx(&self.get_server_uri(), txbytes) { Ok(k) => k, Err(e) => e, }, None => format!("No Tx to broadcast") } } - - // ============== - // GRPC code - // ============== - - pub fn fetch_blocks(&self, start_height: u64, end_height: u64, c: F) - where F : Fn(&[u8]) { - let uri = self.get_server_uri(); - let runner = make_grpc_client!(uri.scheme_str().unwrap(), uri.host().unwrap(), uri.port_part().unwrap()) - .and_then(move |mut client| { - let bs = BlockId{ height: start_height, hash: vec!()}; - let be = BlockId{ height: end_height, hash: vec!()}; - - let br = Request::new(BlockRange{ start: Some(bs), end: Some(be)}); - client - .get_block_range(br) - .map_err(|e| { - format!("RouteChat request failed; err={:?}", e) - }) - .and_then(move |response| { - let inbound = response.into_inner(); - inbound.for_each(move |b| { - use prost::Message; - let mut encoded_buf = vec![]; - - b.encode(&mut encoded_buf).unwrap(); - c(&encoded_buf); - - Ok(()) - }) - .map_err(|e| format!("gRPC inbound stream error: {:?}", e)) - }) - }); - - match tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner) { - Ok(_) => {}, // The result is processed in callbacks, so nothing to do here - Err(e) => { - error!("Error while executing fetch_blocks: {}", e); - eprintln!("{}", e); - } - }; - } - - pub fn fetch_transparent_txids(&self, address: String, - start_height: u64, end_height: u64,c: F) - where F : Fn(&[u8], u64) { - let uri = self.get_server_uri(); - let runner = make_grpc_client!(uri.scheme_str().unwrap(), uri.host().unwrap(), uri.port_part().unwrap()) - .and_then(move |mut client| { - let start = Some(BlockId{ height: start_height, hash: vec!()}); - let end = Some(BlockId{ height: end_height, hash: vec!()}); - - let br = Request::new(TransparentAddressBlockFilter{ address, range: Some(BlockRange{start, end}) }); - - client - .get_address_txids(br) - .map_err(|e| { - format!("RouteChat request failed; err={:?}", e) - }) - .and_then(move |response| { - let inbound = response.into_inner(); - inbound.for_each(move |tx| { - //let tx = Transaction::read(&tx.into_inner().data[..]).unwrap(); - c(&tx.data, tx.height); - - Ok(()) - }) - .map_err(|e| format!("gRPC inbound stream error: {:?}", e)) - }) - }); - - match tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner) { - Ok(_) => {}, // The result is processed in callbacks, so nothing to do here - Err(e) => { - error!("Error while executing fetch_transparent_txids: {}", e); - eprintln!("{}", e); - } - }; - } - - pub fn fetch_full_tx(&self, txid: TxId, c: F) - where F : Fn(&[u8]) { - let uri = self.get_server_uri(); - let runner = make_grpc_client!(uri.scheme_str().unwrap(), uri.host().unwrap(), uri.port_part().unwrap()) - .and_then(move |mut client| { - let txfilter = TxFilter { block: None, index: 0, hash: txid.0.to_vec() }; - client.get_transaction(Request::new(txfilter)) - .map_err(|e| { - format!("RouteChat request failed; err={:?}", e) - }) - .and_then(move |response| { - c(&response.into_inner().data); - - Ok(()) - }) - .map_err(|e| { format!("ERR = {:?}", e) }) - }); - - match tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner) { - Ok(_) => {}, // The result is processed in callbacks, so nothing to do here - Err(e) => { - error!("Error while executing fetch_full_tx: {}", e); - eprintln!("{}", e); - } - }; - } - - pub fn broadcast_raw_tx(&self, tx_bytes: Box<[u8]>) -> Result { - let uri = self.get_server_uri(); - let runner = make_grpc_client!(uri.scheme_str().unwrap(), uri.host().unwrap(), uri.port_part().unwrap()) - .and_then(move |mut client| { - client.send_transaction(Request::new(RawTransaction {data: tx_bytes.to_vec(), height: 0})) - .map_err(|e| { - format!("ERR = {:?}", e) - }) - .and_then(move |response| { - let sendresponse = response.into_inner(); - if sendresponse.error_code == 0 { - Ok(format!("Successfully broadcast Tx: {}", sendresponse.error_message)) - } else { - Err(format!("Error: {:?}", sendresponse)) - } - }) - .map_err(|e| { format!("ERR = {:?}", e) }) - }); - - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner) - } - - pub fn fetch_latest_block(&self, mut c : F) - where F : FnMut(BlockId) { - let uri = self.get_server_uri(); - let runner = make_grpc_client!(uri.scheme_str().unwrap(), uri.host().unwrap(), uri.port_part().unwrap()) - .and_then(|mut client| { - client.get_latest_block(Request::new(ChainSpec {})) - .map_err(|e| { format!("ERR = {:?}", e) }) - .and_then(move |response| { - c(response.into_inner()); - Ok(()) - }) - .map_err(|e| { format!("ERR = {:?}", e) }) - }); - - match tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner) { - Ok(_) => {}, // The result is processed in callbacks, so nothing to do here - Err(e) => { - error!("Error while executing fetch_latest_block: {}", e); - eprintln!("{}", e); - } - }; - } - } diff --git a/src/lightwallet/mod.rs b/src/lightwallet.rs similarity index 100% rename from src/lightwallet/mod.rs rename to src/lightwallet.rs diff --git a/src/main.rs b/src/main.rs index 423119c..af78def 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ extern crate rust_embed; mod lightclient; +mod grpcconnector; mod lightwallet; mod address; mod prover; @@ -101,7 +102,7 @@ pub fn main() { } // Do a getinfo first, before opening the wallet - let info = match LightClient::get_info(server.clone()) { + let info = match grpcconnector::get_info(server.clone()) { Ok(ld) => ld, Err(e) => { eprintln!("Error:\n{}\nCouldn't get server info, quitting!", e);