diff --git a/darkside-test-lib/src/androidTest/java/cash/z/ecc/android/sdk/darkside/test/DarksideTestCoordinator.kt b/darkside-test-lib/src/androidTest/java/cash/z/ecc/android/sdk/darkside/test/DarksideTestCoordinator.kt index 92bd4de..f6f6543 100644 --- a/darkside-test-lib/src/androidTest/java/cash/z/ecc/android/sdk/darkside/test/DarksideTestCoordinator.kt +++ b/darkside-test-lib/src/androidTest/java/cash/z/ecc/android/sdk/darkside/test/DarksideTestCoordinator.kt @@ -101,13 +101,13 @@ class DarksideTestCoordinator(val wallet: TestWallet) { twig("got processor status $it") if (it == Synchronizer.Status.DISCONNECTED) { twig("waiting a bit before giving up on connection...") - } else if (targetHeight != null && (synchronizer as SdkSynchronizer).processor.getLastScannedHeight() < targetHeight) { + } else if (targetHeight != null && ((synchronizer as SdkSynchronizer).processor.getLastScannedHeight() ?: synchronizer.network.saplingActivationHeight) < targetHeight) { twig("awaiting new blocks from server...") } }.map { // whenever we're waiting for a target height, for simplicity, if we're sleeping, // and in between polls, then consider it that we're not synced - if (targetHeight != null && (synchronizer as SdkSynchronizer).processor.getLastScannedHeight() < targetHeight) { + if (targetHeight != null && ((synchronizer as SdkSynchronizer).processor.getLastScannedHeight() ?: synchronizer.network.saplingActivationHeight) < targetHeight) { twig("switching status to DOWNLOADING because we're still waiting for height $targetHeight") Synchronizer.Status.DOWNLOADING } else { diff --git a/gradle.properties b/gradle.properties index 0fd71fa..d00a9a4 100644 --- a/gradle.properties +++ b/gradle.properties @@ -21,7 +21,7 @@ RELEASE_SIGNING_ENABLED=false # Required by the maven publishing plugin SONATYPE_HOST=DEFAULT -LIBRARY_VERSION=1.9.0-beta01 +LIBRARY_VERSION=1.9.1-beta01 # Kotlin compiler warnings can be considered errors, failing the build. # Currently set to false, because this project has a lot of warnings to fix first. diff --git a/sdk-lib/Cargo.lock b/sdk-lib/Cargo.lock index 67d7ec7..90e9cac 100644 --- a/sdk-lib/Cargo.lock +++ b/sdk-lib/Cargo.lock @@ -341,10 +341,10 @@ checksum = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e" dependencies = [ "cfg-if 0.1.10", "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", + "crossbeam-deque 0.7.4", + "crossbeam-epoch 0.8.2", "crossbeam-queue", - "crossbeam-utils", + "crossbeam-utils 0.7.2", ] [[package]] @@ -353,7 +353,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" dependencies = [ - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] @@ -363,11 +363,21 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c20ff29ded3204c5106278a81a38f4b482636ed4fa1e6cfbeef193291beb29ed" dependencies = [ - "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-epoch 0.8.2", + "crossbeam-utils 0.7.2", "maybe-uninit", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch 0.9.18", + "crossbeam-utils 0.8.21", +] + [[package]] name = "crossbeam-epoch" version = "0.8.2" @@ -376,13 +386,22 @@ checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ "autocfg", "cfg-if 0.1.10", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "lazy_static", "maybe-uninit", "memoffset", "scopeguard", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils 0.8.21", +] + [[package]] name = "crossbeam-queue" version = "0.2.3" @@ -390,7 +409,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" dependencies = [ "cfg-if 0.1.10", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] @@ -405,6 +424,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crypto_api" version = "0.2.2" @@ -455,6 +480,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "env_logger" version = "0.7.1" @@ -468,7 +499,6 @@ dependencies = [ [[package]] name = "equihash" version = "0.1.0" -source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2" dependencies = [ "blake2b_simd", "byteorder", @@ -999,6 +1029,26 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rayon" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque 0.8.6", + "crossbeam-utils 0.8.21", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -1552,7 +1602,6 @@ dependencies = [ [[package]] name = "zcash_client_backend" version = "0.5.0" -source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2" dependencies = [ "base64", "bech32", @@ -1569,6 +1618,7 @@ dependencies = [ "protobuf", "protobuf-codegen-pure", "rand_core 0.5.1", + "rayon", "ripemd160", "secp256k1", "sha2", @@ -1581,7 +1631,6 @@ dependencies = [ [[package]] name = "zcash_client_sqlite" version = "0.3.0" -source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2" dependencies = [ "bech32", "bs58", @@ -1600,7 +1649,6 @@ dependencies = [ [[package]] name = "zcash_note_encryption" version = "0.0.0" -source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2" dependencies = [ "blake2b_simd", "byteorder", @@ -1614,7 +1662,6 @@ dependencies = [ [[package]] name = "zcash_primitives" version = "0.5.0" -source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2" dependencies = [ "aes", "bitvec 0.18.5", @@ -1644,7 +1691,6 @@ dependencies = [ [[package]] name = "zcash_proofs" version = "0.5.0" -source = "git+https://git.hush.is/fekt/librustzcash?branch=main#553c3f62b8481234addef7adb8dfa92ecf7934d2" dependencies = [ "bellman", "blake2b_simd", diff --git a/sdk-lib/Cargo.toml b/sdk-lib/Cargo.toml index 8335d94..6d29021 100644 --- a/sdk-lib/Cargo.toml +++ b/sdk-lib/Cargo.toml @@ -42,12 +42,12 @@ secp256k1 = "0.19" #zcash_primitives = { path = '../../clones/librustzcash/zcash_primitives' } #zcash_proofs = { path = '../../clones/librustzcash/zcash_proofs' } -## Uncomment this to test someone else's librustzcash changes in a branch +## Use local librustzcash with parallel scanning support [patch.crates-io] -zcash_client_backend = { git = "https://git.hush.is/fekt/librustzcash", branch = "main" } -zcash_client_sqlite = { git = "https://git.hush.is/fekt/librustzcash", branch = "main" } -zcash_primitives = { git = "https://git.hush.is/fekt/librustzcash", branch = "main" } -zcash_proofs = { git = "https://git.hush.is/fekt/librustzcash", branch = "main" } +zcash_client_backend = { path = "../../librustzcash/zcash_client_backend" } +zcash_client_sqlite = { path = "../../librustzcash/zcash_client_sqlite" } +zcash_primitives = { path = "../../librustzcash/zcash_primitives" } +zcash_proofs = { path = "../../librustzcash/zcash_proofs" } [features] mainnet = ["zcash_client_sqlite/mainnet"] diff --git a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/SdkSynchronizer.kt b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/SdkSynchronizer.kt index b7b635f..f7f1d32 100644 --- a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/SdkSynchronizer.kt +++ b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/SdkSynchronizer.kt @@ -288,17 +288,23 @@ class SdkSynchronizer internal constructor( * will throw an exception if the synchronizer was never previously started. */ override fun stop() { - coroutineScope.launch { - // log everything to help troubleshoot shutdowns that aren't graceful - twig("Synchronizer::stop: STARTING") - twig("Synchronizer::stop: processor.stop()") - processor.stop() + twig("Synchronizer::stop: STARTING") + // Cancel the scope directly so all child coroutines (including the + // processor loop) are cancelled immediately. The previous implementation + // launched a *new* coroutine inside coroutineScope to call cancel(), which + // created a race: the cancel could kill the coroutine before it finished, + // and the caller (DependenciesHolder.resetSynchronizer) had no way to wait + // for shutdown, leaving the old processor running while a new one started. + runCatching { twig("Synchronizer::stop: coroutineScope.cancel()") coroutineScope.cancel() + } + runCatching { twig("Synchronizer::stop: _status.cancel()") _status.cancel() - twig("Synchronizer::stop: COMPLETE") } + isStarted = false + twig("Synchronizer::stop: COMPLETE") } /** diff --git a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/block/CompactBlockProcessor.kt b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/block/CompactBlockProcessor.kt index 115d3d0..4c0672d 100644 --- a/sdk-lib/src/main/java/cash/z/ecc/android/sdk/block/CompactBlockProcessor.kt +++ b/sdk-lib/src/main/java/cash/z/ecc/android/sdk/block/CompactBlockProcessor.kt @@ -46,6 +46,7 @@ import cash.z.wallet.sdk.rpc.Service import io.grpc.StatusRuntimeException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers.IO +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ConflatedBroadcastChannel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableStateFlow @@ -53,6 +54,7 @@ import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.first import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext @@ -119,6 +121,7 @@ class CompactBlockProcessor internal constructor( var onScanMetricCompleteListener: ((BatchMetrics, Boolean) -> Unit)? = null private val consecutiveChainErrors = AtomicInteger(0) + private val consecutiveReconnects = AtomicInteger(0) private val lowerBoundHeight: BlockHeight = BlockHeight( max( rustBackend.network.saplingActivationHeight.value, @@ -206,11 +209,20 @@ class CompactBlockProcessor internal constructor( // immediately process again after failures in order to download new blocks right away when (result) { BlockProcessingResult.Reconnecting -> { + val attempts = consecutiveReconnects.incrementAndGet() + if (attempts >= 3) { + consecutiveReconnects.set(0) + twig("Server unreachable after $attempts attempts, escalating to show server picker") + throw StatusRuntimeException(io.grpc.Status.UNAVAILABLE.withDescription( + "Server unreachable after $attempts reconnection attempts" + )) + } val napTime = calculatePollInterval(true) - twig("Unable to process new blocks because we are disconnected! Attempting to reconnect in ${napTime}ms") + twig("Unable to process new blocks because we are disconnected! Attempting to reconnect in ${napTime}ms (attempt $attempts/3)") delay(napTime) } BlockProcessingResult.NoBlocksToProcess, BlockProcessingResult.FailedEnhance -> { + consecutiveReconnects.set(0) val noWorkDone = currentInfo.lastDownloadRange?.isEmpty() ?: true && currentInfo.lastScanRange?.isEmpty() ?: true val summary = if (noWorkDone) { @@ -232,9 +244,17 @@ class CompactBlockProcessor internal constructor( handleChainError(result.failedAtHeight) } consecutiveChainErrors.getAndIncrement() + // Delay before retrying to prevent tight error loops + val napTime = calculatePollInterval() + twig("Chain error handled. Delaying ${napTime}ms before next attempt.") + delay(napTime) } is BlockProcessingResult.Success -> { - // Do nothing. We are done. + consecutiveChainErrors.set(0) + consecutiveReconnects.set(0) + val napTime = calculatePollInterval() + twig("Done processing blocks! Sleeping for ${napTime}ms (latest height: ${currentInfo.networkBlockHeight}).") + delay(napTime) } } } @@ -274,8 +294,7 @@ class CompactBlockProcessor internal constructor( setState(Scanned(currentInfo.lastScanRange)) BlockProcessingResult.NoBlocksToProcess } else { - downloadNewBlocks(currentInfo.lastDownloadRange) - val error = validateAndScanNewBlocks(currentInfo.lastScanRange) + val error = downloadAndScanPipelined(currentInfo.lastDownloadRange, currentInfo.lastScanRange) if (error != BlockProcessingResult.Success) { error } else { @@ -370,6 +389,8 @@ class CompactBlockProcessor internal constructor( throw CompactBlockProcessorException.FailedScan() } else { setState(Scanned(lastScanRange)) + twig("Clearing block cache after successful scan") + downloader.rewindToHeight(BlockHeight(0)) } } @@ -589,6 +610,103 @@ class CompactBlockProcessor internal constructor( _progress.send(100) } + /** + * Pipelined download+scan: downloads a batch of blocks while the previous batch is being + * validated and scanned. This overlaps network I/O with CPU-intensive scanning. + */ + private suspend fun downloadAndScanPipelined( + downloadRange: ClosedRange?, + scanRange: ClosedRange? + ): BlockProcessingResult = withContext(IO) { + if (null == downloadRange || downloadRange.isEmpty()) { + // Nothing to download — scan whatever is cached, skipping validation + // to avoid stale cached blocks causing false chain errors + setState(Scanning) + val success = scanNewBlocks(scanRange) + if (!success) { + throw CompactBlockProcessorException.FailedScan() + } + setState(Scanned(scanRange)) + twig("Clearing block cache after successful scan") + downloader.rewindToHeight(BlockHeight(0)) + return@withContext BlockProcessingResult.Success + } + + setState(Downloading) + Twig.sprout("pipelined-sync") + twig("pipelined download+scan for range $downloadRange") + + val missingBlockCount = downloadRange.endInclusive.value - downloadRange.start.value + 1 + val batches = ( + missingBlockCount / DOWNLOAD_BATCH_SIZE + + (if (missingBlockCount.rem(DOWNLOAD_BATCH_SIZE) == 0L) 0 else 1) + ) + + // Channel with capacity 1: producer downloads batch N+1 while consumer scans batch N + val batchChannel = Channel>(capacity = 1) + + // Producer: download batches and send ranges to channel + val downloadJob = launch(IO) { + var downloadedBlockHeight = downloadRange.start + for (i in 1..batches) { + retryUpTo(RETRIES, { CompactBlockProcessorException.FailedDownload(it) }) { + val end = BlockHeight.new( + network, + min( + (downloadRange.start.value + (i * DOWNLOAD_BATCH_SIZE)) - 1, + downloadRange.endInclusive.value + ) + ) + var count = 0 + twig("downloading $downloadedBlockHeight..$end (batch $i of $batches)") { + count = downloader.downloadBlockRange(downloadedBlockHeight..end) + } + twig("downloaded $count blocks!") + val progress = (i / batches.toFloat() * 100).roundToInt() + _progress.send(progress) + val lastDownloadedHeight = downloader.getLastDownloadedHeight() + updateProgress(lastDownloadedHeight = lastDownloadedHeight) + + // Send the downloaded range for scanning + batchChannel.send(downloadedBlockHeight..end) + downloadedBlockHeight = end + 1 + } + } + batchChannel.close() + } + + // Consumer: drain download batches (scanning is done after all downloads complete + // because Room and the Rust scanner use separate SQLite connections, and with + // TRUNCATE journal mode Room cannot reliably see Rust's committed scan progress; + // calling scanNewBlocks per-batch caused each iteration to re-scan ALL cached blocks) + var overallScanRange = scanRange + + for (batchRange in batchChannel) { + // Just consume batches — scanning happens after downloads finish + } + + // Wait for download job to finish (in case it's still running) + downloadJob.join() + + // Now scan all downloaded blocks in one pass + setState(Scanning) + val success = scanNewBlocks(overallScanRange) + if (!success) { + Twig.clip("pipelined-sync") + return@withContext BlockProcessingResult.Error(overallScanRange?.start ?: downloadRange.start) + } + + setState(Scanned(overallScanRange)) + // Clear the block cache after successful scan. Scanned blocks serve no + // further purpose and stale cached blocks can cause validateCombinedChain() + // to find hash mismatches on subsequent cycles, triggering repeated rewinds. + twig("Clearing block cache after successful scan") + downloader.rewindToHeight(BlockHeight(0)) + + Twig.clip("pipelined-sync") + BlockProcessingResult.Success + } + /** * Validate all blocks in the given range, ensuring that the blocks are in ascending order, with * no gaps and are also chain-sequential. This means every block's prevHash value matches the @@ -636,9 +754,19 @@ class CompactBlockProcessor internal constructor( var scannedNewBlocks = false metrics.beginBatch() result = rustBackend.scanBlocks(SCAN_BATCH_SIZE) + // Retry once after a delay on scan failure (handles transient + // SQLite lock contention between Rust and Room connections) + if (!result) { + twig("scanBlocks returned false, retrying after 1s delay...") + delay(1000) + result = rustBackend.scanBlocks(SCAN_BATCH_SIZE) + } metrics.endBatch() - val lastScannedHeight = - BlockHeight.new(network, range.start.value + metrics.cumulativeItems - 1) + // Use the database as source of truth for scanned height, since the Rust + // backend tracks its own scan position and metrics.cumulativeItems resets + // each time scanNewBlocks is called (breaking pipelined progress tracking). + val lastScannedHeight = getLastScannedHeight() + ?: BlockHeight.new(network, range.start.value + metrics.cumulativeItems - 1) val percentValue = (lastScannedHeight.value - range.start.value) / (range.endInclusive.value - range.start.value + 1).toFloat() * 100.0f val percent = "%.0f".format(percentValue.coerceAtMost(100f).coerceAtLeast(0f)) @@ -908,8 +1036,31 @@ class CompactBlockProcessor internal constructor( * * @return the last scanned height reported by the repository. */ - suspend fun getLastScannedHeight() = - repository.lastScannedHeight() + suspend fun getLastScannedHeight(): BlockHeight? { + // Query the data DB directly using a fresh SQLite connection to bypass + // Room's cached connection. The Rust scanner writes to this database + // via its own SQLite connection and Room (TRUNCATE journal mode) may + // not see those changes through its cached connection. + return withContext(IO) { + val dbPath = (rustBackend as RustBackend).pathDataDb + var height: Long? = null + val db = android.database.sqlite.SQLiteDatabase.openDatabase( + dbPath, + null, + android.database.sqlite.SQLiteDatabase.OPEN_READONLY + ) + try { + val cursor = db.rawQuery("SELECT MAX(height) FROM blocks", null) + if (cursor.moveToFirst() && !cursor.isNull(0)) { + height = cursor.getLong(0) + } + cursor.close() + } finally { + db.close() + } + height?.let { BlockHeight.new(network, it) } + } + } /** * Get address corresponding to the given account for this wallet.