From aab398f4f557b6649682c1563d1d61dde443b2d4 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 1 Nov 2022 20:32:43 +0300 Subject: [PATCH] [#1972] node: Do not save objects if node not in a container Do not use node's local storage if it is clear that an object will be removed anyway as a redundant. It requires moving the changing local storage logic from the validation step to the local target implementation. It allows performing any relations checks (e.g. object locking) only if a node is considered as a valid container member and is expected to store (stored previously) all the helper objects (e.g. `LOCK`, `TOMBSTONE`, etc). Signed-off-by: Pavel Karpy --- cmd/neofs-node/object.go | 56 +++++------ pkg/core/object/fmt.go | 123 ++++++++++++------------- pkg/core/object/fmt_test.go | 42 +++++++-- pkg/services/object/put/distributed.go | 11 ++- pkg/services/object/put/local.go | 32 ++++++- pkg/services/object/put/remote.go | 5 +- pkg/services/object/put/service.go | 6 -- 7 files changed, 158 insertions(+), 117 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index b20d44908..c1f35ebfa 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -97,20 +97,6 @@ func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRe return s.get.GetRangeHash(ctx, req) } -type localObjectInhumer struct { - storage *engine.StorageEngine - - log *logger.Logger -} - -func (r *localObjectInhumer) DeleteObjects(ts oid.Address, addr ...oid.Address) error { - var prm engine.InhumePrm - prm.WithTarget(ts, addr...) - - _, err := r.storage.Inhume(prm) - return err -} - type delNetInfo struct { netmap.State tsLifetime uint64 @@ -202,11 +188,6 @@ func initObjectService(c *cfg) { } } - objInhumer := &localObjectInhumer{ - storage: ls, - log: c.log, - } - c.replicator = replicator.New( replicator.WithLogger(c.log), replicator.WithPutTimeout( @@ -254,7 +235,7 @@ func initObjectService(c *cfg) { c.workers = append(c.workers, pol) var os putsvc.ObjectStorage = engineWithoutNotifications{ - e: ls, + engine: ls, } if c.cfgNotifications.enabled { @@ -274,10 +255,6 @@ func initObjectService(c *cfg) { putsvc.WithContainerSource(c.cfgObject.cnrSource), putsvc.WithNetworkMapSource(c.netMapSource), putsvc.WithNetmapKeys(c), - putsvc.WithFormatValidatorOpts( - objectCore.WithDeleteHandler(objInhumer), - objectCore.WithLocker(ls), - ), putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithWorkerPools(c.cfgObject.pool.putRemote), putsvc.WithLogger(c.log), @@ -561,6 +538,14 @@ type engineWithNotifications struct { defaultTopic string } +func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error { + return e.base.Delete(tombstone, toDelete) +} + +func (e engineWithNotifications) Lock(locker oid.Address, toLock []oid.ID) error { + return e.base.Lock(locker, toLock) +} + func (e engineWithNotifications) Put(o *objectSDK.Object) error { if err := e.base.Put(o); err != nil { return err @@ -583,9 +568,28 @@ func (e engineWithNotifications) Put(o *objectSDK.Object) error { } type engineWithoutNotifications struct { - e *engine.StorageEngine + engine *engine.StorageEngine +} + +func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error { + var prm engine.InhumePrm + + addrs := make([]oid.Address, len(toDelete)) + for i := range addrs { + addrs[i].SetContainer(tombstone.Container()) + addrs[i].SetObject(toDelete[i]) + } + + prm.WithTarget(tombstone, addrs...) + + _, err := e.engine.Inhume(prm) + return err +} + +func (e engineWithoutNotifications) Lock(locker oid.Address, toLock []oid.ID) error { + return e.engine.Lock(locker.Container(), locker.Object(), toLock) } func (e engineWithoutNotifications) Put(o *objectSDK.Object) error { - return engine.Put(e.e, o) + return engine.Put(e.engine, o) } diff --git a/pkg/core/object/fmt.go b/pkg/core/object/fmt.go index ac4e089e5..4fcf1ee15 100644 --- a/pkg/core/object/fmt.go +++ b/pkg/core/object/fmt.go @@ -26,11 +26,7 @@ type FormatValidator struct { type FormatValidatorOption func(*cfg) type cfg struct { - deleteHandler DeleteHandler - netState netmap.State - - locker Locker } // DeleteHandler is an interface of delete queue processor. @@ -173,130 +169,141 @@ func (v *FormatValidator) checkOwnerKey(id user.ID, key neofsecdsa.PublicKey) er return nil } +// ContentMeta describes NeoFS meta information that brings object's payload if the object +// is one of: +// - object.TypeTombstone; +// - object.TypeStorageGroup; +// - object.TypeLock. +type ContentMeta struct { + typ object.Type + + objs []oid.ID +} + +// Type returns object's type. +func (i ContentMeta) Type() object.Type { + return i.typ +} + +// Objects returns objects that the original object's payload affects: +// - inhumed objects, if the original object is a Tombstone; +// - locked objects, if the original object is a Lock; +// - members of a storage group, if the original object is a Storage group; +// - nil, if the original object is a Regular object. +func (i ContentMeta) Objects() []oid.ID { + return i.objs +} + // ValidateContent validates payload content according to the object type. -func (v *FormatValidator) ValidateContent(o *object.Object) error { +func (v *FormatValidator) ValidateContent(o *object.Object) (ContentMeta, error) { + meta := ContentMeta{ + typ: o.Type(), + } + switch o.Type() { case object.TypeRegular: // ignore regular objects, they do not need payload formatting case object.TypeTombstone: if len(o.Payload()) == 0 { - return fmt.Errorf("(%T) empty payload in tombstone", v) + return ContentMeta{}, fmt.Errorf("(%T) empty payload in tombstone", v) } tombstone := object.NewTombstone() if err := tombstone.Unmarshal(o.Payload()); err != nil { - return fmt.Errorf("(%T) could not unmarshal tombstone content: %w", v, err) + return ContentMeta{}, fmt.Errorf("(%T) could not unmarshal tombstone content: %w", v, err) } // check if the tombstone has the same expiration in the body and the header exp, err := expirationEpochAttribute(o) if err != nil { - return err + return ContentMeta{}, err } if exp != tombstone.ExpirationEpoch() { - return errTombstoneExpiration + return ContentMeta{}, errTombstoneExpiration } // mark all objects from the tombstone body as removed in the storage engine - cnr, ok := o.ContainerID() + _, ok := o.ContainerID() if !ok { - return errors.New("missing container ID") + return ContentMeta{}, errors.New("missing container ID") } idList := tombstone.Members() - addrList := make([]oid.Address, len(idList)) - - for i := range idList { - addrList[i].SetContainer(cnr) - addrList[i].SetObject(idList[i]) - } - - if v.deleteHandler != nil { - err = v.deleteHandler.DeleteObjects(AddressOf(o), addrList...) - if err != nil { - return fmt.Errorf("delete objects from %s object content: %w", o.Type(), err) - } - } + meta.objs = idList case object.TypeStorageGroup: if len(o.Payload()) == 0 { - return fmt.Errorf("(%T) empty payload in SG", v) + return ContentMeta{}, fmt.Errorf("(%T) empty payload in SG", v) } var sg storagegroup.StorageGroup if err := sg.Unmarshal(o.Payload()); err != nil { - return fmt.Errorf("(%T) could not unmarshal SG content: %w", v, err) + return ContentMeta{}, fmt.Errorf("(%T) could not unmarshal SG content: %w", v, err) } mm := sg.Members() + meta.objs = mm + lenMM := len(mm) if lenMM == 0 { - return errEmptySGMembers + return ContentMeta{}, errEmptySGMembers } uniqueFilter := make(map[oid.ID]struct{}, lenMM) for i := 0; i < lenMM; i++ { if _, alreadySeen := uniqueFilter[mm[i]]; alreadySeen { - return fmt.Errorf("storage group contains non-unique member: %s", mm[i]) + return ContentMeta{}, fmt.Errorf("storage group contains non-unique member: %s", mm[i]) } uniqueFilter[mm[i]] = struct{}{} } case object.TypeLock: if len(o.Payload()) == 0 { - return errors.New("empty payload in lock") + return ContentMeta{}, errors.New("empty payload in lock") } - cnr, ok := o.ContainerID() + _, ok := o.ContainerID() if !ok { - return errors.New("missing container") + return ContentMeta{}, errors.New("missing container") } - id, ok := o.ID() + _, ok = o.ID() if !ok { - return errors.New("missing ID") + return ContentMeta{}, errors.New("missing ID") } // check that LOCK object has correct expiration epoch lockExp, err := expirationEpochAttribute(o) if err != nil { - return fmt.Errorf("lock object expiration epoch: %w", err) + return ContentMeta{}, fmt.Errorf("lock object expiration epoch: %w", err) } if currEpoch := v.netState.CurrentEpoch(); lockExp < currEpoch { - return fmt.Errorf("lock object expiration: %d; current: %d", lockExp, currEpoch) + return ContentMeta{}, fmt.Errorf("lock object expiration: %d; current: %d", lockExp, currEpoch) } var lock object.Lock err = lock.Unmarshal(o.Payload()) if err != nil { - return fmt.Errorf("decode lock payload: %w", err) + return ContentMeta{}, fmt.Errorf("decode lock payload: %w", err) } - if v.locker != nil { - num := lock.NumberOfMembers() - if num == 0 { - return errors.New("missing locked members") - } - - // mark all objects from lock list as locked in the storage engine - locklist := make([]oid.ID, num) - lock.ReadMembers(locklist) - - err = v.locker.Lock(cnr, id, locklist) - if err != nil { - return fmt.Errorf("lock objects from %s object content: %w", o.Type(), err) - } + num := lock.NumberOfMembers() + if num == 0 { + return ContentMeta{}, errors.New("missing locked members") } + + meta.objs = make([]oid.ID, num) + lock.ReadMembers(meta.objs) default: // ignore all other object types, they do not need payload formatting } - return nil + return meta, nil } var errExpired = errors.New("object has expired") @@ -373,17 +380,3 @@ func WithNetState(netState netmap.State) FormatValidatorOption { c.netState = netState } } - -// WithDeleteHandler returns an option to set delete queue processor. -func WithDeleteHandler(v DeleteHandler) FormatValidatorOption { - return func(c *cfg) { - c.deleteHandler = v - } -} - -// WithLocker returns an option to set object lock storage. -func WithLocker(v Locker) FormatValidatorOption { - return func(c *cfg) { - c.locker = v - } -} diff --git a/pkg/core/object/fmt_test.go b/pkg/core/object/fmt_test.go index 3bc9eed05..a5df3f365 100644 --- a/pkg/core/object/fmt_test.go +++ b/pkg/core/object/fmt_test.go @@ -114,7 +114,8 @@ func TestFormatValidator_Validate(t *testing.T) { obj.SetType(object.TypeTombstone) obj.SetContainerID(cidtest.ID()) - require.Error(t, v.ValidateContent(obj)) // no tombstone content + _, err := v.ValidateContent(obj) + require.Error(t, err) // no tombstone content content := object.NewTombstone() content.SetMembers([]oid.ID{oidtest.ID()}) @@ -124,7 +125,8 @@ func TestFormatValidator_Validate(t *testing.T) { obj.SetPayload(data) - require.Error(t, v.ValidateContent(obj)) // no members in tombstone + _, err = v.ValidateContent(obj) + require.Error(t, err) // no members in tombstone content.SetMembers([]oid.ID{oidtest.ID()}) @@ -133,7 +135,8 @@ func TestFormatValidator_Validate(t *testing.T) { obj.SetPayload(data) - require.Error(t, v.ValidateContent(obj)) // no expiration epoch in tombstone + _, err = v.ValidateContent(obj) + require.Error(t, err) // no expiration epoch in tombstone var expirationAttribute object.Attribute expirationAttribute.SetKey(objectV2.SysAttributeExpEpoch) @@ -141,15 +144,23 @@ func TestFormatValidator_Validate(t *testing.T) { obj.SetAttributes(expirationAttribute) - require.Error(t, v.ValidateContent(obj)) // different expiration values + _, err = v.ValidateContent(obj) + require.Error(t, err) // different expiration values + + id := oidtest.ID() content.SetExpirationEpoch(10) + content.SetMembers([]oid.ID{id}) data, err = content.Marshal() require.NoError(t, err) obj.SetPayload(data) - require.NoError(t, v.ValidateContent(obj)) // all good + contentGot, err := v.ValidateContent(obj) + require.NoError(t, err) // all good + + require.EqualValues(t, []oid.ID{id}, contentGot.Objects()) + require.Equal(t, object.TypeTombstone, contentGot.Type()) }) t.Run("storage group content", func(t *testing.T) { @@ -157,7 +168,8 @@ func TestFormatValidator_Validate(t *testing.T) { obj.SetType(object.TypeStorageGroup) t.Run("empty payload", func(t *testing.T) { - require.Error(t, v.ValidateContent(obj)) + _, err := v.ValidateContent(obj) + require.Error(t, err) }) var content storagegroup.StorageGroup @@ -168,7 +180,9 @@ func TestFormatValidator_Validate(t *testing.T) { require.NoError(t, err) obj.SetPayload(data) - require.ErrorIs(t, v.ValidateContent(obj), errEmptySGMembers) + + _, err = v.ValidateContent(obj) + require.ErrorIs(t, err, errEmptySGMembers) }) t.Run("non-unique members", func(t *testing.T) { @@ -180,17 +194,25 @@ func TestFormatValidator_Validate(t *testing.T) { require.NoError(t, err) obj.SetPayload(data) - require.Error(t, v.ValidateContent(obj)) + + _, err = v.ValidateContent(obj) + require.Error(t, err) }) t.Run("correct SG", func(t *testing.T) { - content.SetMembers([]oid.ID{oidtest.ID(), oidtest.ID()}) + ids := []oid.ID{oidtest.ID(), oidtest.ID()} + content.SetMembers(ids) data, err := content.Marshal() require.NoError(t, err) obj.SetPayload(data) - require.NoError(t, v.ValidateContent(obj)) + + content, err := v.ValidateContent(obj) + require.NoError(t, err) + + require.EqualValues(t, ids, content.Objects()) + require.Equal(t, object.TypeStorageGroup, content.Type()) }) }) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 5e0e8c119..640e37049 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -16,7 +16,7 @@ import ( ) type preparedObjectTarget interface { - WriteHeader(*objectSDK.Object) error + WriteObject(*objectSDK.Object, object.ContentMeta) error Close() (*transformer.AccessIdentifiers, error) } @@ -25,7 +25,8 @@ type distributedTarget struct { remotePool, localPool util.WorkerPool - obj *objectSDK.Object + obj *objectSDK.Object + objMeta object.ContentMeta payload []byte @@ -120,7 +121,9 @@ func (t *distributedTarget) Write(p []byte) (n int, err error) { func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) { t.obj.SetPayload(t.payload) - if err := t.fmt.ValidateContent(t.obj); err != nil { + var err error + + if t.objMeta, err = t.fmt.ValidateContent(t.obj); err != nil { return nil, fmt.Errorf("(%T) could not validate payload content: %w", t, err) } @@ -134,7 +137,7 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { target := t.nodeTargetInitializer(node) - if err := target.WriteHeader(t.obj); err != nil { + if err := target.WriteObject(t.obj, t.objMeta); err != nil { return fmt.Errorf("could not write header: %w", err) } else if _, err := target.Close(); err != nil { return fmt.Errorf("could not close object stream: %w", err) diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go index e90e0ec02..da96fd58b 100644 --- a/pkg/services/object/put/local.go +++ b/pkg/services/object/put/local.go @@ -3,31 +3,55 @@ package putsvc import ( "fmt" + objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" "github.com/nspcc-dev/neofs-sdk-go/object" - objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) // ObjectStorage is an object storage interface. type ObjectStorage interface { // Put must save passed object // and return any appeared error. - Put(o *objectSDK.Object) error + Put(*object.Object) error + // Delete must delete passed objects + // and return any appeared error. + Delete(tombstone oid.Address, toDelete []oid.ID) error + // Lock must lock passed objects + // and return any appeared error. + Lock(locker oid.Address, toLock []oid.ID) error } type localTarget struct { storage ObjectStorage - obj *object.Object + obj *object.Object + meta objectCore.ContentMeta } -func (t *localTarget) WriteHeader(obj *object.Object) error { +func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error { t.obj = obj + t.meta = meta return nil } func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) { + switch t.meta.Type() { + case object.TypeTombstone: + err := t.storage.Delete(objectCore.AddressOf(t.obj), t.meta.Objects()) + if err != nil { + return nil, fmt.Errorf("could not delete objects from tombstone locally: %w", err) + } + case object.TypeLock: + err := t.storage.Lock(objectCore.AddressOf(t.obj), t.meta.Objects()) + if err != nil { + return nil, fmt.Errorf("could not lock object from lock objects locally: %w", err) + } + default: + // objects that do not change meta storage + } + if err := t.storage.Put(t.obj); err != nil { return nil, fmt.Errorf("(%T) could not put object to local storage: %w", t, err) } diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index 5a6273f68..95d040947 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -6,6 +6,7 @@ import ( clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" @@ -42,7 +43,7 @@ type RemotePutPrm struct { obj *object.Object } -func (t *remoteTarget) WriteHeader(obj *object.Object) error { +func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta) error { t.obj = obj return nil @@ -126,7 +127,7 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error { return fmt.Errorf("parse client node info: %w", err) } - if err := t.WriteHeader(p.obj); err != nil { + if err := t.WriteObject(p.obj, objectcore.ContentMeta{}); err != nil { return fmt.Errorf("(%T) could not send object header: %w", s, err) } else if _, err := t.Close(); err != nil { return fmt.Errorf("(%T) could not send object: %w", s, err) diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 5dcfd5684..594c2db0b 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -128,12 +128,6 @@ func WithNetmapKeys(v netmap.AnnouncedKeys) Option { } } -func WithFormatValidatorOpts(v ...object.FormatValidatorOption) Option { - return func(c *cfg) { - c.fmtValidatorOpts = v - } -} - func WithNetworkState(v netmap.State) Option { return func(c *cfg) { c.networkState = v