[#1181] local storage: Process expired locks similar to tombstones

There is a need to process expired `LOCK` objects similar to `TOMBSTONE`
ones: we collect them on `Shard`, notify all other shards about
expiration so they could unlock the objects, and only after that mark
lockers as garbage.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2022-03-10 20:58:58 +03:00 committed by LeL
parent ebd84f6dd4
commit 9dff07200c
9 changed files with 369 additions and 29 deletions

View file

@ -6,13 +6,11 @@ import (
"math/rand"
"os"
"path/filepath"
"sync"
"testing"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/test"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
@ -74,14 +72,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
}
func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine {
engine := &StorageEngine{
cfg: &cfg{
log: zap.L(),
},
mtx: new(sync.RWMutex),
shards: make(map[string]shardWrapper, len(shards)),
shardPools: make(map[string]util.WorkerPool, len(shards)),
}
engine := New()
for _, s := range shards {
pool, err := ants.NewPool(10, ants.WithNonblocking(true))
@ -123,6 +114,32 @@ func testNewShard(t testing.TB, id int) *shard.Shard {
return s
}
func testEngineFromShardOpts(t *testing.T, num int, extraOpts func(int) []shard.Option) *StorageEngine {
engine := New()
for i := 0; i < num; i++ {
sid, err := generateShardID()
require.NoError(t, err)
err = engine.addShard(sid, append([]shard.Option{
shard.WithBlobStorOptions(
blobstor.WithRootPath(filepath.Join(t.Name(), fmt.Sprintf("%d.blobstor", sid))),
blobstor.WithBlobovniczaShallowWidth(1),
blobstor.WithBlobovniczaShallowDepth(1),
blobstor.WithRootPerm(0700),
),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(t.Name(), fmt.Sprintf("%d.metabase", sid))),
meta.WithPermissions(0700),
)}, extraOpts(i)...)...)
require.NoError(t, err)
}
require.NoError(t, engine.Open())
require.NoError(t, engine.Init())
return engine
}
func testOID() *oidSDK.ID {
cs := [sha256.Size]byte{}
_, _ = rand.Read(cs[:])

View file

@ -172,3 +172,17 @@ func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []*a
}
})
}
func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []*addressSDK.Address) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredLocks(lockers)
select {
case <-ctx.Done():
e.log.Info("interrupt processing the expired locks by context")
return true
default:
return false
}
})
}

View file

