From ced854bc2eeeba6c973ff36d390e17abf2a600ce Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 16 Feb 2022 18:49:19 +0300 Subject: [PATCH] [#1175] object/fmt: Handle `LOCK` objects Make `FormatValidator.ValidateContent` to verify payload of `LOCK` objects. Pass locked objects to `Locker` interface. Require from `Locker.Lock` to return `apistatus.IrregularObjectLock` error on a corresponding condition. Also add error return to `DeleteHandler.DeleteObjects` method. Require from method to return `apistatus.ObjectLocked` error on a corresponding condition. Adopt implementations. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/object.go | 16 ++---- pkg/core/object/fmt.go | 55 ++++++++++++++++++- pkg/local_object_storage/engine/lock.go | 6 +- pkg/local_object_storage/metabase/lock.go | 2 +- .../metabase/lock_test.go | 2 +- 5 files changed, 63 insertions(+), 18 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index f5d54124e..ea587099c 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -101,19 +101,12 @@ type localObjectInhumer struct { log *logger.Logger } -func (r *localObjectInhumer) DeleteObjects(ts *addressSDK.Address, addr ...*addressSDK.Address) { +func (r *localObjectInhumer) DeleteObjects(ts *addressSDK.Address, addr ...*addressSDK.Address) error { prm := new(engine.InhumePrm) + prm.WithTarget(ts, addr...) - for _, a := range addr { - prm.WithTarget(ts, a) - - if _, err := r.storage.Inhume(prm); err != nil { - r.log.Error("could not delete object", - zap.Stringer("address", a), - zap.String("error", err.Error()), - ) - } - } + _, err := r.storage.Inhume(prm) + return err } type delNetInfo struct { @@ -267,6 +260,7 @@ func initObjectService(c *cfg) { putsvc.WithNetmapKeys(c), putsvc.WithFormatValidatorOpts( objectCore.WithDeleteHandler(objInhumer), + objectCore.WithLocker(ls), ), putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithWorkerPools(c.cfgObject.pool.putRemote), diff --git a/pkg/core/object/fmt.go b/pkg/core/object/fmt.go index 9e7c5cda0..48db014c5 100644 --- a/pkg/core/object/fmt.go +++ b/pkg/core/object/fmt.go @@ -11,8 +11,10 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/keys" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/owner" "github.com/nspcc-dev/neofs-sdk-go/storagegroup" ) @@ -29,11 +31,26 @@ type cfg struct { deleteHandler DeleteHandler netState netmap.State + + locker Locker } // DeleteHandler is an interface of delete queue processor. type DeleteHandler interface { - DeleteObjects(*addressSDK.Address, ...*addressSDK.Address) + // DeleteObjects objects places objects to removal queue. + // + // Returns apistatus.IrregularObjectLock if at least one object + // is locked. + DeleteObjects(*addressSDK.Address, ...*addressSDK.Address) error +} + +// Locker is an object lock storage interface. +type Locker interface { + // Lock list of objects as locked by locker in the specified container. + // + // Returns apistatus.IrregularObjectLock if at least object in locked + // list is irregular (not type of REGULAR). + Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error } var errNilObject = errors.New("object is nil") @@ -135,6 +152,8 @@ func (v *FormatValidator) checkOwnerKey(id *owner.ID, key []byte) error { // ValidateContent validates payload content according to object type. func (v *FormatValidator) ValidateContent(o *object.Object) error { switch o.Type() { + case object.TypeRegular: + // ignore regular objects, they do not need payload formatting case object.TypeTombstone: if len(o.Payload()) == 0 { return fmt.Errorf("(%T) empty payload in tombstone", v) @@ -174,7 +193,10 @@ func (v *FormatValidator) ValidateContent(o *object.Object) error { } if v.deleteHandler != nil { - v.deleteHandler.DeleteObjects(AddressOf(o), addrList...) + err = v.deleteHandler.DeleteObjects(AddressOf(o), addrList...) + if err != nil { + return fmt.Errorf("delete objects from %s object content: %w", o.Type(), err) + } } case object.TypeStorageGroup: if len(o.Payload()) == 0 { @@ -192,6 +214,28 @@ func (v *FormatValidator) ValidateContent(o *object.Object) error { return fmt.Errorf("(%T) empty member in SG", v) } } + case object.TypeLock: + if len(o.Payload()) == 0 { + return errors.New("empty payload in lock") + } + + var lock object.Lock + + err := lock.Unmarshal(o.Payload()) + if err != nil { + return fmt.Errorf("decode lock payload: %w", err) + } + + if v.locker != nil { + // mark all objects from lock list as locked in storage engine + locklist := make([]oid.ID, lock.NumberOfMembers()) + lock.ReadMembers(locklist) + + err = v.locker.Lock(*o.ContainerID(), *o.ID(), locklist) + if err != nil { + return fmt.Errorf("lock objects from %s object content: %w", o.Type(), err) + } + } default: // ignore all other object types, they do not need payload formatting } @@ -281,3 +325,10 @@ func WithDeleteHandler(v DeleteHandler) FormatValidatorOption { c.deleteHandler = v } } + +// WithLocker returns option to set object lock storage. +func WithLocker(v Locker) FormatValidatorOption { + return func(c *cfg) { + c.locker = v + } +} diff --git a/pkg/local_object_storage/engine/lock.go b/pkg/local_object_storage/engine/lock.go index 8329c408c..8d38db99d 100644 --- a/pkg/local_object_storage/engine/lock.go +++ b/pkg/local_object_storage/engine/lock.go @@ -32,11 +32,11 @@ func (e *StorageEngine) lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error for i := range locked { switch e.lockSingle(idCnr, locker, locked[i], true) { case 1: - return apistatus.IrregularObjectLock{} + return apistatus.LockNonRegularObject{} case 0: switch e.lockSingle(idCnr, locker, locked[i], false) { case 1: - return apistatus.IrregularObjectLock{} + return apistatus.LockNonRegularObject{} case 0: return errLockFailed } @@ -53,7 +53,7 @@ func (e *StorageEngine) lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) { // code is pretty similar to inhumeAddr, maybe unify? root := false - var errIrregular apistatus.IrregularObjectLock + var errIrregular apistatus.LockNonRegularObject var addrLocked address.Address addrLocked.SetContainerID(&idCnr) diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index 60a957882..565ce8b04 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -42,7 +42,7 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error { } if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != object.TypeRegular { - return apistatus.IrregularObjectLock{} + return apistatus.LockNonRegularObject{} } bucketLocked, err := tx.CreateBucketIfNotExists(bucketNameLocked) diff --git a/pkg/local_object_storage/metabase/lock_test.go b/pkg/local_object_storage/metabase/lock_test.go index f787f6d07..9ca04b748 100644 --- a/pkg/local_object_storage/metabase/lock_test.go +++ b/pkg/local_object_storage/metabase/lock_test.go @@ -37,7 +37,7 @@ func TestDB_Lock(t *testing.T) { err := meta.Put(db, obj, nil) require.NoError(t, err, typ) - var e apistatus.IrregularObjectLock + var e apistatus.LockNonRegularObject // try to lock it err = db.Lock(cnr, *oidtest.ID(), []oid.ID{*obj.ID()})