[#1972] node: Do not save objects if node not in a container

Do not use node's local storage if it is clear that an object will be
removed anyway as a redundant. It requires moving the changing local storage
logic from the validation step to the local target implementation.
It allows performing any relations checks (e.g. object locking) only if a
node is considered as a valid container member and is expected to store
(stored previously) all the helper objects (e.g. `LOCK`, `TOMBSTONE`, etc).

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-11-01 20:32:43 +03:00 committed by fyrchik
parent 59eebc5eeb
commit 76b87c6d94
7 changed files with 158 additions and 117 deletions

View file

@ -97,20 +97,6 @@ func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRe
return s.get.GetRangeHash(ctx, req)
}
type localObjectInhumer struct {
storage *engine.StorageEngine
log *logger.Logger
}
func (r *localObjectInhumer) DeleteObjects(ts oid.Address, addr ...oid.Address) error {
var prm engine.InhumePrm
prm.WithTarget(ts, addr...)
_, err := r.storage.Inhume(prm)
return err
}
type delNetInfo struct {
netmap.State
tsLifetime uint64
@ -202,11 +188,6 @@ func initObjectService(c *cfg) {
}
}
objInhumer := &localObjectInhumer{
storage: ls,
log: c.log,
}
c.replicator = replicator.New(
replicator.WithLogger(c.log),
replicator.WithPutTimeout(
@ -254,7 +235,7 @@ func initObjectService(c *cfg) {
c.workers = append(c.workers, pol)
var os putsvc.ObjectStorage = engineWithoutNotifications{
e: ls,
engine: ls,
}
if c.cfgNotifications.enabled {
@ -274,10 +255,6 @@ func initObjectService(c *cfg) {
putsvc.WithContainerSource(c.cfgObject.cnrSource),
putsvc.WithNetworkMapSource(c.netMapSource),
putsvc.WithNetmapKeys(c),
putsvc.WithFormatValidatorOpts(
objectCore.WithDeleteHandler(objInhumer),
objectCore.WithLocker(ls),
),
putsvc.WithNetworkState(c.cfgNetmap.state),
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote),
putsvc.WithLogger(c.log),
@ -561,6 +538,14 @@ type engineWithNotifications struct {
defaultTopic string
}
func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
return e.base.Delete(tombstone, toDelete)
}
func (e engineWithNotifications) Lock(locker oid.Address, toLock []oid.ID) error {
return e.base.Lock(locker, toLock)
}
func (e engineWithNotifications) Put(o *objectSDK.Object) error {
if err := e.base.Put(o); err != nil {
return err
@ -583,9 +568,28 @@ func (e engineWithNotifications) Put(o *objectSDK.Object) error {
}
type engineWithoutNotifications struct {
e *engine.StorageEngine
engine *engine.StorageEngine
}
func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
var prm engine.InhumePrm
addrs := make([]oid.Address, len(toDelete))
for i := range addrs {
addrs[i].SetContainer(tombstone.Container())
addrs[i].SetObject(toDelete[i])
}
prm.WithTarget(tombstone, addrs...)
_, err := e.engine.Inhume(prm)
return err
}
func (e engineWithoutNotifications) Lock(locker oid.Address, toLock []oid.ID) error {
return e.engine.Lock(locker.Container(), locker.Object(), toLock)
}
func (e engineWithoutNotifications) Put(o *objectSDK.Object) error {
return engine.Put(e.e, o)
return engine.Put(e.engine, o)
}

View file

@ -26,11 +26,7 @@ type FormatValidator struct {
type FormatValidatorOption func(*cfg)
type cfg struct {
deleteHandler DeleteHandler
netState netmap.State
locker Locker
}
// DeleteHandler is an interface of delete queue processor.
@ -173,130 +169,141 @@ func (v *FormatValidator) checkOwnerKey(id user.ID, key neofsecdsa.PublicKey) er
return nil
}
// ContentMeta describes NeoFS meta information that brings object's payload if the object
// is one of:
// - object.TypeTombstone;
// - object.TypeStorageGroup;
// - object.TypeLock.
type ContentMeta struct {
typ object.Type
objs []oid.ID
}
// Type returns object's type.
func (i ContentMeta) Type() object.Type {
return i.typ
}
// Objects returns objects that the original object's payload affects:
// - inhumed objects, if the original object is a Tombstone;
// - locked objects, if the original object is a Lock;
// - members of a storage group, if the original object is a Storage group;
// - nil, if the original object is a Regular object.
func (i ContentMeta) Objects() []oid.ID {
return i.objs
}
// ValidateContent validates payload content according to the object type.
func (v *FormatValidator) ValidateContent(o *object.Object) error {
func (v *FormatValidator) ValidateContent(o *object.Object) (ContentMeta, error) {
meta := ContentMeta{
typ: o.Type(),
}
switch o.Type() {
case object.TypeRegular:
// ignore regular objects, they do not need payload formatting
case object.TypeTombstone:
if len(o.Payload()) == 0 {
return fmt.Errorf("(%T) empty payload in tombstone", v)
return ContentMeta{}, fmt.Errorf("(%T) empty payload in tombstone", v)
}
tombstone := object.NewTombstone()
if err := tombstone.Unmarshal(o.Payload()); err != nil {
return fmt.Errorf("(%T) could not unmarshal tombstone content: %w", v, err)
return ContentMeta{}, fmt.Errorf("(%T) could not unmarshal tombstone content: %w", v, err)
}
// check if the tombstone has the same expiration in the body and the header
exp, err := expirationEpochAttribute(o)
if err != nil {
return err
return ContentMeta{}, err
}
if exp != tombstone.ExpirationEpoch() {
return errTombstoneExpiration
return ContentMeta{}, errTombstoneExpiration
}
// mark all objects from the tombstone body as removed in the storage engine
cnr, ok := o.ContainerID()
_, ok := o.ContainerID()
if !ok {
return errors.New("missing container ID")
return ContentMeta{}, errors.New("missing container ID")
}
idList := tombstone.Members()
addrList := make([]oid.Address, len(idList))
for i := range idList {
addrList[i].SetContainer(cnr)
addrList[i].SetObject(idList[i])
}
if v.deleteHandler != nil {
err = v.deleteHandler.DeleteObjects(AddressOf(o), addrList...)
if err != nil {
return fmt.Errorf("delete objects from %s object content: %w", o.Type(), err)
}
}
meta.objs = idList
case object.TypeStorageGroup:
if len(o.Payload()) == 0 {
return fmt.Errorf("(%T) empty payload in SG", v)
return ContentMeta{}, fmt.Errorf("(%T) empty payload in SG", v)
}
var sg storagegroup.StorageGroup
if err := sg.Unmarshal(o.Payload()); err != nil {
return fmt.Errorf("(%T) could not unmarshal SG content: %w", v, err)
return ContentMeta{}, fmt.Errorf("(%T) could not unmarshal SG content: %w", v, err)
}
mm := sg.Members()
meta.objs = mm
lenMM := len(mm)
if lenMM == 0 {
return errEmptySGMembers
return ContentMeta{}, errEmptySGMembers
}
uniqueFilter := make(map[oid.ID]struct{}, lenMM)
for i := 0; i < lenMM; i++ {
if _, alreadySeen := uniqueFilter[mm[i]]; alreadySeen {
return fmt.Errorf("storage group contains non-unique member: %s", mm[i])
return ContentMeta{}, fmt.Errorf("storage group contains non-unique member: %s", mm[i])
}
uniqueFilter[mm[i]] = struct{}{}
}
case object.TypeLock:
if len(o.Payload()) == 0 {
return errors.New("empty payload in lock")
return ContentMeta{}, errors.New("empty payload in lock")
}
cnr, ok := o.ContainerID()
_, ok := o.ContainerID()
if !ok {
return errors.New("missing container")
return ContentMeta{}, errors.New("missing container")
}
id, ok := o.ID()
_, ok = o.ID()
if !ok {
return errors.New("missing ID")
return ContentMeta{}, errors.New("missing ID")
}
// check that LOCK object has correct expiration epoch
lockExp, err := expirationEpochAttribute(o)
if err != nil {
return fmt.Errorf("lock object expiration epoch: %w", err)
return ContentMeta{}, fmt.Errorf("lock object expiration epoch: %w", err)
}
if currEpoch := v.netState.CurrentEpoch(); lockExp < currEpoch {
return fmt.Errorf("lock object expiration: %d; current: %d", lockExp, currEpoch)
return ContentMeta{}, fmt.Errorf("lock object expiration: %d; current: %d", lockExp, currEpoch)
}
var lock object.Lock
err = lock.Unmarshal(o.Payload())
if err != nil {
return fmt.Errorf("decode lock payload: %w", err)
return ContentMeta{}, fmt.Errorf("decode lock payload: %w", err)
}
if v.locker != nil {
num := lock.NumberOfMembers()
if num == 0 {
return errors.New("missing locked members")
return ContentMeta{}, errors.New("missing locked members")
}
// mark all objects from lock list as locked in the storage engine
locklist := make([]oid.ID, num)
lock.ReadMembers(locklist)
err = v.locker.Lock(cnr, id, locklist)
if err != nil {
return fmt.Errorf("lock objects from %s object content: %w", o.Type(), err)
}
}
meta.objs = make([]oid.ID, num)
lock.ReadMembers(meta.objs)
default:
// ignore all other object types, they do not need payload formatting
}
return nil
return meta, nil
}
var errExpired = errors.New("object has expired")
@ -373,17 +380,3 @@ func WithNetState(netState netmap.State) FormatValidatorOption {
c.netState = netState
}
}
// WithDeleteHandler returns an option to set delete queue processor.
func WithDeleteHandler(v DeleteHandler) FormatValidatorOption {
return func(c *cfg) {
c.deleteHandler = v
}
}
// WithLocker returns an option to set object lock storage.
func WithLocker(v Locker) FormatValidatorOption {
return func(c *cfg) {
c.locker = v
}
}

View file

@ -114,7 +114,8 @@ func TestFormatValidator_Validate(t *testing.T) {
obj.SetType(object.TypeTombstone)
obj.SetContainerID(cidtest.ID())
require.Error(t, v.ValidateContent(obj)) // no tombstone content
_, err := v.ValidateContent(obj)
require.Error(t, err) // no tombstone content
content := object.NewTombstone()
content.SetMembers([]oid.ID{oidtest.ID()})
@ -124,7 +125,8 @@ func TestFormatValidator_Validate(t *testing.T) {
obj.SetPayload(data)
require.Error(t, v.ValidateContent(obj)) // no members in tombstone
_, err = v.ValidateContent(obj)
require.Error(t, err) // no members in tombstone
content.SetMembers([]oid.ID{oidtest.ID()})
@ -133,7 +135,8 @@ func TestFormatValidator_Validate(t *testing.T) {
obj.SetPayload(data)
require.Error(t, v.ValidateContent(obj)) // no expiration epoch in tombstone
_, err = v.ValidateContent(obj)
require.Error(t, err) // no expiration epoch in tombstone
var expirationAttribute object.Attribute
expirationAttribute.SetKey(objectV2.SysAttributeExpEpoch)
@ -141,15 +144,23 @@ func TestFormatValidator_Validate(t *testing.T) {
obj.SetAttributes(expirationAttribute)
require.Error(t, v.ValidateContent(obj)) // different expiration values
_, err = v.ValidateContent(obj)
require.Error(t, err) // different expiration values
id := oidtest.ID()
content.SetExpirationEpoch(10)
content.SetMembers([]oid.ID{id})
data, err = content.Marshal()
require.NoError(t, err)
obj.SetPayload(data)
require.NoError(t, v.ValidateContent(obj)) // all good
contentGot, err := v.ValidateContent(obj)
require.NoError(t, err) // all good
require.EqualValues(t, []oid.ID{id}, contentGot.Objects())
require.Equal(t, object.TypeTombstone, contentGot.Type())
})
t.Run("storage group content", func(t *testing.T) {
@ -157,7 +168,8 @@ func TestFormatValidator_Validate(t *testing.T) {
obj.SetType(object.TypeStorageGroup)
t.Run("empty payload", func(t *testing.T) {
require.Error(t, v.ValidateContent(obj))
_, err := v.ValidateContent(obj)
require.Error(t, err)
})
var content storagegroup.StorageGroup
@ -168,7 +180,9 @@ func TestFormatValidator_Validate(t *testing.T) {
require.NoError(t, err)
obj.SetPayload(data)
require.ErrorIs(t, v.ValidateContent(obj), errEmptySGMembers)
_, err = v.ValidateContent(obj)
require.ErrorIs(t, err, errEmptySGMembers)
})
t.Run("non-unique members", func(t *testing.T) {
@ -180,17 +194,25 @@ func TestFormatValidator_Validate(t *testing.T) {
require.NoError(t, err)
obj.SetPayload(data)
require.Error(t, v.ValidateContent(obj))
_, err = v.ValidateContent(obj)
require.Error(t, err)
})
t.Run("correct SG", func(t *testing.T) {
content.SetMembers([]oid.ID{oidtest.ID(), oidtest.ID()})
ids := []oid.ID{oidtest.ID(), oidtest.ID()}
content.SetMembers(ids)
data, err := content.Marshal()
require.NoError(t, err)
obj.SetPayload(data)
require.NoError(t, v.ValidateContent(obj))
content, err := v.ValidateContent(obj)
require.NoError(t, err)
require.EqualValues(t, ids, content.Objects())
require.Equal(t, object.TypeStorageGroup, content.Type())
})
})

View file

@ -16,7 +16,7 @@ import (
)
type preparedObjectTarget interface {
WriteHeader(*objectSDK.Object) error
WriteObject(*objectSDK.Object, object.ContentMeta) error
Close() (*transformer.AccessIdentifiers, error)
}
@ -26,6 +26,7 @@ type distributedTarget struct {
remotePool, localPool util.WorkerPool
obj *objectSDK.Object
objMeta object.ContentMeta
payload []byte
@ -120,7 +121,9 @@ func (t *distributedTarget) Write(p []byte) (n int, err error) {
func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
t.obj.SetPayload(t.payload)
if err := t.fmt.ValidateContent(t.obj); err != nil {
var err error
if t.objMeta, err = t.fmt.ValidateContent(t.obj); err != nil {
return nil, fmt.Errorf("(%T) could not validate payload content: %w", t, err)
}
@ -134,7 +137,7 @@ func (t *distributedTarget) sendObject(node nodeDesc) error {
target := t.nodeTargetInitializer(node)
if err := target.WriteHeader(t.obj); err != nil {
if err := target.WriteObject(t.obj, t.objMeta); err != nil {
return fmt.Errorf("could not write header: %w", err)
} else if _, err := target.Close(); err != nil {
return fmt.Errorf("could not close object stream: %w", err)

View file

@ -3,31 +3,55 @@ package putsvc
import (
"fmt"
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/nspcc-dev/neofs-sdk-go/object"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)
// ObjectStorage is an object storage interface.
type ObjectStorage interface {
// Put must save passed object
// and return any appeared error.
Put(o *objectSDK.Object) error
Put(*object.Object) error
// Delete must delete passed objects
// and return any appeared error.
Delete(tombstone oid.Address, toDelete []oid.ID) error
// Lock must lock passed objects
// and return any appeared error.
Lock(locker oid.Address, toLock []oid.ID) error
}
type localTarget struct {
storage ObjectStorage
obj *object.Object
meta objectCore.ContentMeta
}
func (t *localTarget) WriteHeader(obj *object.Object) error {
func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error {
t.obj = obj
t.meta = meta
return nil
}
func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) {
switch t.meta.Type() {
case object.TypeTombstone:
err := t.storage.Delete(objectCore.AddressOf(t.obj), t.meta.Objects())
if err != nil {
return nil, fmt.Errorf("could not delete objects from tombstone locally: %w", err)
}
case object.TypeLock:
err := t.storage.Lock(objectCore.AddressOf(t.obj), t.meta.Objects())
if err != nil {
return nil, fmt.Errorf("could not lock object from lock objects locally: %w", err)
}
default:
// objects that do not change meta storage
}
if err := t.storage.Put(t.obj); err != nil {
return nil, fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
}

View file

@ -6,6 +6,7 @@ import (
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
@ -42,7 +43,7 @@ type RemotePutPrm struct {
obj *object.Object
}
func (t *remoteTarget) WriteHeader(obj *object.Object) error {
func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta) error {
t.obj = obj
return nil
@ -126,7 +127,7 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
return fmt.Errorf("parse client node info: %w", err)
}
if err := t.WriteHeader(p.obj); err != nil {
if err := t.WriteObject(p.obj, objectcore.ContentMeta{}); err != nil {
return fmt.Errorf("(%T) could not send object header: %w", s, err)
} else if _, err := t.Close(); err != nil {
return fmt.Errorf("(%T) could not send object: %w", s, err)

View file

@ -128,12 +128,6 @@ func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
}
}
func WithFormatValidatorOpts(v ...object.FormatValidatorOption) Option {
return func(c *cfg) {
c.fmtValidatorOpts = v
}
}
func WithNetworkState(v netmap.State) Option {
return func(c *cfg) {
c.networkState = v