[#1445] metabase: Add IterateLocked method
All checks were successful
Vulncheck / Vulncheck (pull_request) Successful in 2m52s
Pre-commit hooks / Pre-commit (pull_request) Successful in 4m3s
Tests and linters / gopls check (pull_request) Successful in 4m3s
Tests and linters / Run gofumpt (pull_request) Successful in 5m25s
Tests and linters / Tests (pull_request) Successful in 5m36s
DCO action / DCO (pull_request) Successful in 5m48s
Build / Build Components (pull_request) Successful in 6m39s
Tests and linters / Staticcheck (pull_request) Successful in 7m8s
Tests and linters / Tests with -race (pull_request) Successful in 7m14s
Tests and linters / Lint (pull_request) Successful in 7m38s

Added a new method for iterating locked objects. It's indended
to be used in the garbage collector for handling expired locks.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
This commit is contained in:
Aleksey Savchuk 2024-12-17 01:16:03 +03:00
parent a0acef639e
commit 163c3dba71
Signed by: a-savchuk
GPG key ID: 70C0A7FF6F9C4639
2 changed files with 209 additions and 0 deletions

View file

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"errors" "errors"
"fmt"
"strconv" "strconv"
"time" "time"
@ -16,6 +17,7 @@ import (
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
) )
// ExpiredObject is a descriptor of expired object from DB. // ExpiredObject is a descriptor of expired object from DB.
@ -138,3 +140,125 @@ func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) e
}) })
}) })
} }
type LockedObject struct {
cntr cid.ID
obj oid.ID
locks []Lock
}
func (lo *LockedObject) Address() oid.Address {
var addr oid.Address
addr.SetContainer(lo.cntr)
addr.SetObject(lo.obj)
return addr
}
func (lo *LockedObject) Locks() []Lock {
return lo.locks
}
type LockedObjectHandler func(*LockedObject) error
func (db *DB) IterateLocked(ctx context.Context, offset *oid.Address, h LockedObjectHandler) error {
return db.boltDB.View(func(tx *bbolt.Tx) error {
err := db.iterateLocked(ctx, tx, h, offset)
if errors.Is(err, ErrInterruptIterator) {
return nil
}
return err
})
}
func (db *DB) iterateLocked(ctx context.Context, tx *bbolt.Tx, h LockedObjectHandler, offset *oid.Address) error {
bucketLocked := tx.Bucket(bucketNameLocked)
if bucketLocked == nil {
return nil
}
var k, v []byte
c := bucketLocked.Cursor()
if offset != nil {
cntrOffset := make([]byte, cidSize)
offset.Container().Encode(cntrOffset)
k, v = c.Seek(cntrOffset)
} else {
k, v = c.First()
}
var (
res LockedObject
err error
seenFirstBucket bool
)
for ; k != nil; k, v = c.Next() {
if v != nil {
continue
}
if err = res.cntr.Decode(k); err != nil {
return fmt.Errorf("decode CID: %w", err)
}
bucket := bucketLocked.Bucket(k)
if bucket == nil {
db.log.Warn(ctx, "locked bucket expected", zap.Stringer("container", res.cntr))
continue
}
if offset == nil || seenFirstBucket {
err = db.iterateLockedContainer(bucket, h, &res, nil)
} else {
objOffset := offset.Object()
seenFirstBucket = true
err = db.iterateLockedContainer(bucket, h, &res, &objOffset)
}
if err != nil {
return err
}
}
return nil
}
func (db *DB) iterateLockedContainer(b *bbolt.Bucket, h LockedObjectHandler, res *LockedObject, offset *oid.ID) error {
var k, v []byte
c := b.Cursor()
if offset != nil {
objOffset := make([]byte, objectKeySize)
offset.Encode(objOffset)
k, v = c.Seek(objOffset)
if bytes.Equal(k, objOffset) {
k, v = c.Next()
}
} else {
k, v = c.First()
}
for ; k != nil; k, v = c.Next() {
if err := res.obj.Decode(k); err != nil {
return fmt.Errorf("decode OID: %w", err)
}
locks, err := decodeLockWithExpEpochList(v)
if err != nil {
return fmt.Errorf("decode lock list: %w", err)
}
res.locks = make([]Lock, len(locks))
for i, lock := range locks {
if err := lock.decode(&res.locks[i].ID, &res.locks[i].ExpEpoch); err != nil {
return fmt.Errorf("decode lock: %w", err)
}
}
if err := h(res); err != nil {
return err
}
}
return nil
}

View file

@ -2,6 +2,7 @@ package meta_test
import ( import (
"context" "context"
"math/rand"
"strconv" "strconv"
"testing" "testing"
@ -9,10 +10,12 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
) )
func TestDB_IterateExpired(t *testing.T) { func TestDB_IterateExpired(t *testing.T) {
@ -67,3 +70,85 @@ func putWithExpiration(t *testing.T, db *meta.DB, typ objectSDK.Type, expiresAt
return object2.AddressOf(obj) return object2.AddressOf(obj)
} }
func TestIterateLocked(t *testing.T) {
t.Parallel()
const (
numObjects = 10
numLocksPerObject = 10
)
db := newDB(t)
defer func() { require.NoError(t, db.Close(context.Background())) }()
m := make(map[oid.Address][]meta.Lock)
// every two objects have same container
for range numObjects / 2 {
container := cidtest.ID()
object := testutil.GenerateObjectWithCID(container)
require.NoError(t, metaPut(db, object, nil))
addr := object2.AddressOf(object)
m[addr] = []meta.Lock{}
object = testutil.GenerateObjectWithCID(container)
require.NoError(t, metaPut(db, object, nil))
addr = object2.AddressOf(object)
m[addr] = []meta.Lock{}
}
for addr := range m {
// every object has locks of both old and new format
for range numLocksPerObject / 2 {
m[addr] = append(m[addr],
meta.Lock{
ID: oidtest.ID(),
ExpEpoch: rand.Uint64(),
}, meta.Lock{
ID: oidtest.ID(),
ExpEpoch: meta.NoExpirationEpoch,
},
)
}
for _, lock := range m[addr] {
require.NoError(t, db.Lock(
context.Background(),
addr.Container(),
lock.ID,
[]oid.ID{addr.Object()},
lock.ExpEpoch,
))
}
}
t.Run("iterate locked", func(t *testing.T) {
err := db.IterateLocked(context.Background(), nil, func(lo *meta.LockedObject) error {
locks, ok := m[lo.Address()]
require.True(t, ok)
require.ElementsMatch(t, locks, lo.Locks())
return nil
})
require.NoError(t, err)
})
t.Run("iterate with offset", func(t *testing.T) {
var foundObjects []oid.Address
var offset *oid.Address
for didStep := true; didStep; {
didStep = false
err := db.IterateLocked(context.Background(), offset, func(lo *meta.LockedObject) error {
addr := lo.Address()
foundObjects = append(foundObjects, addr)
offset = &addr
didStep = true
return meta.ErrInterruptIterator
})
require.NoError(t, err)
}
require.ElementsMatch(t, foundObjects, maps.Keys(m))
})
}