forked from TrueCloudLab/frostfs-node
[#1175] object/fmt: Handle LOCK
objects
Make `FormatValidator.ValidateContent` to verify payload of `LOCK` objects. Pass locked objects to `Locker` interface. Require from `Locker.Lock` to return `apistatus.IrregularObjectLock` error on a corresponding condition. Also add error return to `DeleteHandler.DeleteObjects` method. Require from method to return `apistatus.ObjectLocked` error on a corresponding condition. Adopt implementations. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
d163008b63
commit
ced854bc2e
5 changed files with 63 additions and 18 deletions
|
@ -101,19 +101,12 @@ type localObjectInhumer struct {
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *localObjectInhumer) DeleteObjects(ts *addressSDK.Address, addr ...*addressSDK.Address) {
|
func (r *localObjectInhumer) DeleteObjects(ts *addressSDK.Address, addr ...*addressSDK.Address) error {
|
||||||
prm := new(engine.InhumePrm)
|
prm := new(engine.InhumePrm)
|
||||||
|
prm.WithTarget(ts, addr...)
|
||||||
|
|
||||||
for _, a := range addr {
|
_, err := r.storage.Inhume(prm)
|
||||||
prm.WithTarget(ts, a)
|
return err
|
||||||
|
|
||||||
if _, err := r.storage.Inhume(prm); err != nil {
|
|
||||||
r.log.Error("could not delete object",
|
|
||||||
zap.Stringer("address", a),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type delNetInfo struct {
|
type delNetInfo struct {
|
||||||
|
@ -267,6 +260,7 @@ func initObjectService(c *cfg) {
|
||||||
putsvc.WithNetmapKeys(c),
|
putsvc.WithNetmapKeys(c),
|
||||||
putsvc.WithFormatValidatorOpts(
|
putsvc.WithFormatValidatorOpts(
|
||||||
objectCore.WithDeleteHandler(objInhumer),
|
objectCore.WithDeleteHandler(objInhumer),
|
||||||
|
objectCore.WithLocker(ls),
|
||||||
),
|
),
|
||||||
putsvc.WithNetworkState(c.cfgNetmap.state),
|
putsvc.WithNetworkState(c.cfgNetmap.state),
|
||||||
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote),
|
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote),
|
||||||
|
|
|
@ -11,8 +11,10 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
||||||
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/owner"
|
"github.com/nspcc-dev/neofs-sdk-go/owner"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/storagegroup"
|
"github.com/nspcc-dev/neofs-sdk-go/storagegroup"
|
||||||
)
|
)
|
||||||
|
@ -29,11 +31,26 @@ type cfg struct {
|
||||||
deleteHandler DeleteHandler
|
deleteHandler DeleteHandler
|
||||||
|
|
||||||
netState netmap.State
|
netState netmap.State
|
||||||
|
|
||||||
|
locker Locker
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteHandler is an interface of delete queue processor.
|
// DeleteHandler is an interface of delete queue processor.
|
||||||
type DeleteHandler interface {
|
type DeleteHandler interface {
|
||||||
DeleteObjects(*addressSDK.Address, ...*addressSDK.Address)
|
// DeleteObjects objects places objects to removal queue.
|
||||||
|
//
|
||||||
|
// Returns apistatus.IrregularObjectLock if at least one object
|
||||||
|
// is locked.
|
||||||
|
DeleteObjects(*addressSDK.Address, ...*addressSDK.Address) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Locker is an object lock storage interface.
|
||||||
|
type Locker interface {
|
||||||
|
// Lock list of objects as locked by locker in the specified container.
|
||||||
|
//
|
||||||
|
// Returns apistatus.IrregularObjectLock if at least object in locked
|
||||||
|
// list is irregular (not type of REGULAR).
|
||||||
|
Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error
|
||||||
}
|
}
|
||||||
|
|
||||||
var errNilObject = errors.New("object is nil")
|
var errNilObject = errors.New("object is nil")
|
||||||
|
@ -135,6 +152,8 @@ func (v *FormatValidator) checkOwnerKey(id *owner.ID, key []byte) error {
|
||||||
// ValidateContent validates payload content according to object type.
|
// ValidateContent validates payload content according to object type.
|
||||||
func (v *FormatValidator) ValidateContent(o *object.Object) error {
|
func (v *FormatValidator) ValidateContent(o *object.Object) error {
|
||||||
switch o.Type() {
|
switch o.Type() {
|
||||||
|
case object.TypeRegular:
|
||||||
|
// ignore regular objects, they do not need payload formatting
|
||||||
case object.TypeTombstone:
|
case object.TypeTombstone:
|
||||||
if len(o.Payload()) == 0 {
|
if len(o.Payload()) == 0 {
|
||||||
return fmt.Errorf("(%T) empty payload in tombstone", v)
|
return fmt.Errorf("(%T) empty payload in tombstone", v)
|
||||||
|
@ -174,7 +193,10 @@ func (v *FormatValidator) ValidateContent(o *object.Object) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if v.deleteHandler != nil {
|
if v.deleteHandler != nil {
|
||||||
v.deleteHandler.DeleteObjects(AddressOf(o), addrList...)
|
err = v.deleteHandler.DeleteObjects(AddressOf(o), addrList...)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("delete objects from %s object content: %w", o.Type(), err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case object.TypeStorageGroup:
|
case object.TypeStorageGroup:
|
||||||
if len(o.Payload()) == 0 {
|
if len(o.Payload()) == 0 {
|
||||||
|
@ -192,6 +214,28 @@ func (v *FormatValidator) ValidateContent(o *object.Object) error {
|
||||||
return fmt.Errorf("(%T) empty member in SG", v)
|
return fmt.Errorf("(%T) empty member in SG", v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case object.TypeLock:
|
||||||
|
if len(o.Payload()) == 0 {
|
||||||
|
return errors.New("empty payload in lock")
|
||||||
|
}
|
||||||
|
|
||||||
|
var lock object.Lock
|
||||||
|
|
||||||
|
err := lock.Unmarshal(o.Payload())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("decode lock payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.locker != nil {
|
||||||
|
// mark all objects from lock list as locked in storage engine
|
||||||
|
locklist := make([]oid.ID, lock.NumberOfMembers())
|
||||||
|
lock.ReadMembers(locklist)
|
||||||
|
|
||||||
|
err = v.locker.Lock(*o.ContainerID(), *o.ID(), locklist)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("lock objects from %s object content: %w", o.Type(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
// ignore all other object types, they do not need payload formatting
|
// ignore all other object types, they do not need payload formatting
|
||||||
}
|
}
|
||||||
|
@ -281,3 +325,10 @@ func WithDeleteHandler(v DeleteHandler) FormatValidatorOption {
|
||||||
c.deleteHandler = v
|
c.deleteHandler = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithLocker returns option to set object lock storage.
|
||||||
|
func WithLocker(v Locker) FormatValidatorOption {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.locker = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -32,11 +32,11 @@ func (e *StorageEngine) lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error
|
||||||
for i := range locked {
|
for i := range locked {
|
||||||
switch e.lockSingle(idCnr, locker, locked[i], true) {
|
switch e.lockSingle(idCnr, locker, locked[i], true) {
|
||||||
case 1:
|
case 1:
|
||||||
return apistatus.IrregularObjectLock{}
|
return apistatus.LockNonRegularObject{}
|
||||||
case 0:
|
case 0:
|
||||||
switch e.lockSingle(idCnr, locker, locked[i], false) {
|
switch e.lockSingle(idCnr, locker, locked[i], false) {
|
||||||
case 1:
|
case 1:
|
||||||
return apistatus.IrregularObjectLock{}
|
return apistatus.LockNonRegularObject{}
|
||||||
case 0:
|
case 0:
|
||||||
return errLockFailed
|
return errLockFailed
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ func (e *StorageEngine) lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error
|
||||||
func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
|
func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) {
|
||||||
// code is pretty similar to inhumeAddr, maybe unify?
|
// code is pretty similar to inhumeAddr, maybe unify?
|
||||||
root := false
|
root := false
|
||||||
var errIrregular apistatus.IrregularObjectLock
|
var errIrregular apistatus.LockNonRegularObject
|
||||||
|
|
||||||
var addrLocked address.Address
|
var addrLocked address.Address
|
||||||
addrLocked.SetContainerID(&idCnr)
|
addrLocked.SetContainerID(&idCnr)
|
||||||
|
|
|
@ -42,7 +42,7 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != object.TypeRegular {
|
if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != object.TypeRegular {
|
||||||
return apistatus.IrregularObjectLock{}
|
return apistatus.LockNonRegularObject{}
|
||||||
}
|
}
|
||||||
|
|
||||||
bucketLocked, err := tx.CreateBucketIfNotExists(bucketNameLocked)
|
bucketLocked, err := tx.CreateBucketIfNotExists(bucketNameLocked)
|
||||||
|
|
|
@ -37,7 +37,7 @@ func TestDB_Lock(t *testing.T) {
|
||||||
err := meta.Put(db, obj, nil)
|
err := meta.Put(db, obj, nil)
|
||||||
require.NoError(t, err, typ)
|
require.NoError(t, err, typ)
|
||||||
|
|
||||||
var e apistatus.IrregularObjectLock
|
var e apistatus.LockNonRegularObject
|
||||||
|
|
||||||
// try to lock it
|
// try to lock it
|
||||||
err = db.Lock(cnr, *oidtest.ID(), []oid.ID{*obj.ID()})
|
err = db.Lock(cnr, *oidtest.ID(), []oid.ID{*obj.ID()})
|
||||||
|
|
Loading…
Reference in a new issue