monza/internal/chain/chain.go
Alex Vanin 4fc7478e1e [#1] Add '-f' flag to overwrite local cache
Sometimes multiple environments have blockchains with
the same magic number. In this case, user should not
reuse cached chain, because cache contains magic number
to identify different chains. With new '-f' flag user
will be able to repopulate cache with new data for the
chain with the same magic number.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-19 17:20:45 +03:00

243 lines
5.2 KiB
Go

package chain
import (
"context"
"encoding/binary"
"fmt"
"os"
"path"
"strconv"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.etcd.io/bbolt"
)
type Chain struct {
db *bbolt.DB
stateRoot bool
Client *rpcclient.Client
}
var (
blocksBucket = []byte("blocks")
logsBucket = []byte("logs")
)
func Open(ctx context.Context, dir, endpoint string, rewrite bool) (*Chain, error) {
cli, err := rpcclient.New(ctx, endpoint, rpcclient.Options{})
if err != nil {
return nil, fmt.Errorf("rpc connection: %w", err)
}
err = cli.Init()
if err != nil {
return nil, fmt.Errorf("rpc client initialization: %w", err)
}
v, err := cli.GetVersion()
if err != nil {
return nil, fmt.Errorf("rpc get version: %w", err)
}
dbPath := path.Join(dir, strconv.Itoa(int(v.Protocol.Network))+".db")
if rewrite {
_ = os.Remove(dbPath) // ignore error if file does not exist, etc.
}
db, err := bbolt.Open(dbPath, 0600, nil)
if err != nil {
return nil, fmt.Errorf("database [%s] init: %w", dbPath, err)
}
return &Chain{db, v.Protocol.StateRootInHeader, cli}, nil
}
func (d *Chain) Block(i uint32) (res *block.Block, err error) {
cached, err := d.block(i)
if err != nil {
return nil, err
}
if cached != nil {
return cached, nil
}
metaBlock, err := d.Client.GetBlockByIndexVerbose(i)
if err != nil {
return nil, fmt.Errorf("block %d fetch: %w", i, err)
}
return &metaBlock.Block, d.addBlock(&metaBlock.Block)
}
func (d *Chain) BlockByHash(h util.Uint256) (res *block.Block, err error) {
rev := h.Reverse()
metaBlock, err := d.Client.GetBlockByHashVerbose(rev)
if err != nil {
return nil, fmt.Errorf("block %s fetch: %w", h.StringLE(), err)
}
return &metaBlock.Block, d.addBlock(&metaBlock.Block)
}
func (d *Chain) block(i uint32) (res *block.Block, err error) {
err = d.db.View(func(tx *bbolt.Tx) error {
key := make([]byte, 4)
binary.LittleEndian.PutUint32(key, i)
bkt := tx.Bucket(blocksBucket)
if bkt == nil {
return nil
}
data := bkt.Get(key)
if len(data) == 0 {
return nil
}
res = block.New(d.stateRoot)
r := io.NewBinReaderFromBuf(data)
res.DecodeBinary(r)
return r.Err
})
if err != nil {
return nil, fmt.Errorf("cannot read block %d from cache: %w", i, err)
}
return res, nil
}
func (d *Chain) addBlock(block *block.Block) error {
err := d.db.Batch(func(tx *bbolt.Tx) error {
key := make([]byte, 4)
binary.LittleEndian.PutUint32(key, block.Index)
w := io.NewBufBinWriter()
block.EncodeBinary(w.BinWriter)
if w.Err != nil {
return w.Err
}
bkt, err := tx.CreateBucketIfNotExists(blocksBucket)
if err != nil {
return err
}
return bkt.Put(key, w.Bytes())
})
if err != nil {
return fmt.Errorf("cannot add block %d to cache: %w", block.Index, err)
}
return nil
}
func (d *Chain) ApplicationLog(txHash util.Uint256) (*result.ApplicationLog, error) {
cached, err := d.applicationLog(txHash)
if err != nil {
return nil, err
}
if cached != nil {
return cached, nil
}
appLog, err := d.Client.GetApplicationLog(txHash, nil)
if err != nil {
return nil, fmt.Errorf("app log of tx %s fetch: %w", txHash.StringLE(), err)
}
return appLog, d.addApplicationLog(txHash, appLog)
}
func (d *Chain) applicationLog(txHash util.Uint256) (res *result.ApplicationLog, err error) {
err = d.db.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(logsBucket)
if bkt == nil {
return nil
}
data := bkt.Get(txHash.BytesLE())
if len(data) == 0 {
return nil
}
res = new(result.ApplicationLog)
return res.UnmarshalJSON(bkt.Get(txHash.BytesLE()))
})
if err != nil {
return nil, fmt.Errorf("cannot read tx %s from cache: %w", txHash.StringLE(), err)
}
return res, nil
}
func (d *Chain) addApplicationLog(txHash util.Uint256, appLog *result.ApplicationLog) error {
err := d.db.Batch(func(tx *bbolt.Tx) error {
val, err := appLog.MarshalJSON()
if err != nil {
return err
}
bkt, err := tx.CreateBucketIfNotExists(logsBucket)
if err != nil {
return err
}
return bkt.Put(txHash.BytesLE(), val)
})
if err != nil {
return fmt.Errorf("cannot add tx %s to cache: %w", txHash.StringLE(), err)
}
return nil
}
func (d *Chain) Notifications(txHash util.Uint256) ([]state.NotificationEvent, error) {
appLog, err := d.ApplicationLog(txHash)
if err != nil {
return nil, err
}
res := make([]state.NotificationEvent, 0)
for _, execution := range appLog.Executions {
res = append(res, execution.Events...)
}
return res, nil
}
func (d *Chain) AllNotifications(b *block.Block) ([]state.NotificationEvent, error) {
res := make([]state.NotificationEvent, 0)
appLog, err := d.ApplicationLog(b.Hash())
if err != nil {
return nil, err
}
for _, execution := range appLog.Executions {
res = append(res, execution.Events...)
}
for _, tx := range b.Transactions {
appLog, err = d.ApplicationLog(tx.Hash())
if err != nil {
return nil, err
}
for _, execution := range appLog.Executions {
res = append(res, execution.Events...)
}
}
return res, nil
}
func (d *Chain) Close() {
_ = d.db.Close()
}