forked from TrueCloudLab/frostfs-node
[#907] container/put: Work with named containers
Add name and zone arguments to `Put` method of wrapper over the Container contract client. Pass result of `container.GetNativeNameWithZone` function to the method in `Put` helper function. Due to this, the storage node will call the method depending on the presence of the container name in the attributes. Make IR to listen `putNamed` notification event. The event is processed like `put` event, but with sanity check of the container attributes. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
7db47c88bf
commit
e0f0188466
6 changed files with 80 additions and 15 deletions
2
go.mod
2
go.mod
|
@ -11,7 +11,7 @@ require (
|
||||||
github.com/multiformats/go-multiaddr v0.4.0
|
github.com/multiformats/go-multiaddr v0.4.0
|
||||||
github.com/nspcc-dev/hrw v1.0.9
|
github.com/nspcc-dev/hrw v1.0.9
|
||||||
github.com/nspcc-dev/neo-go v0.97.3
|
github.com/nspcc-dev/neo-go v0.97.3
|
||||||
github.com/nspcc-dev/neofs-api-go v1.29.1-0.20210929152813-0117e90e9adb
|
github.com/nspcc-dev/neofs-api-go v1.29.1-0.20211014122040-db1ed764733b
|
||||||
github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210520210714-9dee13f0d556
|
github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210520210714-9dee13f0d556
|
||||||
github.com/nspcc-dev/tzhash v1.4.0
|
github.com/nspcc-dev/tzhash v1.4.0
|
||||||
github.com/panjf2000/ants/v2 v2.4.0
|
github.com/panjf2000/ants/v2 v2.4.0
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -397,8 +397,8 @@ github.com/nspcc-dev/neo-go v0.97.3/go.mod h1:31LelE8G5NZwGmePCykqui6BpPyEklTVbO
|
||||||
github.com/nspcc-dev/neofs-api-go v1.24.0/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8=
|
github.com/nspcc-dev/neofs-api-go v1.24.0/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8=
|
||||||
github.com/nspcc-dev/neofs-api-go v1.26.1/go.mod h1:SHuH1Ba3U/h3j+8HHbb3Cns1LfMlEb88guWog9Qi68Y=
|
github.com/nspcc-dev/neofs-api-go v1.26.1/go.mod h1:SHuH1Ba3U/h3j+8HHbb3Cns1LfMlEb88guWog9Qi68Y=
|
||||||
github.com/nspcc-dev/neofs-api-go v1.27.1/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE=
|
github.com/nspcc-dev/neofs-api-go v1.27.1/go.mod h1:i0Cwgvcu9A4M4e58pydbXFisUhSxpfljmuWFPIp2btE=
|
||||||
github.com/nspcc-dev/neofs-api-go v1.29.1-0.20210929152813-0117e90e9adb h1:634hGE+KGXakNRkgB83rUoJEB4MWkfsiKCD4LrS3XOc=
|
github.com/nspcc-dev/neofs-api-go v1.29.1-0.20211014122040-db1ed764733b h1:n5tIRk8WMwJJCjpBO6V2sJFqNDnJvYXH7lY5GdkQaAo=
|
||||||
github.com/nspcc-dev/neofs-api-go v1.29.1-0.20210929152813-0117e90e9adb/go.mod h1:KC8T91skIg8juvUh7lQabswQ9J6KmnXErpH8qwDitXA=
|
github.com/nspcc-dev/neofs-api-go v1.29.1-0.20211014122040-db1ed764733b/go.mod h1:KC8T91skIg8juvUh7lQabswQ9J6KmnXErpH8qwDitXA=
|
||||||
github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA=
|
github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA=
|
||||||
github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
|
github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
|
||||||
github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM=
|
github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM=
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (cp *Processor) handlePut(ev event.Event) {
|
func (cp *Processor) handlePut(ev event.Event) {
|
||||||
put := ev.(containerEvent.Put)
|
put := ev.(putEvent)
|
||||||
|
|
||||||
id := sha256.Sum256(put.Container())
|
id := sha256.Sum256(put.Container())
|
||||||
cp.log.Info("notification",
|
cp.log.Info("notification",
|
||||||
|
@ -19,7 +19,7 @@ func (cp *Processor) handlePut(ev event.Event) {
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
|
|
||||||
err := cp.pool.Submit(func() { cp.processContainerPut(&put) })
|
err := cp.pool.Submit(func() { cp.processContainerPut(put) })
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
cp.log.Warn("container processor worker pool drained",
|
cp.log.Warn("container processor worker pool drained",
|
||||||
|
|
|
@ -7,24 +7,46 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
||||||
containerSDK "github.com/nspcc-dev/neofs-api-go/pkg/container"
|
containerSDK "github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||||
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/session"
|
"github.com/nspcc-dev/neofs-api-go/pkg/session"
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/refs"
|
"github.com/nspcc-dev/neofs-api-go/v2/refs"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||||
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
|
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// putEvent is a common interface of Put and PutNamed event.
|
||||||
|
type putEvent interface {
|
||||||
|
event.Event
|
||||||
|
Container() []byte
|
||||||
|
PublicKey() []byte
|
||||||
|
Signature() []byte
|
||||||
|
SessionToken() []byte
|
||||||
|
NotaryRequest() *payload.P2PNotaryRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
type putContainerContext struct {
|
||||||
|
e putEvent
|
||||||
|
|
||||||
|
name, zone string // from container structure
|
||||||
|
}
|
||||||
|
|
||||||
// Process new container from the user by checking container sanity
|
// Process new container from the user by checking container sanity
|
||||||
// and sending approve tx back to morph.
|
// and sending approve tx back to morph.
|
||||||
func (cp *Processor) processContainerPut(put *containerEvent.Put) {
|
func (cp *Processor) processContainerPut(put putEvent) {
|
||||||
if !cp.alphabetState.IsAlphabet() {
|
if !cp.alphabetState.IsAlphabet() {
|
||||||
cp.log.Info("non alphabet mode, ignore container put")
|
cp.log.Info("non alphabet mode, ignore container put")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := cp.checkPutContainer(put)
|
ctx := &putContainerContext{
|
||||||
|
e: put,
|
||||||
|
}
|
||||||
|
|
||||||
|
err := cp.checkPutContainer(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cp.log.Error("put container check failed",
|
cp.log.Error("put container check failed",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -33,10 +55,12 @@ func (cp *Processor) processContainerPut(put *containerEvent.Put) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cp.approvePutContainer(put)
|
cp.approvePutContainer(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) checkPutContainer(e *containerEvent.Put) error {
|
func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
|
||||||
|
e := ctx.e
|
||||||
|
|
||||||
// verify signature
|
// verify signature
|
||||||
key, err := keys.NewPublicKeyFromBytes(e.PublicKey(), elliptic.P256())
|
key, err := keys.NewPublicKeyFromBytes(e.PublicKey(), elliptic.P256())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -58,6 +82,12 @@ func (cp *Processor) checkPutContainer(e *containerEvent.Put) error {
|
||||||
return fmt.Errorf("invalid binary container: %w", err)
|
return fmt.Errorf("invalid binary container: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check native name and zone
|
||||||
|
err = checkNNS(ctx, cnr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("NNS: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// perform format check
|
// perform format check
|
||||||
err = container.CheckFormat(cnr)
|
err = container.CheckFormat(cnr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -85,7 +115,9 @@ func (cp *Processor) checkPutContainer(e *containerEvent.Put) error {
|
||||||
return cp.checkKeyOwnership(cnr, key)
|
return cp.checkKeyOwnership(cnr, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) approvePutContainer(e *containerEvent.Put) {
|
func (cp *Processor) approvePutContainer(ctx *putContainerContext) {
|
||||||
|
e := ctx.e
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if nr := e.NotaryRequest(); nr != nil {
|
if nr := e.NotaryRequest(); nr != nil {
|
||||||
|
@ -93,7 +125,7 @@ func (cp *Processor) approvePutContainer(e *containerEvent.Put) {
|
||||||
err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction)
|
err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction)
|
||||||
} else {
|
} else {
|
||||||
// put event was received via notification service
|
// put event was received via notification service
|
||||||
err = cp.cnrClient.Put(e.Container(), e.PublicKey(), e.Signature(), e.SessionToken())
|
err = cp.cnrClient.Put(e.Container(), e.PublicKey(), e.Signature(), e.SessionToken(), ctx.name, ctx.zone)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cp.log.Error("could not approve put container",
|
cp.log.Error("could not approve put container",
|
||||||
|
@ -202,3 +234,24 @@ func (cp *Processor) approveDeleteContainer(e *containerEvent.Delete) {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkNNS(ctx *putContainerContext, cnr *containerSDK.Container) error {
|
||||||
|
// fetch native name and zone
|
||||||
|
ctx.name, ctx.zone = containerSDK.GetNativeNameWithZone(cnr)
|
||||||
|
|
||||||
|
// if PutNamed event => check if values in container correspond to args
|
||||||
|
if named, ok := ctx.e.(interface {
|
||||||
|
Name() string
|
||||||
|
Zone() string
|
||||||
|
}); ok {
|
||||||
|
if name := named.Name(); name != ctx.name {
|
||||||
|
return fmt.Errorf("names differ %s/%s", name, ctx.name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if zone := named.Zone(); zone != ctx.zone {
|
||||||
|
return fmt.Errorf("zones differ %s/%s", zone, ctx.zone)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -161,7 +161,7 @@ func (cp *Processor) ListenerNotaryParsers() []event.NotaryParserInfo {
|
||||||
var (
|
var (
|
||||||
p event.NotaryParserInfo
|
p event.NotaryParserInfo
|
||||||
|
|
||||||
pp = make([]event.NotaryParserInfo, 0, 3)
|
pp = make([]event.NotaryParserInfo, 0, 4)
|
||||||
)
|
)
|
||||||
|
|
||||||
p.SetMempoolType(mempoolevent.TransactionAdded)
|
p.SetMempoolType(mempoolevent.TransactionAdded)
|
||||||
|
@ -172,6 +172,11 @@ func (cp *Processor) ListenerNotaryParsers() []event.NotaryParserInfo {
|
||||||
p.SetParser(containerEvent.ParsePutNotary)
|
p.SetParser(containerEvent.ParsePutNotary)
|
||||||
pp = append(pp, p)
|
pp = append(pp, p)
|
||||||
|
|
||||||
|
// container named put
|
||||||
|
p.SetRequestType(containerEvent.PutNamedNotaryEvent)
|
||||||
|
p.SetParser(containerEvent.ParsePutNamedNotary)
|
||||||
|
pp = append(pp, p)
|
||||||
|
|
||||||
// container delete
|
// container delete
|
||||||
p.SetRequestType(containerEvent.DeleteNotaryEvent)
|
p.SetRequestType(containerEvent.DeleteNotaryEvent)
|
||||||
p.SetParser(containerEvent.ParseDeleteNotary)
|
p.SetParser(containerEvent.ParseDeleteNotary)
|
||||||
|
@ -190,7 +195,7 @@ func (cp *Processor) ListenerNotaryHandlers() []event.NotaryHandlerInfo {
|
||||||
var (
|
var (
|
||||||
h event.NotaryHandlerInfo
|
h event.NotaryHandlerInfo
|
||||||
|
|
||||||
hh = make([]event.NotaryHandlerInfo, 0, 3)
|
hh = make([]event.NotaryHandlerInfo, 0, 4)
|
||||||
)
|
)
|
||||||
|
|
||||||
h.SetScriptHash(cp.cnrClient.ContractAddress())
|
h.SetScriptHash(cp.cnrClient.ContractAddress())
|
||||||
|
@ -201,6 +206,10 @@ func (cp *Processor) ListenerNotaryHandlers() []event.NotaryHandlerInfo {
|
||||||
h.SetHandler(cp.handlePut)
|
h.SetHandler(cp.handlePut)
|
||||||
hh = append(hh, h)
|
hh = append(hh, h)
|
||||||
|
|
||||||
|
// container named put (same handler)
|
||||||
|
h.SetRequestType(containerEvent.PutNamedNotaryEvent)
|
||||||
|
hh = append(hh, h)
|
||||||
|
|
||||||
// container delete
|
// container delete
|
||||||
h.SetRequestType(containerEvent.DeleteNotaryEvent)
|
h.SetRequestType(containerEvent.DeleteNotaryEvent)
|
||||||
h.SetHandler(cp.handleDelete)
|
h.SetHandler(cp.handleDelete)
|
||||||
|
|
|
@ -41,7 +41,9 @@ func Put(w *Wrapper, cnr *container.Container) (*cid.ID, error) {
|
||||||
|
|
||||||
sig := cnr.Signature()
|
sig := cnr.Signature()
|
||||||
|
|
||||||
err = w.Put(data, sig.Key(), sig.Sign(), binToken)
|
name, zone := container.GetNativeNameWithZone(cnr)
|
||||||
|
|
||||||
|
err = w.Put(data, sig.Key(), sig.Sign(), binToken, name, zone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -59,7 +61,7 @@ func Put(w *Wrapper, cnr *container.Container) (*cid.ID, error) {
|
||||||
// encountered that caused the saving to interrupt.
|
// encountered that caused the saving to interrupt.
|
||||||
//
|
//
|
||||||
// If TryNotary is provided, calls notary contract.
|
// If TryNotary is provided, calls notary contract.
|
||||||
func (w *Wrapper) Put(cnr, key, sig, token []byte) error {
|
func (w *Wrapper) Put(cnr, key, sig, token []byte, name, zone string) error {
|
||||||
if len(sig) == 0 || len(key) == 0 {
|
if len(sig) == 0 || len(key) == 0 {
|
||||||
return errNilArgument
|
return errNilArgument
|
||||||
}
|
}
|
||||||
|
@ -70,6 +72,7 @@ func (w *Wrapper) Put(cnr, key, sig, token []byte) error {
|
||||||
args.SetSignature(sig)
|
args.SetSignature(sig)
|
||||||
args.SetPublicKey(key)
|
args.SetPublicKey(key)
|
||||||
args.SetSessionToken(token)
|
args.SetSessionToken(token)
|
||||||
|
args.SetNativeNameWithZone(name, zone)
|
||||||
|
|
||||||
err := w.client.Put(args)
|
err := w.client.Put(args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue