Alex Vanin
4fc7478e1e
Sometimes multiple environments have blockchains with the same magic number. In this case, user should not reuse cached chain, because cache contains magic number to identify different chains. With new '-f' flag user will be able to repopulate cache with new data for the chain with the same magic number. Signed-off-by: Alex Vanin <a.vanin@yadro.com>
274 lines
5.6 KiB
Go
274 lines
5.6 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"path"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/monza/internal/chain"
|
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
|
"github.com/schollz/progressbar/v3"
|
|
"github.com/urfave/cli/v2"
|
|
)
|
|
|
|
func main() {
|
|
app := &cli.App{
|
|
Name: "monza",
|
|
Usage: "monitor notification events in N3 compatible chains",
|
|
Commands: []*cli.Command{
|
|
{
|
|
Name: "run",
|
|
Usage: "look up over subset of blocks to find notifications",
|
|
UsageText: "monza run -r [endpoint] --from 101000 --to p1000 -n \"Transfer:gas\" -n \"newEpoch:*\"",
|
|
Action: monza,
|
|
Flags: []cli.Flag{
|
|
endpointFlag,
|
|
fromFlag,
|
|
toFlag,
|
|
notificationFlag,
|
|
cacheFlag,
|
|
workersFlag,
|
|
disableProgressBarFlag,
|
|
forceCacheRewriteFlag,
|
|
},
|
|
},
|
|
{
|
|
Name: "stutter",
|
|
Usage: "find stuttered blocks in subset",
|
|
UsageText: "monza stutter -r [endpoint] --from 101000 --to p1000 --threshold 20s",
|
|
Action: stutter,
|
|
Flags: []cli.Flag{
|
|
endpointFlag,
|
|
fromFlag,
|
|
toFlag,
|
|
stutterThresholdFlag,
|
|
cacheFlag,
|
|
workersFlag,
|
|
disableProgressBarFlag,
|
|
},
|
|
},
|
|
{
|
|
Name: "explore",
|
|
Usage: "explore stuttered blocks in subset",
|
|
UsageText: "monza explore -r [endpoint]",
|
|
Action: explorer,
|
|
Flags: []cli.Flag{
|
|
endpointFlag,
|
|
cacheFlag,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
err := app.Run(os.Args)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
}
|
|
|
|
func monza(c *cli.Context) (err error) {
|
|
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
|
|
|
|
// parse blockchain info
|
|
cacheDir := c.String(cacheFlagKey)
|
|
if len(cacheDir) == 0 {
|
|
cacheDir, err = defaultConfigDir()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
blockchain, err := chain.Open(ctx, cacheDir, c.String(endpointFlagKey), c.Bool(forceCacheRewriteKey))
|
|
if err != nil {
|
|
return fmt.Errorf("cannot initialize remote blockchain client: %w", err)
|
|
}
|
|
defer func() {
|
|
blockchain.Close()
|
|
cancel()
|
|
}()
|
|
|
|
// parse block indices
|
|
from, to, err := parseInterval(c.String(fromFlagKey), c.String(toFlagKey), blockchain.Client)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// parse notifications
|
|
notifications, err := parseNotifications(c.StringSlice(notificationFlagKey), blockchain.Client)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// start monza
|
|
return run(ctx, ¶ms{
|
|
from: from,
|
|
to: to,
|
|
blockchain: blockchain,
|
|
notifications: notifications,
|
|
workers: int(c.Uint64(workersFlagKey)),
|
|
disableBar: c.Bool(disableProgressBarFlagKey),
|
|
})
|
|
}
|
|
|
|
type params struct {
|
|
from, to uint32
|
|
blockchain *chain.Chain
|
|
notifications map[string]*util.Uint160
|
|
workers int
|
|
disableBar bool
|
|
}
|
|
|
|
func run(ctx context.Context, p *params) error {
|
|
err := cacheBlocks(ctx, p)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for i := p.from; i < p.to; i++ {
|
|
b, err := p.blockchain.Block(i)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot fetch block %d: %w", i, err)
|
|
}
|
|
|
|
notifications, err := p.blockchain.AllNotifications(b)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot fetch notifications from block %d: %w", i, err)
|
|
}
|
|
|
|
for _, ev := range notifications {
|
|
contract, ok := p.notifications[ev.Name]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
if contract != nil && !contract.Equals(ev.ScriptHash) {
|
|
continue
|
|
}
|
|
|
|
switch ev.Name {
|
|
case "Transfer":
|
|
PrintTransfer(b, ev)
|
|
case "NewEpoch":
|
|
PrintNewEpoch(b, ev)
|
|
case "AddPeerSuccess":
|
|
PrintAddPeerSuccess(b, ev)
|
|
case "UpdateState":
|
|
PrintUpdateState(b, ev)
|
|
default:
|
|
PrintEvent(b, ev, "")
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func cacheBlocks(ctx context.Context, p *params) error {
|
|
if p.workers <= 0 {
|
|
return fmt.Errorf("invalid amount of workers %d", p.workers)
|
|
}
|
|
|
|
var bar *progressbar.ProgressBar
|
|
if !p.disableBar {
|
|
bar = progressbar.NewOptions(int(p.to-p.from),
|
|
progressbar.OptionSetDescription("syncing"),
|
|
progressbar.OptionSetWriter(os.Stderr),
|
|
progressbar.OptionSetWidth(10),
|
|
progressbar.OptionThrottle(65*time.Millisecond),
|
|
progressbar.OptionShowCount(),
|
|
progressbar.OptionShowIts(),
|
|
progressbar.OptionSetItsString("blocks"),
|
|
progressbar.OptionOnCompletion(func() {
|
|
_, _ = fmt.Fprint(os.Stderr, "\n")
|
|
}),
|
|
progressbar.OptionSpinnerType(14),
|
|
progressbar.OptionSetWidth(50),
|
|
progressbar.OptionSetTheme(progressbar.Theme{
|
|
Saucer: "#",
|
|
SaucerHead: "#",
|
|
SaucerPadding: " ",
|
|
BarStart: "[",
|
|
BarEnd: "]",
|
|
}),
|
|
)
|
|
}
|
|
|
|
jobCh := make(chan uint32)
|
|
errCh := make(chan error)
|
|
wgCh := make(chan struct{})
|
|
|
|
wg := new(sync.WaitGroup)
|
|
|
|
for i := 0; i < p.workers; i++ {
|
|
go func(ctx context.Context, ch <-chan uint32, out chan<- error) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case block, ok := <-ch:
|
|
wg.Add(1)
|
|
if !ok {
|
|
return
|
|
}
|
|
b, err := p.blockchain.Block(block)
|
|
if err != nil {
|
|
out <- err
|
|
return
|
|
}
|
|
_, err = p.blockchain.AllNotifications(b)
|
|
if err != nil {
|
|
out <- err
|
|
return
|
|
}
|
|
if bar != nil {
|
|
_ = bar.Add(1)
|
|
}
|
|
wg.Done()
|
|
}
|
|
}
|
|
}(ctx, jobCh, errCh)
|
|
}
|
|
|
|
for i := p.from; i < p.to; i++ {
|
|
select {
|
|
case <-ctx.Done():
|
|
return errors.New("interrupted")
|
|
case err := <-errCh:
|
|
return err
|
|
case jobCh <- i:
|
|
}
|
|
}
|
|
|
|
go func() {
|
|
wg.Wait()
|
|
close(wgCh)
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return errors.New("interrupted")
|
|
case err := <-errCh:
|
|
return err
|
|
case <-wgCh:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func defaultConfigDir() (string, error) {
|
|
home, err := os.UserHomeDir()
|
|
if err != nil {
|
|
log.Fatalf("cannot determine home dir for default config path: %s", err)
|
|
}
|
|
|
|
p := path.Join(home, ".config")
|
|
p = path.Join(p, "monza")
|
|
|
|
return p, os.MkdirAll(p, os.ModePerm)
|
|
}
|