[#183] gc: Fix drop expired locked simple objects

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-03-28 11:17:15 +03:00
parent 341fe1688f
commit 9f0bce5c15
10 changed files with 306 additions and 49 deletions

View file

@ -216,9 +216,9 @@ func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []me
})
}
func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []oid.Address) {
func (e *StorageEngine) processExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredLocks(lockers)
sh.HandleExpiredLocks(ctx, epoch, lockers)
select {
case <-ctx.Done():

View file

@ -2,7 +2,6 @@ package meta
import (
"fmt"
"strconv"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -125,24 +124,6 @@ func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 {
return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt)
}
func isExpiredWithAttribute(tx *bbolt.Tx, attr string, addr oid.Address, currEpoch uint64) bool {
// bucket with objects that have expiration attr
attrKey := make([]byte, bucketKeySize+len(attr))
expirationBucket := tx.Bucket(attributeBucketName(addr.Container(), attr, attrKey))
if expirationBucket != nil {
// bucket that contains objects that expire in the current epoch
prevEpochBkt := expirationBucket.Bucket([]byte(strconv.FormatUint(currEpoch-1, 10)))
if prevEpochBkt != nil {
rawOID := objectKey(addr.Object(), make([]byte, objectKeySize))
if prevEpochBkt.Get(rawOID) != nil {
return true
}
}
}
return false
}
func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uint8 {
if graveyard == nil {
// incorrect metabase state, does not make

View file

@ -0,0 +1,145 @@
package meta
import (
"context"
"errors"
"fmt"
"strconv"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
// FilterExpired return expired items from addresses.
// Address considered expired if metabase does contain information about expiration and
// expiration epoch is less than epoch.
func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.Address) ([]oid.Address, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}
result := make([]oid.Address, 0, len(addresses))
containerIDToObjectIDs := make(map[cid.ID][]oid.ID)
for _, addr := range addresses {
containerIDToObjectIDs[addr.Container()] = append(containerIDToObjectIDs[addr.Container()], addr.Object())
}
err := db.boltDB.View(func(tx *bbolt.Tx) error {
for containerID, objectIDs := range containerIDToObjectIDs {
select {
case <-ctx.Done():
return ErrInterruptIterator
default:
}
expiredNeoFS, err := selectExpiredObjectIDs(tx, objectV2.SysAttributeExpEpochNeoFS, epoch, containerID, objectIDs)
if err != nil {
return err
}
expiredSys, err := selectExpiredObjectIDs(tx, objectV2.SysAttributeExpEpoch, epoch, containerID, objectIDs)
if err != nil {
return err
}
for _, o := range expiredNeoFS {
var a oid.Address
a.SetContainer(containerID)
a.SetObject(o)
result = append(result, a)
}
for _, o := range expiredSys {
var a oid.Address
a.SetContainer(containerID)
a.SetObject(o)
result = append(result, a)
}
}
return nil
})
if err != nil {
return nil, err
}
return result, nil
}
func isExpiredWithAttribute(tx *bbolt.Tx, attr string, addr oid.Address, currEpoch uint64) bool {
// bucket with objects that have expiration attr
attrKey := make([]byte, bucketKeySize+len(attr))
expirationBucket := tx.Bucket(attributeBucketName(addr.Container(), attr, attrKey))
if expirationBucket != nil {
// bucket that contains objects that expire in the current epoch
prevEpochBkt := expirationBucket.Bucket([]byte(strconv.FormatUint(currEpoch-1, 10)))
if prevEpochBkt != nil {
rawOID := objectKey(addr.Object(), make([]byte, objectKeySize))
if prevEpochBkt.Get(rawOID) != nil {
return true
}
}
}
return false
}
func selectExpiredObjectIDs(tx *bbolt.Tx, attr string, epoch uint64, containerID cid.ID, objectIDs []oid.ID) ([]oid.ID, error) {
result := make([]oid.ID, 0)
notResolved := make(map[oid.ID]struct{})
for _, oid := range objectIDs {
notResolved[oid] = struct{}{}
}
expiredBuffer := make([]oid.ID, 0)
objectKeyBuffer := make([]byte, objectKeySize)
expirationBucketKey := make([]byte, bucketKeySize+len(attr))
expirationBucket := tx.Bucket(attributeBucketName(containerID, attr, expirationBucketKey))
if expirationBucket == nil {
return result, nil // all not expired
}
err := expirationBucket.ForEach(func(epochExpBucketKey, _ []byte) error {
bucketExpiresAfter, err := strconv.ParseUint(string(epochExpBucketKey), 10, 64)
if err != nil {
return fmt.Errorf("could not parse expiration epoch: %w", err)
} else if bucketExpiresAfter >= epoch {
return nil
}
epochExpirationBucket := expirationBucket.Bucket(epochExpBucketKey)
if epochExpirationBucket == nil {
return nil
}
expiredBuffer = expiredBuffer[:0]
for oid := range notResolved {
key := objectKey(oid, objectKeyBuffer)
if epochExpirationBucket.Get(key) != nil {
expiredBuffer = append(expiredBuffer, oid)
}
}
for _, oid := range expiredBuffer {
delete(notResolved, oid)
result = append(result, oid)
}
if len(notResolved) == 0 {
return errBreakBucketForEach
}
return nil
})
if err != nil && !errors.Is(err, errBreakBucketForEach) {
return nil, err
}
return result, nil
}

View file

@ -0,0 +1,93 @@
package meta_test
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
func TestDB_SelectExpired(t *testing.T) {
db := newDB(t)
containerID1 := cidtest.ID()
expiredObj11 := testutil.GenerateObject()
expiredObj11.SetContainerID(containerID1)
setExpiration(expiredObj11, 10)
err := putBig(db, expiredObj11)
require.NoError(t, err)
expiredObj12 := testutil.GenerateObject()
expiredObj12.SetContainerID(containerID1)
setExpiration(expiredObj12, 12)
err = putBig(db, expiredObj12)
require.NoError(t, err)
notExpiredObj11 := testutil.GenerateObject()
notExpiredObj11.SetContainerID(containerID1)
setExpiration(notExpiredObj11, 20)
err = putBig(db, notExpiredObj11)
require.NoError(t, err)
regularObj11 := testutil.GenerateObject()
regularObj11.SetContainerID(containerID1)
err = putBig(db, regularObj11)
require.NoError(t, err)
containerID2 := cidtest.ID()
expiredObj21 := testutil.GenerateObject()
expiredObj21.SetContainerID(containerID2)
setExpiration(expiredObj21, 10)
err = putBig(db, expiredObj21)
require.NoError(t, err)
expiredObj22 := testutil.GenerateObject()
expiredObj22.SetContainerID(containerID2)
setExpiration(expiredObj22, 12)
err = putBig(db, expiredObj22)
require.NoError(t, err)
notExpiredObj21 := testutil.GenerateObject()
notExpiredObj21.SetContainerID(containerID2)
setExpiration(notExpiredObj21, 20)
err = putBig(db, notExpiredObj21)
require.NoError(t, err)
regularObj21 := testutil.GenerateObject()
regularObj21.SetContainerID(containerID2)
err = putBig(db, regularObj21)
require.NoError(t, err)
expired, err := db.FilterExpired(context.Background(), 15,
[]oid.Address{
getAddressSafe(t, expiredObj11), getAddressSafe(t, expiredObj12), getAddressSafe(t, notExpiredObj11), getAddressSafe(t, regularObj11),
getAddressSafe(t, expiredObj21), getAddressSafe(t, expiredObj22), getAddressSafe(t, notExpiredObj21), getAddressSafe(t, regularObj21),
})
require.NoError(t, err)
require.Equal(t, 4, len(expired), "invalid expired count")
require.Contains(t, expired, getAddressSafe(t, expiredObj11))
require.Contains(t, expired, getAddressSafe(t, expiredObj12))
require.Contains(t, expired, getAddressSafe(t, expiredObj21))
require.Contains(t, expired, getAddressSafe(t, expiredObj22))
}
func getAddressSafe(t *testing.T, o *object.Object) oid.Address {
cid, set := o.ContainerID()
if !set {
t.Fatalf("container id required")
}
id, set := o.ID()
if !set {
t.Fatalf("object id required")
}
var addr oid.Address
addr.SetContainer(cid)
addr.SetObject(id)
return addr
}

View file

@ -96,25 +96,27 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error {
}
// FreeLockedBy unlocks all objects in DB which are locked by lockers.
func (db *DB) FreeLockedBy(lockers []oid.Address) error {
// Returns slice of unlocked object ID's or an error.
func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
return nil, ErrDegradedMode
}
return db.boltDB.Update(func(tx *bbolt.Tx) error {
var err error
var unlockedObjects []oid.Address
return unlockedObjects, db.boltDB.Update(func(tx *bbolt.Tx) error {
for i := range lockers {
err = freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
if err != nil {
return err
}
unlockedObjects = append(unlockedObjects, unlocked...)
}
return err
return nil
})
}
@ -134,14 +136,16 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
}
// releases all records about the objects locked by the locker.
// Returns slice of unlocked object ID's or an error.
//
// Operation is very resource-intensive, which is caused by the admissibility
// of multiple locks. Also, if we knew what objects are locked, it would be
// possible to speed up the execution.
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error {
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) {
var unlockedObjects []oid.Address
bucketLocked := tx.Bucket(bucketNameLocked)
if bucketLocked == nil {
return nil
return unlockedObjects, nil
}
key := make([]byte, cidSize)
@ -149,11 +153,11 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error {
bucketLockedContainer := bucketLocked.Bucket(key)
if bucketLockedContainer == nil {
return nil
return unlockedObjects, nil
}
keyLocker := objectKey(locker, key)
return bucketLockedContainer.ForEach(func(k, v []byte) error {
return unlockedObjects, bucketLockedContainer.ForEach(func(k, v []byte) error {
keyLockers, err := decodeList(v)
if err != nil {
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
@ -167,6 +171,18 @@ func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error {
if err != nil {
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
}
var id oid.ID
err = id.Decode(k)
if err != nil {
return fmt.Errorf("decode unlocked object id error: %w", err)
}
var addr oid.Address
addr.SetContainer(idCnr)
addr.SetObject(id)
unlockedObjects = append(unlockedObjects, addr)
} else {
// exclude locker
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)

View file

@ -110,7 +110,7 @@ func TestDB_Lock(t *testing.T) {
require.Len(t, res.DeletedLockObjects(), 1)
require.Equal(t, objectcore.AddressOf(lockObj), res.DeletedLockObjects()[0])
err = db.FreeLockedBy([]oid.Address{lockAddr})
_, err = db.FreeLockedBy([]oid.Address{lockAddr})
require.NoError(t, err)
inhumePrm.SetAddresses(objAddr)
@ -141,7 +141,7 @@ func TestDB_Lock(t *testing.T) {
// unlock just objects that were locked by
// just removed locker
err = db.FreeLockedBy([]oid.Address{res.DeletedLockObjects()[0]})
_, err = db.FreeLockedBy([]oid.Address{res.DeletedLockObjects()[0]})
require.NoError(t, err)
// removing objects after unlock

View file

@ -143,7 +143,8 @@ func (s *Shard) Init() error {
eventNewEpoch: {
cancelFunc: func() {},
handlers: []eventHandler{
s.collectExpiredLocksAndObjects,
s.collectExpiredLocks,
s.collectExpiredObjects,
s.collectExpiredTombstones,
},
},

View file

@ -243,11 +243,6 @@ func (s *Shard) removeGarbage() {
}
}
func (s *Shard) collectExpiredLocksAndObjects(ctx context.Context, e Event) {
s.collectExpiredLocks(ctx, e)
s.collectExpiredObjects(ctx, e)
}
func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) {
workersCount = minExpiredWorkers
batchSize = minExpiredBatchSize
@ -424,7 +419,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
if len(batch) == batchSize {
expired := batch
errGroup.Go(func() error {
s.expiredLocksCallback(egCtx, expired)
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired)
return egCtx.Err()
})
batch = make([]oid.Address, 0, batchSize)
@ -438,7 +433,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
if len(batch) > 0 {
expired := batch
errGroup.Go(func() error {
s.expiredLocksCallback(egCtx, expired)
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired)
return egCtx.Err()
})
}
@ -474,6 +469,17 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFo
return ctx.Err()
}
func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid.Address) ([]oid.Address, error) {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
return s.metaBase.FilterExpired(ctx, epoch, addresses)
}
// HandleExpiredTombstones marks tombstones themselves as garbage
// and clears up corresponding graveyard records.
//
@ -523,12 +529,11 @@ func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) {
// HandleExpiredLocks unlocks all objects which were locked by lockers.
// If successful, marks lockers themselves as garbage.
func (s *Shard) HandleExpiredLocks(lockers []oid.Address) {
func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) {
if s.GetMode().NoMetabase() {
return
}
err := s.metaBase.FreeLockedBy(lockers)
unlocked, err := s.metaBase.FreeLockedBy(lockers)
if err != nil {
s.log.Warn("failure to unlock objects",
zap.String("error", err.Error()),
@ -539,7 +544,7 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) {
var pInhume meta.InhumePrm
pInhume.SetAddresses(lockers...)
pInhume.SetGCMark()
pInhume.SetForceGCMark()
res, err := s.metaBase.Inhume(pInhume)
if err != nil {
@ -558,6 +563,22 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) {
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
i++
}
s.inhumeUnlockedIfExpired(ctx, epoch, unlocked)
}
func (s *Shard) inhumeUnlockedIfExpired(ctx context.Context, epoch uint64, unlocked []oid.Address) {
expiredUnlocked, err := s.selectExpired(ctx, epoch, unlocked)
if err != nil {
s.log.Warn("failure to get expired unlocked objects", zap.Error(err))
return
}
if len(expiredUnlocked) == 0 {
return
}
s.handleExpiredObjects(ctx, expiredUnlocked)
}
// HandleDeletedLocks unlocks all objects which were locked by lockers.
@ -566,7 +587,7 @@ func (s *Shard) HandleDeletedLocks(lockers []oid.Address) {
return
}
err := s.metaBase.FreeLockedBy(lockers)
_, err := s.metaBase.FreeLockedBy(lockers)
if err != nil {
s.log.Warn("failure to unlock objects",
zap.String("error", err.Error()),

View file

@ -59,8 +59,8 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) {
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(addresses)
}),
shard.WithExpiredLocksCallback(func(_ context.Context, a []oid.Address) {
sh.HandleExpiredLocks(a)
shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a)
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)

View file

@ -40,7 +40,7 @@ type Option func(*cfg)
type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject)
// ExpiredObjectsCallback is a callback handling list of expired objects.
type ExpiredObjectsCallback func(context.Context, []oid.Address)
type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address)
// DeletedLockCallback is a callback handling list of deleted LOCK objects.
type DeletedLockCallback func(context.Context, []oid.Address)