[#2] Add FrostFS new epoch trigger

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-07-04 16:44:14 +03:00
parent 4e71fbeba6
commit d78861b148
13 changed files with 818 additions and 22 deletions

View file

@ -2,12 +2,21 @@ package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/notificator"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/resolver"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/spf13/viper"
"go.uber.org/zap"
)
@ -20,6 +29,7 @@ type (
done chan struct{}
appServices []*metrics.Service
appMetrics *metrics.AppMetrics
notificator *notificator.Notificator
}
)
@ -30,19 +40,94 @@ const (
HealthStatusShuttingDown int32 = 3
)
func newApp(cfg *viper.Viper, log *zap.Logger, level zap.AtomicLevel) *App {
func newApp(ctx context.Context, cfg *viper.Viper, log *Logger) *App {
a := &App{
log: log,
logLevel: level,
log: log.logger,
logLevel: log.lvl,
cfg: cfg,
done: make(chan struct{}),
appMetrics: metrics.NewAppMetrics(),
}
a.appMetrics.SetHealth(HealthStatusStarting)
a.init(ctx)
return a
}
func (a *App) init(ctx context.Context) {
key, err := fetchKey(a.cfg)
if err != nil {
a.log.Fatal(logs.FailedToLoadPrivateKey, zap.Error(err))
}
endpoints := fetchMorphEndpoints(a.cfg, a.log)
newListenerFunc := a.getNewListenerFunction(ctx, key, endpoints)
handler := a.getNewEpochHandler()
netmapContract, err := resolver.ResolveContractHash(a.cfg.GetString(cfgMorphContractNetmap), endpoints[0].Address)
if err != nil {
a.log.Fatal(logs.ResolveNetmapContract, zap.Error(err))
}
cfg := notificator.Config{
Handler: handler,
Logger: a.log,
NewListener: newListenerFunc,
NetmapContract: netmapContract,
ReconnectClientsInterval: 30 * time.Second,
}
if a.notificator, err = notificator.New(ctx, cfg); err != nil {
a.log.Fatal(logs.InitNotificator, zap.Error(err))
}
}
func (a *App) getNewListenerFunction(ctx context.Context, key *keys.PrivateKey, endpoints []client.Endpoint) notificator.ListenerCreationFunc {
morphLogger := &logger.Logger{Logger: a.log}
clientOptions := []client.Option{
client.WithLogger(morphLogger),
client.WithEndpoints(endpoints...),
}
return func(connectionLostCb func()) (event.Listener, error) {
options := append([]client.Option{client.WithConnLostCallback(connectionLostCb)}, clientOptions...)
cli, err := client.New(ctx, key, options...)
if err != nil {
return nil, fmt.Errorf("create new client: %w", err)
}
currentBlock, err := cli.BlockCount()
if err != nil {
return nil, fmt.Errorf("get block count: %w", err)
}
subs, err := subscriber.New(ctx, &subscriber.Params{
Log: morphLogger,
StartFromBlock: currentBlock,
Client: cli,
})
if err != nil {
return nil, fmt.Errorf("create subscriber: %w", err)
}
return event.NewListener(event.ListenerParams{
Logger: morphLogger,
Subscriber: subs,
WorkerPoolCapacity: 0, // 0 means "infinite"
})
}
}
func (a *App) getNewEpochHandler() notificator.NewEpochHandler {
return func(_ context.Context, ee notificator.NewEpochEvent) {
// todo (d.kirillov) use real job executor here TrueCloudLab/frostfs-s3-lifecycler#3
fmt.Println("start handler", ee.Epoch)
time.Sleep(30 * time.Second)
fmt.Println("end handler", ee.Epoch)
}
}
func (a *App) Wait() {
a.log.Info(logs.ApplicationStarted,
zap.String("app_name", "frostfs-s3-lifecycler"),
@ -62,6 +147,8 @@ func (a *App) Serve(ctx context.Context) {
a.startAppServices()
go a.notificator.Start(ctx)
loop:
for {
select {

View file

@ -15,7 +15,12 @@ const (
destinationJournald string = "journald"
)
func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
type Logger struct {
logger *zap.Logger
lvl zap.AtomicLevel
}
func pickLogger(v *viper.Viper) *Logger {
lvl, err := getLogLevel(v.GetString(cfgLoggerLevel))
if err != nil {
panic(err)
@ -34,7 +39,7 @@ func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
}
func newStdoutLogger(lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
func newStdoutLogger(lvl zapcore.Level) *Logger {
c := zap.NewProductionConfig()
c.Level = zap.NewAtomicLevelAt(lvl)
c.Encoding = "console"
@ -47,10 +52,10 @@ func newStdoutLogger(lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
panic(fmt.Sprintf("build zap logger instance: %v", err))
}
return l, c.Level
return &Logger{logger: l, lvl: c.Level}
}
func newJournaldLogger(lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
func newJournaldLogger(lvl zapcore.Level) *Logger {
c := zap.NewProductionConfig()
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
c.Level = zap.NewAtomicLevelAt(lvl)
@ -66,7 +71,7 @@ func newJournaldLogger(lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
zapjournald.SyslogPid(),
})
l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
return l, c.Level
return &Logger{logger: l, lvl: c.Level}
}
func getLogLevel(lvlStr string) (zapcore.Level, error) {

View file

@ -11,9 +11,9 @@ func main() {
defer cancel()
cfg := settings()
log, level := pickLogger(cfg)
log := pickLogger(cfg)
app := newApp(cfg, log, level)
app := newApp(ctx, cfg, log)
go app.Serve(ctx)
app.Wait()
}

View file

@ -8,11 +8,25 @@ import (
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/cli/input"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"go.uber.org/zap"
)
const (
// Wallet.
cfgWalletPath = "wallet.path"
cfgWalletAddress = "wallet.address"
cfgWalletPassphrase = "wallet.passphrase"
// Metrics.
cfgPrometheusEnabled = "prometheus.enabled"
cfgPrometheusAddress = "prometheus.address"
cfgPprofEnabled = "pprof.enabled"
@ -22,6 +36,16 @@ const (
cfgLoggerLevel = "logger.level"
cfgLoggerDestination = "logger.destination"
// Morph.
cfgMorphRPCEndpointPrefixTmpl = "morph.rpc_endpoint.%d."
cfgMorphRPCEndpointAddressTmpl = cfgMorphRPCEndpointPrefixTmpl + "address"
cfgMorphRPCEndpointPriorityTmpl = cfgMorphRPCEndpointPrefixTmpl + "priority"
cfgMorphRPCEndpointTrustedCAListTmpl = cfgMorphRPCEndpointPrefixTmpl + "trusted_ca_list"
cfgMorphRPCEndpointCertificateTmpl = cfgMorphRPCEndpointPrefixTmpl + "certificate"
cfgMorphRPCEndpointKeyTmpl = cfgMorphRPCEndpointPrefixTmpl + "key"
cfgMorphContractNetmap = "morph.contract.netmap"
cfgMorphReconnectClientInterval = "morph.reconnect_clients_interval"
// Command line args.
cmdHelp = "help"
cmdVersion = "version"
@ -32,6 +56,8 @@ const (
const (
defaultShutdownTimeout = 15 * time.Second
componentName = "frostfs-s3-lifecycler"
defaultMorphRPCEndpointPriority = 1
)
func settings() *viper.Viper {
@ -50,19 +76,23 @@ func settings() *viper.Viper {
help := flags.BoolP(cmdHelp, "h", false, "show help")
version := flags.BoolP(cmdVersion, "v", false, "show version")
flags.StringArray(cmdConfig, nil, "config paths")
flags.StringArrayP(cmdConfig, "c", nil, "config paths")
flags.String(cmdConfigDir, "", "config dir path")
// set defaults:
// logger:
v.SetDefault(cfgLoggerLevel, "debug")
v.SetDefault(cfgLoggerLevel, "info")
v.SetDefault(cfgLoggerDestination, "stdout")
// services:
v.SetDefault(cfgPrometheusEnabled, false)
v.SetDefault(cfgPprofEnabled, false)
// morph:
v.SetDefault(cfgMorphContractNetmap, "netmap.frostfs")
v.SetDefault(cfgMorphReconnectClientInterval, 30*time.Second)
// Bind flags with configuration values.
if err := v.BindPFlags(flags); err != nil {
panic(err)
@ -159,3 +189,87 @@ func mergeConfig(v *viper.Viper, fileName string) error {
func printVersion() {
fmt.Printf("%s\nVersion: %s\nGoVersion: %s\n", componentName, Version, runtime.Version())
}
func fetchKey(v *viper.Viper) (*keys.PrivateKey, error) {
var password *string
if v.IsSet(cfgWalletPassphrase) {
pwd := v.GetString(cfgWalletPassphrase)
password = &pwd
}
walletPath := v.GetString(cfgWalletPath)
if len(walletPath) == 0 {
return nil, fmt.Errorf("wallet path must not be empty")
}
w, err := wallet.NewWalletFromFile(walletPath)
if err != nil {
return nil, fmt.Errorf("parse wallet: %w", err)
}
walletAddress := v.GetString(cfgWalletAddress)
var addr util.Uint160
if len(walletAddress) == 0 {
addr = w.GetChangeAddress()
} else {
addr, err = flags.ParseAddress(walletAddress)
if err != nil {
return nil, fmt.Errorf("invalid address")
}
}
acc := w.GetAccount(addr)
if acc == nil {
return nil, fmt.Errorf("couldn't find wallet account for %s", walletAddress)
}
if password == nil {
pwd, err := input.ReadPassword(fmt.Sprintf("Enter password for %s > ", walletPath))
if err != nil {
return nil, fmt.Errorf("couldn't read password")
}
password = &pwd
}
if err = acc.Decrypt(*password, w.Scrypt); err != nil {
return nil, fmt.Errorf("couldn't decrypt account: %w", err)
}
return acc.PrivateKey(), nil
}
func fetchMorphEndpoints(v *viper.Viper, l *zap.Logger) []client.Endpoint {
var res []client.Endpoint
for i := 0; ; i++ {
addr := v.GetString(fmt.Sprintf(cfgMorphRPCEndpointAddressTmpl, i))
if addr == "" {
break
}
priority := v.GetInt(fmt.Sprintf(cfgMorphRPCEndpointPriorityTmpl, i))
if priority <= 0 {
priority = defaultMorphRPCEndpointPriority
}
var mtlsConfig *client.MTLSConfig
rootCAs := v.GetStringSlice(fmt.Sprintf(cfgMorphRPCEndpointTrustedCAListTmpl, i))
if len(rootCAs) != 0 {
mtlsConfig = &client.MTLSConfig{
TrustedCAList: rootCAs,
KeyFile: v.GetString(fmt.Sprintf(cfgMorphRPCEndpointKeyTmpl, i)),
CertFile: v.GetString(fmt.Sprintf(cfgMorphRPCEndpointCertificateTmpl, i)),
}
}
res = append(res, client.Endpoint{
Address: addr,
Priority: priority,
MTLSConfig: mtlsConfig,
})
}
if len(res) == 0 {
l.Fatal(logs.NoMorphRPCEndpoints)
}
return res
}

View file

@ -1,10 +1,29 @@
# Wallet
# Path to wallet
S3_LIFECYCLER_WALLET_PATH=/path/to/wallet.json
# Account address. If omitted default one will be used.
S3_LIFECYCLER_WALLET_ADDRESS=NfgHwwTi3wHAS8aFAN243C5vGbkYDpqLHP
# Passphrase to decrypt wallet.
S3_LIFECYCLER_WALLET_PASSPHRASE=pwd
# Logger
S3_GW_LOGGER_LEVEL=debug
S3_GW_LOGGER_DESTINATION=stdout
S3_LIFECYCLER_LOGGER_LEVEL=debug
S3_LIFECYCLER_LOGGER_DESTINATION=stdout
# Metrics
S3_GW_PPROF_ENABLED=false
S3_GW_PPROF_ADDRESS=localhost:8077
S3_LIFECYCLER_PPROF_ENABLED=false
S3_LIFECYCLER_PPROF_ADDRESS=localhost:8077
S3_GW_PROMETHEUS_ENABLED=false
S3_GW_PROMETHEUS_ADDRESS=localhost:8078
S3_LIFECYCLER_PROMETHEUS_ENABLED=false
S3_LIFECYCLER_PROMETHEUS_ADDRESS=localhost:8078
# Morph chain
S3_LIFECYCLER_MORPH_RPC_ENDPOINT_0_ADDRESS="wss://rpc1.morph.frostfs.info:40341/ws"
S3_LIFECYCLER_MORPH_RPC_ENDPOINT_0_PRIORITY=0
S3_LIFECYCLER_MORPH_RPC_ENDPOINT_0_TRUSTED_CA_LIST="/path/to/ca.pem"
S3_LIFECYCLER_MORPH_RPC_ENDPOINT_0_CERTIFICATE="/path/to/cert"
S3_LIFECYCLER_MORPH_RPC_ENDPOINT_0_KEY="/path/to/key"
S3_LIFECYCLER_MORPH_RPC_ENDPOINT_1_ADDRESS="wss://rpc2.morph.frostfs.info:40341/ws"
S3_LIFECYCLER_MORPH_RPC_ENDPOINT_1_PRIORITY=2
S3_LIFECYCLER_MORPH_RECONNECT_CLIENTS_INTERVAL=30s
S3_LIFECYCLER_MORPH_CONTRACT_NETMAP=netmap.frostfs

View file

@ -1,5 +1,11 @@
# Wallet address, path to the wallet must be set as cli parameter or environment variable
wallet:
path: /path/to/wallet.json # Path to wallet
address: NfgHwwTi3wHAS8aFAN243C5vGbkYDpqLHP # Account address. If omitted default one will be used.
passphrase: "" # Passphrase to decrypt wallet. If you're using a wallet without a password, place '' here.
logger:
level: debug # Log level.
level: info # Log level.
destination: stdout # Logging destination.
pprof:
@ -9,3 +15,17 @@ pprof:
prometheus:
enabled: false
address: localhost:8078 # Endpoint for service metrics
morph:
rpc_endpoint:
- address: wss://rpc1.morph.frostfs.info:40341/ws
priority: 1
trusted_ca_list:
- "/path/to/ca.pem"
certificate: "/path/to/cert"
key: "/path/to/key"
- address: wss://rpc2.morph.frostfs.info:40341/ws
priority: 2
reconnect_clients_interval: 30s
contract:
netmap: netmap.frostfs

View file

@ -6,10 +6,10 @@ This section contains detailed FrostFS S3 Lifecycler component configuration des
| Section | Description |
|--------------|-------------------------------------------------|
| no section | [General parameters](#general-section) |
| `logger` | [Logger configuration](#logger-section) |
| `pprof` | [Pprof configuration](#pprof-section) |
| `prometheus` | [Prometheus configuration](#prometheus-section) |
| `morph` | [Morph configuration](#morph-section) |
### Reload on SIGHUP
@ -64,3 +64,33 @@ prometheus:
|-----------|----------|---------------|---------------|----------------------------------------------------|
| `enabled` | `bool` | yes | `false` | Flag to enable prometheus service. |
| `address` | `string` | yes | | Address that prometheus service listener binds to. |
# `morph` section
Contains configuration for the `morph` chain.
```yaml
morph:
rpc_endpoint:
- address: wss://rpc1.morph.frostfs.info:40341/ws
priority: 0
trusted_ca_list:
- "/path/to/ca.pem"
certificate: "/path/to/cert"
key: "/path/to/key"
- address: wss://rpc2.morph.frostfs.info:40341/ws
priority: 2
reconnect_clients_interval: 30s
contract:
netmap: netmap.frostfs
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|--------------------------------|------------|---------------|------------------|---------------------------------------------------------------------------------------------------------|
| `rpc_endpoint.address` | `string` | no | | The address of the RPC host to connect. |
| `rpc_endpoint.priority` | `int` | no | | Priority of RPC endpoint. |
| `rpc_endpoint.trusted_ca_list` | `[]string` | no | | List of paths to CAs to use in mTLS configuration. |
| `rpc_endpoint.certificate` | `string` | no | | Path to certificate to use in mTLS configuration. |
| `rpc_endpoint.key` | `string` | no | | Path to key to use in mTLS configuration. |
| `reconnect_clients_interval` | `string` | no | `30s` | When all endpoints are failed. Overall connection be reinitialized. This value is time between retries. |
| `contract.netmap` | `string` | no | `netmap.frostfs` | Netmap contract hash (LE) or name in NNS. |

37
go.mod
View file

@ -3,7 +3,10 @@ module git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler
go 1.21
require (
git.frostfs.info/TrueCloudLab/frostfs-node v0.42.0-rc.5
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240617140730-1a5886e776de
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
github.com/nspcc-dev/neo-go v0.106.0
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1
github.com/spf13/pflag v1.0.5
@ -13,28 +16,58 @@ require (
go.uber.org/zap v1.27.0
)
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240611123832-594f716b3d18
require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240530152826-2f6d3209e1d3 // indirect
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240409111539-e7a05a49ff45 // indirect
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65 // indirect
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
github.com/panjf2000/ants/v2 v2.9.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
github.com/urfave/cli v1.22.14 // indirect
go.etcd.io/bbolt v1.3.9 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

View file

@ -15,4 +15,11 @@ const (
FailedToReloadConfig = "failed to reload config"
LogLevelWontBeUpdated = "log level won't be updated"
SIGHUPConfigReloadCompleted = "SIGHUP config reload completed"
NotificatorStopped = "notificator stopped"
ResolveNetmapContract = "failed to resolve netmap contract"
NewEpochWasTriggered = "new epoch was triggered"
ListenerCouldntBeReinitialized = "listener couldn't be reinitialized"
InitNotificator = "init notificator"
NoMorphRPCEndpoints = "no morph RPC endpoints"
FailedToLoadPrivateKey = "failed to load private key"
)

View file

@ -0,0 +1,71 @@
package notificator
import (
"context"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"go.uber.org/zap"
)
// handlerLimiter is a limiter to make some works to be sequential
// and interrupt previous one if new one is submitted.
type handlerLimiter struct {
ctx context.Context
log *zap.Logger
handler NewEpochHandler
work chan func()
mu sync.Mutex
cancelCurrent context.CancelFunc
}
func newHandlerLimiter(ctx context.Context, handler NewEpochHandler, log *zap.Logger) *handlerLimiter {
hl := &handlerLimiter{
ctx: ctx,
log: log,
handler: handler,
work: make(chan func()),
cancelCurrent: func() {},
}
go hl.start(ctx)
return hl
}
func (h *handlerLimiter) start(ctx context.Context) {
for {
select {
case <-ctx.Done():
close(h.work)
return
case work := <-h.work:
work()
}
}
}
func (h *handlerLimiter) replaceCurrentWorkContext(ctx context.Context) (workCtx context.Context) {
h.mu.Lock()
defer h.mu.Unlock()
h.cancelCurrent()
workCtx, h.cancelCurrent = context.WithCancel(ctx)
return workCtx
}
func (h *handlerLimiter) Handler(e event.Event) {
ee, ok := e.(NewEpochEvent)
if !ok {
return
}
workCtx := h.replaceCurrentWorkContext(h.ctx)
h.log.Debug(logs.NewEpochWasTriggered, zap.Int64("epoch", ee.Epoch))
h.work <- func() {
h.handler(workCtx, ee)
}
}

View file

@ -0,0 +1,157 @@
package notificator
import (
"context"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"go.uber.org/zap"
)
type NewEpochHandler func(ctx context.Context, ee NewEpochEvent)
type NewEpochEvent struct {
Epoch int64
}
func (n NewEpochEvent) MorphEvent() {}
type ListenerCreationFunc func(connectionLostCallback func()) (event.Listener, error)
type Notificator struct {
logger *zap.Logger
listener event.Listener
handler *handlerLimiter
connLost chan struct{}
netmapContract util.Uint160
newListener ListenerCreationFunc
reconnectClientsInterval time.Duration
}
type Config struct {
Handler NewEpochHandler
Logger *zap.Logger
NewListener ListenerCreationFunc
NetmapContract util.Uint160
ReconnectClientsInterval time.Duration
}
const newEpochEventType = event.Type("NewEpoch")
func New(ctx context.Context, cfg Config) (*Notificator, error) {
notifier := &Notificator{
netmapContract: cfg.NetmapContract,
handler: newHandlerLimiter(ctx, cfg.Handler, cfg.Logger),
connLost: make(chan struct{}),
newListener: cfg.NewListener,
logger: cfg.Logger,
reconnectClientsInterval: cfg.ReconnectClientsInterval,
}
if err := notifier.initListener(); err != nil {
return nil, fmt.Errorf("init listener: %w", err)
}
return notifier, nil
}
func (n *Notificator) initListener() error {
listener, err := n.newListener(func() { n.connLost <- struct{}{} })
if err != nil {
return err
}
var npi event.NotificationParserInfo
npi.SetScriptHash(n.netmapContract)
npi.SetType(newEpochEventType)
npi.SetParser(newEpochEventParser())
listener.SetNotificationParser(npi)
var nhi event.NotificationHandlerInfo
nhi.SetType(newEpochEventType)
nhi.SetScriptHash(n.netmapContract)
nhi.SetHandler(n.handler.Handler)
listener.RegisterNotificationHandler(nhi)
n.listener = listener
return nil
}
// Start runs listener to process notifications.
// Method MUST be invoked once after successful initialization with New
// otherwise panic can happen.
func (n *Notificator) Start(ctx context.Context) {
go n.listener.Listen(ctx)
ticker := time.NewTicker(n.reconnectClientsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
n.logger.Info(logs.NotificatorStopped, zap.Error(ctx.Err()))
return
case <-n.connLost:
n.listener.Stop()
LOOP:
for {
select {
case <-ctx.Done():
n.logger.Info(logs.NotificatorStopped, zap.Error(ctx.Err()))
return
case <-ticker.C:
if err := n.initListener(); err != nil {
n.logger.Error(logs.ListenerCouldntBeReinitialized, zap.Error(err))
ticker.Reset(n.reconnectClientsInterval)
continue
}
go n.listener.Listen(ctx)
break LOOP
}
}
}
}
}
func newEpochEventParser() event.NotificationParser {
return func(ne *state.ContainedNotificationEvent) (event.Event, error) {
arr, err := arrayFromStackItem(ne.Item)
if err != nil {
return nil, fmt.Errorf("notification event item is invalid: %w", err)
}
if len(arr) != 1 {
return nil, fmt.Errorf("notification event item array has invalid length: %d", len(arr))
}
epoch, err := arr[0].TryInteger()
if err != nil {
return nil, err
}
return NewEpochEvent{Epoch: epoch.Int64()}, nil
}
}
// arrayFromStackItem returns the slice contract parameters from passed parameter.
// If passed parameter carries boolean false value, (nil, nil) returns.
func arrayFromStackItem(param stackitem.Item) ([]stackitem.Item, error) {
switch param.Type() {
case stackitem.AnyT:
return nil, nil
case stackitem.ArrayT, stackitem.StructT:
items, ok := param.Value().([]stackitem.Item)
if !ok {
return nil, fmt.Errorf("can't convert %T to parameter slice", param.Value())
}
return items, nil
default:
return nil, fmt.Errorf("%s is not an array type", param.Type())
}
}

View file

@ -0,0 +1,219 @@
package notificator
import (
"context"
"errors"
"sync"
"sync/atomic"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
type scriptHashWithType struct {
eventType event.Type
contractHash util.Uint160
}
type listenerMock struct {
scriptHashWithType
mu sync.Mutex
parsers map[scriptHashWithType]event.NotificationParserInfo
handlers map[scriptHashWithType][]event.NotificationHandlerInfo
started, stopped bool
lostConnectionCallback func()
}
func newListenerMock(hash util.Uint160) *listenerMock {
return &listenerMock{
scriptHashWithType: scriptHashWithType{
eventType: newEpochEventType,
contractHash: hash,
},
parsers: map[scriptHashWithType]event.NotificationParserInfo{},
handlers: map[scriptHashWithType][]event.NotificationHandlerInfo{},
started: false,
stopped: false,
}
}
func (l *listenerMock) sendNotification(epochEvent NewEpochEvent) error {
l.mu.Lock()
defer l.mu.Unlock()
if _, ok := l.parsers[l.scriptHashWithType]; !ok {
return errors.New("there is no appropriate parser")
}
handlers, ok := l.handlers[l.scriptHashWithType]
if !ok {
return errors.New("there is no appropriate handlers")
}
for _, handler := range handlers {
handler.Handler()(epochEvent)
}
return nil
}
func (l *listenerMock) refresh() {
l.mu.Lock()
defer l.mu.Unlock()
l.started = false
l.stopped = false
l.parsers = map[scriptHashWithType]event.NotificationParserInfo{}
l.handlers = map[scriptHashWithType][]event.NotificationHandlerInfo{}
}
func (l *listenerMock) Listen(context.Context) {
l.mu.Lock()
l.started = true
l.mu.Unlock()
}
func (l *listenerMock) ListenWithError(context.Context, chan<- error) {
panic("not implemented")
}
func (l *listenerMock) SetNotificationParser(info event.NotificationParserInfo) {
l.mu.Lock()
defer l.mu.Unlock()
l.parsers[scriptHashWithType{
eventType: info.GetType(),
contractHash: info.ScriptHash(),
}] = info
}
func (l *listenerMock) RegisterNotificationHandler(info event.NotificationHandlerInfo) {
l.mu.Lock()
defer l.mu.Unlock()
key := scriptHashWithType{
eventType: info.GetType(),
contractHash: info.ScriptHash(),
}
list := l.handlers[key]
l.handlers[key] = append(list, info)
}
func (l *listenerMock) EnableNotarySupport(util.Uint160, client.AlphabetKeys, event.BlockCounter) {
panic("not implemented")
}
func (l *listenerMock) SetNotaryParser(event.NotaryParserInfo) {
panic("not implemented")
}
func (l *listenerMock) RegisterNotaryHandler(event.NotaryHandlerInfo) {
panic("not implemented")
}
func (l *listenerMock) RegisterBlockHandler(event.BlockHandler) {
panic("not implemented")
}
func (l *listenerMock) Stop() {
l.mu.Lock()
l.stopped = true
l.mu.Unlock()
}
func TestNotificatorBase(t *testing.T) {
ctx := context.Background()
logger := zaptest.NewLogger(t)
contractHash, err := util.Uint160DecodeStringLE("a0520ef5e7b9dd89ba49cce9cac1a6332d3facc0")
require.NoError(t, err)
var sequentialHandlerFlag atomic.Bool
var gotEvent NewEpochEvent
var wg sync.WaitGroup
handler := func(_ context.Context, ee NewEpochEvent) {
require.False(t, sequentialHandlerFlag.Load())
sequentialHandlerFlag.Store(true)
gotEvent = ee
time.Sleep(time.Second)
sequentialHandlerFlag.Store(false)
wg.Done()
}
lnMock := newListenerMock(contractHash)
cfg := Config{
Handler: handler,
Logger: logger,
NewListener: func(cb func()) (event.Listener, error) {
lnMock.lostConnectionCallback = cb
lnMock.refresh()
return lnMock, nil
},
NetmapContract: contractHash,
ReconnectClientsInterval: 100 * time.Millisecond,
}
n, err := New(ctx, cfg)
require.NoError(t, err)
go n.Start(ctx)
ee := NewEpochEvent{Epoch: 1}
sendNotification(t, lnMock, ee, &wg)
require.Equal(t, ee.Epoch, gotEvent.Epoch)
ee = NewEpochEvent{Epoch: 2}
sendNotification(t, lnMock, ee, &wg)
require.Equal(t, ee.Epoch, gotEvent.Epoch)
lnMock.lostConnectionCallback()
ee = NewEpochEvent{Epoch: 3}
sendNotification(t, lnMock, ee, &wg)
require.Equal(t, ee.Epoch, gotEvent.Epoch)
}
func sendNotification(t *testing.T, lnMock *listenerMock, ee NewEpochEvent, wg *sync.WaitGroup) {
wg.Add(1)
err := lnMock.sendNotification(ee)
require.NoError(t, err)
wg.Wait()
}
func TestLimiter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
log := zaptest.NewLogger(t)
var (
interrupted atomic.Bool
wg sync.WaitGroup
)
wg.Add(1)
handler := NewEpochHandler(func(ctx context.Context, ee NewEpochEvent) {
defer wg.Done()
select {
case <-ctx.Done():
interrupted.Store(true)
case <-time.After(3 * time.Second):
log.Warn("handler executed successfully", zap.Any("event", ee))
}
})
hl := newHandlerLimiter(ctx, handler, log)
hl.Handler(NewEpochEvent{Epoch: 1})
cancel()
wg.Wait()
require.True(t, interrupted.Load())
}

View file

@ -0,0 +1,34 @@
package resolver
import (
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
"github.com/nspcc-dev/neo-go/pkg/util"
)
// ResolveContractHash determine contract hash by resolving NNS name.
func ResolveContractHash(contractHash, rpcAddress string) (util.Uint160, error) {
if hash, err := util.Uint160DecodeStringLE(contractHash); err == nil {
return hash, nil
}
splitName := strings.Split(contractHash, ".")
if len(splitName) != 2 {
return util.Uint160{}, fmt.Errorf("invalid contract name: '%s'", contractHash)
}
var domain container.Domain
domain.SetName(splitName[0])
domain.SetZone(splitName[1])
var nns ns.NNS
if err := nns.Dial(rpcAddress); err != nil {
return util.Uint160{}, fmt.Errorf("dial nns %s: %w", rpcAddress, err)
}
defer nns.Close()
return nns.ResolveContractHash(domain)
}