@ -0,0 +1,197 @@
package engine
import (
"os"
"strconv"
"testing"
"time"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/util"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/nspcc-dev/neofs-sdk-go/object/address"
objecttest "github.com/nspcc-dev/neofs-sdk-go/object/address/test"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
func TestLockUserScenario(t *testing.T) {
t.Skip("posted bug neofs-node#1227")
// Tested user actions:
// 1. stores some object
// 2. locks the object
// 3. tries to inhume the object with tombstone and expects failure
// 4. saves tombstone for LOCK-object and inhumes the LOCK-object using it
// 5. waits for an epoch after the tombstone expiration one
// 6. tries to inhume the object and expects success
chEvents := make([]chan shard.Event, 2)
for i := range chEvents {
chEvents[i] = make(chan shard.Event, 1)
}
e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option {
return []shard.Option{
shard.WithGCEventChannelInitializer(func() <-chan shard.Event {
return chEvents[i]
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
}
})
t.Cleanup(func() {
_ = e.Close()
_ = os.RemoveAll(t.Name())
})
const lockerTombExpiresAfter = 13
lockerID := oidtest.ID()
tombForLockID := oidtest.ID()
tombID := oidtest.ID()
cnr := cidtest.ID()
var err error
var objAddr address.Address
objAddr.SetContainerID(cnr)
var tombAddr address.Address
tombAddr.SetContainerID(cnr)
tombAddr.SetObjectID(tombID)
var lockerAddr address.Address
lockerAddr.SetContainerID(cnr)
lockerAddr.SetObjectID(lockerID)
var tombForLockAddr address.Address
tombForLockAddr.SetContainerID(cnr)
tombForLockAddr.SetObjectID(tombForLockID)
// 1.
obj := generateObjectWithCID(t, cnr)
objAddr.SetObjectID(obj.ID())
err = Put(e, obj)
require.NoError(t, err)
// 2.
err = e.Lock(*cnr, *lockerID, []oid.ID{*obj.ID()})
require.NoError(t, err)
// 3.
_, err = e.Inhume(new(InhumePrm).WithTarget(&tombAddr, &objAddr))
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
// 4.
var a object.Attribute
a.SetKey(objectV2.SysAttributeExpEpoch)
a.SetValue(strconv.Itoa(lockerTombExpiresAfter))
tombObj := generateObjectWithCID(t, cnr)
tombObj.SetType(object.TypeTombstone)
tombObj.SetID(tombForLockID)
tombObj.SetAttributes(&a)
err = Put(e, tombObj)
require.NoError(t, err)
_, err = e.Inhume(new(InhumePrm).WithTarget(&tombForLockAddr, &lockerAddr))
require.NoError(t, err, new(apistatus.ObjectLocked))
// 5.
for i := range chEvents {
chEvents[i] <- shard.EventNewEpoch(lockerTombExpiresAfter + 1)
}
// delay for GC
time.Sleep(time.Second)
_, err = e.Inhume(new(InhumePrm).WithTarget(&tombAddr, &objAddr))
require.NoError(t, err)
}
func TestLockExpiration(t *testing.T) {
// Tested scenario:
// 1. some object is stored
// 2. lock object for it is stored, and the object is locked
// 3. lock expiration epoch is coming
// 4. after some delay the object is not locked anymore
chEvents := make([]chan shard.Event, 2)
for i := range chEvents {
chEvents[i] = make(chan shard.Event, 1)
}
e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option {
return []shard.Option{
shard.WithGCEventChannelInitializer(func() <-chan shard.Event {
return chEvents[i]
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool
}),
}
})
t.Cleanup(func() {
_ = e.Close()
_ = os.RemoveAll(t.Name())
})
const lockerExpiresAfter = 13
cnr := cidtest.ID()
var err error
// 1.
obj := generateObjectWithCID(t, cnr)
err = Put(e, obj)
require.NoError(t, err)
// 2.
var a object.Attribute
a.SetKey(objectV2.SysAttributeExpEpoch)
a.SetValue(strconv.Itoa(lockerExpiresAfter))
lock := generateObjectWithCID(t, cnr)
lock.SetType(object.TypeLock)
lock.SetAttributes(&a)
err = Put(e, lock)
require.NoError(t, err)
err = e.Lock(*cnr, *lock.ID(), []oid.ID{*obj.ID()})
require.NoError(t, err)
_, err = e.Inhume(new(InhumePrm).WithTarget(objecttest.Address(), objectcore.AddressOf(obj)))
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
// 3.
for i := range chEvents {
chEvents[i] <- shard.EventNewEpoch(lockerExpiresAfter + 1)
}
// delay for GC processing. It can't be estimated, but making it bigger
// will slow down test
time.Sleep(time.Second)
// 4.
_, err = e.Inhume(new(InhumePrm).WithTarget(objecttest.Address(), objectcore.AddressOf(obj)))
require.NoError(t, err)
}

View file

@ -21,31 +21,36 @@ type hashedShard shardWrapper
// Returns any error encountered that did not allow adding a shard.
// Otherwise returns the ID of the added shard.
func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
e.mtx.Lock()
defer e.mtx.Unlock()
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
if err != nil {
return nil, err
}
id, err := generateShardID()
if err != nil {
return nil, fmt.Errorf("could not generate shard ID: %w", err)
}
return id, e.addShard(id, opts...)
}
func (e *StorageEngine) addShard(id *shard.ID, opts ...shard.Option) error {
e.mtx.Lock()
defer e.mtx.Unlock()
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
if err != nil {
return err
}
sh := shard.New(append(opts,
shard.WithID(id),
shard.WithExpiredObjectsCallback(e.processExpiredTombstones),
shard.WithExpiredTombstonesCallback(e.processExpiredTombstones),
shard.WithExpiredLocksCallback(e.processExpiredLocks),
)...)
if err := sh.UpdateID(); err != nil {
return nil, fmt.Errorf("could not open shard: %w", err)
return fmt.Errorf("could not open shard: %w", err)
}
strID := sh.ID().String()
if _, ok := e.shards[strID]; ok {
return nil, fmt.Errorf("shard with id %s was already added", strID)
return fmt.Errorf("shard with id %s was already added", strID)
}
e.shards[strID] = shardWrapper{
@ -55,7 +60,7 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
e.shardPools[strID] = pool
return sh.ID(), nil
return nil
}
func generateShardID() (*shard.ID, error) {

View file

@ -7,6 +7,7 @@ import (
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
@ -91,6 +92,22 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error {
})
}
// FreeLockedBy unlocks all objects in DB which are locked by lockers.
func (db *DB) FreeLockedBy(lockers []*addressSDK.Address) error {
return db.boltDB.Update(func(tx *bbolt.Tx) error {
var err error
for _, addr := range lockers {
err = freePotentialLocks(tx, *addr.ContainerID(), *addr.ObjectID())
if err != nil {
return err
}
}
return err
})
}
// checks if specified object is locked in the specified container.
func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
bucketLocked := tx.Bucket(bucketNameLocked)

View file

@ -3,10 +3,12 @@ package meta_test
import (
"testing"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/nspcc-dev/neofs-sdk-go/object/address"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test"
@ -48,4 +50,37 @@ func TestDB_Lock(t *testing.T) {
}
}
})
t.Run("lock-unlock scenario", func(t *testing.T) {
cnr := cidtest.ID()
obj := generateObjectWithCID(t, cnr)
var err error
err = putBig(db, obj)
require.NoError(t, err)
tombID := *oidtest.ID()
// lock the object
err = db.Lock(*cnr, tombID, []oid.ID{*obj.ID()})
require.NoError(t, err)
var tombAddr address.Address
tombAddr.SetContainerID(cnr)
tombAddr.SetObjectID(&tombID)
// try to inhume locked object using tombstone
err = meta.Inhume(db, objectcore.AddressOf(obj), &tombAddr)
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
// inhume the tombstone
_, err = db.Inhume(new(meta.InhumePrm).WithAddresses(&tombAddr).WithGCMark())
require.NoError(t, err)
// now we can inhume the object
err = meta.Inhume(db, objectcore.AddressOf(obj), &tombAddr)
require.NoError(t, err)
})
}

