frostfs-node/pkg/morph/client/constructor.go

325 lines
8.4 KiB
Go

package client
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"go.uber.org/zap"
)
// Option is a client configuration change function.
type Option func(*cfg)
// Callback is a function that is going to be called
// on certain Client's state.
type Callback func()
// groups the configurations with default values.
type cfg struct {
dialTimeout time.Duration // client dial timeout
logger *logger.Logger // logging component
metrics morphmetrics.Register
waitInterval time.Duration
signer *transaction.Signer
endpoints []Endpoint
singleCli *rpcclient.WSClient // neo-go client for single client mode
inactiveModeCb Callback
switchInterval time.Duration
morphCacheMetrics metrics.MorphCacheMetrics
cmode *atomic.Bool
}
const (
defaultDialTimeout = 5 * time.Second
defaultWaitInterval = 500 * time.Millisecond
)
var ErrNoHealthyEndpoint = errors.New("no healthy endpoint")
func defaultConfig() *cfg {
return &cfg{
dialTimeout: defaultDialTimeout,
logger: &logger.Logger{Logger: zap.L()},
metrics: morphmetrics.NoopRegister{},
waitInterval: defaultWaitInterval,
signer: &transaction.Signer{
Scopes: transaction.Global,
},
morphCacheMetrics: &morphmetrics.NoopMorphCacheMetrics{},
}
}
// New creates, initializes and returns the Client instance.
// Notary support should be enabled with EnableNotarySupport client
// method separately.
//
// If private key is nil, it panics.
//
// Other values are set according to provided options, or by default:
// - client context: Background;
// - dial timeout: 5s;
// - blockchain network type: netmode.PrivNet;
// - signer with the global scope;
// - wait interval: 500ms;
// - logger: &logger.Logger{Logger: zap.L()}.
// - metrics: metrics.NoopRegister
//
// If desired option satisfies the default value, it can be omitted.
// If multiple options of the same config value are supplied,
// the option with the highest index in the arguments will be used.
// If the list of endpoints provided - uses first alive.
// If there are no healthy endpoint - returns ErrNoHealthyEndpoint.
func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, error) {
if key == nil {
panic("empty private key")
}
acc := wallet.NewAccountFromPrivateKey(key)
accAddr := key.GetScriptHash()
// build default configuration
cfg := defaultConfig()
// apply options
for _, opt := range opts {
opt(cfg)
}
if len(cfg.endpoints) == 0 {
return nil, errors.New("no endpoints were provided")
}
cli := &Client{
cache: newClientCache(cfg.morphCacheMetrics),
logger: cfg.logger,
metrics: cfg.metrics,
acc: acc,
accAddr: accAddr,
cfg: *cfg,
closeChan: make(chan struct{}),
}
cli.endpoints.init(cfg.endpoints)
var err error
var act *actor.Actor
if cfg.singleCli != nil {
// return client in single RPC node mode that uses
// predefined WS client
//
// in case of the closing web socket connection:
// if extra endpoints were provided via options,
// they will be used in switch process, otherwise
// inactive mode will be enabled
cli.client = cfg.singleCli
act, err = newActor(cfg.singleCli, acc, *cfg)
if err != nil {
return nil, fmt.Errorf("could not create RPC actor: %w", err)
}
} else {
var endpoint Endpoint
for cli.endpoints.curr, endpoint = range cli.endpoints.list {
cli.client, act, err = cli.newCli(ctx, endpoint.Address)
if err != nil {
cli.logger.Warn(logs.FrostFSIRCouldntCreateRPCClientForEndpoint,
zap.Error(err), zap.String("endpoint", endpoint.Address))
} else {
cli.logger.Info(logs.FrostFSIRCreatedRPCClientForEndpoint,
zap.String("endpoint", endpoint.Address))
break
}
}
if cli.client == nil {
return nil, ErrNoHealthyEndpoint
}
}
cli.setActor(act)
go cli.closeWaiter(ctx)
return cli, nil
}
func (c *Client) newCli(ctx context.Context, endpoint string) (*rpcclient.WSClient, *actor.Actor, error) {
cli, err := rpcclient.NewWS(ctx, endpoint, rpcclient.WSOptions{
Options: rpcclient.Options{
DialTimeout: c.cfg.dialTimeout,
},
})
if err != nil {
return nil, nil, fmt.Errorf("WS client creation: %w", err)
}
defer func() {
if err != nil {
cli.Close()
}
}()
err = cli.Init()
if err != nil {
return nil, nil, fmt.Errorf("WS client initialization: %w", err)
}
act, err := newActor(cli, c.acc, c.cfg)
if err != nil {
return nil, nil, fmt.Errorf("RPC actor creation: %w", err)
}
return cli, act, nil
}
func newActor(ws *rpcclient.WSClient, acc *wallet.Account, cfg cfg) (*actor.Actor, error) {
return actor.New(ws, []actor.SignerAccount{{
Signer: transaction.Signer{
Account: acc.PrivateKey().PublicKey().GetScriptHash(),
Scopes: cfg.signer.Scopes,
AllowedContracts: cfg.signer.AllowedContracts,
AllowedGroups: cfg.signer.AllowedGroups,
},
Account: acc,
}})
}
func newClientCache(morphCacheMetrics metrics.MorphCacheMetrics) cache {
c, _ := lru.New[util.Uint256, uint32](100) // returns error only if size is negative
return cache{
txHeights: c,
metrics: morphCacheMetrics,
}
}
// WithDialTimeout returns a client constructor option
// that specifies neo-go client dial timeout duration.
//
// Ignores non-positive value. Has no effect if WithSingleClient
// is provided.
//
// If option not provided, 5s timeout is used.
func WithDialTimeout(dur time.Duration) Option {
return func(c *cfg) {
if dur > 0 {
c.dialTimeout = dur
}
}
}
// WithLogger returns a client constructor option
// that specifies the component for writing log messages.
//
// Ignores nil value.
//
// If option not provided, &logger.Logger{Logger: zap.L()} is used.
func WithLogger(logger *logger.Logger) Option {
return func(c *cfg) {
if logger != nil {
c.logger = logger
}
}
}
// WithMetrics returns a client constructor option
// that specifies the component for reporting metrics.
//
// Ignores nil value.
//
// If option not provided, NoopMetrics is used.
func WithMetrics(metrics morphmetrics.Register) Option {
return func(c *cfg) {
if metrics != nil {
c.metrics = metrics
}
}
}
// WithSigner returns a client constructor option
// that specifies the signer and the scope of the transaction.
//
// Ignores nil value.
//
// If option not provided, signer with global scope is used.
func WithSigner(signer *transaction.Signer) Option {
return func(c *cfg) {
if signer != nil {
c.signer = signer
}
}
}
// WithEndpoints returns a client constructor option
// that specifies additional Neo rpc endpoints.
func WithEndpoints(endpoints ...Endpoint) Option {
return func(c *cfg) {
c.endpoints = append(c.endpoints, endpoints...)
}
}
// WithSingleClient returns a client constructor option
// that specifies single neo-go client and forces Client
// to use it for requests.
//
// Passed client must already be initialized.
func WithSingleClient(cli *rpcclient.WSClient) Option {
return func(c *cfg) {
c.singleCli = cli
}
}
// WithConnLostCallback return a client constructor option
// that specifies a callback that is called when Client
// unsuccessfully tried to connect to all the specified
// endpoints.
func WithConnLostCallback(cb Callback) Option {
return func(c *cfg) {
c.inactiveModeCb = cb
}
}
// WithSwitchInterval returns a client constructor option
// that specifies a wait interval b/w attempts to reconnect
// to an RPC node with the highest priority.
func WithSwitchInterval(i time.Duration) Option {
return func(c *cfg) {
c.switchInterval = i
}
}
func WithMorphCacheMetrics(morphCacheMetrics metrics.MorphCacheMetrics) Option {
return func(c *cfg) {
c.morphCacheMetrics = morphCacheMetrics
}
}
// WithCompatibilityMode indicates that Client is working in compatibility mode
// in this mode we need to keep backward compatibility with services with previous version.
func WithCompatibilityMode(cmode *atomic.Bool) Option {
return func(c *cfg) {
c.cmode = cmode
}
}