monza/main.go

275 lines
5.6 KiB
Go
Raw Permalink Normal View History

package main
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"path"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/monza/internal/chain"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/schollz/progressbar/v3"
"github.com/urfave/cli/v2"
)
func main() {
app := &cli.App{
Name: "monza",
Usage: "monitor notification events in N3 compatible chains",
Commands: []*cli.Command{
{
Name: "run",
Usage: "look up over subset of blocks to find notifications",
UsageText: "monza run -r [endpoint] --from 101000 --to p1000 -n \"Transfer:gas\" -n \"newEpoch:*\"",
Action: monza,
Flags: []cli.Flag{
endpointFlag,
fromFlag,
toFlag,
notificationFlag,
cacheFlag,
workersFlag,
disableProgressBarFlag,
forceCacheRewriteFlag,
},
},
{
Name: "stutter",
Usage: "find stuttered blocks in subset",
UsageText: "monza stutter -r [endpoint] --from 101000 --to p1000 --threshold 20s",
Action: stutter,
Flags: []cli.Flag{
endpointFlag,
fromFlag,
toFlag,
stutterThresholdFlag,
cacheFlag,
workersFlag,
disableProgressBarFlag,
},
},
{
Name: "explore",
Usage: "explore stuttered blocks in subset",
UsageText: "monza explore -r [endpoint]",
Action: explorer,
Flags: []cli.Flag{
endpointFlag,
cacheFlag,
},
},
},
}
err := app.Run(os.Args)
if err != nil {
log.Fatal(err)
}
}
func monza(c *cli.Context) (err error) {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
// parse blockchain info
cacheDir := c.String(cacheFlagKey)
if len(cacheDir) == 0 {
cacheDir, err = defaultConfigDir()
if err != nil {
return err
}
}
blockchain, err := chain.Open(ctx, cacheDir, c.String(endpointFlagKey), c.Bool(forceCacheRewriteKey))
if err != nil {
return fmt.Errorf("cannot initialize remote blockchain client: %w", err)
}
defer func() {
blockchain.Close()
cancel()
}()
// parse block indices
from, to, err := parseInterval(c.String(fromFlagKey), c.String(toFlagKey), blockchain.Client)
if err != nil {
return err
}
// parse notifications
notifications, err := parseNotifications(c.StringSlice(notificationFlagKey), blockchain.Client)
if err != nil {
return err
}
// start monza
return run(ctx, &params{
from: from,
to: to,
blockchain: blockchain,
notifications: notifications,
workers: int(c.Uint64(workersFlagKey)),
disableBar: c.Bool(disableProgressBarFlagKey),
})
}
type params struct {
from, to uint32
blockchain *chain.Chain
notifications map[string]*util.Uint160
workers int
disableBar bool
}
func run(ctx context.Context, p *params) error {
err := cacheBlocks(ctx, p)
if err != nil {
return err
}
for i := p.from; i < p.to; i++ {
b, err := p.blockchain.Block(i)
if err != nil {
return fmt.Errorf("cannot fetch block %d: %w", i, err)
}
notifications, err := p.blockchain.AllNotifications(b)
if err != nil {
return fmt.Errorf("cannot fetch notifications from block %d: %w", i, err)
}
for _, ev := range notifications {
contract, ok := p.notifications[ev.Name]
if !ok {
continue
}
if contract != nil && !contract.Equals(ev.ScriptHash) {
continue
}
switch ev.Name {
case "Transfer":
PrintTransfer(b, ev)
case "NewEpoch":
PrintNewEpoch(b, ev)
case "AddPeerSuccess":
PrintAddPeerSuccess(b, ev)
case "UpdateState":
PrintUpdateState(b, ev)
default:
PrintEvent(b, ev, "")
}
}
}
return nil
}
func cacheBlocks(ctx context.Context, p *params) error {
if p.workers <= 0 {
return fmt.Errorf("invalid amount of workers %d", p.workers)
}
var bar *progressbar.ProgressBar
if !p.disableBar {
bar = progressbar.NewOptions(int(p.to-p.from),
progressbar.OptionSetDescription("syncing"),
progressbar.OptionSetWriter(os.Stderr),
progressbar.OptionSetWidth(10),
progressbar.OptionThrottle(65*time.Millisecond),
progressbar.OptionShowCount(),
progressbar.OptionShowIts(),
progressbar.OptionSetItsString("blocks"),
progressbar.OptionOnCompletion(func() {
_, _ = fmt.Fprint(os.Stderr, "\n")
}),
progressbar.OptionSpinnerType(14),
progressbar.OptionSetWidth(50),
progressbar.OptionSetTheme(progressbar.Theme{
Saucer: "#",
SaucerHead: "#",
SaucerPadding: " ",
BarStart: "[",
BarEnd: "]",
}),
)
}
jobCh := make(chan uint32)
errCh := make(chan error)
wgCh := make(chan struct{})
wg := new(sync.WaitGroup)
for i := 0; i < p.workers; i++ {
go func(ctx context.Context, ch <-chan uint32, out chan<- error) {
for {
select {
case <-ctx.Done():
return
case block, ok := <-ch:
wg.Add(1)
if !ok {
return
}
b, err := p.blockchain.Block(block)
if err != nil {
out <- err
return
}
_, err = p.blockchain.AllNotifications(b)
if err != nil {
out <- err
return
}
if bar != nil {
_ = bar.Add(1)
}
wg.Done()
}
}
}(ctx, jobCh, errCh)
}
for i := p.from; i < p.to; i++ {
select {
case <-ctx.Done():
return errors.New("interrupted")
case err := <-errCh:
return err
case jobCh <- i:
}
}
go func() {
wg.Wait()
close(wgCh)
}()
select {
case <-ctx.Done():
return errors.New("interrupted")
case err := <-errCh:
return err
case <-wgCh:
return nil
}
}
func defaultConfigDir() (string, error) {
home, err := os.UserHomeDir()
if err != nil {
log.Fatalf("cannot determine home dir for default config path: %s", err)
}
p := path.Join(home, ".config")
p = path.Join(p, "monza")
return p, os.MkdirAll(p, os.ModePerm)
}