package client import ( "context" "errors" "fmt" "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" lru "github.com/hashicorp/golang-lru/v2" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/rpcclient" "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" "go.uber.org/zap" ) // Option is a client configuration change function. type Option func(*cfg) // Callback is a function that is going to be called // on certain Client's state. type Callback func() // groups the configurations with default values. type cfg struct { dialTimeout time.Duration // client dial timeout logger *logger.Logger // logging component metrics morphmetrics.Register waitInterval time.Duration signer *transaction.Signer endpoints []Endpoint singleCli *rpcclient.WSClient // neo-go client for single client mode inactiveModeCb Callback switchInterval time.Duration morphCacheMetrics metrics.MorphCacheMetrics cmode *atomic.Bool } const ( defaultDialTimeout = 5 * time.Second defaultWaitInterval = 500 * time.Millisecond ) var ErrNoHealthyEndpoint = errors.New("no healthy endpoint") func defaultConfig() *cfg { return &cfg{ dialTimeout: defaultDialTimeout, logger: &logger.Logger{Logger: zap.L()}, metrics: morphmetrics.NoopRegister{}, waitInterval: defaultWaitInterval, signer: &transaction.Signer{ Scopes: transaction.Global, }, morphCacheMetrics: &morphmetrics.NoopMorphCacheMetrics{}, } } // New creates, initializes and returns the Client instance. // Notary support should be enabled with EnableNotarySupport client // method separately. // // If private key is nil, it panics. // // Other values are set according to provided options, or by default: // - client context: Background; // - dial timeout: 5s; // - blockchain network type: netmode.PrivNet; // - signer with the global scope; // - wait interval: 500ms; // - logger: &logger.Logger{Logger: zap.L()}. // - metrics: metrics.NoopRegister // // If desired option satisfies the default value, it can be omitted. // If multiple options of the same config value are supplied, // the option with the highest index in the arguments will be used. // If the list of endpoints provided - uses first alive. // If there are no healthy endpoint - returns ErrNoHealthyEndpoint. func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, error) { if key == nil { panic("empty private key") } acc := wallet.NewAccountFromPrivateKey(key) accAddr := key.GetScriptHash() // build default configuration cfg := defaultConfig() // apply options for _, opt := range opts { opt(cfg) } if len(cfg.endpoints) == 0 { return nil, errors.New("no endpoints were provided") } cli := &Client{ cache: newClientCache(cfg.morphCacheMetrics), logger: cfg.logger, metrics: cfg.metrics, acc: acc, accAddr: accAddr, cfg: *cfg, closeChan: make(chan struct{}), } cli.endpoints.init(cfg.endpoints) var err error var act *actor.Actor if cfg.singleCli != nil { // return client in single RPC node mode that uses // predefined WS client // // in case of the closing web socket connection: // if extra endpoints were provided via options, // they will be used in switch process, otherwise // inactive mode will be enabled cli.client = cfg.singleCli act, err = newActor(cfg.singleCli, acc, *cfg) if err != nil { return nil, fmt.Errorf("could not create RPC actor: %w", err) } } else { var endpoint Endpoint for cli.endpoints.curr, endpoint = range cli.endpoints.list { cli.client, act, err = cli.newCli(ctx, endpoint.Address) if err != nil { cli.logger.Warn(logs.FrostFSIRCouldntCreateRPCClientForEndpoint, zap.Error(err), zap.String("endpoint", endpoint.Address)) } else { cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint, zap.String("endpoint", endpoint.Address)) break } } if cli.client == nil { return nil, ErrNoHealthyEndpoint } } cli.setActor(act) go cli.closeWaiter(ctx) return cli, nil } func (c *Client) newCli(ctx context.Context, endpoint string) (*rpcclient.WSClient, *actor.Actor, error) { cli, err := rpcclient.NewWS(ctx, endpoint, rpcclient.WSOptions{ Options: rpcclient.Options{ DialTimeout: c.cfg.dialTimeout, }, }) if err != nil { return nil, nil, fmt.Errorf("WS client creation: %w", err) } defer func() { if err != nil { cli.Close() } }() err = cli.Init() if err != nil { return nil, nil, fmt.Errorf("WS client initialization: %w", err) } act, err := newActor(cli, c.acc, c.cfg) if err != nil { return nil, nil, fmt.Errorf("RPC actor creation: %w", err) } return cli, act, nil } func newActor(ws *rpcclient.WSClient, acc *wallet.Account, cfg cfg) (*actor.Actor, error) { return actor.New(ws, []actor.SignerAccount{{ Signer: transaction.Signer{ Account: acc.PrivateKey().PublicKey().GetScriptHash(), Scopes: cfg.signer.Scopes, AllowedContracts: cfg.signer.AllowedContracts, AllowedGroups: cfg.signer.AllowedGroups, }, Account: acc, }}) } func newClientCache(morphCacheMetrics metrics.MorphCacheMetrics) cache { c, _ := lru.New[util.Uint256, uint32](100) // returns error only if size is negative return cache{ txHeights: c, metrics: morphCacheMetrics, } } // WithDialTimeout returns a client constructor option // that specifies neo-go client dial timeout duration. // // Ignores non-positive value. Has no effect if WithSingleClient // is provided. // // If option not provided, 5s timeout is used. func WithDialTimeout(dur time.Duration) Option { return func(c *cfg) { if dur > 0 { c.dialTimeout = dur } } } // WithLogger returns a client constructor option // that specifies the component for writing log messages. // // Ignores nil value. // // If option not provided, &logger.Logger{Logger: zap.L()} is used. func WithLogger(logger *logger.Logger) Option { return func(c *cfg) { if logger != nil { c.logger = logger } } } // WithMetrics returns a client constructor option // that specifies the component for reporting metrics. // // Ignores nil value. // // If option not provided, NoopMetrics is used. func WithMetrics(metrics morphmetrics.Register) Option { return func(c *cfg) { if metrics != nil { c.metrics = metrics } } } // WithSigner returns a client constructor option // that specifies the signer and the scope of the transaction. // // Ignores nil value. // // If option not provided, signer with global scope is used. func WithSigner(signer *transaction.Signer) Option { return func(c *cfg) { if signer != nil { c.signer = signer } } } // WithEndpoints returns a client constructor option // that specifies additional Neo rpc endpoints. func WithEndpoints(endpoints ...Endpoint) Option { return func(c *cfg) { c.endpoints = append(c.endpoints, endpoints...) } } // WithSingleClient returns a client constructor option // that specifies single neo-go client and forces Client // to use it for requests. // // Passed client must already be initialized. func WithSingleClient(cli *rpcclient.WSClient) Option { return func(c *cfg) { c.singleCli = cli } } // WithConnLostCallback return a client constructor option // that specifies a callback that is called when Client // unsuccessfully tried to connect to all the specified // endpoints. func WithConnLostCallback(cb Callback) Option { return func(c *cfg) { c.inactiveModeCb = cb } } // WithSwitchInterval returns a client constructor option // that specifies a wait interval b/w attempts to reconnect // to an RPC node with the highest priority. func WithSwitchInterval(i time.Duration) Option { return func(c *cfg) { c.switchInterval = i } } func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option { return func(c *cfg) { c.morphCacheMetrics = morphCacheMetrics } } // WithCompatibilityMode indicates that Client is working in compatibility mode // in this mode we need to keep backward compatibility with services with previous version. func WithCompatibilityMode(cmode *atomic.Bool) Option { return func(c *cfg) { c.cmode = cmode } }