frostfs-node/pkg/innerring/processors/container/process_container.go
Dmitrii Stepanov f07d4158f5 [] node: Drop subnet from IR and morph
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-17 09:46:02 +03:00

228 lines
5.7 KiB
Go

package container
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"go.uber.org/zap"
)
// putEvent is a common interface of Put and PutNamed event.
type putEvent interface {
event.Event
Container() []byte
PublicKey() []byte
Signature() []byte
SessionToken() []byte
NotaryRequest() *payload.P2PNotaryRequest
}
type putContainerContext struct {
e putEvent
d containerSDK.Domain
}
// Process a new container from the user by checking the container sanity
// and sending approve tx back to the morph.
func (cp *Processor) processContainerPut(put putEvent) {
if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerPut)
return
}
ctx := &putContainerContext{
e: put,
}
err := cp.checkPutContainer(ctx)
if err != nil {
cp.log.Error(logs.ContainerPutContainerCheckFailed,
zap.String("error", err.Error()),
)
return
}
cp.approvePutContainer(ctx)
}
func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
binCnr := ctx.e.Container()
var cnr containerSDK.Container
err := cnr.Unmarshal(binCnr)
if err != nil {
return fmt.Errorf("invalid binary container: %w", err)
}
err = cp.verifySignature(signatureVerificationData{
ownerContainer: cnr.Owner(),
verb: session.VerbContainerPut,
binTokenSession: ctx.e.SessionToken(),
binPublicKey: ctx.e.PublicKey(),
signature: ctx.e.Signature(),
signedData: binCnr,
})
if err != nil {
return fmt.Errorf("auth container creation: %w", err)
}
// check homomorphic hashing setting
err = checkHomomorphicHashing(cp.netState, cnr)
if err != nil {
return fmt.Errorf("incorrect homomorphic hashing setting: %w", err)
}
// check native name and zone
err = checkNNS(ctx, cnr)
if err != nil {
return fmt.Errorf("NNS: %w", err)
}
return nil
}
func (cp *Processor) approvePutContainer(ctx *putContainerContext) {
e := ctx.e
var err error
prm := cntClient.PutPrm{}
prm.SetContainer(e.Container())
prm.SetKey(e.PublicKey())
prm.SetSignature(e.Signature())
prm.SetToken(e.SessionToken())
prm.SetName(ctx.d.Name())
prm.SetZone(ctx.d.Zone())
if nr := e.NotaryRequest(); nr != nil {
// put event was received via Notary service
err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction)
} else {
// put event was received via notification service
err = cp.cnrClient.Put(prm)
}
if err != nil {
cp.log.Error(logs.ContainerCouldNotApprovePutContainer,
zap.String("error", err.Error()),
)
}
}
// Process delete container operation from the user by checking container sanity
// and sending approve tx back to morph.
func (cp *Processor) processContainerDelete(e *containerEvent.Delete) {
if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerDelete)
return
}
err := cp.checkDeleteContainer(e)
if err != nil {
cp.log.Error(logs.ContainerDeleteContainerCheckFailed,
zap.String("error", err.Error()),
)
return
}
cp.approveDeleteContainer(e)
}
func (cp *Processor) checkDeleteContainer(e *containerEvent.Delete) error {
binCnr := e.ContainerID()
var idCnr cid.ID
err := idCnr.Decode(binCnr)
if err != nil {
return fmt.Errorf("invalid container ID: %w", err)
}
// receive owner of the related container
cnr, err := cp.cnrClient.Get(binCnr)
if err != nil {
return fmt.Errorf("could not receive the container: %w", err)
}
err = cp.verifySignature(signatureVerificationData{
ownerContainer: cnr.Value.Owner(),
verb: session.VerbContainerDelete,
idContainerSet: true,
idContainer: idCnr,
binTokenSession: e.SessionToken(),
signature: e.Signature(),
signedData: binCnr,
})
if err != nil {
return fmt.Errorf("auth container removal: %w", err)
}
return nil
}
func (cp *Processor) approveDeleteContainer(e *containerEvent.Delete) {
var err error
prm := cntClient.DeletePrm{}
prm.SetCID(e.ContainerID())
prm.SetSignature(e.Signature())
prm.SetToken(e.SessionToken())
if nr := e.NotaryRequest(); nr != nil {
// delete event was received via Notary service
err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction)
} else {
// delete event was received via notification service
err = cp.cnrClient.Delete(prm)
}
if err != nil {
cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer,
zap.String("error", err.Error()),
)
}
}
func checkNNS(ctx *putContainerContext, cnr containerSDK.Container) error {
// fetch domain info
ctx.d = containerSDK.ReadDomain(cnr)
// if PutNamed event => check if values in container correspond to args
if named, ok := ctx.e.(interface {
Name() string
Zone() string
}); ok {
if name := named.Name(); name != ctx.d.Name() {
return fmt.Errorf("names differ %s/%s", name, ctx.d.Name())
}
if zone := named.Zone(); zone != ctx.d.Zone() {
return fmt.Errorf("zones differ %s/%s", zone, ctx.d.Zone())
}
}
return nil
}
func checkHomomorphicHashing(ns NetworkState, cnr containerSDK.Container) error {
netSetting, err := ns.HomomorphicHashDisabled()
if err != nil {
return fmt.Errorf("could not get setting in contract: %w", err)
}
if cnrSetting := containerSDK.IsHomomorphicHashingDisabled(cnr); netSetting != cnrSetting {
return fmt.Errorf("network setting: %t, container setting: %t", netSetting, cnrSetting)
}
return nil
}