package meta import ( "context" "errors" "fmt" "strconv" "time" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) // ExpiredObject is a descriptor of expired object from DB. type ExpiredObject struct { typ objectSDK.Type addr oid.Address } // Type returns type of the expired object. func (e *ExpiredObject) Type() objectSDK.Type { return e.typ } // Address returns address of the expired object. func (e *ExpiredObject) Address() oid.Address { return e.addr } // ExpiredObjectHandler is an ExpiredObject handling function. type ExpiredObjectHandler func(*ExpiredObject) error // ErrInterruptIterator is returned by iteration handlers // as a "break" keyword. var ErrInterruptIterator = logicerr.New("iterator is interrupted") // IterateExpired iterates over all objects in DB which are out of date // relative to epoch. Locked objects are not included (do not confuse // with objects of type LOCK). // // If h returns ErrInterruptIterator, nil returns immediately. // Returns other errors of h directly. func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectHandler) error { var ( startedAt = time.Now() success = false ) defer func() { db.metrics.AddMethodDuration("IterateExpired", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateExpired", trace.WithAttributes( attribute.String("epoch", strconv.FormatUint(epoch, 10)), )) defer span.End() db.modeMtx.RLock() defer db.modeMtx.RUnlock() if db.mode.NoMetabase() { return ErrDegradedMode } err := metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { return db.iterateExpired(tx, epoch, h) })) success = err == nil return err } func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error { err := tx.ForEach(func(name []byte, b *bbolt.Bucket) error { cidBytes := cidFromAttributeBucket(name, objectV2.SysAttributeExpEpoch) if cidBytes == nil { cidBytes = cidFromAttributeBucket(name, objectV2.SysAttributeExpEpochNeoFS) if cidBytes == nil { return nil } } var cnrID cid.ID err := cnrID.Decode(cidBytes) if err != nil { return fmt.Errorf("could not parse container ID of expired bucket: %w", err) } return b.ForEachBucket(func(expKey []byte) error { bktExpired := b.Bucket(expKey) expiresAfter, err := strconv.ParseUint(string(expKey), 10, 64) if err != nil { return fmt.Errorf("could not parse expiration epoch: %w", err) } else if expiresAfter >= epoch { return nil } return bktExpired.ForEach(func(idKey, _ []byte) error { var id oid.ID err = id.Decode(idKey) if err != nil { return fmt.Errorf("could not parse ID of expired object: %w", err) } // Ignore locked objects. // // To slightly optimize performance we can check only REGULAR objects // (only they can be locked), but it's more reliable. if objectLocked(tx, cnrID, id) { return nil } var addr oid.Address addr.SetContainer(cnrID) addr.SetObject(id) return h(&ExpiredObject{ typ: firstIrregularObjectType(tx, cnrID, idKey), addr: addr, }) }) }) }) if errors.Is(err, ErrInterruptIterator) { err = nil } return err } // IterateCoveredByTombstones iterates over all objects in DB which are covered // by tombstone with string address from tss. Locked objects are not included // (do not confuse with objects of type LOCK). // // If h returns ErrInterruptIterator, nil returns immediately. // Returns other errors of h directly. // // Does not modify tss. func (db *DB) IterateCoveredByTombstones(ctx context.Context, tss map[string]oid.Address, h func(oid.Address) error) error { var ( startedAt = time.Now() success = false ) defer func() { db.metrics.AddMethodDuration("IterateCoveredByTombstones", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateCoveredByTombstones") defer span.End() db.modeMtx.RLock() defer db.modeMtx.RUnlock() if db.mode.NoMetabase() { return ErrDegradedMode } return db.boltDB.View(func(tx *bbolt.Tx) error { return db.iterateCoveredByTombstones(tx, tss, h) }) } func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]oid.Address, h func(oid.Address) error) error { bktGraveyard := tx.Bucket(graveyardBucketName) err := bktGraveyard.ForEach(func(k, v []byte) error { var addr oid.Address if err := decodeAddressFromKey(&addr, v); err != nil { return err } if _, ok := tss[addr.EncodeToString()]; ok { var addr oid.Address err := decodeAddressFromKey(&addr, k) if err != nil { return fmt.Errorf("could not parse address of the object under tombstone: %w", err) } if objectLocked(tx, addr.Container(), addr.Object()) { return nil } return h(addr) } return nil }) if errors.Is(err, ErrInterruptIterator) { err = nil } return err } func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) error) error { var cid cid.ID var oid oid.ID obj := objectSDK.New() return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { b58CID, postfix := parseContainerIDWithPrefix(&cid, name) if len(b58CID) == 0 { return nil } switch postfix { case primaryPrefix, lockersPrefix, tombstonePrefix: default: return nil } return b.ForEach(func(k, v []byte) error { if oid.Decode(k) == nil && obj.Unmarshal(v) == nil { return f(cid, oid, obj) } return nil }) }) }