View file

@ -64,6 +64,7 @@ func (s *Shard) Init() error {
handlers: []eventHandler{
s.collectExpiredObjects,
s.collectExpiredTombstones,
s.collectExpiredLocks,
},
},
},

View file

@ -217,7 +217,9 @@ func (s *Shard) removeGarbage() {
}
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, false)
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
return typ != object.TypeTombstone && typ != object.TypeLock
})
if err != nil || len(expired) == 0 {
if err != nil {
s.log.Warn("iterator over expired objects failed", zap.String("error", err.Error()))
@ -240,10 +242,12 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
}
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, true)
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
return typ == object.TypeTombstone
})
if err != nil || len(expired) == 0 {
if err != nil {
s.log.Warn("iterator over expired tombstones failes", zap.String("error", err.Error()))
s.log.Warn("iterator over expired tombstones failed", zap.String("error", err.Error()))
}
return
}
@ -251,7 +255,21 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
s.expiredTombstonesCallback(ctx, expired)
}
func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, collectTombstones bool) ([]*addressSDK.Address, error) {
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
return typ == object.TypeLock
})
if err != nil || len(expired) == 0 {
if err != nil {
s.log.Warn("iterator over expired locks failed", zap.String("error", err.Error()))
}
return
}
s.expiredLocksCallback(ctx, expired)
}
func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond func(object.Type) bool) ([]*addressSDK.Address, error) {
var expired []*addressSDK.Address
err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error {
@ -259,7 +277,7 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, collectTomb
case <-ctx.Done():
return meta.ErrInterruptIterator
default:
if (expiredObject.Type() == object.TypeTombstone) == collectTombstones {
if typeCond(expiredObject.Type()) {
expired = append(expired, expiredObject.Address())
}
return nil
@ -333,3 +351,29 @@ func (s *Shard) HandleExpiredTombstones(tss map[string]*addressSDK.Address) {
return
}
}
// HandleExpiredLocks unlocks all objects which were locked by lockers.
// If successful, marks lockers themselves as garbage.
func (s *Shard) HandleExpiredLocks(lockers []*addressSDK.Address) {
err := s.metaBase.FreeLockedBy(lockers)
if err != nil {
s.log.Warn("failure to unlock objects",
zap.String("error", err.Error()),
)
return
}
var pInhume meta.InhumePrm
pInhume.WithAddresses(lockers...)
pInhume.WithGCMark()
_, err = s.metaBase.Inhume(&pInhume)
if err != nil {
s.log.Warn("failure to mark lockers as garbage",
zap.String("error", err.Error()),
)
return
}
}

View file

@ -55,6 +55,8 @@ type cfg struct {
gcCfg *gcCfg
expiredTombstonesCallback ExpiredObjectsCallback
expiredLocksCallback ExpiredObjectsCallback
}
func defaultCfg() *cfg {
@ -181,14 +183,22 @@ func WithGCRemoverSleepInterval(dur time.Duration) Option {
}
}
// WithExpiredObjectsCallback returns option to specify callback
// WithExpiredTombstonesCallback returns option to specify callback
// of the expired tombstones handler.
func WithExpiredObjectsCallback(cb ExpiredObjectsCallback) Option {
func WithExpiredTombstonesCallback(cb ExpiredObjectsCallback) Option {
return func(c *cfg) {
c.expiredTombstonesCallback = cb
}
}
// WithExpiredLocksCallback returns option to specify callback
// of the expired LOCK objects handler.
func WithExpiredLocksCallback(cb ExpiredObjectsCallback) Option {
return func(c *cfg) {
c.expiredLocksCallback = cb
}
}
// WithRefillMetabase returns option to set flag to refill the Metabase on Shard's initialization step.
func WithRefillMetabase(v bool) Option {
return func(c *cfg) {