frostfs-node/pkg/local_object_storage/metabase/iterators.go

233 lines
5.9 KiB
Go
Raw Normal View History

package meta
import (
"context"
"errors"
"fmt"
"strconv"
"time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// ExpiredObject is a descriptor of expired object from DB.
type ExpiredObject struct {
typ objectSDK.Type
addr oid.Address
}
// Type returns type of the expired object.
func (e *ExpiredObject) Type() objectSDK.Type {
return e.typ
}
// Address returns address of the expired object.
func (e *ExpiredObject) Address() oid.Address {
return e.addr
}
// ExpiredObjectHandler is an ExpiredObject handling function.
type ExpiredObjectHandler func(*ExpiredObject) error
// ErrInterruptIterator is returned by iteration handlers
// as a "break" keyword.
var ErrInterruptIterator = logicerr.New("iterator is interrupted")
// IterateExpired iterates over all objects in DB which are out of date
// relative to epoch. Locked objects are not included (do not confuse
// with objects of type LOCK).
//
// If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly.
func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectHandler) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("IterateExpired", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateExpired",
trace.WithAttributes(
attribute.String("epoch", strconv.FormatUint(epoch, 10)),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
err := metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateExpired(tx, epoch, h)
}))
success = err == nil
return err
}
func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error {
err := tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
cidBytes := cidFromAttributeBucket(name, objectV2.SysAttributeExpEpoch)
if cidBytes == nil {
cidBytes = cidFromAttributeBucket(name, objectV2.SysAttributeExpEpochNeoFS)
if cidBytes == nil {
return nil
}
}
var cnrID cid.ID
err := cnrID.Decode(cidBytes)
if err != nil {
return fmt.Errorf("could not parse container ID of expired bucket: %w", err)
}
return b.ForEachBucket(func(expKey []byte) error {
bktExpired := b.Bucket(expKey)
expiresAfter, err := strconv.ParseUint(string(expKey), 10, 64)
if err != nil {
return fmt.Errorf("could not parse expiration epoch: %w", err)
} else if expiresAfter >= epoch {
return nil
}
return bktExpired.ForEach(func(idKey, _ []byte) error {
var id oid.ID
err = id.Decode(idKey)
if err != nil {
return fmt.Errorf("could not parse ID of expired object: %w", err)
}
// Ignore locked objects.
//
// To slightly optimize performance we can check only REGULAR objects
// (only they can be locked), but it's more reliable.
if objectLocked(tx, cnrID, id) {
return nil
}
var addr oid.Address
addr.SetContainer(cnrID)
addr.SetObject(id)
return h(&ExpiredObject{
typ: firstIrregularObjectType(tx, cnrID, idKey),
addr: addr,
})
})
})
})
if errors.Is(err, ErrInterruptIterator) {
err = nil
}
return err
}
// IterateCoveredByTombstones iterates over all objects in DB which are covered
// by tombstone with string address from tss. Locked objects are not included
// (do not confuse with objects of type LOCK).
//
// If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly.
//
// Does not modify tss.
func (db *DB) IterateCoveredByTombstones(ctx context.Context, tss map[string]oid.Address, h func(oid.Address) error) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("IterateCoveredByTombstones", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateCoveredByTombstones")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
return db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateCoveredByTombstones(tx, tss, h)
})
}
func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]oid.Address, h func(oid.Address) error) error {
bktGraveyard := tx.Bucket(graveyardBucketName)
err := bktGraveyard.ForEach(func(k, v []byte) error {
var addr oid.Address
if err := decodeAddressFromKey(&addr, v); err != nil {
return err
}
if _, ok := tss[addr.EncodeToString()]; ok {
var addr oid.Address
err := decodeAddressFromKey(&addr, k)
if err != nil {
return fmt.Errorf("could not parse address of the object under tombstone: %w", err)
}
if objectLocked(tx, addr.Container(), addr.Object()) {
return nil
}
return h(addr)
}
return nil
})
if errors.Is(err, ErrInterruptIterator) {
err = nil
}
return err
}
func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
var cid cid.ID
var oid oid.ID
obj := objectSDK.New()
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
b58CID, postfix := parseContainerIDWithPrefix(&cid, name)
if len(b58CID) == 0 {
return nil
}
switch postfix {
case primaryPrefix,
lockersPrefix,
tombstonePrefix:
default:
return nil
}
return b.ForEach(func(k, v []byte) error {
if oid.Decode(k) == nil && obj.Unmarshal(v) == nil {
return f(cid, oid, obj)
}
return nil
})
})
}