Thread safe cache with separate ingestor

This commit is contained in:
Aditya Kulkarni
2019-09-25 21:15:32 -07:00
parent fbb75e8f20
commit a8cc2424a2
6 changed files with 193 additions and 131 deletions

View File

@@ -2,6 +2,7 @@ package common
import (
"bytes"
"sync"
"github.com/adityapk00/lightwalletd/walletrpc"
"github.com/pkg/errors"
@@ -14,9 +15,11 @@ type BlockCache struct {
LastBlock int
m map[int]*walletrpc.CompactBlock
mutex sync.RWMutex
}
func New(maxEntries int) *BlockCache {
func NewBlockCache(maxEntries int) *BlockCache {
return &BlockCache{
MaxEntries: maxEntries,
FirstBlock: -1,
@@ -26,25 +29,18 @@ func New(maxEntries int) *BlockCache {
}
func (c *BlockCache) Add(height int, block *walletrpc.CompactBlock) error {
c.mutex.Lock()
defer c.mutex.Unlock()
//println("Cache add", height)
if c.FirstBlock == -1 && c.LastBlock == -1 {
// If this is the first block, prep the data structure
c.FirstBlock = height
c.LastBlock = height - 1
} else if height >= c.FirstBlock && height <= c.LastBlock {
// Overwriting an existing entry. If so, then remove all
// subsequent blocks, since this might be a reorg
for i := height; i <= c.LastBlock; i++ {
//println("Deleteing at height", i)
delete(c.m, i)
}
c.LastBlock = height - 1
}
if height != c.LastBlock+1 {
return errors.New("Blocks need to be added sequentially")
}
// Don't allow out-of-order blocks. This is more of a sanity check than anything
// If there is a reorg, then the ingestor needs to handle it.
if c.m[height-1] != nil && !bytes.Equal(block.PrevHash, c.m[height-1].Hash) {
return errors.New("Prev hash of the block didn't match")
}
@@ -66,6 +62,9 @@ func (c *BlockCache) Add(height int, block *walletrpc.CompactBlock) error {
}
func (c *BlockCache) Get(height int) *walletrpc.CompactBlock {
c.mutex.RLock()
defer c.mutex.RUnlock()
//println("Cache get", height)
if c.LastBlock == -1 || c.FirstBlock == -1 {
return nil
@@ -79,3 +78,10 @@ func (c *BlockCache) Get(height int) *walletrpc.CompactBlock {
//println("Cache returned")
return c.m[height]
}
func (c *BlockCache) GetLatestBlock() int {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.LastBlock
}

View File

@@ -1,19 +1,21 @@
package common
import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/adityapk00/lightwalletd/parser"
"github.com/adityapk00/lightwalletd/walletrpc"
"github.com/btcsuite/btcd/rpcclient"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
func GetSaplingInfo(rpcClient *rpcclient.Client) (int, string, string, error) {
func GetSaplingInfo(rpcClient *rpcclient.Client) (int, int, string, string, error) {
result, rpcErr := rpcClient.RawRequest("getblockchaininfo", make([]json.RawMessage, 0))
var err error
@@ -25,15 +27,15 @@ func GetSaplingInfo(rpcClient *rpcclient.Client) (int, string, string, error) {
errCode, err = strconv.ParseInt(errParts[0], 10, 32)
//Check to see if we are requesting a height the zcashd doesn't have yet
if err == nil && errCode == -8 {
return -1, "", "", nil
return -1, -1, "", "", nil
}
return -1, "", "", errors.Wrap(rpcErr, "error requesting block")
return -1, -1, "", "", errors.Wrap(rpcErr, "error requesting block")
}
var f interface{}
err = json.Unmarshal(result, &f)
if err != nil {
return -1, "", "", errors.Wrap(err, "error reading JSON response")
return -1, -1, "", "", errors.Wrap(err, "error reading JSON response")
}
chainName := f.(map[string]interface{})["chain"].(string)
@@ -42,10 +44,12 @@ func GetSaplingInfo(rpcClient *rpcclient.Client) (int, string, string, error) {
saplingJSON := upgradeJSON.(map[string]interface{})["76b809bb"] // Sapling ID
saplingHeight := saplingJSON.(map[string]interface{})["activationheight"].(float64)
blockHeight := f.(map[string]interface{})["headers"].(float64)
consensus := f.(map[string]interface{})["consensus"]
branchID := consensus.(map[string]interface{})["nextblock"].(string)
return int(saplingHeight), chainName, branchID, nil
return int(saplingHeight), int(blockHeight), chainName, branchID, nil
}
func getBlockFromRPC(rpcClient *rpcclient.Client, height int) (*walletrpc.CompactBlock, error) {
@@ -91,6 +95,78 @@ func getBlockFromRPC(rpcClient *rpcclient.Client, height int) (*walletrpc.Compac
return block.ToCompact(), nil
}
func BlockIngestor(rpcClient *rpcclient.Client, cache *BlockCache, log *logrus.Entry,
stopChan chan bool, startHeight int) {
reorgCount := -1
height := startHeight
timeoutCount := 0
hash := ""
phash := ""
// Start listening for new blocks
for {
select {
case <-stopChan:
break
case <-time.After(15 * time.Second):
for {
if reorgCount > 0 {
reorgCount = -1
height -= 10
}
block, err := getBlockFromRPC(rpcClient, height)
if err != nil {
log.WithFields(logrus.Fields{
"height": height,
"error": err,
}).Warn("error with getblock")
timeoutCount++
if timeoutCount == 3 {
log.WithFields(logrus.Fields{
"timeouts": timeoutCount,
}).Warn("unable to issue RPC call to zcashd node 3 times")
break
}
}
if block != nil {
log.Info("Ingestor adding block to cache: ", height)
cache.Add(height, block)
if timeoutCount > 0 {
timeoutCount--
}
phash = hex.EncodeToString(block.PrevHash)
//check for reorgs once we have inital block hash from startup
if hash != phash && reorgCount != -1 {
reorgCount++
log.WithFields(logrus.Fields{
"height": height,
"hash(reversed)": hash,
"phash(reversed)": phash,
"reorg": reorgCount,
}).Warn("REORG")
} else {
hash = hex.EncodeToString(block.Hash)
}
if reorgCount == -1 {
hash = hex.EncodeToString(block.Hash)
reorgCount = 0
}
height++
} else {
break
}
}
}
}
}
func GetBlock(rpcClient *rpcclient.Client, cache *BlockCache, height int) (*walletrpc.CompactBlock, error) {
// First, check the cache to see if we have the block
block := cache.Get(height)
@@ -98,51 +174,19 @@ func GetBlock(rpcClient *rpcclient.Client, cache *BlockCache, height int) (*wall
return block, nil
}
// If a block was not found, make sure user is requesting a historical block
if height > cache.GetLatestBlock() {
return nil, errors.New(
fmt.Sprintf(
"Block requested is newer than latest block. Requested: %d Latest: %d",
height, cache.GetLatestBlock()))
}
block, err := getBlockFromRPC(rpcClient, height)
if err != nil {
return nil, err
}
// Store the block in the cache, but test for reorg first
prevBlock := cache.Get(height - 1)
if prevBlock != nil {
if !bytes.Equal(prevBlock.Hash, block.PrevHash) {
// Reorg!
reorgCount := 0
cacheBlock := cache.Get(height - reorgCount)
rpcBlocks := []*walletrpc.CompactBlock{}
for ; reorgCount <= 100 &&
cacheBlock != nil &&
!bytes.Equal(block.PrevHash, cacheBlock.Hash); reorgCount++ {
block, err = getBlockFromRPC(rpcClient, height-reorgCount-1)
if err != nil {
return nil, err
}
_ = append(rpcBlocks, block)
cacheBlock = cache.Get(height - reorgCount - 2)
}
if reorgCount == 100 {
return nil, errors.New("Max reorg depth exceeded")
}
// At this point, the block.prevHash == cache.hash
// Store all blocks starting with 'block'
for i := len(rpcBlocks) - 1; i >= 0; i-- {
cache.Add(int(rpcBlocks[i].Height), rpcBlocks[i])
}
}
}
cache.Add(height, block)
return block, nil
}