[#1445] metabase: Add IterateLocked
method
Added a new method for iterating locked objects. It's indended to be used in the garbage collector for handling expired locks. Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
parent
a0acef639e
commit
8ad8e2783a
2 changed files with 228 additions and 0 deletions
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
|
@ -16,6 +17,7 @@ import (
|
|||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ExpiredObject is a descriptor of expired object from DB.
|
||||
|
@ -138,3 +140,144 @@ func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) e
|
|||
})
|
||||
})
|
||||
}
|
||||
|
||||
type LockedObject struct {
|
||||
cntr cid.ID
|
||||
obj oid.ID
|
||||
locks []Lock
|
||||
}
|
||||
|
||||
func (lo *LockedObject) Address() oid.Address {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(lo.cntr)
|
||||
addr.SetObject(lo.obj)
|
||||
return addr
|
||||
}
|
||||
|
||||
func (lo *LockedObject) Locks() []Lock {
|
||||
return lo.locks
|
||||
}
|
||||
|
||||
type LockedObjectHandler func(*LockedObject) error
|
||||
|
||||
func (db *DB) IterateLocked(ctx context.Context, offset *oid.Address, h LockedObjectHandler) error {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("IterateLocked", time.Since(startedAt), success)
|
||||
}()
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateLocked")
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
err := metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
err := db.iterateLocked(ctx, tx, h, offset)
|
||||
if errors.Is(err, ErrInterruptIterator) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}))
|
||||
success = err == nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (db *DB) iterateLocked(ctx context.Context, tx *bbolt.Tx, h LockedObjectHandler, offset *oid.Address) error {
|
||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
||||
if bucketLocked == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var k, v []byte
|
||||
|
||||
c := bucketLocked.Cursor()
|
||||
if offset != nil {
|
||||
cntrOffset := make([]byte, cidSize)
|
||||
offset.Container().Encode(cntrOffset)
|
||||
|
||||
k, v = c.Seek(cntrOffset)
|
||||
} else {
|
||||
k, v = c.First()
|
||||
}
|
||||
|
||||
var (
|
||||
res LockedObject
|
||||
err error
|
||||
seenFirstBucket bool
|
||||
)
|
||||
|
||||
for ; k != nil; k, v = c.Next() {
|
||||
if v != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if err = res.cntr.Decode(k); err != nil {
|
||||
return fmt.Errorf("decode CID: %w", err)
|
||||
}
|
||||
|
||||
bucket := bucketLocked.Bucket(k)
|
||||
if bucket == nil {
|
||||
db.log.Warn(ctx, "locked bucket expected", zap.Stringer("container", res.cntr))
|
||||
continue
|
||||
}
|
||||
|
||||
if offset == nil || seenFirstBucket {
|
||||
err = db.iterateLockedContainer(bucket, h, &res, nil)
|
||||
} else {
|
||||
objOffset := offset.Object()
|
||||
seenFirstBucket = true
|
||||
err = db.iterateLockedContainer(bucket, h, &res, &objOffset)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) iterateLockedContainer(b *bbolt.Bucket, h LockedObjectHandler, res *LockedObject, offset *oid.ID) error {
|
||||
var k, v []byte
|
||||
|
||||
c := b.Cursor()
|
||||
if offset != nil {
|
||||
objOffset := make([]byte, objectKeySize)
|
||||
offset.Encode(objOffset)
|
||||
|
||||
k, v = c.Seek(objOffset)
|
||||
if bytes.Equal(k, objOffset) {
|
||||
k, v = c.Next()
|
||||
}
|
||||
} else {
|
||||
k, v = c.First()
|
||||
}
|
||||
|
||||
for ; k != nil; k, v = c.Next() {
|
||||
if err := res.obj.Decode(k); err != nil {
|
||||
return fmt.Errorf("decode OID: %w", err)
|
||||
}
|
||||
|
||||
locks, err := decodeLockWithExpEpochList(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decode lock list: %w", err)
|
||||
}
|
||||
|
||||
res.locks = make([]Lock, len(locks))
|
||||
for i, lock := range locks {
|
||||
if err := lock.decode(&res.locks[i].ID, &res.locks[i].ExpEpoch); err != nil {
|
||||
return fmt.Errorf("decode lock: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := h(res); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package meta_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
|
@ -9,10 +10,12 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
|
||||
func TestDB_IterateExpired(t *testing.T) {
|
||||
|
@ -67,3 +70,85 @@ func putWithExpiration(t *testing.T, db *meta.DB, typ objectSDK.Type, expiresAt
|
|||
|
||||
return object2.AddressOf(obj)
|
||||
}
|
||||
|
||||
func TestIterateLocked(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const (
|
||||
numObjects = 10
|
||||
numLocksPerObject = 10
|
||||
)
|
||||
|
||||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||
|
||||
m := make(map[oid.Address][]meta.Lock)
|
||||
|
||||
// every two objects have same container
|
||||
for range numObjects / 2 {
|
||||
container := cidtest.ID()
|
||||
|
||||
object := testutil.GenerateObjectWithCID(container)
|
||||
require.NoError(t, metaPut(db, object, nil))
|
||||
addr := object2.AddressOf(object)
|
||||
m[addr] = []meta.Lock{}
|
||||
|
||||
object = testutil.GenerateObjectWithCID(container)
|
||||
require.NoError(t, metaPut(db, object, nil))
|
||||
addr = object2.AddressOf(object)
|
||||
m[addr] = []meta.Lock{}
|
||||
}
|
||||
|
||||
for addr := range m {
|
||||
// every object has locks of both old and new formats
|
||||
for range numLocksPerObject / 2 {
|
||||
m[addr] = append(m[addr],
|
||||
meta.Lock{
|
||||
ID: oidtest.ID(),
|
||||
ExpEpoch: rand.Uint64(),
|
||||
}, meta.Lock{
|
||||
ID: oidtest.ID(),
|
||||
ExpEpoch: meta.NoExpirationEpoch,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
for _, lock := range m[addr] {
|
||||
require.NoError(t, db.Lock(
|
||||
context.Background(),
|
||||
addr.Container(),
|
||||
lock.ID,
|
||||
[]oid.ID{addr.Object()},
|
||||
lock.ExpEpoch,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("just iterate", func(t *testing.T) {
|
||||
err := db.IterateLocked(context.Background(), nil, func(lo *meta.LockedObject) error {
|
||||
locks, ok := m[lo.Address()]
|
||||
require.True(t, ok)
|
||||
require.ElementsMatch(t, locks, lo.Locks())
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("iterate with offset", func(t *testing.T) {
|
||||
var foundObjects []oid.Address
|
||||
var offset *oid.Address
|
||||
|
||||
for didStep := true; didStep; {
|
||||
didStep = false
|
||||
err := db.IterateLocked(context.Background(), offset, func(lo *meta.LockedObject) error {
|
||||
addr := lo.Address()
|
||||
foundObjects = append(foundObjects, addr)
|
||||
offset = &addr
|
||||
didStep = true
|
||||
return meta.ErrInterruptIterator
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.ElementsMatch(t, foundObjects, maps.Keys(m))
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue