Aleksey Savchuk
8ad8e2783a
Added a new method for iterating locked objects. It's indended to be used in the garbage collector for handling expired locks. Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
283 lines
6.5 KiB
Go
283 lines
6.5 KiB
Go
package meta
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.etcd.io/bbolt"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// ExpiredObject is a descriptor of expired object from DB.
|
|
type ExpiredObject struct {
|
|
typ objectSDK.Type
|
|
|
|
addr oid.Address
|
|
}
|
|
|
|
// Type returns type of the expired object.
|
|
func (e *ExpiredObject) Type() objectSDK.Type {
|
|
return e.typ
|
|
}
|
|
|
|
// Address returns address of the expired object.
|
|
func (e *ExpiredObject) Address() oid.Address {
|
|
return e.addr
|
|
}
|
|
|
|
// ExpiredObjectHandler is an ExpiredObject handling function.
|
|
type ExpiredObjectHandler func(*ExpiredObject) error
|
|
|
|
// ErrInterruptIterator is returned by iteration handlers
|
|
// as a "break" keyword.
|
|
var ErrInterruptIterator = logicerr.New("iterator is interrupted")
|
|
|
|
// IterateExpired iterates over all objects in DB which are out of date
|
|
// relative to epoch. Locked objects are not included (do not confuse
|
|
// with objects of type LOCK).
|
|
//
|
|
// If h returns ErrInterruptIterator, nil returns immediately.
|
|
// Returns other errors of h directly.
|
|
func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectHandler) error {
|
|
var (
|
|
startedAt = time.Now()
|
|
success = false
|
|
)
|
|
defer func() {
|
|
db.metrics.AddMethodDuration("IterateExpired", time.Since(startedAt), success)
|
|
}()
|
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateExpired",
|
|
trace.WithAttributes(
|
|
attribute.String("epoch", strconv.FormatUint(epoch, 10)),
|
|
))
|
|
defer span.End()
|
|
|
|
db.modeMtx.RLock()
|
|
defer db.modeMtx.RUnlock()
|
|
|
|
if db.mode.NoMetabase() {
|
|
return ErrDegradedMode
|
|
}
|
|
|
|
err := metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
|
return db.iterateExpired(tx, epoch, h)
|
|
}))
|
|
success = err == nil
|
|
return err
|
|
}
|
|
|
|
func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error {
|
|
b := tx.Bucket(expEpochToObjectBucketName)
|
|
c := b.Cursor()
|
|
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
|
expiresAfter, cnr, obj, err := parseExpirationEpochKey(k)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// bucket keys ordered by epoch, no need to continue lookup
|
|
if expiresAfter >= epoch {
|
|
return nil
|
|
}
|
|
if objectLocked(tx, cnr, obj) {
|
|
continue
|
|
}
|
|
var addr oid.Address
|
|
addr.SetContainer(cnr)
|
|
addr.SetObject(obj)
|
|
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
|
err = h(&ExpiredObject{
|
|
typ: firstIrregularObjectType(tx, cnr, objKey),
|
|
addr: addr,
|
|
})
|
|
if err == nil {
|
|
continue
|
|
}
|
|
if errors.Is(err, ErrInterruptIterator) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
|
|
var cid cid.ID
|
|
var oid oid.ID
|
|
obj := objectSDK.New()
|
|
|
|
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
|
b58CID, postfix := parseContainerIDWithPrefix(&cid, name)
|
|
if len(b58CID) == 0 {
|
|
return nil
|
|
}
|
|
|
|
switch postfix {
|
|
case primaryPrefix,
|
|
lockersPrefix,
|
|
tombstonePrefix:
|
|
default:
|
|
return nil
|
|
}
|
|
|
|
return b.ForEach(func(k, v []byte) error {
|
|
if oid.Decode(k) == nil && obj.Unmarshal(bytes.Clone(v)) == nil {
|
|
return f(cid, oid, obj)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
})
|
|
}
|
|
|
|
type LockedObject struct {
|
|
cntr cid.ID
|
|
obj oid.ID
|
|
locks []Lock
|
|
}
|
|
|
|
func (lo *LockedObject) Address() oid.Address {
|
|
var addr oid.Address
|
|
addr.SetContainer(lo.cntr)
|
|
addr.SetObject(lo.obj)
|
|
return addr
|
|
}
|
|
|
|
func (lo *LockedObject) Locks() []Lock {
|
|
return lo.locks
|
|
}
|
|
|
|
type LockedObjectHandler func(*LockedObject) error
|
|
|
|
func (db *DB) IterateLocked(ctx context.Context, offset *oid.Address, h LockedObjectHandler) error {
|
|
var (
|
|
startedAt = time.Now()
|
|
success = false
|
|
)
|
|
defer func() {
|
|
db.metrics.AddMethodDuration("IterateLocked", time.Since(startedAt), success)
|
|
}()
|
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateLocked")
|
|
defer span.End()
|
|
|
|
db.modeMtx.RLock()
|
|
defer db.modeMtx.RUnlock()
|
|
|
|
if db.mode.NoMetabase() {
|
|
return ErrDegradedMode
|
|
}
|
|
|
|
err := metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
|
err := db.iterateLocked(ctx, tx, h, offset)
|
|
if errors.Is(err, ErrInterruptIterator) {
|
|
return nil
|
|
}
|
|
return err
|
|
}))
|
|
success = err == nil
|
|
return err
|
|
}
|
|
|
|
func (db *DB) iterateLocked(ctx context.Context, tx *bbolt.Tx, h LockedObjectHandler, offset *oid.Address) error {
|
|
bucketLocked := tx.Bucket(bucketNameLocked)
|
|
if bucketLocked == nil {
|
|
return nil
|
|
}
|
|
|
|
var k, v []byte
|
|
|
|
c := bucketLocked.Cursor()
|
|
if offset != nil {
|
|
cntrOffset := make([]byte, cidSize)
|
|
offset.Container().Encode(cntrOffset)
|
|
|
|
k, v = c.Seek(cntrOffset)
|
|
} else {
|
|
k, v = c.First()
|
|
}
|
|
|
|
var (
|
|
res LockedObject
|
|
err error
|
|
seenFirstBucket bool
|
|
)
|
|
|
|
for ; k != nil; k, v = c.Next() {
|
|
if v != nil {
|
|
continue
|
|
}
|
|
|
|
if err = res.cntr.Decode(k); err != nil {
|
|
return fmt.Errorf("decode CID: %w", err)
|
|
}
|
|
|
|
bucket := bucketLocked.Bucket(k)
|
|
if bucket == nil {
|
|
db.log.Warn(ctx, "locked bucket expected", zap.Stringer("container", res.cntr))
|
|
continue
|
|
}
|
|
|
|
if offset == nil || seenFirstBucket {
|
|
err = db.iterateLockedContainer(bucket, h, &res, nil)
|
|
} else {
|
|
objOffset := offset.Object()
|
|
seenFirstBucket = true
|
|
err = db.iterateLockedContainer(bucket, h, &res, &objOffset)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) iterateLockedContainer(b *bbolt.Bucket, h LockedObjectHandler, res *LockedObject, offset *oid.ID) error {
|
|
var k, v []byte
|
|
|
|
c := b.Cursor()
|
|
if offset != nil {
|
|
objOffset := make([]byte, objectKeySize)
|
|
offset.Encode(objOffset)
|
|
|
|
k, v = c.Seek(objOffset)
|
|
if bytes.Equal(k, objOffset) {
|
|
k, v = c.Next()
|
|
}
|
|
} else {
|
|
k, v = c.First()
|
|
}
|
|
|
|
for ; k != nil; k, v = c.Next() {
|
|
if err := res.obj.Decode(k); err != nil {
|
|
return fmt.Errorf("decode OID: %w", err)
|
|
}
|
|
|
|
locks, err := decodeLockWithExpEpochList(v)
|
|
if err != nil {
|
|
return fmt.Errorf("decode lock list: %w", err)
|
|
}
|
|
|
|
res.locks = make([]Lock, len(locks))
|
|
for i, lock := range locks {
|
|
if err := lock.decode(&res.locks[i].ID, &res.locks[i].ExpEpoch); err != nil {
|
|
return fmt.Errorf("decode lock: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := h(res); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|