[#1461] node: Unlock locked object on its lock removal
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
72c044e2eb
commit
fed9e6679d
8 changed files with 93 additions and 14 deletions
|
@ -176,3 +176,17 @@ func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []oid.A
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.Address) {
|
||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
sh.HandleDeletedLocks(lockers)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
e.log.Info("interrupt processing the deleted locks by context")
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
|
|||
shard.WithID(id),
|
||||
shard.WithExpiredTombstonesCallback(e.processExpiredTombstones),
|
||||
shard.WithExpiredLocksCallback(e.processExpiredLocks),
|
||||
shard.WitDeletedLockCallback(e.processDeletedLocks),
|
||||
)...)
|
||||
|
||||
if err := sh.UpdateID(); err != nil {
|
||||
|
|
|
@ -16,10 +16,21 @@ type InhumePrm struct {
|
|||
tomb *oid.Address
|
||||
|
||||
target []oid.Address
|
||||
|
||||
lockObjectHandling bool
|
||||
}
|
||||
|
||||
// InhumeRes encapsulates results of Inhume operation.
|
||||
type InhumeRes struct{}
|
||||
type InhumeRes struct {
|
||||
deletedLockObj []oid.Address
|
||||
}
|
||||
|
||||
// DeletedLockObjects returns deleted object of LOCK
|
||||
// type. Returns always nil if WithoutLockObjectHandling
|
||||
// was provided to the InhumePrm.
|
||||
func (i InhumeRes) DeletedLockObjects() []oid.Address {
|
||||
return i.deletedLockObj
|
||||
}
|
||||
|
||||
// WithAddresses sets a list of object addresses that should be inhumed.
|
||||
func (p *InhumePrm) WithAddresses(addrs ...oid.Address) {
|
||||
|
@ -47,6 +58,14 @@ func (p *InhumePrm) WithGCMark() {
|
|||
}
|
||||
}
|
||||
|
||||
// WithLockObjectHandling checks if there were
|
||||
// any LOCK object among the targets set via WithAddresses.
|
||||
func (p *InhumePrm) WithLockObjectHandling() {
|
||||
if p != nil {
|
||||
p.lockObjectHandling = true
|
||||
}
|
||||
}
|
||||
|
||||
// Inhume inhumes the object by specified address.
|
||||
//
|
||||
// tomb should not be nil.
|
||||
|
@ -156,13 +175,6 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// garbage object can probably lock some objects, so they should become
|
||||
// unlocked after its decay
|
||||
err = freePotentialLocks(tx, cnr, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("free potential locks: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// consider checking if target is already in graveyard?
|
||||
|
@ -170,6 +182,12 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if prm.lockObjectHandling {
|
||||
if isLockObject(tx, cnr, id) {
|
||||
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -62,32 +62,42 @@ func TestDB_Lock(t *testing.T) {
|
|||
err = putBig(db, obj)
|
||||
require.NoError(t, err)
|
||||
|
||||
tombID := oidtest.ID()
|
||||
lockID := oidtest.ID()
|
||||
|
||||
id, _ := obj.ID()
|
||||
|
||||
// lock the object
|
||||
err = db.Lock(cnr, tombID, []oid.ID{id})
|
||||
err = db.Lock(cnr, lockID, []oid.ID{id})
|
||||
require.NoError(t, err)
|
||||
|
||||
var tombAddr oid.Address
|
||||
tombAddr.SetContainer(cnr)
|
||||
tombAddr.SetObject(tombID)
|
||||
tombAddr.SetObject(lockID)
|
||||
|
||||
// try to inhume locked object using tombstone
|
||||
err = meta.Inhume(db, objectcore.AddressOf(obj), tombAddr)
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
|
||||
|
||||
// free locked object
|
||||
var inhumePrm meta.InhumePrm
|
||||
inhumePrm.WithAddresses(tombAddr)
|
||||
inhumePrm.WithGCMark()
|
||||
|
||||
// inhume the tombstone
|
||||
_, err = db.Inhume(inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var lockAddr oid.Address
|
||||
lockAddr.SetObject(lockID)
|
||||
lockAddr.SetContainer(cnr)
|
||||
|
||||
err = db.FreeLockedBy([]oid.Address{lockAddr})
|
||||
require.NoError(t, err)
|
||||
|
||||
inhumePrm.WithAddresses(tombAddr)
|
||||
inhumePrm.WithGCMark()
|
||||
|
||||
// now we can inhume the object
|
||||
err = meta.Inhume(db, objectcore.AddressOf(obj), tombAddr)
|
||||
_, err = db.Inhume(inhumePrm)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -171,3 +171,8 @@ func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) object
|
|||
|
||||
return object.TypeRegular
|
||||
}
|
||||
|
||||
// return true if provided object is of LOCK type.
|
||||
func isLockObject(tx *bbolt.Tx, idCnr cid.ID, obj oid.ID) bool {
|
||||
return inBucket(tx, bucketNameLockers(idCnr), objectKey(obj))
|
||||
}
|
||||
|
|
|
@ -395,3 +395,15 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) {
|
|||
return
|
||||
}
|
||||
}
|
||||
|
||||
// HandleDeletedLocks unlocks all objects which were locked by lockers.
|
||||
func (s *Shard) HandleDeletedLocks(lockers []oid.Address) {
|
||||
err := s.metaBase.FreeLockedBy(lockers)
|
||||
if err != nil {
|
||||
s.log.Warn("failure to unlock objects",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
|
@ -59,6 +60,7 @@ func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
|
|||
|
||||
var metaPrm meta.InhumePrm
|
||||
metaPrm.WithAddresses(prm.target...)
|
||||
metaPrm.WithLockObjectHandling()
|
||||
|
||||
if prm.tombstone != nil {
|
||||
metaPrm.WithTombstoneAddress(*prm.tombstone)
|
||||
|
@ -66,7 +68,7 @@ func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
|
|||
metaPrm.WithGCMark()
|
||||
}
|
||||
|
||||
_, err := s.metaBase.Inhume(metaPrm)
|
||||
res, err := s.metaBase.Inhume(metaPrm)
|
||||
if err != nil {
|
||||
s.log.Debug("could not mark object to delete in metabase",
|
||||
zap.String("error", err.Error()),
|
||||
|
@ -75,5 +77,9 @@ func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
|
|||
return InhumeRes{}, fmt.Errorf("metabase inhume: %w", err)
|
||||
}
|
||||
|
||||
if deletedLockObjs := res.DeletedLockObjects(); len(deletedLockObjs) != 0 {
|
||||
s.deletedLockCallBack(context.Background(), deletedLockObjs)
|
||||
}
|
||||
|
||||
return InhumeRes{}, nil
|
||||
}
|
||||
|
|
|
@ -38,6 +38,9 @@ type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject)
|
|||
// ExpiredObjectsCallback is a callback handling list of expired objects.
|
||||
type ExpiredObjectsCallback func(context.Context, []oid.Address)
|
||||
|
||||
// DeletedLockCallback is a callback handling list of deleted LOCK objects.
|
||||
type DeletedLockCallback func(context.Context, []oid.Address)
|
||||
|
||||
type cfg struct {
|
||||
m sync.RWMutex
|
||||
|
||||
|
@ -63,6 +66,8 @@ type cfg struct {
|
|||
|
||||
expiredLocksCallback ExpiredObjectsCallback
|
||||
|
||||
deletedLockCallBack DeletedLockCallback
|
||||
|
||||
tsSource TombstoneSource
|
||||
}
|
||||
|
||||
|
@ -229,6 +234,14 @@ func WithTombstoneSource(v TombstoneSource) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WitDeletedLockCallback returns option to specify callback
|
||||
// of the deleted LOCK objects handler.
|
||||
func WitDeletedLockCallback(v DeletedLockCallback) Option {
|
||||
return func(c *cfg) {
|
||||
c.deletedLockCallBack = v
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Shard) fillInfo() {
|
||||
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
||||
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
||||
|
|
Loading…
Reference in a new issue