Files
dragonx/util/test_diff1_block.py
2026-03-10 17:07:16 -05:00

211 lines
7.7 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Test script to verify that DragonX rejects a block with diff=1 (trivially easy nBits).
This script:
1. Connects to the local DragonX node via RPC
2. Fetches the current tip block in raw hex
3. Deserializes the block header
4. Tampers with nBits to set difficulty=1 (0x200f0f0f)
5. Reserializes and submits via submitblock
6. Verifies the node rejects it
Usage:
python3 test_diff1_block.py
"""
import json
import struct
import subprocess
import sys
import os
import time
CLI = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "src", "dragonx-cli")
DEBUG_LOG = os.path.expanduser("~/.hush/DRAGONX/debug.log")
def rpc(method, *args):
"""Call dragonx-cli with the given RPC method and arguments."""
cmd = [CLI, method] + [str(a) for a in args]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
# Some RPC calls return non-zero for rejection messages
if e.stdout and e.stdout.strip():
return e.stdout.strip()
if e.stderr and e.stderr.strip():
return e.stderr.strip()
raise
def rpc_json(method, *args):
"""Call dragonx-cli and parse JSON result."""
raw = rpc(method, *args)
return json.loads(raw)
def read_uint32(data, offset):
return struct.unpack_from('<I', data, offset)[0], offset + 4
def read_int32(data, offset):
return struct.unpack_from('<i', data, offset)[0], offset + 4
def read_uint256(data, offset):
return data[offset:offset+32], offset + 32
def read_compactsize(data, offset):
val = data[offset]
if val < 253:
return val, offset + 1
elif val == 253:
return struct.unpack_from('<H', data, offset + 1)[0], offset + 3
elif val == 254:
return struct.unpack_from('<I', data, offset + 1)[0], offset + 5
else:
return struct.unpack_from('<Q', data, offset + 1)[0], offset + 9
def write_uint32(val):
return struct.pack('<I', val)
def write_int32(val):
return struct.pack('<i', val)
def main():
print("=" * 60)
print("DragonX Diff=1 Block Rejection Test")
print("=" * 60)
# Step 1: Get current chain info
print("\n[1] Fetching chain info...")
info = rpc_json("getblockchaininfo")
height = info["blocks"]
best_hash = info["bestblockhash"]
print(f" Chain height: {height}")
print(f" Best block: {best_hash}")
# Step 2: Get the tip block header details
print("\n[2] Fetching tip block details...")
block_info = rpc_json("getblock", best_hash)
current_bits = block_info["bits"]
current_difficulty = block_info["difficulty"]
print(f" Current nBits: {current_bits}")
print(f" Current difficulty: {current_difficulty}")
# Step 3: Get the raw block hex
print("\n[3] Fetching raw block hex...")
block_hex = rpc("getblock", best_hash, "0")
block_data = bytes.fromhex(block_hex)
print(f" Raw block size: {len(block_data)} bytes")
# Step 4: Parse the block header to find the nBits offset
# Header format:
# nVersion: 4 bytes (int32)
# hashPrevBlock: 32 bytes (uint256)
# hashMerkleRoot: 32 bytes (uint256)
# hashFinalSaplingRoot: 32 bytes (uint256)
# nTime: 4 bytes (uint32)
# nBits: 4 bytes (uint32) <-- this is what we tamper
# nNonce: 32 bytes (uint256)
# nSolution: compactsize + data
offset = 0
nVersion, offset = read_int32(block_data, offset)
hashPrevBlock, offset = read_uint256(block_data, offset)
hashMerkleRoot, offset = read_uint256(block_data, offset)
hashFinalSaplingRoot, offset = read_uint256(block_data, offset)
nTime, offset = read_uint32(block_data, offset)
nbits_offset = offset
nBits, offset = read_uint32(block_data, offset)
nNonce, offset = read_uint256(block_data, offset)
sol_len, offset = read_compactsize(block_data, offset)
print(f"\n[4] Parsed block header:")
print(f" nVersion: {nVersion}")
print(f" nTime: {nTime}")
print(f" nBits: 0x{nBits:08x} (offset {nbits_offset})")
print(f" nSolution: {sol_len} bytes")
# Step 5: Tamper nBits to diff=1
# 0x200f0f0f is the powLimit for DragonX (minimum difficulty / diff=1)
DIFF1_NBITS = 0x200f0f0f
print(f"\n[5] Tampering nBits from 0x{nBits:08x} -> 0x{DIFF1_NBITS:08x} (diff=1)...")
tampered_data = bytearray(block_data)
struct.pack_into('<I', tampered_data, nbits_offset, DIFF1_NBITS)
tampered_hex = tampered_data.hex()
# Verify the tamper worked
check_nbits = struct.unpack_from('<I', tampered_data, nbits_offset)[0]
assert check_nbits == DIFF1_NBITS, "nBits tamper failed!"
print(f" Verified tampered nBits: 0x{check_nbits:08x}")
# Step 6: Record log position before submitting
log_size_before = 0
if os.path.exists(DEBUG_LOG):
log_size_before = os.path.getsize(DEBUG_LOG)
# Step 7: Submit the tampered block
print(f"\n[6] Submitting tampered block via submitblock...")
submit_time = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
result = rpc("submitblock", tampered_hex)
print(f" submitblock result: {repr(result)}")
# Note: Bitcoin-derived RPC returns empty string when a block is processed,
# even if it fails internal validation. This is normal behavior.
# Step 8: Check debug.log for the actual rejection reason
print(f"\n[7] Checking debug.log for rejection details...")
log_tail = ""
if os.path.exists(DEBUG_LOG):
with open(DEBUG_LOG, "r", errors="replace") as f:
f.seek(log_size_before)
log_tail = f.read()
# Find rejection-related lines
rejection_lines = []
for line in log_tail.splitlines():
lowline = line.lower()
if any(kw in lowline for kw in ["failed", "error", "reject", "invalid",
"high-hash", "bad-diff", "mismatch",
"checkblock", "checkproof", "randomx"]):
rejection_lines.append(line.strip())
if rejection_lines:
print(" Rejection log entries:")
for line in rejection_lines[-10:]:
print(f" {line}")
else:
print(" No rejection entries found in new log output.")
else:
print(f" debug.log not found at {DEBUG_LOG}")
# Step 9: Evaluate result
print("\n" + "=" * 60)
rejected_by_rpc = result.lower() in ("rejected", "invalid") if result else False
rejected_by_log = any("FAILED" in l or "MISMATCH" in l for l in (rejection_lines if os.path.exists(DEBUG_LOG) and rejection_lines else []))
if rejected_by_rpc or rejected_by_log or result == "":
print("PASS: Block with diff=1 was correctly REJECTED!")
if result:
print(f" RPC result: {result}")
else:
print(" RPC returned empty (block processed but failed validation)")
elif "duplicate" in (result or "").lower():
print(f"NOTE: Block was seen as duplicate. Result: {result}")
else:
print(f"RESULT: {result}")
print(" Check debug.log for rejection details.")
# Step 10: Verify chain tip didn't change
print("\n[8] Verifying chain tip unchanged...")
new_hash = rpc("getbestblockhash")
if new_hash == best_hash:
print(f" Chain tip unchanged: {new_hash}")
print(" CONFIRMED: Bad block did not affect the chain.")
else:
print(f" WARNING: Chain tip changed! {best_hash} -> {new_hash}")
print(" This should NOT happen!")
print("\n" + "=" * 60)
print("Test complete.")
if __name__ == "__main__":
main()