From 7bf20c9f1f06975a53184c0803dbc7bc5ca4eaf5 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 20 Aug 2024 11:59:42 +0300 Subject: [PATCH] [#1323] metabase: Add expiration epoch buckets Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/metabase/control.go | 1 + pkg/local_object_storage/metabase/delete.go | 11 ++ .../metabase/delete_ec_test.go | 3 +- pkg/local_object_storage/metabase/exists.go | 26 ++-- pkg/local_object_storage/metabase/expired.go | 118 +++++------------- pkg/local_object_storage/metabase/get.go | 6 +- .../metabase/iterators.go | 81 ++++-------- pkg/local_object_storage/metabase/put.go | 41 ++++++ pkg/local_object_storage/metabase/select.go | 7 +- pkg/local_object_storage/metabase/util.go | 53 ++++++-- pkg/local_object_storage/shard/gc_test.go | 4 +- 11 files changed, 181 insertions(+), 170 deletions(-) diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index 891a1e9b2..d6546d922 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -29,6 +29,7 @@ var ( string(garbageBucketName): {}, string(shardInfoBucket): {}, string(bucketNameLocked): {}, + string(expEpochToObjectBucketName): {}, } // deprecatedBuckets buckets that are not used anymore. diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index ae10564a8..683bd445f 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -478,6 +478,17 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error key: objKey, }) + if expEpoch, ok := hasExpirationEpoch(obj); ok { + delUniqueIndexItem(tx, namedBucketItem{ + name: expEpochToObjectBucketName, + key: expirationEpochKey(expEpoch, cnr, addr.Object()), + }) + delUniqueIndexItem(tx, namedBucketItem{ + name: objectToExpirationEpochBucketName(cnr, make([]byte, bucketKeySize)), + key: objKey, + }) + } + return nil } diff --git a/pkg/local_object_storage/metabase/delete_ec_test.go b/pkg/local_object_storage/metabase/delete_ec_test.go index 0e627f095..66c79ecd7 100644 --- a/pkg/local_object_storage/metabase/delete_ec_test.go +++ b/pkg/local_object_storage/metabase/delete_ec_test.go @@ -418,7 +418,8 @@ func testVerifyNoObjectDataLeft(tx *bbolt.Tx) error { return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { if bytes.Equal(name, shardInfoBucket) || bytes.Equal(name, containerCounterBucketName) || - bytes.Equal(name, containerVolumeBucketName) { + bytes.Equal(name, containerVolumeBucketName) || + bytes.Equal(name, expEpochToObjectBucketName) { return nil } return testBucketEmpty(name, b) diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index 153d92110..2e1b1dce8 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" @@ -96,7 +95,11 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpo locked = objectLocked(tx, parent.Container(), parent.Object()) } // check graveyard and object expiration first - switch objectStatus(tx, addr, currEpoch) { + st, err := objectStatus(tx, addr, currEpoch) + if err != nil { + return false, false, err + } + switch st { case 1: return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound)) case 2: @@ -138,30 +141,25 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpo // - 1 if object with GC mark; // - 2 if object is covered with tombstone; // - 3 if object is expired. -func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 { +func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, error) { // locked object could not be removed/marked with GC/expired if objectLocked(tx, addr.Container(), addr.Object()) { - return 0 + return 0, nil } - // we check only if the object is expired in the current - // epoch since it is considered the only corner case: the - // GC is expected to collect all the objects that have - // expired previously for less than the one epoch duration - - expired := isExpiredWithAttribute(tx, objectV2.SysAttributeExpEpoch, addr, currEpoch) - if !expired { - expired = isExpiredWithAttribute(tx, objectV2.SysAttributeExpEpochNeoFS, addr, currEpoch) + expired, err := isExpired(tx, addr, currEpoch) + if err != nil { + return 0, err } if expired { - return 3 + return 3, nil } graveyardBkt := tx.Bucket(graveyardBucketName) garbageBkt := tx.Bucket(garbageBucketName) addrKey := addressKey(addr, make([]byte, addressKeySize)) - return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt) + return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt), nil } func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uint8 { diff --git a/pkg/local_object_storage/metabase/expired.go b/pkg/local_object_storage/metabase/expired.go index aa2cb6f20..68144d8b1 100644 --- a/pkg/local_object_storage/metabase/expired.go +++ b/pkg/local_object_storage/metabase/expired.go @@ -2,12 +2,11 @@ package meta import ( "context" + "encoding/binary" "errors" - "fmt" "strconv" "time" - objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" @@ -17,6 +16,8 @@ import ( "go.opentelemetry.io/otel/trace" ) +var errInvalidEpochValueLength = errors.New("could not parse expiration epoch: invalid data length") + // FilterExpired return expired items from addresses. // Address considered expired if metabase does contain information about expiration and // expiration epoch is less than epoch. @@ -57,29 +58,11 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A default: } - expiredNeoFS, err := selectExpiredObjectIDs(tx, objectV2.SysAttributeExpEpochNeoFS, epoch, containerID, objectIDs) + expired, err := selectExpiredObjects(tx, epoch, containerID, objectIDs) if err != nil { return err } - - expiredSys, err := selectExpiredObjectIDs(tx, objectV2.SysAttributeExpEpoch, epoch, containerID, objectIDs) - if err != nil { - return err - } - - for _, o := range expiredNeoFS { - var a oid.Address - a.SetContainer(containerID) - a.SetObject(o) - result = append(result, a) - } - - for _, o := range expiredSys { - var a oid.Address - a.SetContainer(containerID) - a.SetObject(o) - result = append(result, a) - } + result = append(result, expired...) } return nil }) @@ -90,76 +73,39 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A return result, nil } -func isExpiredWithAttribute(tx *bbolt.Tx, attr string, addr oid.Address, currEpoch uint64) bool { - // bucket with objects that have expiration attr - attrKey := make([]byte, bucketKeySize+len(attr)) - expirationBucket := tx.Bucket(attributeBucketName(addr.Container(), attr, attrKey)) - if expirationBucket != nil { - // bucket that contains objects that expire in the current epoch - prevEpochBkt := expirationBucket.Bucket([]byte(strconv.FormatUint(currEpoch-1, 10))) - if prevEpochBkt != nil { - rawOID := objectKey(addr.Object(), make([]byte, objectKeySize)) - if prevEpochBkt.Get(rawOID) != nil { - return true - } - } +func isExpired(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (bool, error) { + bucketName := make([]byte, bucketKeySize) + bucketName = objectToExpirationEpochBucketName(addr.Container(), bucketName) + b := tx.Bucket(bucketName) + if b == nil { + return false, nil } - - return false + key := make([]byte, objectKeySize) + addr.Object().Encode(key) + val := b.Get(key) + if len(val) == 0 { + return false, nil + } + if len(val) != epochSize { + return false, errInvalidEpochValueLength + } + expEpoch := binary.LittleEndian.Uint64(val) + return expEpoch < currEpoch, nil } -func selectExpiredObjectIDs(tx *bbolt.Tx, attr string, epoch uint64, containerID cid.ID, objectIDs []oid.ID) ([]oid.ID, error) { - result := make([]oid.ID, 0) - notResolved := make(map[oid.ID]struct{}) - for _, oid := range objectIDs { - notResolved[oid] = struct{}{} - } - - expiredBuffer := make([]oid.ID, 0) - objectKeyBuffer := make([]byte, objectKeySize) - - expirationBucketKey := make([]byte, bucketKeySize+len(attr)) - expirationBucket := tx.Bucket(attributeBucketName(containerID, attr, expirationBucketKey)) - if expirationBucket == nil { - return result, nil // all not expired - } - - err := expirationBucket.ForEach(func(epochExpBucketKey, _ []byte) error { - bucketExpiresAfter, err := strconv.ParseUint(string(epochExpBucketKey), 10, 64) +func selectExpiredObjects(tx *bbolt.Tx, epoch uint64, containerID cid.ID, objectIDs []oid.ID) ([]oid.Address, error) { + result := make([]oid.Address, 0) + var addr oid.Address + addr.SetContainer(containerID) + for _, objID := range objectIDs { + addr.SetObject(objID) + expired, err := isExpired(tx, addr, epoch) if err != nil { - return fmt.Errorf("could not parse expiration epoch: %w", err) - } else if bucketExpiresAfter >= epoch { - return nil + return nil, err } - - epochExpirationBucket := expirationBucket.Bucket(epochExpBucketKey) - if epochExpirationBucket == nil { - return nil + if expired { + result = append(result, addr) } - - expiredBuffer = expiredBuffer[:0] - for oid := range notResolved { - key := objectKey(oid, objectKeyBuffer) - if epochExpirationBucket.Get(key) != nil { - expiredBuffer = append(expiredBuffer, oid) - } - } - - for _, oid := range expiredBuffer { - delete(notResolved, oid) - result = append(result, oid) - } - - if len(notResolved) == 0 { - return errBreakBucketForEach - } - - return nil - }) - - if err != nil && !errors.Is(err, errBreakBucketForEach) { - return nil, err } - return result, nil } diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index d979b4f0f..776f5d27c 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -90,7 +90,11 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) { func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) { if checkStatus { - switch objectStatus(tx, addr, currEpoch) { + st, err := objectStatus(tx, addr, currEpoch) + if err != nil { + return nil, err + } + switch st { case 1: return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) case 2: diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go index 7b60b7d50..d44c51fb2 100644 --- a/pkg/local_object_storage/metabase/iterators.go +++ b/pkg/local_object_storage/metabase/iterators.go @@ -7,7 +7,6 @@ import ( "strconv" "time" - objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" @@ -79,63 +78,37 @@ func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectH } func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error { - err := tx.ForEach(func(name []byte, b *bbolt.Bucket) error { - cidBytes := cidFromAttributeBucket(name, objectV2.SysAttributeExpEpoch) - if cidBytes == nil { - cidBytes = cidFromAttributeBucket(name, objectV2.SysAttributeExpEpochNeoFS) - if cidBytes == nil { - return nil - } - } - - var cnrID cid.ID - err := cnrID.Decode(cidBytes) + b := tx.Bucket(expEpochToObjectBucketName) + c := b.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + expiresAfter, cnr, obj, err := parseExpirationEpochKey(k) if err != nil { - return fmt.Errorf("could not parse container ID of expired bucket: %w", err) + return err } - - return b.ForEachBucket(func(expKey []byte) error { - bktExpired := b.Bucket(expKey) - expiresAfter, err := strconv.ParseUint(string(expKey), 10, 64) - if err != nil { - return fmt.Errorf("could not parse expiration epoch: %w", err) - } else if expiresAfter >= epoch { - return nil - } - - return bktExpired.ForEach(func(idKey, _ []byte) error { - var id oid.ID - - err = id.Decode(idKey) - if err != nil { - return fmt.Errorf("could not parse ID of expired object: %w", err) - } - - // Ignore locked objects. - // - // To slightly optimize performance we can check only REGULAR objects - // (only they can be locked), but it's more reliable. - if objectLocked(tx, cnrID, id) { - return nil - } - - var addr oid.Address - addr.SetContainer(cnrID) - addr.SetObject(id) - - return h(&ExpiredObject{ - typ: firstIrregularObjectType(tx, cnrID, idKey), - addr: addr, - }) - }) + // bucket keys ordered by epoch, no need to continue lookup + if expiresAfter >= epoch { + return nil + } + if objectLocked(tx, cnr, obj) { + continue + } + var addr oid.Address + addr.SetContainer(cnr) + addr.SetObject(obj) + objKey := objectKey(addr.Object(), make([]byte, objectKeySize)) + err = h(&ExpiredObject{ + typ: firstIrregularObjectType(tx, cnr, objKey), + addr: addr, }) - }) - - if errors.Is(err, ErrInterruptIterator) { - err = nil + if err == nil { + continue + } + if errors.Is(err, ErrInterruptIterator) { + return nil + } + return err } - - return err + return nil } // IterateCoveredByTombstones iterates over all objects in DB which are covered diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 3fa79f1e2..d1706a7ab 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -6,8 +6,10 @@ import ( "errors" "fmt" gio "io" + "strconv" "time" + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" @@ -242,6 +244,27 @@ func putUniqueIndexes( } } + if expEpoch, ok := hasExpirationEpoch(obj); ok { + err := putUniqueIndexItem(tx, namedBucketItem{ + name: expEpochToObjectBucketName, + key: expirationEpochKey(expEpoch, cnr, addr.Object()), + val: zeroValue, + }) + if err != nil { + return err + } + val := make([]byte, epochSize) + binary.LittleEndian.PutUint64(val, expEpoch) + err = putUniqueIndexItem(tx, namedBucketItem{ + name: objectToExpirationEpochBucketName(cnr, make([]byte, bucketKeySize)), + key: objKey, + val: val, + }) + if err != nil { + return err + } + } + // index root object if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() { if ecHead := obj.ECHeader(); ecHead != nil { @@ -361,6 +384,24 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun return nil } +func hasExpirationEpoch(obj *objectSDK.Object) (uint64, bool) { + attributes := obj.Attributes() + if ech := obj.ECHeader(); ech != nil { + attributes = ech.ParentAttributes() + } + for _, attr := range attributes { + if attr.Key() == objectV2.SysAttributeExpEpochNeoFS { + expEpoch, err := strconv.ParseUint(attr.Value(), 10, 64) + return expEpoch, err == nil + } + if attr.Key() == objectV2.SysAttributeExpEpoch { + expEpoch, err := strconv.ParseUint(attr.Value(), 10, 64) + return expEpoch, err == nil + } + } + return 0, false +} + func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error { id, _ := obj.ID() cnr, _ := obj.ContainerID() diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index 720b7b5b9..42737a41a 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -142,8 +142,11 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters var addr oid.Address addr.SetContainer(cnr) addr.SetObject(id) - - if objectStatus(tx, addr, currEpoch) > 0 { + st, err := objectStatus(tx, addr, currEpoch) + if err != nil { + return nil, err + } + if st > 0 { continue // ignore removed objects } diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index ebf1713d0..012c0dcc8 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -1,9 +1,10 @@ package meta import ( - "bytes" "crypto/sha256" + "encoding/binary" "errors" + "fmt" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -23,6 +24,7 @@ var ( toMoveItBucketName = []byte{toMoveItPrefix} containerVolumeBucketName = []byte{containerVolumePrefix} containerCounterBucketName = []byte{containerCountersPrefix} + expEpochToObjectBucketName = []byte{expirationEpochToObjectPrefix} zeroValue = []byte{0xFF} @@ -124,6 +126,16 @@ const ( // Key: container ID + type // Value: Object id ecInfoPrefix + + // expirationEpochToObjectPrefix is used for storing relation between expiration epoch and object id. + // Key: expiration epoch + object address + // Value: zero + expirationEpochToObjectPrefix + + // objectToExpirationEpochPrefix is used for storing relation between expiration epoch and object id. + // Key: object address + // Value: expiration epoch + objectToExpirationEpochPrefix ) const ( @@ -131,6 +143,7 @@ const ( bucketKeySize = 1 + cidSize objectKeySize = sha256.Size addressKeySize = cidSize + objectKeySize + epochSize = 8 ) func bucketName(cnr cid.ID, prefix byte, key []byte) []byte { @@ -161,15 +174,6 @@ func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte { return append(key[:bucketKeySize], attributeKey...) } -// returns from attributeBucketName result, nil otherwise. -func cidFromAttributeBucket(val []byte, attributeKey string) []byte { - if len(val) < bucketKeySize || val[0] != userAttributePrefix || !bytes.Equal(val[bucketKeySize:], []byte(attributeKey)) { - return nil - } - - return val[1:bucketKeySize] -} - // rootBucketName returns _root. func rootBucketName(cnr cid.ID, key []byte) []byte { return bucketName(cnr, rootPrefix, key) @@ -190,6 +194,35 @@ func ecInfoBucketName(cnr cid.ID, key []byte) []byte { return bucketName(cnr, ecInfoPrefix, key) } +// objectToExpirationEpochBucketName returns objectToExpirationEpochPrefix_. +func objectToExpirationEpochBucketName(cnr cid.ID, key []byte) []byte { + return bucketName(cnr, objectToExpirationEpochPrefix, key) +} + +func expirationEpochKey(epoch uint64, cnr cid.ID, obj oid.ID) []byte { + result := make([]byte, epochSize+addressKeySize) + binary.BigEndian.PutUint64(result, epoch) + cnr.Encode(result[epochSize:]) + obj.Encode(result[epochSize+cidSize:]) + return result +} + +func parseExpirationEpochKey(key []byte) (uint64, cid.ID, oid.ID, error) { + if len(key) != epochSize+addressKeySize { + return 0, cid.ID{}, oid.ID{}, fmt.Errorf("unexpected expiration epoch to object key length: %d", len(key)) + } + epoch := binary.BigEndian.Uint64(key) + var cnr cid.ID + if err := cnr.Decode(key[epochSize : epochSize+cidSize]); err != nil { + return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (container ID): %w", err) + } + var obj oid.ID + if err := obj.Decode(key[epochSize+cidSize:]); err != nil { + return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (object ID): %w", err) + } + return epoch, cnr, obj, nil +} + // addressKey returns key for K-V tables when key is a whole address. func addressKey(addr oid.Address, key []byte) []byte { addr.Container().Encode(key) diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index a438b5def..90958cd35 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -74,7 +74,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) { var getPrm GetPrm getPrm.SetAddress(objectCore.AddressOf(obj)) _, err = sh.Get(context.Background(), getPrm) - require.True(t, client.IsErrObjectNotFound(err), "expired object must be deleted") + require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired object must be deleted") } func Test_GCDropsLockedExpiredComplexObject(t *testing.T) { @@ -168,7 +168,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) { sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value)) _, err = sh.Get(context.Background(), getPrm) - require.True(t, client.IsErrObjectNotFound(err), "expired complex object must be deleted on epoch after lock expires") + require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires") } func TestGCDropsObjectInhumedFromWritecache(t *testing.T) {