monza/main.go
Alex Vanin 4fc7478e1e [#1] Add '-f' flag to overwrite local cache
Sometimes multiple environments have blockchains with
the same magic number. In this case, user should not
reuse cached chain, because cache contains magic number
to identify different chains. With new '-f' flag user
will be able to repopulate cache with new data for the
chain with the same magic number.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-19 17:20:45 +03:00

274 lines
5.6 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"path"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/monza/internal/chain"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/schollz/progressbar/v3"
"github.com/urfave/cli/v2"
)
func main() {
app := &cli.App{
Name: "monza",
Usage: "monitor notification events in N3 compatible chains",
Commands: []*cli.Command{
{
Name: "run",
Usage: "look up over subset of blocks to find notifications",
UsageText: "monza run -r [endpoint] --from 101000 --to p1000 -n \"Transfer:gas\" -n \"newEpoch:*\"",
Action: monza,
Flags: []cli.Flag{
endpointFlag,
fromFlag,
toFlag,
notificationFlag,
cacheFlag,
workersFlag,
disableProgressBarFlag,
forceCacheRewriteFlag,
},
},
{
Name: "stutter",
Usage: "find stuttered blocks in subset",
UsageText: "monza stutter -r [endpoint] --from 101000 --to p1000 --threshold 20s",
Action: stutter,
Flags: []cli.Flag{
endpointFlag,
fromFlag,
toFlag,
stutterThresholdFlag,
cacheFlag,
workersFlag,
disableProgressBarFlag,
},
},
{
Name: "explore",
Usage: "explore stuttered blocks in subset",
UsageText: "monza explore -r [endpoint]",
Action: explorer,
Flags: []cli.Flag{
endpointFlag,
cacheFlag,
},
},
},
}
err := app.Run(os.Args)
if err != nil {
log.Fatal(err)
}
}
func monza(c *cli.Context) (err error) {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
// parse blockchain info
cacheDir := c.String(cacheFlagKey)
if len(cacheDir) == 0 {
cacheDir, err = defaultConfigDir()
if err != nil {
return err
}
}
blockchain, err := chain.Open(ctx, cacheDir, c.String(endpointFlagKey), c.Bool(forceCacheRewriteKey))
if err != nil {
return fmt.Errorf("cannot initialize remote blockchain client: %w", err)
}
defer func() {
blockchain.Close()
cancel()
}()
// parse block indices
from, to, err := parseInterval(c.String(fromFlagKey), c.String(toFlagKey), blockchain.Client)
if err != nil {
return err
}
// parse notifications
notifications, err := parseNotifications(c.StringSlice(notificationFlagKey), blockchain.Client)
if err != nil {
return err
}
// start monza
return run(ctx, &params{
from: from,
to: to,
blockchain: blockchain,
notifications: notifications,
workers: int(c.Uint64(workersFlagKey)),
disableBar: c.Bool(disableProgressBarFlagKey),
})
}
type params struct {
from, to uint32
blockchain *chain.Chain
notifications map[string]*util.Uint160
workers int
disableBar bool
}
func run(ctx context.Context, p *params) error {
err := cacheBlocks(ctx, p)
if err != nil {
return err
}
for i := p.from; i < p.to; i++ {
b, err := p.blockchain.Block(i)
if err != nil {
return fmt.Errorf("cannot fetch block %d: %w", i, err)
}
notifications, err := p.blockchain.AllNotifications(b)
if err != nil {
return fmt.Errorf("cannot fetch notifications from block %d: %w", i, err)
}
for _, ev := range notifications {
contract, ok := p.notifications[ev.Name]
if !ok {
continue
}
if contract != nil && !contract.Equals(ev.ScriptHash) {
continue
}
switch ev.Name {
case "Transfer":
PrintTransfer(b, ev)
case "NewEpoch":
PrintNewEpoch(b, ev)
case "AddPeerSuccess":
PrintAddPeerSuccess(b, ev)
case "UpdateState":
PrintUpdateState(b, ev)
default:
PrintEvent(b, ev, "")
}
}
}
return nil
}
func cacheBlocks(ctx context.Context, p *params) error {
if p.workers <= 0 {
return fmt.Errorf("invalid amount of workers %d", p.workers)
}
var bar *progressbar.ProgressBar
if !p.disableBar {
bar = progressbar.NewOptions(int(p.to-p.from),
progressbar.OptionSetDescription("syncing"),
progressbar.OptionSetWriter(os.Stderr),
progressbar.OptionSetWidth(10),
progressbar.OptionThrottle(65*time.Millisecond),
progressbar.OptionShowCount(),
progressbar.OptionShowIts(),
progressbar.OptionSetItsString("blocks"),
progressbar.OptionOnCompletion(func() {
_, _ = fmt.Fprint(os.Stderr, "\n")
}),
progressbar.OptionSpinnerType(14),
progressbar.OptionSetWidth(50),
progressbar.OptionSetTheme(progressbar.Theme{
Saucer: "#",
SaucerHead: "#",
SaucerPadding: " ",
BarStart: "[",
BarEnd: "]",
}),
)
}
jobCh := make(chan uint32)
errCh := make(chan error)
wgCh := make(chan struct{})
wg := new(sync.WaitGroup)
for i := 0; i < p.workers; i++ {
go func(ctx context.Context, ch <-chan uint32, out chan<- error) {
for {
select {
case <-ctx.Done():
return
case block, ok := <-ch:
wg.Add(1)
if !ok {
return
}
b, err := p.blockchain.Block(block)
if err != nil {
out <- err
return
}
_, err = p.blockchain.AllNotifications(b)
if err != nil {
out <- err
return
}
if bar != nil {
_ = bar.Add(1)
}
wg.Done()
}
}
}(ctx, jobCh, errCh)
}
for i := p.from; i < p.to; i++ {
select {
case <-ctx.Done():
return errors.New("interrupted")
case err := <-errCh:
return err
case jobCh <- i:
}
}
go func() {
wg.Wait()
close(wgCh)
}()
select {
case <-ctx.Done():
return errors.New("interrupted")
case err := <-errCh:
return err
case <-wgCh:
return nil
}
}
func defaultConfigDir() (string, error) {
home, err := os.UserHomeDir()
if err != nil {
log.Fatalf("cannot determine home dir for default config path: %s", err)
}
p := path.Join(home, ".config")
p = path.Join(p, "monza")
return p, os.MkdirAll(p, os.ModePerm)
}