Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 939477a7a3 | |||
| e8a2d3ebc9 |
@@ -101,13 +101,13 @@ class DarksideTestCoordinator(val wallet: TestWallet) {
|
||||
twig("got processor status $it")
|
||||
if (it == Synchronizer.Status.DISCONNECTED) {
|
||||
twig("waiting a bit before giving up on connection...")
|
||||
} else if (targetHeight != null && (synchronizer as SdkSynchronizer).processor.getLastScannedHeight() < targetHeight) {
|
||||
} else if (targetHeight != null && ((synchronizer as SdkSynchronizer).processor.getLastScannedHeight() ?: synchronizer.network.saplingActivationHeight) < targetHeight) {
|
||||
twig("awaiting new blocks from server...")
|
||||
}
|
||||
}.map {
|
||||
// whenever we're waiting for a target height, for simplicity, if we're sleeping,
|
||||
// and in between polls, then consider it that we're not synced
|
||||
if (targetHeight != null && (synchronizer as SdkSynchronizer).processor.getLastScannedHeight() < targetHeight) {
|
||||
if (targetHeight != null && ((synchronizer as SdkSynchronizer).processor.getLastScannedHeight() ?: synchronizer.network.saplingActivationHeight) < targetHeight) {
|
||||
twig("switching status to DOWNLOADING because we're still waiting for height $targetHeight")
|
||||
Synchronizer.Status.DOWNLOADING
|
||||
} else {
|
||||
|
||||
@@ -21,7 +21,7 @@ RELEASE_SIGNING_ENABLED=false
|
||||
# Required by the maven publishing plugin
|
||||
SONATYPE_HOST=DEFAULT
|
||||
|
||||
LIBRARY_VERSION=1.9.0-beta01
|
||||
LIBRARY_VERSION=1.9.1-beta01
|
||||
|
||||
# Kotlin compiler warnings can be considered errors, failing the build.
|
||||
# Currently set to false, because this project has a lot of warnings to fix first.
|
||||
|
||||
74
sdk-lib/Cargo.lock
generated
74
sdk-lib/Cargo.lock
generated
@@ -341,10 +341,10 @@ checksum = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e"
|
||||
dependencies = [
|
||||
"cfg-if 0.1.10",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-deque",
|
||||
"crossbeam-epoch",
|
||||
"crossbeam-deque 0.7.4",
|
||||
"crossbeam-epoch 0.8.2",
|
||||
"crossbeam-queue",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.7.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -353,7 +353,7 @@ version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"maybe-uninit",
|
||||
]
|
||||
|
||||
@@ -363,11 +363,21 @@ version = "0.7.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c20ff29ded3204c5106278a81a38f4b482636ed4fa1e6cfbeef193291beb29ed"
|
||||
dependencies = [
|
||||
"crossbeam-epoch",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-epoch 0.8.2",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"maybe-uninit",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-deque"
|
||||
version = "0.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
|
||||
dependencies = [
|
||||
"crossbeam-epoch 0.9.18",
|
||||
"crossbeam-utils 0.8.21",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-epoch"
|
||||
version = "0.8.2"
|
||||
@@ -376,13 +386,22 @@ checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"cfg-if 0.1.10",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"lazy_static",
|
||||
"maybe-uninit",
|
||||
"memoffset",
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-epoch"
|
||||
version = "0.9.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
|
||||
dependencies = [
|
||||
"crossbeam-utils 0.8.21",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-queue"
|
||||
version = "0.2.3"
|
||||
@@ -390,7 +409,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570"
|
||||
dependencies = [
|
||||
"cfg-if 0.1.10",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.7.2",
|
||||
"maybe-uninit",
|
||||
]
|
||||
|
||||
@@ -405,6 +424,12 @@ dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
|
||||
|
||||
[[package]]
|
||||
name = "crypto_api"
|
||||
version = "0.2.2"
|
||||
@@ -455,6 +480,12 @@ version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
|
||||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.7.1"
|
||||
@@ -468,7 +499,6 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "equihash"
|
||||
version = "0.1.0"
|
||||
source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2"
|
||||
dependencies = [
|
||||
"blake2b_simd",
|
||||
"byteorder",
|
||||
@@ -999,6 +1029,26 @@ dependencies = [
|
||||
"rand_core 0.5.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1"
|
||||
dependencies = [
|
||||
"either",
|
||||
"rayon-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon-core"
|
||||
version = "1.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
|
||||
dependencies = [
|
||||
"crossbeam-deque 0.8.6",
|
||||
"crossbeam-utils 0.8.21",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.2.16"
|
||||
@@ -1552,7 +1602,6 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "zcash_client_backend"
|
||||
version = "0.5.0"
|
||||
source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"bech32",
|
||||
@@ -1569,6 +1618,7 @@ dependencies = [
|
||||
"protobuf",
|
||||
"protobuf-codegen-pure",
|
||||
"rand_core 0.5.1",
|
||||
"rayon",
|
||||
"ripemd160",
|
||||
"secp256k1",
|
||||
"sha2",
|
||||
@@ -1581,7 +1631,6 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "zcash_client_sqlite"
|
||||
version = "0.3.0"
|
||||
source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2"
|
||||
dependencies = [
|
||||
"bech32",
|
||||
"bs58",
|
||||
@@ -1600,7 +1649,6 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "zcash_note_encryption"
|
||||
version = "0.0.0"
|
||||
source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2"
|
||||
dependencies = [
|
||||
"blake2b_simd",
|
||||
"byteorder",
|
||||
@@ -1614,7 +1662,6 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "zcash_primitives"
|
||||
version = "0.5.0"
|
||||
source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2"
|
||||
dependencies = [
|
||||
"aes",
|
||||
"bitvec 0.18.5",
|
||||
@@ -1644,7 +1691,6 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "zcash_proofs"
|
||||
version = "0.5.0"
|
||||
source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2"
|
||||
dependencies = [
|
||||
"bellman",
|
||||
"blake2b_simd",
|
||||
|
||||
@@ -42,12 +42,12 @@ secp256k1 = "0.19"
|
||||
#zcash_primitives = { path = '../../clones/librustzcash/zcash_primitives' }
|
||||
#zcash_proofs = { path = '../../clones/librustzcash/zcash_proofs' }
|
||||
|
||||
## Uncomment this to test someone else's librustzcash changes in a branch
|
||||
## Use local librustzcash with parallel scanning support
|
||||
[patch.crates-io]
|
||||
zcash_client_backend = { git = "https://git.hush.is/fekt/librustzcash", branch = "main" }
|
||||
zcash_client_sqlite = { git = "https://git.hush.is/fekt/librustzcash", branch = "main" }
|
||||
zcash_primitives = { git = "https://git.hush.is/fekt/librustzcash", branch = "main" }
|
||||
zcash_proofs = { git = "https://git.hush.is/fekt/librustzcash", branch = "main" }
|
||||
zcash_client_backend = { path = "../../librustzcash/zcash_client_backend" }
|
||||
zcash_client_sqlite = { path = "../../librustzcash/zcash_client_sqlite" }
|
||||
zcash_primitives = { path = "../../librustzcash/zcash_primitives" }
|
||||
zcash_proofs = { path = "../../librustzcash/zcash_proofs" }
|
||||
|
||||
[features]
|
||||
mainnet = ["zcash_client_sqlite/mainnet"]
|
||||
|
||||
BIN
sdk-lib/src/main/assets/params/sapling-output.params
Normal file
BIN
sdk-lib/src/main/assets/params/sapling-output.params
Normal file
Binary file not shown.
BIN
sdk-lib/src/main/assets/params/sapling-spend.params
Normal file
BIN
sdk-lib/src/main/assets/params/sapling-spend.params
Normal file
Binary file not shown.
@@ -3,6 +3,7 @@ package cash.z.ecc.android.sdk
|
||||
import android.content.Context
|
||||
import cash.z.ecc.android.sdk.exception.InitializerException
|
||||
import cash.z.ecc.android.sdk.ext.ZcashSdk
|
||||
import cash.z.ecc.android.sdk.internal.SaplingParamTool
|
||||
import cash.z.ecc.android.sdk.internal.SdkDispatchers
|
||||
import cash.z.ecc.android.sdk.internal.ext.getCacheDirSuspend
|
||||
import cash.z.ecc.android.sdk.internal.ext.getDatabasePathSuspend
|
||||
@@ -319,6 +320,7 @@ class Initializer private constructor(
|
||||
config: Config
|
||||
): Initializer {
|
||||
config.validate()
|
||||
SaplingParamTool.init(context)
|
||||
|
||||
val loadedCheckpoint = run {
|
||||
val height = config.birthdayHeight
|
||||
|
||||
@@ -288,17 +288,23 @@ class SdkSynchronizer internal constructor(
|
||||
* will throw an exception if the synchronizer was never previously started.
|
||||
*/
|
||||
override fun stop() {
|
||||
coroutineScope.launch {
|
||||
// log everything to help troubleshoot shutdowns that aren't graceful
|
||||
twig("Synchronizer::stop: STARTING")
|
||||
twig("Synchronizer::stop: processor.stop()")
|
||||
processor.stop()
|
||||
twig("Synchronizer::stop: STARTING")
|
||||
// Cancel the scope directly so all child coroutines (including the
|
||||
// processor loop) are cancelled immediately. The previous implementation
|
||||
// launched a *new* coroutine inside coroutineScope to call cancel(), which
|
||||
// created a race: the cancel could kill the coroutine before it finished,
|
||||
// and the caller (DependenciesHolder.resetSynchronizer) had no way to wait
|
||||
// for shutdown, leaving the old processor running while a new one started.
|
||||
runCatching {
|
||||
twig("Synchronizer::stop: coroutineScope.cancel()")
|
||||
coroutineScope.cancel()
|
||||
}
|
||||
runCatching {
|
||||
twig("Synchronizer::stop: _status.cancel()")
|
||||
_status.cancel()
|
||||
twig("Synchronizer::stop: COMPLETE")
|
||||
}
|
||||
isStarted = false
|
||||
twig("Synchronizer::stop: COMPLETE")
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -46,6 +46,7 @@ import cash.z.wallet.sdk.rpc.Service
|
||||
import io.grpc.StatusRuntimeException
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.Dispatchers.IO
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
@@ -53,6 +54,7 @@ import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.coroutines.withContext
|
||||
@@ -119,6 +121,7 @@ class CompactBlockProcessor internal constructor(
|
||||
var onScanMetricCompleteListener: ((BatchMetrics, Boolean) -> Unit)? = null
|
||||
|
||||
private val consecutiveChainErrors = AtomicInteger(0)
|
||||
private val consecutiveReconnects = AtomicInteger(0)
|
||||
private val lowerBoundHeight: BlockHeight = BlockHeight(
|
||||
max(
|
||||
rustBackend.network.saplingActivationHeight.value,
|
||||
@@ -206,11 +209,20 @@ class CompactBlockProcessor internal constructor(
|
||||
// immediately process again after failures in order to download new blocks right away
|
||||
when (result) {
|
||||
BlockProcessingResult.Reconnecting -> {
|
||||
val attempts = consecutiveReconnects.incrementAndGet()
|
||||
if (attempts >= 3) {
|
||||
consecutiveReconnects.set(0)
|
||||
twig("Server unreachable after $attempts attempts, escalating to show server picker")
|
||||
throw StatusRuntimeException(io.grpc.Status.UNAVAILABLE.withDescription(
|
||||
"Server unreachable after $attempts reconnection attempts"
|
||||
))
|
||||
}
|
||||
val napTime = calculatePollInterval(true)
|
||||
twig("Unable to process new blocks because we are disconnected! Attempting to reconnect in ${napTime}ms")
|
||||
twig("Unable to process new blocks because we are disconnected! Attempting to reconnect in ${napTime}ms (attempt $attempts/3)")
|
||||
delay(napTime)
|
||||
}
|
||||
BlockProcessingResult.NoBlocksToProcess, BlockProcessingResult.FailedEnhance -> {
|
||||
consecutiveReconnects.set(0)
|
||||
val noWorkDone =
|
||||
currentInfo.lastDownloadRange?.isEmpty() ?: true && currentInfo.lastScanRange?.isEmpty() ?: true
|
||||
val summary = if (noWorkDone) {
|
||||
@@ -232,9 +244,17 @@ class CompactBlockProcessor internal constructor(
|
||||
handleChainError(result.failedAtHeight)
|
||||
}
|
||||
consecutiveChainErrors.getAndIncrement()
|
||||
// Delay before retrying to prevent tight error loops
|
||||
val napTime = calculatePollInterval()
|
||||
twig("Chain error handled. Delaying ${napTime}ms before next attempt.")
|
||||
delay(napTime)
|
||||
}
|
||||
is BlockProcessingResult.Success -> {
|
||||
// Do nothing. We are done.
|
||||
consecutiveChainErrors.set(0)
|
||||
consecutiveReconnects.set(0)
|
||||
val napTime = calculatePollInterval()
|
||||
twig("Done processing blocks! Sleeping for ${napTime}ms (latest height: ${currentInfo.networkBlockHeight}).")
|
||||
delay(napTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -274,8 +294,7 @@ class CompactBlockProcessor internal constructor(
|
||||
setState(Scanned(currentInfo.lastScanRange))
|
||||
BlockProcessingResult.NoBlocksToProcess
|
||||
} else {
|
||||
downloadNewBlocks(currentInfo.lastDownloadRange)
|
||||
val error = validateAndScanNewBlocks(currentInfo.lastScanRange)
|
||||
val error = downloadAndScanPipelined(currentInfo.lastDownloadRange, currentInfo.lastScanRange)
|
||||
if (error != BlockProcessingResult.Success) {
|
||||
error
|
||||
} else {
|
||||
@@ -370,6 +389,8 @@ class CompactBlockProcessor internal constructor(
|
||||
throw CompactBlockProcessorException.FailedScan()
|
||||
} else {
|
||||
setState(Scanned(lastScanRange))
|
||||
twig("Clearing block cache after successful scan")
|
||||
downloader.rewindToHeight(BlockHeight(0))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -589,6 +610,103 @@ class CompactBlockProcessor internal constructor(
|
||||
_progress.send(100)
|
||||
}
|
||||
|
||||
/**
|
||||
* Pipelined download+scan: downloads a batch of blocks while the previous batch is being
|
||||
* validated and scanned. This overlaps network I/O with CPU-intensive scanning.
|
||||
*/
|
||||
private suspend fun downloadAndScanPipelined(
|
||||
downloadRange: ClosedRange<BlockHeight>?,
|
||||
scanRange: ClosedRange<BlockHeight>?
|
||||
): BlockProcessingResult = withContext(IO) {
|
||||
if (null == downloadRange || downloadRange.isEmpty()) {
|
||||
// Nothing to download — scan whatever is cached, skipping validation
|
||||
// to avoid stale cached blocks causing false chain errors
|
||||
setState(Scanning)
|
||||
val success = scanNewBlocks(scanRange)
|
||||
if (!success) {
|
||||
throw CompactBlockProcessorException.FailedScan()
|
||||
}
|
||||
setState(Scanned(scanRange))
|
||||
twig("Clearing block cache after successful scan")
|
||||
downloader.rewindToHeight(BlockHeight(0))
|
||||
return@withContext BlockProcessingResult.Success
|
||||
}
|
||||
|
||||
setState(Downloading)
|
||||
Twig.sprout("pipelined-sync")
|
||||
twig("pipelined download+scan for range $downloadRange")
|
||||
|
||||
val missingBlockCount = downloadRange.endInclusive.value - downloadRange.start.value + 1
|
||||
val batches = (
|
||||
missingBlockCount / DOWNLOAD_BATCH_SIZE +
|
||||
(if (missingBlockCount.rem(DOWNLOAD_BATCH_SIZE) == 0L) 0 else 1)
|
||||
)
|
||||
|
||||
// Channel with capacity 1: producer downloads batch N+1 while consumer scans batch N
|
||||
val batchChannel = Channel<ClosedRange<BlockHeight>>(capacity = 1)
|
||||
|
||||
// Producer: download batches and send ranges to channel
|
||||
val downloadJob = launch(IO) {
|
||||
var downloadedBlockHeight = downloadRange.start
|
||||
for (i in 1..batches) {
|
||||
retryUpTo(RETRIES, { CompactBlockProcessorException.FailedDownload(it) }) {
|
||||
val end = BlockHeight.new(
|
||||
network,
|
||||
min(
|
||||
(downloadRange.start.value + (i * DOWNLOAD_BATCH_SIZE)) - 1,
|
||||
downloadRange.endInclusive.value
|
||||
)
|
||||
)
|
||||
var count = 0
|
||||
twig("downloading $downloadedBlockHeight..$end (batch $i of $batches)") {
|
||||
count = downloader.downloadBlockRange(downloadedBlockHeight..end)
|
||||
}
|
||||
twig("downloaded $count blocks!")
|
||||
val progress = (i / batches.toFloat() * 100).roundToInt()
|
||||
_progress.send(progress)
|
||||
val lastDownloadedHeight = downloader.getLastDownloadedHeight()
|
||||
updateProgress(lastDownloadedHeight = lastDownloadedHeight)
|
||||
|
||||
// Send the downloaded range for scanning
|
||||
batchChannel.send(downloadedBlockHeight..end)
|
||||
downloadedBlockHeight = end + 1
|
||||
}
|
||||
}
|
||||
batchChannel.close()
|
||||
}
|
||||
|
||||
// Consumer: drain download batches (scanning is done after all downloads complete
|
||||
// because Room and the Rust scanner use separate SQLite connections, and with
|
||||
// TRUNCATE journal mode Room cannot reliably see Rust's committed scan progress;
|
||||
// calling scanNewBlocks per-batch caused each iteration to re-scan ALL cached blocks)
|
||||
var overallScanRange = scanRange
|
||||
|
||||
for (batchRange in batchChannel) {
|
||||
// Just consume batches — scanning happens after downloads finish
|
||||
}
|
||||
|
||||
// Wait for download job to finish (in case it's still running)
|
||||
downloadJob.join()
|
||||
|
||||
// Now scan all downloaded blocks in one pass
|
||||
setState(Scanning)
|
||||
val success = scanNewBlocks(overallScanRange)
|
||||
if (!success) {
|
||||
Twig.clip("pipelined-sync")
|
||||
return@withContext BlockProcessingResult.Error(overallScanRange?.start ?: downloadRange.start)
|
||||
}
|
||||
|
||||
setState(Scanned(overallScanRange))
|
||||
// Clear the block cache after successful scan. Scanned blocks serve no
|
||||
// further purpose and stale cached blocks can cause validateCombinedChain()
|
||||
// to find hash mismatches on subsequent cycles, triggering repeated rewinds.
|
||||
twig("Clearing block cache after successful scan")
|
||||
downloader.rewindToHeight(BlockHeight(0))
|
||||
|
||||
Twig.clip("pipelined-sync")
|
||||
BlockProcessingResult.Success
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate all blocks in the given range, ensuring that the blocks are in ascending order, with
|
||||
* no gaps and are also chain-sequential. This means every block's prevHash value matches the
|
||||
@@ -636,9 +754,19 @@ class CompactBlockProcessor internal constructor(
|
||||
var scannedNewBlocks = false
|
||||
metrics.beginBatch()
|
||||
result = rustBackend.scanBlocks(SCAN_BATCH_SIZE)
|
||||
// Retry once after a delay on scan failure (handles transient
|
||||
// SQLite lock contention between Rust and Room connections)
|
||||
if (!result) {
|
||||
twig("scanBlocks returned false, retrying after 1s delay...")
|
||||
delay(1000)
|
||||
result = rustBackend.scanBlocks(SCAN_BATCH_SIZE)
|
||||
}
|
||||
metrics.endBatch()
|
||||
val lastScannedHeight =
|
||||
BlockHeight.new(network, range.start.value + metrics.cumulativeItems - 1)
|
||||
// Use the database as source of truth for scanned height, since the Rust
|
||||
// backend tracks its own scan position and metrics.cumulativeItems resets
|
||||
// each time scanNewBlocks is called (breaking pipelined progress tracking).
|
||||
val lastScannedHeight = getLastScannedHeight()
|
||||
?: BlockHeight.new(network, range.start.value + metrics.cumulativeItems - 1)
|
||||
val percentValue =
|
||||
(lastScannedHeight.value - range.start.value) / (range.endInclusive.value - range.start.value + 1).toFloat() * 100.0f
|
||||
val percent = "%.0f".format(percentValue.coerceAtMost(100f).coerceAtLeast(0f))
|
||||
@@ -908,8 +1036,31 @@ class CompactBlockProcessor internal constructor(
|
||||
*
|
||||
* @return the last scanned height reported by the repository.
|
||||
*/
|
||||
suspend fun getLastScannedHeight() =
|
||||
repository.lastScannedHeight()
|
||||
suspend fun getLastScannedHeight(): BlockHeight? {
|
||||
// Query the data DB directly using a fresh SQLite connection to bypass
|
||||
// Room's cached connection. The Rust scanner writes to this database
|
||||
// via its own SQLite connection and Room (TRUNCATE journal mode) may
|
||||
// not see those changes through its cached connection.
|
||||
return withContext(IO) {
|
||||
val dbPath = (rustBackend as RustBackend).pathDataDb
|
||||
var height: Long? = null
|
||||
val db = android.database.sqlite.SQLiteDatabase.openDatabase(
|
||||
dbPath,
|
||||
null,
|
||||
android.database.sqlite.SQLiteDatabase.OPEN_READONLY
|
||||
)
|
||||
try {
|
||||
val cursor = db.rawQuery("SELECT MAX(height) FROM blocks", null)
|
||||
if (cursor.moveToFirst() && !cursor.isNull(0)) {
|
||||
height = cursor.getLong(0)
|
||||
}
|
||||
cursor.close()
|
||||
} finally {
|
||||
db.close()
|
||||
}
|
||||
height?.let { BlockHeight.new(network, it) }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get address corresponding to the given account for this wallet.
|
||||
|
||||
@@ -1,21 +1,28 @@
|
||||
package cash.z.ecc.android.sdk.internal
|
||||
|
||||
import android.content.Context
|
||||
import cash.z.ecc.android.sdk.exception.TransactionEncoderException
|
||||
import cash.z.ecc.android.sdk.ext.ZcashSdk
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.withContext
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
import okio.buffer
|
||||
import okio.sink
|
||||
import java.io.File
|
||||
|
||||
class SaplingParamTool {
|
||||
|
||||
companion object {
|
||||
|
||||
private var appContext: Context? = null
|
||||
|
||||
/**
|
||||
* Checks the given directory for the output and spending params and calls [fetchParams] if
|
||||
* they're missing.
|
||||
* Initialize with application context so params can be copied from bundled assets.
|
||||
*/
|
||||
fun init(context: Context) {
|
||||
appContext = context.applicationContext
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks the given directory for the output and spending params and copies them from
|
||||
* bundled assets if they're missing.
|
||||
*
|
||||
* @param destinationDir the directory where the params should be stored.
|
||||
*/
|
||||
@@ -32,64 +39,44 @@ class SaplingParamTool {
|
||||
}
|
||||
if (hadError) {
|
||||
try {
|
||||
Bush.trunk.twigTask("attempting to download missing params") {
|
||||
fetchParams(destinationDir)
|
||||
Bush.trunk.twigTask("copying bundled sapling params") {
|
||||
copyBundledParams(destinationDir)
|
||||
}
|
||||
} catch (e: Throwable) {
|
||||
twig("failed to fetch params due to: $e")
|
||||
twig("failed to copy bundled params due to: $e")
|
||||
throw TransactionEncoderException.MissingParamsException
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Download and store the params into the given directory.
|
||||
* Copy the sapling params from bundled assets into the given directory.
|
||||
*
|
||||
* @param destinationDir the directory where the params will be stored. It's assumed that we
|
||||
* have write access to this directory. Typically, this should be the app's cache directory
|
||||
* because it is not harmful if these files are cleared by the user since they are downloaded
|
||||
* on-demand.
|
||||
* @param destinationDir the directory where the params will be stored.
|
||||
*/
|
||||
suspend fun fetchParams(destinationDir: String) {
|
||||
val client = createHttpClient()
|
||||
var failureMessage = ""
|
||||
private suspend fun copyBundledParams(destinationDir: String) {
|
||||
val context = appContext
|
||||
?: throw IllegalStateException("SaplingParamTool not initialized. Call init(context) first.")
|
||||
|
||||
arrayOf(
|
||||
ZcashSdk.SPEND_PARAM_FILE_NAME,
|
||||
ZcashSdk.OUTPUT_PARAM_FILE_NAME
|
||||
).forEach { paramFileName ->
|
||||
val url = "${ZcashSdk.CLOUD_PARAM_DIR_URL.random()}/$paramFileName"
|
||||
twig("Downloading Sapling params from ${url}...")
|
||||
val request = Request.Builder().url(url).build()
|
||||
val response = withContext(Dispatchers.IO) { client.newCall(request).execute() }
|
||||
if (response.isSuccessful) {
|
||||
twig("fetch succeeded", -1)
|
||||
val file = File(destinationDir, paramFileName)
|
||||
if (file.parentFile?.existsSuspend() == true) {
|
||||
twig("directory exists!", -1)
|
||||
} else {
|
||||
twig("directory did not exist attempting to make it")
|
||||
file.parentFile?.mkdirsSuspend()
|
||||
val destFile = File(destinationDir, paramFileName)
|
||||
if (!destFile.existsSuspend()) {
|
||||
if (destFile.parentFile?.existsSuspend() != true) {
|
||||
destFile.parentFile?.mkdirsSuspend()
|
||||
}
|
||||
withContext(Dispatchers.IO) {
|
||||
response.body?.let { body ->
|
||||
body.source().use { source ->
|
||||
file.sink().buffer().use { sink ->
|
||||
twig("writing to $file")
|
||||
sink.writeAll(source)
|
||||
}
|
||||
context.assets.open("params/$paramFileName").use { input ->
|
||||
destFile.outputStream().use { output ->
|
||||
twig("copying bundled $paramFileName to $destFile")
|
||||
input.copyTo(output)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
failureMessage += "Error while fetching $paramFileName : $response\n"
|
||||
twig(failureMessage)
|
||||
}
|
||||
|
||||
twig("fetch succeeded, done writing $paramFileName")
|
||||
}
|
||||
if (failureMessage.isNotEmpty()) throw TransactionEncoderException.FetchParamsException(
|
||||
failureMessage
|
||||
)
|
||||
}
|
||||
|
||||
suspend fun clear(destinationDir: String) {
|
||||
@@ -119,19 +106,6 @@ class SaplingParamTool {
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Helpers
|
||||
//
|
||||
/**
|
||||
* Http client is only used for downloading sapling spend and output params data, which are
|
||||
* necessary for the wallet to scan blocks.
|
||||
*
|
||||
* @return an http client suitable for downloading params data.
|
||||
*/
|
||||
private fun createHttpClient(): OkHttpClient {
|
||||
// TODO: add logging and timeouts
|
||||
return OkHttpClient()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user