frostfs-s3-lifecycler/internal/notificator/notificator.go
Denis Kirillov d8b5cd5fc2 [#3] Add job fetcher
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-07-22 13:32:04 +03:00

120 lines
2.9 KiB
Go

package notificator
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"go.uber.org/zap"
)
type NewEpochHandler func(ctx context.Context, ee NewEpochEvent)
type NewEpochEvent struct {
Epoch uint64
}
func (n NewEpochEvent) MorphEvent() {}
type Listener interface {
// Listen must start the event listener.
//
// Must listen to events with the parser installed.
Listen(context.Context)
}
type ListenerConfig struct {
Parser event.NotificationParserInfo
Handler event.NotificationHandlerInfo
}
type Notificator struct {
logger *zap.Logger
listener Listener
handler *handlerLimiter
}
type Config struct {
Handler NewEpochHandler
Logger *zap.Logger
NewListenerFn func(ListenerConfig) (Listener, error)
NetmapContract util.Uint160
}
const newEpochEventType = event.Type("NewEpoch")
func New(ctx context.Context, cfg Config) (*Notificator, error) {
notifier := &Notificator{
handler: newHandlerLimiter(ctx, cfg.Handler, cfg.Logger),
logger: cfg.Logger,
}
var npi event.NotificationParserInfo
npi.SetScriptHash(cfg.NetmapContract)
npi.SetType(newEpochEventType)
npi.SetParser(newEpochEventParser())
var nhi event.NotificationHandlerInfo
nhi.SetType(newEpochEventType)
nhi.SetScriptHash(cfg.NetmapContract)
nhi.SetHandler(notifier.handler.Handler)
ln, err := cfg.NewListenerFn(ListenerConfig{
Parser: npi,
Handler: nhi,
})
if err != nil {
return nil, fmt.Errorf("create new listener: %w", err)
}
notifier.listener = ln
return notifier, nil
}
// Start runs listener to process notifications.
// Method MUST be invoked once after successful initialization with New
// otherwise panic can happen.
func (n *Notificator) Start(ctx context.Context) {
n.listener.Listen(ctx)
}
func newEpochEventParser() event.NotificationParser {
return func(ne *state.ContainedNotificationEvent) (event.Event, error) {
arr, err := arrayFromStackItem(ne.Item)
if err != nil {
return nil, fmt.Errorf("notification event item is invalid: %w", err)
}
if len(arr) != 1 {
return nil, fmt.Errorf("notification event item array has invalid length: %d", len(arr))
}
epoch, err := arr[0].TryInteger()
if err != nil {
return nil, err
}
return NewEpochEvent{Epoch: epoch.Uint64()}, nil
}
}
// arrayFromStackItem returns the slice contract parameters from passed parameter.
// If passed parameter carries boolean false value, (nil, nil) returns.
func arrayFromStackItem(param stackitem.Item) ([]stackitem.Item, error) {
switch param.Type() {
case stackitem.AnyT:
return nil, nil
case stackitem.ArrayT, stackitem.StructT:
items, ok := param.Value().([]stackitem.Item)
if !ok {
return nil, fmt.Errorf("can't convert %T to parameter slice", param.Value())
}
return items, nil
default:
return nil, fmt.Errorf("%s is not an array type", param.Type())
}
}