diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 56144df394..561b83f63c 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -6,13 +6,11 @@ import ( "math/rand" "os" "path/filepath" - "sync" "testing" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/test" "github.com/nspcc-dev/neofs-sdk-go/checksum" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -74,14 +72,7 @@ func benchmarkExists(b *testing.B, shardNum int) { } func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine { - engine := &StorageEngine{ - cfg: &cfg{ - log: zap.L(), - }, - mtx: new(sync.RWMutex), - shards: make(map[string]shardWrapper, len(shards)), - shardPools: make(map[string]util.WorkerPool, len(shards)), - } + engine := New() for _, s := range shards { pool, err := ants.NewPool(10, ants.WithNonblocking(true)) @@ -123,6 +114,32 @@ func testNewShard(t testing.TB, id int) *shard.Shard { return s } +func testEngineFromShardOpts(t *testing.T, num int, extraOpts func(int) []shard.Option) *StorageEngine { + engine := New() + for i := 0; i < num; i++ { + sid, err := generateShardID() + require.NoError(t, err) + + err = engine.addShard(sid, append([]shard.Option{ + shard.WithBlobStorOptions( + blobstor.WithRootPath(filepath.Join(t.Name(), fmt.Sprintf("%d.blobstor", sid))), + blobstor.WithBlobovniczaShallowWidth(1), + blobstor.WithBlobovniczaShallowDepth(1), + blobstor.WithRootPerm(0700), + ), + shard.WithMetaBaseOptions( + meta.WithPath(filepath.Join(t.Name(), fmt.Sprintf("%d.metabase", sid))), + meta.WithPermissions(0700), + )}, extraOpts(i)...)...) + require.NoError(t, err) + } + + require.NoError(t, engine.Open()) + require.NoError(t, engine.Init()) + + return engine +} + func testOID() *oidSDK.ID { cs := [sha256.Size]byte{} _, _ = rand.Read(cs[:]) diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 2bd171662b..94c68909d4 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -172,3 +172,17 @@ func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []*a } }) } + +func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []*addressSDK.Address) { + e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + sh.HandleExpiredLocks(lockers) + + select { + case <-ctx.Done(): + e.log.Info("interrupt processing the expired locks by context") + return true + default: + return false + } + }) +} diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go new file mode 100644 index 0000000000..ee3b4da1ab --- /dev/null +++ b/pkg/local_object_storage/engine/lock_test.go @@ -0,0 +1,197 @@ +package engine + +import ( + "os" + "strconv" + "testing" + "time" + + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/util" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/nspcc-dev/neofs-sdk-go/object/address" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/address/test" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + "github.com/panjf2000/ants/v2" + "github.com/stretchr/testify/require" +) + +func TestLockUserScenario(t *testing.T) { + t.Skip("posted bug neofs-node#1227") + // Tested user actions: + // 1. stores some object + // 2. locks the object + // 3. tries to inhume the object with tombstone and expects failure + // 4. saves tombstone for LOCK-object and inhumes the LOCK-object using it + // 5. waits for an epoch after the tombstone expiration one + // 6. tries to inhume the object and expects success + chEvents := make([]chan shard.Event, 2) + + for i := range chEvents { + chEvents[i] = make(chan shard.Event, 1) + } + + e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option { + return []shard.Option{ + shard.WithGCEventChannelInitializer(func() <-chan shard.Event { + return chEvents[i] + }), + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + require.NoError(t, err) + + return pool + }), + } + }) + + t.Cleanup(func() { + _ = e.Close() + _ = os.RemoveAll(t.Name()) + }) + + const lockerTombExpiresAfter = 13 + + lockerID := oidtest.ID() + tombForLockID := oidtest.ID() + tombID := oidtest.ID() + cnr := cidtest.ID() + var err error + + var objAddr address.Address + objAddr.SetContainerID(cnr) + + var tombAddr address.Address + tombAddr.SetContainerID(cnr) + tombAddr.SetObjectID(tombID) + + var lockerAddr address.Address + lockerAddr.SetContainerID(cnr) + lockerAddr.SetObjectID(lockerID) + + var tombForLockAddr address.Address + tombForLockAddr.SetContainerID(cnr) + tombForLockAddr.SetObjectID(tombForLockID) + + // 1. + obj := generateObjectWithCID(t, cnr) + + objAddr.SetObjectID(obj.ID()) + + err = Put(e, obj) + require.NoError(t, err) + + // 2. + err = e.Lock(*cnr, *lockerID, []oid.ID{*obj.ID()}) + require.NoError(t, err) + + // 3. + _, err = e.Inhume(new(InhumePrm).WithTarget(&tombAddr, &objAddr)) + require.ErrorAs(t, err, new(apistatus.ObjectLocked)) + + // 4. + var a object.Attribute + a.SetKey(objectV2.SysAttributeExpEpoch) + a.SetValue(strconv.Itoa(lockerTombExpiresAfter)) + + tombObj := generateObjectWithCID(t, cnr) + tombObj.SetType(object.TypeTombstone) + tombObj.SetID(tombForLockID) + tombObj.SetAttributes(&a) + + err = Put(e, tombObj) + require.NoError(t, err) + + _, err = e.Inhume(new(InhumePrm).WithTarget(&tombForLockAddr, &lockerAddr)) + require.NoError(t, err, new(apistatus.ObjectLocked)) + + // 5. + for i := range chEvents { + chEvents[i] <- shard.EventNewEpoch(lockerTombExpiresAfter + 1) + } + + // delay for GC + time.Sleep(time.Second) + + _, err = e.Inhume(new(InhumePrm).WithTarget(&tombAddr, &objAddr)) + require.NoError(t, err) +} + +func TestLockExpiration(t *testing.T) { + // Tested scenario: + // 1. some object is stored + // 2. lock object for it is stored, and the object is locked + // 3. lock expiration epoch is coming + // 4. after some delay the object is not locked anymore + chEvents := make([]chan shard.Event, 2) + + for i := range chEvents { + chEvents[i] = make(chan shard.Event, 1) + } + + e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option { + return []shard.Option{ + shard.WithGCEventChannelInitializer(func() <-chan shard.Event { + return chEvents[i] + }), + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + require.NoError(t, err) + + return pool + }), + } + }) + + t.Cleanup(func() { + _ = e.Close() + _ = os.RemoveAll(t.Name()) + }) + + const lockerExpiresAfter = 13 + + cnr := cidtest.ID() + var err error + + // 1. + obj := generateObjectWithCID(t, cnr) + + err = Put(e, obj) + require.NoError(t, err) + + // 2. + var a object.Attribute + a.SetKey(objectV2.SysAttributeExpEpoch) + a.SetValue(strconv.Itoa(lockerExpiresAfter)) + + lock := generateObjectWithCID(t, cnr) + lock.SetType(object.TypeLock) + lock.SetAttributes(&a) + + err = Put(e, lock) + require.NoError(t, err) + + err = e.Lock(*cnr, *lock.ID(), []oid.ID{*obj.ID()}) + require.NoError(t, err) + + _, err = e.Inhume(new(InhumePrm).WithTarget(objecttest.Address(), objectcore.AddressOf(obj))) + require.ErrorAs(t, err, new(apistatus.ObjectLocked)) + + // 3. + for i := range chEvents { + chEvents[i] <- shard.EventNewEpoch(lockerExpiresAfter + 1) + } + + // delay for GC processing. It can't be estimated, but making it bigger + // will slow down test + time.Sleep(time.Second) + + // 4. + _, err = e.Inhume(new(InhumePrm).WithTarget(objecttest.Address(), objectcore.AddressOf(obj))) + require.NoError(t, err) +} diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 86ff4d77c1..5494d23cc6 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -21,31 +21,36 @@ type hashedShard shardWrapper // Returns any error encountered that did not allow adding a shard. // Otherwise returns the ID of the added shard. func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { - e.mtx.Lock() - defer e.mtx.Unlock() - - pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) - if err != nil { - return nil, err - } - id, err := generateShardID() if err != nil { return nil, fmt.Errorf("could not generate shard ID: %w", err) } + return id, e.addShard(id, opts...) +} + +func (e *StorageEngine) addShard(id *shard.ID, opts ...shard.Option) error { + e.mtx.Lock() + defer e.mtx.Unlock() + + pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) + if err != nil { + return err + } + sh := shard.New(append(opts, shard.WithID(id), - shard.WithExpiredObjectsCallback(e.processExpiredTombstones), + shard.WithExpiredTombstonesCallback(e.processExpiredTombstones), + shard.WithExpiredLocksCallback(e.processExpiredLocks), )...) if err := sh.UpdateID(); err != nil { - return nil, fmt.Errorf("could not open shard: %w", err) + return fmt.Errorf("could not open shard: %w", err) } strID := sh.ID().String() if _, ok := e.shards[strID]; ok { - return nil, fmt.Errorf("shard with id %s was already added", strID) + return fmt.Errorf("shard with id %s was already added", strID) } e.shards[strID] = shardWrapper{ @@ -55,7 +60,7 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { e.shardPools[strID] = pool - return sh.ID(), nil + return nil } func generateShardID() (*shard.ID, error) { diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index b71eb7a113..e1ba45706a 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -7,6 +7,7 @@ import ( apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" + addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.etcd.io/bbolt" ) @@ -91,6 +92,22 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error { }) } +// FreeLockedBy unlocks all objects in DB which are locked by lockers. +func (db *DB) FreeLockedBy(lockers []*addressSDK.Address) error { + return db.boltDB.Update(func(tx *bbolt.Tx) error { + var err error + + for _, addr := range lockers { + err = freePotentialLocks(tx, *addr.ContainerID(), *addr.ObjectID()) + if err != nil { + return err + } + } + + return err + }) +} + // checks if specified object is locked in the specified container. func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool { bucketLocked := tx.Bucket(bucketNameLocked) diff --git a/pkg/local_object_storage/metabase/lock_test.go b/pkg/local_object_storage/metabase/lock_test.go index 8b972784a4..1412c3277f 100644 --- a/pkg/local_object_storage/metabase/lock_test.go +++ b/pkg/local_object_storage/metabase/lock_test.go @@ -3,10 +3,12 @@ package meta_test import ( "testing" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/nspcc-dev/neofs-sdk-go/object/address" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" @@ -48,4 +50,37 @@ func TestDB_Lock(t *testing.T) { } } }) + + t.Run("lock-unlock scenario", func(t *testing.T) { + cnr := cidtest.ID() + + obj := generateObjectWithCID(t, cnr) + + var err error + + err = putBig(db, obj) + require.NoError(t, err) + + tombID := *oidtest.ID() + + // lock the object + err = db.Lock(*cnr, tombID, []oid.ID{*obj.ID()}) + require.NoError(t, err) + + var tombAddr address.Address + tombAddr.SetContainerID(cnr) + tombAddr.SetObjectID(&tombID) + + // try to inhume locked object using tombstone + err = meta.Inhume(db, objectcore.AddressOf(obj), &tombAddr) + require.ErrorAs(t, err, new(apistatus.ObjectLocked)) + + // inhume the tombstone + _, err = db.Inhume(new(meta.InhumePrm).WithAddresses(&tombAddr).WithGCMark()) + require.NoError(t, err) + + // now we can inhume the object + err = meta.Inhume(db, objectcore.AddressOf(obj), &tombAddr) + require.NoError(t, err) + }) } diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 5d162caa8d..e6cfe4f35d 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -64,6 +64,7 @@ func (s *Shard) Init() error { handlers: []eventHandler{ s.collectExpiredObjects, s.collectExpiredTombstones, + s.collectExpiredLocks, }, }, }, diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 32586aa643..1e7dfce707 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -217,7 +217,9 @@ func (s *Shard) removeGarbage() { } func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { - expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, false) + expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool { + return typ != object.TypeTombstone && typ != object.TypeLock + }) if err != nil || len(expired) == 0 { if err != nil { s.log.Warn("iterator over expired objects failed", zap.String("error", err.Error())) @@ -240,10 +242,12 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { } func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { - expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, true) + expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool { + return typ == object.TypeTombstone + }) if err != nil || len(expired) == 0 { if err != nil { - s.log.Warn("iterator over expired tombstones failes", zap.String("error", err.Error())) + s.log.Warn("iterator over expired tombstones failed", zap.String("error", err.Error())) } return } @@ -251,7 +255,21 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { s.expiredTombstonesCallback(ctx, expired) } -func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, collectTombstones bool) ([]*addressSDK.Address, error) { +func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { + expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool { + return typ == object.TypeLock + }) + if err != nil || len(expired) == 0 { + if err != nil { + s.log.Warn("iterator over expired locks failed", zap.String("error", err.Error())) + } + return + } + + s.expiredLocksCallback(ctx, expired) +} + +func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond func(object.Type) bool) ([]*addressSDK.Address, error) { var expired []*addressSDK.Address err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error { @@ -259,7 +277,7 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, collectTomb case <-ctx.Done(): return meta.ErrInterruptIterator default: - if (expiredObject.Type() == object.TypeTombstone) == collectTombstones { + if typeCond(expiredObject.Type()) { expired = append(expired, expiredObject.Address()) } return nil @@ -333,3 +351,29 @@ func (s *Shard) HandleExpiredTombstones(tss map[string]*addressSDK.Address) { return } } + +// HandleExpiredLocks unlocks all objects which were locked by lockers. +// If successful, marks lockers themselves as garbage. +func (s *Shard) HandleExpiredLocks(lockers []*addressSDK.Address) { + err := s.metaBase.FreeLockedBy(lockers) + if err != nil { + s.log.Warn("failure to unlock objects", + zap.String("error", err.Error()), + ) + + return + } + + var pInhume meta.InhumePrm + pInhume.WithAddresses(lockers...) + pInhume.WithGCMark() + + _, err = s.metaBase.Inhume(&pInhume) + if err != nil { + s.log.Warn("failure to mark lockers as garbage", + zap.String("error", err.Error()), + ) + + return + } +} diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 5d888cc402..dfe26d381b 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -55,6 +55,8 @@ type cfg struct { gcCfg *gcCfg expiredTombstonesCallback ExpiredObjectsCallback + + expiredLocksCallback ExpiredObjectsCallback } func defaultCfg() *cfg { @@ -181,14 +183,22 @@ func WithGCRemoverSleepInterval(dur time.Duration) Option { } } -// WithExpiredObjectsCallback returns option to specify callback +// WithExpiredTombstonesCallback returns option to specify callback // of the expired tombstones handler. -func WithExpiredObjectsCallback(cb ExpiredObjectsCallback) Option { +func WithExpiredTombstonesCallback(cb ExpiredObjectsCallback) Option { return func(c *cfg) { c.expiredTombstonesCallback = cb } } +// WithExpiredLocksCallback returns option to specify callback +// of the expired LOCK objects handler. +func WithExpiredLocksCallback(cb ExpiredObjectsCallback) Option { + return func(c *cfg) { + c.expiredLocksCallback = cb + } +} + // WithRefillMetabase returns option to set flag to refill the Metabase on Shard's initialization step. func WithRefillMetabase(v bool) Option { return func(c *cfg) {