v1.9.1-beta01: Fix synchronizer stop, scan loop, and SQLite isolation

- Fix SdkSynchronizer.stop() race condition: use synchronous scope.cancel()
  instead of async self-cancellation that never completed
- Fix scan loop: scan once after all downloads complete instead of per-batch
- Use fresh SQLiteDatabase connection for getLastScannedHeight() to avoid
  reading stale Room-cached values
- Bump version to 1.9.1-beta01
- Update Cargo dependencies
This commit is contained in:
2026-03-24 16:50:15 -05:00
parent e8a2d3ebc9
commit 939477a7a3
6 changed files with 239 additions and 36 deletions

View File

@@ -288,17 +288,23 @@ class SdkSynchronizer internal constructor(
* will throw an exception if the synchronizer was never previously started.
*/
override fun stop() {
coroutineScope.launch {
// log everything to help troubleshoot shutdowns that aren't graceful
twig("Synchronizer::stop: STARTING")
twig("Synchronizer::stop: processor.stop()")
processor.stop()
twig("Synchronizer::stop: STARTING")
// Cancel the scope directly so all child coroutines (including the
// processor loop) are cancelled immediately. The previous implementation
// launched a *new* coroutine inside coroutineScope to call cancel(), which
// created a race: the cancel could kill the coroutine before it finished,
// and the caller (DependenciesHolder.resetSynchronizer) had no way to wait
// for shutdown, leaving the old processor running while a new one started.
runCatching {
twig("Synchronizer::stop: coroutineScope.cancel()")
coroutineScope.cancel()
}
runCatching {
twig("Synchronizer::stop: _status.cancel()")
_status.cancel()
twig("Synchronizer::stop: COMPLETE")
}
isStarted = false
twig("Synchronizer::stop: COMPLETE")
}
/**

View File

@@ -46,6 +46,7 @@ import cash.z.wallet.sdk.rpc.Service
import io.grpc.StatusRuntimeException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Dispatchers.IO
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
@@ -53,6 +54,7 @@ import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
@@ -119,6 +121,7 @@ class CompactBlockProcessor internal constructor(
var onScanMetricCompleteListener: ((BatchMetrics, Boolean) -> Unit)? = null
private val consecutiveChainErrors = AtomicInteger(0)
private val consecutiveReconnects = AtomicInteger(0)
private val lowerBoundHeight: BlockHeight = BlockHeight(
max(
rustBackend.network.saplingActivationHeight.value,
@@ -206,11 +209,20 @@ class CompactBlockProcessor internal constructor(
// immediately process again after failures in order to download new blocks right away
when (result) {
BlockProcessingResult.Reconnecting -> {
val attempts = consecutiveReconnects.incrementAndGet()
if (attempts >= 3) {
consecutiveReconnects.set(0)
twig("Server unreachable after $attempts attempts, escalating to show server picker")
throw StatusRuntimeException(io.grpc.Status.UNAVAILABLE.withDescription(
"Server unreachable after $attempts reconnection attempts"
))
}
val napTime = calculatePollInterval(true)
twig("Unable to process new blocks because we are disconnected! Attempting to reconnect in ${napTime}ms")
twig("Unable to process new blocks because we are disconnected! Attempting to reconnect in ${napTime}ms (attempt $attempts/3)")
delay(napTime)
}
BlockProcessingResult.NoBlocksToProcess, BlockProcessingResult.FailedEnhance -> {
consecutiveReconnects.set(0)
val noWorkDone =
currentInfo.lastDownloadRange?.isEmpty() ?: true && currentInfo.lastScanRange?.isEmpty() ?: true
val summary = if (noWorkDone) {
@@ -232,9 +244,17 @@ class CompactBlockProcessor internal constructor(
handleChainError(result.failedAtHeight)
}
consecutiveChainErrors.getAndIncrement()
// Delay before retrying to prevent tight error loops
val napTime = calculatePollInterval()
twig("Chain error handled. Delaying ${napTime}ms before next attempt.")
delay(napTime)
}
is BlockProcessingResult.Success -> {
// Do nothing. We are done.
consecutiveChainErrors.set(0)
consecutiveReconnects.set(0)
val napTime = calculatePollInterval()
twig("Done processing blocks! Sleeping for ${napTime}ms (latest height: ${currentInfo.networkBlockHeight}).")
delay(napTime)
}
}
}
@@ -274,8 +294,7 @@ class CompactBlockProcessor internal constructor(
setState(Scanned(currentInfo.lastScanRange))
BlockProcessingResult.NoBlocksToProcess
} else {
downloadNewBlocks(currentInfo.lastDownloadRange)
val error = validateAndScanNewBlocks(currentInfo.lastScanRange)
val error = downloadAndScanPipelined(currentInfo.lastDownloadRange, currentInfo.lastScanRange)
if (error != BlockProcessingResult.Success) {
error
} else {
@@ -370,6 +389,8 @@ class CompactBlockProcessor internal constructor(
throw CompactBlockProcessorException.FailedScan()
} else {
setState(Scanned(lastScanRange))
twig("Clearing block cache after successful scan")
downloader.rewindToHeight(BlockHeight(0))
}
}
@@ -589,6 +610,103 @@ class CompactBlockProcessor internal constructor(
_progress.send(100)
}
/**
* Pipelined download+scan: downloads a batch of blocks while the previous batch is being
* validated and scanned. This overlaps network I/O with CPU-intensive scanning.
*/
private suspend fun downloadAndScanPipelined(
downloadRange: ClosedRange<BlockHeight>?,
scanRange: ClosedRange<BlockHeight>?
): BlockProcessingResult = withContext(IO) {
if (null == downloadRange || downloadRange.isEmpty()) {
// Nothing to download — scan whatever is cached, skipping validation
// to avoid stale cached blocks causing false chain errors
setState(Scanning)
val success = scanNewBlocks(scanRange)
if (!success) {
throw CompactBlockProcessorException.FailedScan()
}
setState(Scanned(scanRange))
twig("Clearing block cache after successful scan")
downloader.rewindToHeight(BlockHeight(0))
return@withContext BlockProcessingResult.Success
}
setState(Downloading)
Twig.sprout("pipelined-sync")
twig("pipelined download+scan for range $downloadRange")
val missingBlockCount = downloadRange.endInclusive.value - downloadRange.start.value + 1
val batches = (
missingBlockCount / DOWNLOAD_BATCH_SIZE +
(if (missingBlockCount.rem(DOWNLOAD_BATCH_SIZE) == 0L) 0 else 1)
)
// Channel with capacity 1: producer downloads batch N+1 while consumer scans batch N
val batchChannel = Channel<ClosedRange<BlockHeight>>(capacity = 1)
// Producer: download batches and send ranges to channel
val downloadJob = launch(IO) {
var downloadedBlockHeight = downloadRange.start
for (i in 1..batches) {
retryUpTo(RETRIES, { CompactBlockProcessorException.FailedDownload(it) }) {
val end = BlockHeight.new(
network,
min(
(downloadRange.start.value + (i * DOWNLOAD_BATCH_SIZE)) - 1,
downloadRange.endInclusive.value
)
)
var count = 0
twig("downloading $downloadedBlockHeight..$end (batch $i of $batches)") {
count = downloader.downloadBlockRange(downloadedBlockHeight..end)
}
twig("downloaded $count blocks!")
val progress = (i / batches.toFloat() * 100).roundToInt()
_progress.send(progress)
val lastDownloadedHeight = downloader.getLastDownloadedHeight()
updateProgress(lastDownloadedHeight = lastDownloadedHeight)
// Send the downloaded range for scanning
batchChannel.send(downloadedBlockHeight..end)
downloadedBlockHeight = end + 1
}
}
batchChannel.close()
}
// Consumer: drain download batches (scanning is done after all downloads complete
// because Room and the Rust scanner use separate SQLite connections, and with
// TRUNCATE journal mode Room cannot reliably see Rust's committed scan progress;
// calling scanNewBlocks per-batch caused each iteration to re-scan ALL cached blocks)
var overallScanRange = scanRange
for (batchRange in batchChannel) {
// Just consume batches — scanning happens after downloads finish
}
// Wait for download job to finish (in case it's still running)
downloadJob.join()
// Now scan all downloaded blocks in one pass
setState(Scanning)
val success = scanNewBlocks(overallScanRange)
if (!success) {
Twig.clip("pipelined-sync")
return@withContext BlockProcessingResult.Error(overallScanRange?.start ?: downloadRange.start)
}
setState(Scanned(overallScanRange))
// Clear the block cache after successful scan. Scanned blocks serve no
// further purpose and stale cached blocks can cause validateCombinedChain()
// to find hash mismatches on subsequent cycles, triggering repeated rewinds.
twig("Clearing block cache after successful scan")
downloader.rewindToHeight(BlockHeight(0))
Twig.clip("pipelined-sync")
BlockProcessingResult.Success
}
/**
* Validate all blocks in the given range, ensuring that the blocks are in ascending order, with
* no gaps and are also chain-sequential. This means every block's prevHash value matches the
@@ -636,9 +754,19 @@ class CompactBlockProcessor internal constructor(
var scannedNewBlocks = false
metrics.beginBatch()
result = rustBackend.scanBlocks(SCAN_BATCH_SIZE)
// Retry once after a delay on scan failure (handles transient
// SQLite lock contention between Rust and Room connections)
if (!result) {
twig("scanBlocks returned false, retrying after 1s delay...")
delay(1000)
result = rustBackend.scanBlocks(SCAN_BATCH_SIZE)
}
metrics.endBatch()
val lastScannedHeight =
BlockHeight.new(network, range.start.value + metrics.cumulativeItems - 1)
// Use the database as source of truth for scanned height, since the Rust
// backend tracks its own scan position and metrics.cumulativeItems resets
// each time scanNewBlocks is called (breaking pipelined progress tracking).
val lastScannedHeight = getLastScannedHeight()
?: BlockHeight.new(network, range.start.value + metrics.cumulativeItems - 1)
val percentValue =
(lastScannedHeight.value - range.start.value) / (range.endInclusive.value - range.start.value + 1).toFloat() * 100.0f
val percent = "%.0f".format(percentValue.coerceAtMost(100f).coerceAtLeast(0f))
@@ -908,8 +1036,31 @@ class CompactBlockProcessor internal constructor(
*
* @return the last scanned height reported by the repository.
*/
suspend fun getLastScannedHeight() =
repository.lastScannedHeight()
suspend fun getLastScannedHeight(): BlockHeight? {
// Query the data DB directly using a fresh SQLite connection to bypass
// Room's cached connection. The Rust scanner writes to this database
// via its own SQLite connection and Room (TRUNCATE journal mode) may
// not see those changes through its cached connection.
return withContext(IO) {
val dbPath = (rustBackend as RustBackend).pathDataDb
var height: Long? = null
val db = android.database.sqlite.SQLiteDatabase.openDatabase(
dbPath,
null,
android.database.sqlite.SQLiteDatabase.OPEN_READONLY
)
try {
val cursor = db.rawQuery("SELECT MAX(height) FROM blocks", null)
if (cursor.moveToFirst() && !cursor.isNull(0)) {
height = cursor.getLong(0)
}
cursor.close()
} finally {
db.close()
}
height?.let { BlockHeight.new(network, it) }
}
}
/**
* Get address corresponding to the given account for this wallet.