diff --git a/pkg/local_object_storage/metabase/expired.go b/pkg/local_object_storage/metabase/expired.go index 52ee3f571..07b2e84e1 100644 --- a/pkg/local_object_storage/metabase/expired.go +++ b/pkg/local_object_storage/metabase/expired.go @@ -4,17 +4,13 @@ import ( "bytes" "context" "errors" - "fmt" "strconv" "time" - objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/cockroachdb/pebble" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -46,44 +42,10 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A } result := make([]oid.Address, 0, len(addresses)) - containerIDToObjectIDs := make(map[cid.ID][]oid.ID) - for _, addr := range addresses { - containerIDToObjectIDs[addr.Container()] = append(containerIDToObjectIDs[addr.Container()], addr.Object()) - } - - err := db.database.View(func(tx *bbolt.Tx) error { - for containerID, objectIDs := range containerIDToObjectIDs { - select { - case <-ctx.Done(): - return ErrInterruptIterator - default: - } - - expiredNeoFS, err := selectExpiredObjectIDs(tx, objectV2.SysAttributeExpEpochNeoFS, epoch, containerID, objectIDs) - if err != nil { - return err - } - - expiredSys, err := selectExpiredObjectIDs(tx, objectV2.SysAttributeExpEpoch, epoch, containerID, objectIDs) - if err != nil { - return err - } - - for _, o := range expiredNeoFS { - var a oid.Address - a.SetContainer(containerID) - a.SetObject(o) - result = append(result, a) - } - - for _, o := range expiredSys { - var a oid.Address - a.SetContainer(containerID) - a.SetObject(o) - result = append(result, a) - } - } - return nil + err := db.snapshot(func(s *pebble.Snapshot) error { + var e error + result, e = selectExpiredObjects(ctx, s, epoch, addresses) + return e }) if err != nil { return nil, metaerr.Wrap(err) @@ -92,84 +54,10 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A return result, nil } -func isExpiredWithAttribute(tx *bbolt.Tx, attr string, addr oid.Address, currEpoch uint64) bool { - // bucket with objects that have expiration attr - attrKey := make([]byte, bucketKeySize+len(attr)) - expirationBucket := tx.Bucket(attributeBucketName(addr.Container(), attr, attrKey)) - if expirationBucket != nil { - // bucket that contains objects that expire in the current epoch - prevEpochBkt := expirationBucket.Bucket([]byte(strconv.FormatUint(currEpoch-1, 10))) - if prevEpochBkt != nil { - rawOID := objectKey(addr.Object(), make([]byte, objectKeySize)) - if prevEpochBkt.Get(rawOID) != nil { - return true - } - } - } - - return false -} - -func selectExpiredObjectIDs(tx *bbolt.Tx, attr string, epoch uint64, containerID cid.ID, objectIDs []oid.ID) ([]oid.ID, error) { - result := make([]oid.ID, 0) - notResolved := make(map[oid.ID]struct{}) - for _, oid := range objectIDs { - notResolved[oid] = struct{}{} - } - - expiredBuffer := make([]oid.ID, 0) - objectKeyBuffer := make([]byte, objectKeySize) - - expirationBucketKey := make([]byte, bucketKeySize+len(attr)) - expirationBucket := tx.Bucket(attributeBucketName(containerID, attr, expirationBucketKey)) - if expirationBucket == nil { - return result, nil // all not expired - } - - err := expirationBucket.ForEach(func(epochExpBucketKey, _ []byte) error { - bucketExpiresAfter, err := strconv.ParseUint(string(epochExpBucketKey), 10, 64) - if err != nil { - return fmt.Errorf("could not parse expiration epoch: %w", err) - } else if bucketExpiresAfter >= epoch { - return nil - } - - epochExpirationBucket := expirationBucket.Bucket(epochExpBucketKey) - if epochExpirationBucket == nil { - return nil - } - - expiredBuffer = expiredBuffer[:0] - for oid := range notResolved { - key := objectKey(oid, objectKeyBuffer) - if epochExpirationBucket.Get(key) != nil { - expiredBuffer = append(expiredBuffer, oid) - } - } - - for _, oid := range expiredBuffer { - delete(notResolved, oid) - result = append(result, oid) - } - - if len(notResolved) == 0 { - return errBreakBucketForEach - } - - return nil - }) - - if err != nil && !errors.Is(err, errBreakBucketForEach) { - return nil, err - } - - return result, nil -} - func isExpired(ctx context.Context, r pebble.Reader, addr oid.Address, currEpoch uint64) (bool, error) { prefix := []byte{expiredPrefix} it, err := r.NewIter(&pebble.IterOptions{ - LowerBound: []byte{expiredPrefix}, + LowerBound: prefix, OnlyReadGuaranteedDurable: true, }) if err != nil { @@ -204,3 +92,144 @@ func isExpired(ctx context.Context, r pebble.Reader, addr oid.Address, currEpoch } return false, it.Close() } + +func selectExpiredObjects(ctx context.Context, r pebble.Reader, epoch uint64, objects []oid.Address) ([]oid.Address, error) { + result := make([]oid.Address, 0) + objMap := make(map[oid.Address]struct{}) + for _, obj := range objects { + objMap[obj] = struct{}{} + } + + prefix := []byte{expiredPrefix} + it, err := r.NewIter(&pebble.IterOptions{ + LowerBound: prefix, + OnlyReadGuaranteedDurable: true, + }) + if err != nil { + return nil, err + } + + // iteration does in ascending order by expiration epoch. + // gc does expired objects collect every epoch, so here should be not so much items. + for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() { + select { + case <-ctx.Done(): + return nil, errors.Join(ctx.Err(), it.Close()) + default: + } + + expEpoch, err := expirationEpochFromExpiredKey(it.Key()) + if err != nil { + return nil, errors.Join(err, it.Close()) + } + + if expEpoch >= epoch { + return result, it.Close() // keys are ordered by epoch, so next items will be discarded anyway. + } + + addr, err := addressFromExpiredKey(it.Key()) + if err != nil { + return nil, errors.Join(err, it.Close()) + } + if _, ok := objMap[addr]; ok { + result = append(result, addr) + } + } + return result, it.Close() +} + +// IterateExpired iterates over all objects in DB which are out of date +// relative to epoch. Locked objects are not included (do not confuse +// with objects of type LOCK). +// +// If h returns ErrInterruptIterator, nil returns immediately. +// Returns other errors of h directly. +func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectHandler) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("IterateExpired", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateExpired", + trace.WithAttributes( + attribute.String("epoch", strconv.FormatUint(epoch, 10)), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + err := metaerr.Wrap(db.snapshot(func(s *pebble.Snapshot) error { + return iterateExpired(ctx, s, epoch, h) + })) + success = err == nil + return err +} + +func iterateExpired(ctx context.Context, r pebble.Reader, epoch uint64, h ExpiredObjectHandler) error { + prefix := []byte{expiredPrefix} + it, err := r.NewIter(&pebble.IterOptions{ + LowerBound: prefix, + OnlyReadGuaranteedDurable: true, + }) + if err != nil { + return err + } + + // iteration does in ascending order by expiration epoch. + for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() { + select { + case <-ctx.Done(): + return errors.Join(ctx.Err(), it.Close()) + default: + } + + expEpoch, err := expirationEpochFromExpiredKey(it.Key()) + if err != nil { + return errors.Join(err, it.Close()) + } + + if expEpoch >= epoch { + return it.Close() // keys are ordered by epoch, so next items will be discarded anyway. + } + + addr, err := addressFromExpiredKey(it.Key()) + if err != nil { + return errors.Join(err, it.Close()) + } + + // Ignore locked objects. + // + // To slightly optimize performance we can check only REGULAR objects + // (only they can be locked), but it's more reliable. + isLocked, err := objectLocked(ctx, r, addr.Container(), addr.Object()) + if err != nil { + return errors.Join(err, it.Close()) + } + if isLocked { + continue + } + + objType, err := firstIrregularObjectType(r, addr.Container(), addr.Object()) + if err != nil { + return errors.Join(err, it.Close()) + } + + if err := h(&ExpiredObject{ + typ: objType, + addr: addr, + }); err != nil { + if errors.Is(err, ErrInterruptIterator) { + return it.Close() + } + return errors.Join(err, it.Close()) + } + } + return it.Close() +}