Compare commits

2 Commits

Author SHA1 Message Date
939477a7a3 v1.9.1-beta01: Fix synchronizer stop, scan loop, and SQLite isolation
- Fix SdkSynchronizer.stop() race condition: use synchronous scope.cancel()
  instead of async self-cancellation that never completed
- Fix scan loop: scan once after all downloads complete instead of per-batch
- Use fresh SQLiteDatabase connection for getLastScannedHeight() to avoid
  reading stale Room-cached values
- Bump version to 1.9.1-beta01
- Update Cargo dependencies
2026-03-24 16:50:15 -05:00
e8a2d3ebc9 Bundle sapling params in APK instead of downloading from remote servers
- Add sapling-spend.params and sapling-output.params to sdk-lib assets
- Rewrite SaplingParamTool to copy params from bundled assets instead of HTTP download
- Remove OkHttp dependency from SaplingParamTool
- Initialize SaplingParamTool with app context from Initializer
2026-03-22 09:13:50 -05:00
10 changed files with 271 additions and 92 deletions

View File

@@ -101,13 +101,13 @@ class DarksideTestCoordinator(val wallet: TestWallet) {
twig("got processor status $it") twig("got processor status $it")
if (it == Synchronizer.Status.DISCONNECTED) { if (it == Synchronizer.Status.DISCONNECTED) {
twig("waiting a bit before giving up on connection...") twig("waiting a bit before giving up on connection...")
} else if (targetHeight != null && (synchronizer as SdkSynchronizer).processor.getLastScannedHeight() < targetHeight) { } else if (targetHeight != null && ((synchronizer as SdkSynchronizer).processor.getLastScannedHeight() ?: synchronizer.network.saplingActivationHeight) < targetHeight) {
twig("awaiting new blocks from server...") twig("awaiting new blocks from server...")
} }
}.map { }.map {
// whenever we're waiting for a target height, for simplicity, if we're sleeping, // whenever we're waiting for a target height, for simplicity, if we're sleeping,
// and in between polls, then consider it that we're not synced // and in between polls, then consider it that we're not synced
if (targetHeight != null && (synchronizer as SdkSynchronizer).processor.getLastScannedHeight() < targetHeight) { if (targetHeight != null && ((synchronizer as SdkSynchronizer).processor.getLastScannedHeight() ?: synchronizer.network.saplingActivationHeight) < targetHeight) {
twig("switching status to DOWNLOADING because we're still waiting for height $targetHeight") twig("switching status to DOWNLOADING because we're still waiting for height $targetHeight")
Synchronizer.Status.DOWNLOADING Synchronizer.Status.DOWNLOADING
} else { } else {

View File

@@ -21,7 +21,7 @@ RELEASE_SIGNING_ENABLED=false
# Required by the maven publishing plugin # Required by the maven publishing plugin
SONATYPE_HOST=DEFAULT SONATYPE_HOST=DEFAULT
LIBRARY_VERSION=1.9.0-beta01 LIBRARY_VERSION=1.9.1-beta01
# Kotlin compiler warnings can be considered errors, failing the build. # Kotlin compiler warnings can be considered errors, failing the build.
# Currently set to false, because this project has a lot of warnings to fix first. # Currently set to false, because this project has a lot of warnings to fix first.

74
sdk-lib/Cargo.lock generated
View File

@@ -341,10 +341,10 @@ checksum = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e"
dependencies = [ dependencies = [
"cfg-if 0.1.10", "cfg-if 0.1.10",
"crossbeam-channel", "crossbeam-channel",
"crossbeam-deque", "crossbeam-deque 0.7.4",
"crossbeam-epoch", "crossbeam-epoch 0.8.2",
"crossbeam-queue", "crossbeam-queue",
"crossbeam-utils", "crossbeam-utils 0.7.2",
] ]
[[package]] [[package]]
@@ -353,7 +353,7 @@ version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87"
dependencies = [ dependencies = [
"crossbeam-utils", "crossbeam-utils 0.7.2",
"maybe-uninit", "maybe-uninit",
] ]
@@ -363,11 +363,21 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c20ff29ded3204c5106278a81a38f4b482636ed4fa1e6cfbeef193291beb29ed" checksum = "c20ff29ded3204c5106278a81a38f4b482636ed4fa1e6cfbeef193291beb29ed"
dependencies = [ dependencies = [
"crossbeam-epoch", "crossbeam-epoch 0.8.2",
"crossbeam-utils", "crossbeam-utils 0.7.2",
"maybe-uninit", "maybe-uninit",
] ]
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
dependencies = [
"crossbeam-epoch 0.9.18",
"crossbeam-utils 0.8.21",
]
[[package]] [[package]]
name = "crossbeam-epoch" name = "crossbeam-epoch"
version = "0.8.2" version = "0.8.2"
@@ -376,13 +386,22 @@ checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"cfg-if 0.1.10", "cfg-if 0.1.10",
"crossbeam-utils", "crossbeam-utils 0.7.2",
"lazy_static", "lazy_static",
"maybe-uninit", "maybe-uninit",
"memoffset", "memoffset",
"scopeguard", "scopeguard",
] ]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils 0.8.21",
]
[[package]] [[package]]
name = "crossbeam-queue" name = "crossbeam-queue"
version = "0.2.3" version = "0.2.3"
@@ -390,7 +409,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570"
dependencies = [ dependencies = [
"cfg-if 0.1.10", "cfg-if 0.1.10",
"crossbeam-utils", "crossbeam-utils 0.7.2",
"maybe-uninit", "maybe-uninit",
] ]
@@ -405,6 +424,12 @@ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]] [[package]]
name = "crypto_api" name = "crypto_api"
version = "0.2.2" version = "0.2.2"
@@ -455,6 +480,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
[[package]]
name = "either"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]] [[package]]
name = "env_logger" name = "env_logger"
version = "0.7.1" version = "0.7.1"
@@ -468,7 +499,6 @@ dependencies = [
[[package]] [[package]]
name = "equihash" name = "equihash"
version = "0.1.0" version = "0.1.0"
source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2"
dependencies = [ dependencies = [
"blake2b_simd", "blake2b_simd",
"byteorder", "byteorder",
@@ -999,6 +1029,26 @@ dependencies = [
"rand_core 0.5.1", "rand_core 0.5.1",
] ]
[[package]]
name = "rayon"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
dependencies = [
"crossbeam-deque 0.8.6",
"crossbeam-utils 0.8.21",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.16" version = "0.2.16"
@@ -1552,7 +1602,6 @@ dependencies = [
[[package]] [[package]]
name = "zcash_client_backend" name = "zcash_client_backend"
version = "0.5.0" version = "0.5.0"
source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2"
dependencies = [ dependencies = [
"base64", "base64",
"bech32", "bech32",
@@ -1569,6 +1618,7 @@ dependencies = [
"protobuf", "protobuf",
"protobuf-codegen-pure", "protobuf-codegen-pure",
"rand_core 0.5.1", "rand_core 0.5.1",
"rayon",
"ripemd160", "ripemd160",
"secp256k1", "secp256k1",
"sha2", "sha2",
@@ -1581,7 +1631,6 @@ dependencies = [
[[package]] [[package]]
name = "zcash_client_sqlite" name = "zcash_client_sqlite"
version = "0.3.0" version = "0.3.0"
source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2"
dependencies = [ dependencies = [
"bech32", "bech32",
"bs58", "bs58",
@@ -1600,7 +1649,6 @@ dependencies = [
[[package]] [[package]]
name = "zcash_note_encryption" name = "zcash_note_encryption"
version = "0.0.0" version = "0.0.0"
source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2"
dependencies = [ dependencies = [
"blake2b_simd", "blake2b_simd",
"byteorder", "byteorder",
@@ -1614,7 +1662,6 @@ dependencies = [
[[package]] [[package]]
name = "zcash_primitives" name = "zcash_primitives"
version = "0.5.0" version = "0.5.0"
source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2"
dependencies = [ dependencies = [
"aes", "aes",
"bitvec 0.18.5", "bitvec 0.18.5",
@@ -1644,7 +1691,6 @@ dependencies = [
[[package]] [[package]]
name = "zcash_proofs" name = "zcash_proofs"
version = "0.5.0" version = "0.5.0"
source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2"
dependencies = [ dependencies = [
"bellman", "bellman",
"blake2b_simd", "blake2b_simd",

View File

@@ -42,12 +42,12 @@ secp256k1 = "0.19"
#zcash_primitives = { path = '../../clones/librustzcash/zcash_primitives' } #zcash_primitives = { path = '../../clones/librustzcash/zcash_primitives' }
#zcash_proofs = { path = '../../clones/librustzcash/zcash_proofs' } #zcash_proofs = { path = '../../clones/librustzcash/zcash_proofs' }
## Uncomment this to test someone else's librustzcash changes in a branch ## Use local librustzcash with parallel scanning support
[patch.crates-io] [patch.crates-io]
zcash_client_backend = { git = "https://git.hush.is/fekt/librustzcash", branch = "main" } zcash_client_backend = { path = "../../librustzcash/zcash_client_backend" }
zcash_client_sqlite = { git = "https://git.hush.is/fekt/librustzcash", branch = "main" } zcash_client_sqlite = { path = "../../librustzcash/zcash_client_sqlite" }
zcash_primitives = { git = "https://git.hush.is/fekt/librustzcash", branch = "main" } zcash_primitives = { path = "../../librustzcash/zcash_primitives" }
zcash_proofs = { git = "https://git.hush.is/fekt/librustzcash", branch = "main" } zcash_proofs = { path = "../../librustzcash/zcash_proofs" }
[features] [features]
mainnet = ["zcash_client_sqlite/mainnet"] mainnet = ["zcash_client_sqlite/mainnet"]

Binary file not shown.

Binary file not shown.

View File

@@ -3,6 +3,7 @@ package cash.z.ecc.android.sdk
import android.content.Context import android.content.Context
import cash.z.ecc.android.sdk.exception.InitializerException import cash.z.ecc.android.sdk.exception.InitializerException
import cash.z.ecc.android.sdk.ext.ZcashSdk import cash.z.ecc.android.sdk.ext.ZcashSdk
import cash.z.ecc.android.sdk.internal.SaplingParamTool
import cash.z.ecc.android.sdk.internal.SdkDispatchers import cash.z.ecc.android.sdk.internal.SdkDispatchers
import cash.z.ecc.android.sdk.internal.ext.getCacheDirSuspend import cash.z.ecc.android.sdk.internal.ext.getCacheDirSuspend
import cash.z.ecc.android.sdk.internal.ext.getDatabasePathSuspend import cash.z.ecc.android.sdk.internal.ext.getDatabasePathSuspend
@@ -319,6 +320,7 @@ class Initializer private constructor(
config: Config config: Config
): Initializer { ): Initializer {
config.validate() config.validate()
SaplingParamTool.init(context)
val loadedCheckpoint = run { val loadedCheckpoint = run {
val height = config.birthdayHeight val height = config.birthdayHeight

View File

@@ -288,17 +288,23 @@ class SdkSynchronizer internal constructor(
* will throw an exception if the synchronizer was never previously started. * will throw an exception if the synchronizer was never previously started.
*/ */
override fun stop() { override fun stop() {
coroutineScope.launch { twig("Synchronizer::stop: STARTING")
// log everything to help troubleshoot shutdowns that aren't graceful // Cancel the scope directly so all child coroutines (including the
twig("Synchronizer::stop: STARTING") // processor loop) are cancelled immediately. The previous implementation
twig("Synchronizer::stop: processor.stop()") // launched a *new* coroutine inside coroutineScope to call cancel(), which
processor.stop() // created a race: the cancel could kill the coroutine before it finished,
// and the caller (DependenciesHolder.resetSynchronizer) had no way to wait
// for shutdown, leaving the old processor running while a new one started.
runCatching {
twig("Synchronizer::stop: coroutineScope.cancel()") twig("Synchronizer::stop: coroutineScope.cancel()")
coroutineScope.cancel() coroutineScope.cancel()
}
runCatching {
twig("Synchronizer::stop: _status.cancel()") twig("Synchronizer::stop: _status.cancel()")
_status.cancel() _status.cancel()
twig("Synchronizer::stop: COMPLETE")
} }
isStarted = false
twig("Synchronizer::stop: COMPLETE")
} }
/** /**

View File

@@ -46,6 +46,7 @@ import cash.z.wallet.sdk.rpc.Service
import io.grpc.StatusRuntimeException import io.grpc.StatusRuntimeException
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Dispatchers.IO import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ConflatedBroadcastChannel import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
@@ -53,6 +54,7 @@ import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.first
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
@@ -119,6 +121,7 @@ class CompactBlockProcessor internal constructor(
var onScanMetricCompleteListener: ((BatchMetrics, Boolean) -> Unit)? = null var onScanMetricCompleteListener: ((BatchMetrics, Boolean) -> Unit)? = null
private val consecutiveChainErrors = AtomicInteger(0) private val consecutiveChainErrors = AtomicInteger(0)
private val consecutiveReconnects = AtomicInteger(0)
private val lowerBoundHeight: BlockHeight = BlockHeight( private val lowerBoundHeight: BlockHeight = BlockHeight(
max( max(
rustBackend.network.saplingActivationHeight.value, rustBackend.network.saplingActivationHeight.value,
@@ -206,11 +209,20 @@ class CompactBlockProcessor internal constructor(
// immediately process again after failures in order to download new blocks right away // immediately process again after failures in order to download new blocks right away
when (result) { when (result) {
BlockProcessingResult.Reconnecting -> { BlockProcessingResult.Reconnecting -> {
val attempts = consecutiveReconnects.incrementAndGet()
if (attempts >= 3) {
consecutiveReconnects.set(0)
twig("Server unreachable after $attempts attempts, escalating to show server picker")
throw StatusRuntimeException(io.grpc.Status.UNAVAILABLE.withDescription(
"Server unreachable after $attempts reconnection attempts"
))
}
val napTime = calculatePollInterval(true) val napTime = calculatePollInterval(true)
twig("Unable to process new blocks because we are disconnected! Attempting to reconnect in ${napTime}ms") twig("Unable to process new blocks because we are disconnected! Attempting to reconnect in ${napTime}ms (attempt $attempts/3)")
delay(napTime) delay(napTime)
} }
BlockProcessingResult.NoBlocksToProcess, BlockProcessingResult.FailedEnhance -> { BlockProcessingResult.NoBlocksToProcess, BlockProcessingResult.FailedEnhance -> {
consecutiveReconnects.set(0)
val noWorkDone = val noWorkDone =
currentInfo.lastDownloadRange?.isEmpty() ?: true && currentInfo.lastScanRange?.isEmpty() ?: true currentInfo.lastDownloadRange?.isEmpty() ?: true && currentInfo.lastScanRange?.isEmpty() ?: true
val summary = if (noWorkDone) { val summary = if (noWorkDone) {
@@ -232,9 +244,17 @@ class CompactBlockProcessor internal constructor(
handleChainError(result.failedAtHeight) handleChainError(result.failedAtHeight)
} }
consecutiveChainErrors.getAndIncrement() consecutiveChainErrors.getAndIncrement()
// Delay before retrying to prevent tight error loops
val napTime = calculatePollInterval()
twig("Chain error handled. Delaying ${napTime}ms before next attempt.")
delay(napTime)
} }
is BlockProcessingResult.Success -> { is BlockProcessingResult.Success -> {
// Do nothing. We are done. consecutiveChainErrors.set(0)
consecutiveReconnects.set(0)
val napTime = calculatePollInterval()
twig("Done processing blocks! Sleeping for ${napTime}ms (latest height: ${currentInfo.networkBlockHeight}).")
delay(napTime)
} }
} }
} }
@@ -274,8 +294,7 @@ class CompactBlockProcessor internal constructor(
setState(Scanned(currentInfo.lastScanRange)) setState(Scanned(currentInfo.lastScanRange))
BlockProcessingResult.NoBlocksToProcess BlockProcessingResult.NoBlocksToProcess
} else { } else {
downloadNewBlocks(currentInfo.lastDownloadRange) val error = downloadAndScanPipelined(currentInfo.lastDownloadRange, currentInfo.lastScanRange)
val error = validateAndScanNewBlocks(currentInfo.lastScanRange)
if (error != BlockProcessingResult.Success) { if (error != BlockProcessingResult.Success) {
error error
} else { } else {
@@ -370,6 +389,8 @@ class CompactBlockProcessor internal constructor(
throw CompactBlockProcessorException.FailedScan() throw CompactBlockProcessorException.FailedScan()
} else { } else {
setState(Scanned(lastScanRange)) setState(Scanned(lastScanRange))
twig("Clearing block cache after successful scan")
downloader.rewindToHeight(BlockHeight(0))
} }
} }
@@ -589,6 +610,103 @@ class CompactBlockProcessor internal constructor(
_progress.send(100) _progress.send(100)
} }
/**
* Pipelined download+scan: downloads a batch of blocks while the previous batch is being
* validated and scanned. This overlaps network I/O with CPU-intensive scanning.
*/
private suspend fun downloadAndScanPipelined(
downloadRange: ClosedRange<BlockHeight>?,
scanRange: ClosedRange<BlockHeight>?
): BlockProcessingResult = withContext(IO) {
if (null == downloadRange || downloadRange.isEmpty()) {
// Nothing to download — scan whatever is cached, skipping validation
// to avoid stale cached blocks causing false chain errors
setState(Scanning)
val success = scanNewBlocks(scanRange)
if (!success) {
throw CompactBlockProcessorException.FailedScan()
}
setState(Scanned(scanRange))
twig("Clearing block cache after successful scan")
downloader.rewindToHeight(BlockHeight(0))
return@withContext BlockProcessingResult.Success
}
setState(Downloading)
Twig.sprout("pipelined-sync")
twig("pipelined download+scan for range $downloadRange")
val missingBlockCount = downloadRange.endInclusive.value - downloadRange.start.value + 1
val batches = (
missingBlockCount / DOWNLOAD_BATCH_SIZE +
(if (missingBlockCount.rem(DOWNLOAD_BATCH_SIZE) == 0L) 0 else 1)
)
// Channel with capacity 1: producer downloads batch N+1 while consumer scans batch N
val batchChannel = Channel<ClosedRange<BlockHeight>>(capacity = 1)
// Producer: download batches and send ranges to channel
val downloadJob = launch(IO) {
var downloadedBlockHeight = downloadRange.start
for (i in 1..batches) {
retryUpTo(RETRIES, { CompactBlockProcessorException.FailedDownload(it) }) {
val end = BlockHeight.new(
network,
min(
(downloadRange.start.value + (i * DOWNLOAD_BATCH_SIZE)) - 1,
downloadRange.endInclusive.value
)
)
var count = 0
twig("downloading $downloadedBlockHeight..$end (batch $i of $batches)") {
count = downloader.downloadBlockRange(downloadedBlockHeight..end)
}
twig("downloaded $count blocks!")
val progress = (i / batches.toFloat() * 100).roundToInt()
_progress.send(progress)
val lastDownloadedHeight = downloader.getLastDownloadedHeight()
updateProgress(lastDownloadedHeight = lastDownloadedHeight)
// Send the downloaded range for scanning
batchChannel.send(downloadedBlockHeight..end)
downloadedBlockHeight = end + 1
}
}
batchChannel.close()
}
// Consumer: drain download batches (scanning is done after all downloads complete
// because Room and the Rust scanner use separate SQLite connections, and with
// TRUNCATE journal mode Room cannot reliably see Rust's committed scan progress;
// calling scanNewBlocks per-batch caused each iteration to re-scan ALL cached blocks)
var overallScanRange = scanRange
for (batchRange in batchChannel) {
// Just consume batches — scanning happens after downloads finish
}
// Wait for download job to finish (in case it's still running)
downloadJob.join()
// Now scan all downloaded blocks in one pass
setState(Scanning)
val success = scanNewBlocks(overallScanRange)
if (!success) {
Twig.clip("pipelined-sync")
return@withContext BlockProcessingResult.Error(overallScanRange?.start ?: downloadRange.start)
}
setState(Scanned(overallScanRange))
// Clear the block cache after successful scan. Scanned blocks serve no
// further purpose and stale cached blocks can cause validateCombinedChain()
// to find hash mismatches on subsequent cycles, triggering repeated rewinds.
twig("Clearing block cache after successful scan")
downloader.rewindToHeight(BlockHeight(0))
Twig.clip("pipelined-sync")
BlockProcessingResult.Success
}
/** /**
* Validate all blocks in the given range, ensuring that the blocks are in ascending order, with * Validate all blocks in the given range, ensuring that the blocks are in ascending order, with
* no gaps and are also chain-sequential. This means every block's prevHash value matches the * no gaps and are also chain-sequential. This means every block's prevHash value matches the
@@ -636,9 +754,19 @@ class CompactBlockProcessor internal constructor(
var scannedNewBlocks = false var scannedNewBlocks = false
metrics.beginBatch() metrics.beginBatch()
result = rustBackend.scanBlocks(SCAN_BATCH_SIZE) result = rustBackend.scanBlocks(SCAN_BATCH_SIZE)
// Retry once after a delay on scan failure (handles transient
// SQLite lock contention between Rust and Room connections)
if (!result) {
twig("scanBlocks returned false, retrying after 1s delay...")
delay(1000)
result = rustBackend.scanBlocks(SCAN_BATCH_SIZE)
}
metrics.endBatch() metrics.endBatch()
val lastScannedHeight = // Use the database as source of truth for scanned height, since the Rust
BlockHeight.new(network, range.start.value + metrics.cumulativeItems - 1) // backend tracks its own scan position and metrics.cumulativeItems resets
// each time scanNewBlocks is called (breaking pipelined progress tracking).
val lastScannedHeight = getLastScannedHeight()
?: BlockHeight.new(network, range.start.value + metrics.cumulativeItems - 1)
val percentValue = val percentValue =
(lastScannedHeight.value - range.start.value) / (range.endInclusive.value - range.start.value + 1).toFloat() * 100.0f (lastScannedHeight.value - range.start.value) / (range.endInclusive.value - range.start.value + 1).toFloat() * 100.0f
val percent = "%.0f".format(percentValue.coerceAtMost(100f).coerceAtLeast(0f)) val percent = "%.0f".format(percentValue.coerceAtMost(100f).coerceAtLeast(0f))
@@ -908,8 +1036,31 @@ class CompactBlockProcessor internal constructor(
* *
* @return the last scanned height reported by the repository. * @return the last scanned height reported by the repository.
*/ */
suspend fun getLastScannedHeight() = suspend fun getLastScannedHeight(): BlockHeight? {
repository.lastScannedHeight() // Query the data DB directly using a fresh SQLite connection to bypass
// Room's cached connection. The Rust scanner writes to this database
// via its own SQLite connection and Room (TRUNCATE journal mode) may
// not see those changes through its cached connection.
return withContext(IO) {
val dbPath = (rustBackend as RustBackend).pathDataDb
var height: Long? = null
val db = android.database.sqlite.SQLiteDatabase.openDatabase(
dbPath,
null,
android.database.sqlite.SQLiteDatabase.OPEN_READONLY
)
try {
val cursor = db.rawQuery("SELECT MAX(height) FROM blocks", null)
if (cursor.moveToFirst() && !cursor.isNull(0)) {
height = cursor.getLong(0)
}
cursor.close()
} finally {
db.close()
}
height?.let { BlockHeight.new(network, it) }
}
}
/** /**
* Get address corresponding to the given account for this wallet. * Get address corresponding to the given account for this wallet.

View File

@@ -1,21 +1,28 @@
package cash.z.ecc.android.sdk.internal package cash.z.ecc.android.sdk.internal
import android.content.Context
import cash.z.ecc.android.sdk.exception.TransactionEncoderException import cash.z.ecc.android.sdk.exception.TransactionEncoderException
import cash.z.ecc.android.sdk.ext.ZcashSdk import cash.z.ecc.android.sdk.ext.ZcashSdk
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import okhttp3.OkHttpClient
import okhttp3.Request
import okio.buffer
import okio.sink
import java.io.File import java.io.File
class SaplingParamTool { class SaplingParamTool {
companion object { companion object {
private var appContext: Context? = null
/** /**
* Checks the given directory for the output and spending params and calls [fetchParams] if * Initialize with application context so params can be copied from bundled assets.
* they're missing. */
fun init(context: Context) {
appContext = context.applicationContext
}
/**
* Checks the given directory for the output and spending params and copies them from
* bundled assets if they're missing.
* *
* @param destinationDir the directory where the params should be stored. * @param destinationDir the directory where the params should be stored.
*/ */
@@ -32,64 +39,44 @@ class SaplingParamTool {
} }
if (hadError) { if (hadError) {
try { try {
Bush.trunk.twigTask("attempting to download missing params") { Bush.trunk.twigTask("copying bundled sapling params") {
fetchParams(destinationDir) copyBundledParams(destinationDir)
} }
} catch (e: Throwable) { } catch (e: Throwable) {
twig("failed to fetch params due to: $e") twig("failed to copy bundled params due to: $e")
throw TransactionEncoderException.MissingParamsException throw TransactionEncoderException.MissingParamsException
} }
} }
} }
/** /**
* Download and store the params into the given directory. * Copy the sapling params from bundled assets into the given directory.
* *
* @param destinationDir the directory where the params will be stored. It's assumed that we * @param destinationDir the directory where the params will be stored.
* have write access to this directory. Typically, this should be the app's cache directory
* because it is not harmful if these files are cleared by the user since they are downloaded
* on-demand.
*/ */
suspend fun fetchParams(destinationDir: String) { private suspend fun copyBundledParams(destinationDir: String) {
val client = createHttpClient() val context = appContext
var failureMessage = "" ?: throw IllegalStateException("SaplingParamTool not initialized. Call init(context) first.")
arrayOf( arrayOf(
ZcashSdk.SPEND_PARAM_FILE_NAME, ZcashSdk.SPEND_PARAM_FILE_NAME,
ZcashSdk.OUTPUT_PARAM_FILE_NAME ZcashSdk.OUTPUT_PARAM_FILE_NAME
).forEach { paramFileName -> ).forEach { paramFileName ->
val url = "${ZcashSdk.CLOUD_PARAM_DIR_URL.random()}/$paramFileName" val destFile = File(destinationDir, paramFileName)
twig("Downloading Sapling params from ${url}...") if (!destFile.existsSuspend()) {
val request = Request.Builder().url(url).build() if (destFile.parentFile?.existsSuspend() != true) {
val response = withContext(Dispatchers.IO) { client.newCall(request).execute() } destFile.parentFile?.mkdirsSuspend()
if (response.isSuccessful) {
twig("fetch succeeded", -1)
val file = File(destinationDir, paramFileName)
if (file.parentFile?.existsSuspend() == true) {
twig("directory exists!", -1)
} else {
twig("directory did not exist attempting to make it")
file.parentFile?.mkdirsSuspend()
} }
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {
response.body?.let { body -> context.assets.open("params/$paramFileName").use { input ->
body.source().use { source -> destFile.outputStream().use { output ->
file.sink().buffer().use { sink -> twig("copying bundled $paramFileName to $destFile")
twig("writing to $file") input.copyTo(output)
sink.writeAll(source)
}
} }
} }
} }
} else {
failureMessage += "Error while fetching $paramFileName : $response\n"
twig(failureMessage)
} }
twig("fetch succeeded, done writing $paramFileName")
} }
if (failureMessage.isNotEmpty()) throw TransactionEncoderException.FetchParamsException(
failureMessage
)
} }
suspend fun clear(destinationDir: String) { suspend fun clear(destinationDir: String) {
@@ -119,19 +106,6 @@ class SaplingParamTool {
} }
} }
//
// Helpers
//
/**
* Http client is only used for downloading sapling spend and output params data, which are
* necessary for the wallet to scan blocks.
*
* @return an http client suitable for downloading params data.
*/
private fun createHttpClient(): OkHttpClient {
// TODO: add logging and timeouts
return OkHttpClient()
}
} }
} }