From c4cdfe3ec278513f049aa1f04c8783fe6bf51215 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Tue, 11 Aug 2020 15:10:01 +0300 Subject: [PATCH] [#7] Add container processor for inner ring Signed-off-by: Alex Vanin --- .../processors/container/handlers.go | 28 +++++ .../processors/container/process_container.go | 26 +++++ .../processors/container/processor.go | 100 ++++++++++++++++++ 3 files changed, 154 insertions(+) create mode 100644 pkg/innerring/processors/container/handlers.go create mode 100644 pkg/innerring/processors/container/process_container.go create mode 100644 pkg/innerring/processors/container/processor.go diff --git a/pkg/innerring/processors/container/handlers.go b/pkg/innerring/processors/container/handlers.go new file mode 100644 index 000000000..0808e168a --- /dev/null +++ b/pkg/innerring/processors/container/handlers.go @@ -0,0 +1,28 @@ +package container + +import ( + "crypto/sha256" + + "github.com/mr-tron/base58" + "github.com/nspcc-dev/neofs-node/pkg/morph/event" + containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" + "go.uber.org/zap" +) + +func (cp *Processor) handlePut(ev event.Event) { + put := ev.(containerEvent.Put) // todo: check panic in production + + id := sha256.Sum256(put.Container()) + cp.log.Info("notification", + zap.String("type", "container put"), + zap.String("id", base58.Encode(id[:]))) + + // send event to the worker pool + + err := cp.pool.Submit(func() { cp.processContainerPut(&put) }) + if err != nil { + // todo: move into controlled degradation stage + cp.log.Warn("container processor worker pool drained", + zap.Int("capacity", cp.pool.Cap())) + } +} diff --git a/pkg/innerring/processors/container/process_container.go b/pkg/innerring/processors/container/process_container.go new file mode 100644 index 000000000..f235e34d9 --- /dev/null +++ b/pkg/innerring/processors/container/process_container.go @@ -0,0 +1,26 @@ +package container + +import ( + "github.com/nspcc-dev/neofs-node/pkg/innerring/invoke" + containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" + "go.uber.org/zap" +) + +// Process new container from the user by checking container sanity +// and sending approve tx back to morph. +func (cp *Processor) processContainerPut(put *containerEvent.Put) { + if !cp.activeState.IsActive() { + cp.log.Info("passive mode, ignore container put") + return + } + + err := invoke.RegisterContainer(cp.morphClient, cp.containerContract, + &invoke.ContainerParams{ + Key: put.PublicKey(), + Container: put.Container(), + Signature: put.Signature(), + }) + if err != nil { + cp.log.Error("can't invoke new container", zap.Error(err)) + } +} diff --git a/pkg/innerring/processors/container/processor.go b/pkg/innerring/processors/container/processor.go new file mode 100644 index 000000000..5cfe6e805 --- /dev/null +++ b/pkg/innerring/processors/container/processor.go @@ -0,0 +1,100 @@ +package container + +import ( + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neofs-node/pkg/morph/client" + "github.com/nspcc-dev/neofs-node/pkg/morph/event" + containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" + "github.com/panjf2000/ants/v2" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +type ( + // ActiveState is a callback interface for inner ring global state. + ActiveState interface { + IsActive() bool + } + + // Processor of events produced by container contract in morph chain. + Processor struct { + log *zap.Logger + pool *ants.Pool + containerContract util.Uint160 + morphClient *client.Client + activeState ActiveState + } + + // Params of the processor constructor. + Params struct { + Log *zap.Logger + PoolSize int + ContainerContract util.Uint160 + MorphClient *client.Client + ActiveState ActiveState + } +) + +const ( + putNotification = "containerPut" +) + +// New creates container contract processor instance. +func New(p *Params) (*Processor, error) { + switch { + case p.Log == nil: + return nil, errors.New("ir/container: logger is not set") + case p.MorphClient == nil: + return nil, errors.New("ir/container: neo:morph client is not set") + case p.ActiveState == nil: + return nil, errors.New("ir/container: global state is not set") + } + + p.Log.Debug("container worker pool", zap.Int("size", p.PoolSize)) + + pool, err := ants.NewPool(p.PoolSize, ants.WithNonblocking(true)) + if err != nil { + return nil, errors.Wrap(err, "ir/container: can't create worker pool") + } + + return &Processor{ + log: p.Log, + pool: pool, + containerContract: p.ContainerContract, + morphClient: p.MorphClient, + activeState: p.ActiveState, + }, nil +} + +// ListenerParsers for the 'event.Listener' event producer. +func (cp *Processor) ListenerParsers() []event.ParserInfo { + var parsers []event.ParserInfo + + // container put event + deposit := event.ParserInfo{} + deposit.SetType(putNotification) + deposit.SetScriptHash(cp.containerContract) + deposit.SetParser(containerEvent.ParsePut) + parsers = append(parsers, deposit) + + return parsers +} + +// ListenerHandlers for the 'event.Listener' event producer. +func (cp *Processor) ListenerHandlers() []event.HandlerInfo { + var handlers []event.HandlerInfo + + // container put handler + deposit := event.HandlerInfo{} + deposit.SetType(putNotification) + deposit.SetScriptHash(cp.containerContract) + deposit.SetHandler(cp.handlePut) + handlers = append(handlers, deposit) + + return handlers +} + +// TimersHandlers for the 'Timers' event producer. +func (cp *Processor) TimersHandlers() []event.HandlerInfo { + return nil +}