WIP: Change metabase engine to pebble #1221
4 changed files with 316 additions and 653 deletions
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -67,7 +68,7 @@ func containers(ctx context.Context, r pebble.Reader) ([]cid.ID, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, nil
|
return result, it.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseContainerIDWithIgnore(dst *cid.ID, name []byte, ignore map[string]struct{}) bool {
|
func parseContainerIDWithIgnore(dst *cid.ID, name []byte, ignore map[string]struct{}) bool {
|
||||||
|
@ -185,23 +186,23 @@ func (db *DB) containerSizesInternal(ctx context.Context, id *cid.ID) (map[cid.I
|
||||||
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return errors.Join(ctx.Err(), it.Close())
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
key := it.Key()
|
key := it.Key()
|
||||||
var cnr cid.ID
|
var cnr cid.ID
|
||||||
if err := cnr.Decode(key[1:containerSizePrefixSize]); err != nil {
|
if err := cnr.Decode(key[1:containerSizePrefixSize]); err != nil {
|
||||||
return fmt.Errorf("invalid container size key: %w", err)
|
return errors.Join(fmt.Errorf("invalid container size key: %w", err), it.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
value, ok := parseSize(it.Value())
|
value, ok := parseSize(it.Value())
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("invalid container size value for container %s", cnr)
|
return errors.Join(fmt.Errorf("invalid container size value for container %s", cnr), it.Close())
|
||||||
}
|
}
|
||||||
result[cnr] += value
|
result[cnr] += value
|
||||||
}
|
}
|
||||||
return nil
|
return it.Close()
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, metaerr.Wrap(err)
|
return nil, metaerr.Wrap(err)
|
||||||
|
|
|
@ -13,32 +13,21 @@ import (
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"github.com/cockroachdb/pebble"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
objectPhyCounterKey = []byte("phy_counter")
|
|
||||||
objectLogicCounterKey = []byte("logic_counter")
|
|
||||||
objectUserCounterKey = []byte("user_counter")
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errInvalidKeyLenght = errors.New("invalid key length")
|
errInvalidKeyLenght = errors.New("invalid key length")
|
||||||
errInvalidKeyPrefix = errors.New("invalid key prefix")
|
errInvalidKeyPrefix = errors.New("invalid key prefix")
|
||||||
errInvalidValueLenght = errors.New("invalid value length")
|
errInvalidValueLenght = errors.New("invalid value length")
|
||||||
errInvalidContainerIDValue = errors.New("invalid container ID value")
|
errInvalidContainerIDValue = errors.New("invalid container ID value")
|
||||||
errInvalidAttributeKey = errors.New("invalid userr attribute key")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type objectType uint8
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
_ objectType = iota
|
containerObjectCountKeySize = 1 + cidSize + 2
|
||||||
phy
|
containerObjectCountPrefixSize = 1 + cidSize
|
||||||
logical
|
|
||||||
user
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ObjectCounters groups object counter
|
// ObjectCounters groups object counter
|
||||||
|
@ -53,12 +42,18 @@ func (o ObjectCounters) IsZero() bool {
|
||||||
return o.Phy == 0 && o.Logic == 0 && o.User == 0
|
return o.Phy == 0 && o.Logic == 0 && o.User == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type objectCounterValue struct {
|
||||||
|
Logic int64
|
||||||
|
Phy int64
|
||||||
|
User int64
|
||||||
|
}
|
||||||
|
|
||||||
// ObjectCounters returns object counters that metabase has
|
// ObjectCounters returns object counters that metabase has
|
||||||
// tracked since it was opened and initialized.
|
// tracked since it was opened and initialized.
|
||||||
//
|
//
|
||||||
// Returns only the errors that do not allow reading counter
|
// Returns only the errors that do not allow reading counter
|
||||||
// in Bolt database.
|
// in badger database.
|
||||||
func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
|
func (db *DB) ObjectCounters(ctx context.Context) (ObjectCounters, error) {
|
||||||
db.modeMtx.RLock()
|
db.modeMtx.RLock()
|
||||||
defer db.modeMtx.RUnlock()
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
@ -66,29 +61,22 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
|
||||||
return ObjectCounters{}, ErrDegradedMode
|
return ObjectCounters{}, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
var cc map[cid.ID]ObjectCounters
|
||||||
b := tx.Bucket(shardInfoBucket)
|
err := db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
if b != nil {
|
var err error
|
||||||
data := b.Get(objectPhyCounterKey)
|
cc, err = containerObjectCounters(ctx, s, nil)
|
||||||
if len(data) == 8 {
|
return err
|
||||||
cc.Phy = binary.LittleEndian.Uint64(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
data = b.Get(objectLogicCounterKey)
|
|
||||||
if len(data) == 8 {
|
|
||||||
cc.Logic = binary.LittleEndian.Uint64(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
data = b.Get(objectUserCounterKey)
|
|
||||||
if len(data) == 8 {
|
|
||||||
cc.User = binary.LittleEndian.Uint64(data)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
return cc, metaerr.Wrap(err)
|
return ObjectCounters{}, metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
var result ObjectCounters
|
||||||
|
for _, v := range cc {
|
||||||
|
result.Logic += v.Logic
|
||||||
|
result.Phy += v.Phy
|
||||||
|
result.User += v.User
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type ContainerCounters struct {
|
type ContainerCounters struct {
|
||||||
|
@ -99,7 +87,7 @@ type ContainerCounters struct {
|
||||||
// that metabase has tracked since it was opened and initialized.
|
// that metabase has tracked since it was opened and initialized.
|
||||||
//
|
//
|
||||||
// Returns only the errors that do not allow reading counter
|
// Returns only the errors that do not allow reading counter
|
||||||
// in Bolt database.
|
// in badger database.
|
||||||
//
|
//
|
||||||
// It is guaranteed that the ContainerCounters fields are not nil.
|
// It is guaranteed that the ContainerCounters fields are not nil.
|
||||||
func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) {
|
func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) {
|
||||||
|
@ -117,84 +105,16 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error)
|
||||||
cc := ContainerCounters{
|
cc := ContainerCounters{
|
||||||
Counts: make(map[cid.ID]ObjectCounters),
|
Counts: make(map[cid.ID]ObjectCounters),
|
||||||
}
|
}
|
||||||
|
err := db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
lastKey := make([]byte, cidSize)
|
var err error
|
||||||
|
cc.Counts, err = containerObjectCounters(ctx, s, nil)
|
||||||
// there is no limit for containers count, so use batching with cancellation
|
return err
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return cc, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) {
|
|
||||||
cc.Counts[id] = entity
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return cc, err
|
|
||||||
}
|
|
||||||
if completed {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
success = true
|
|
||||||
return cc, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) containerCountersNextBatch(lastKey []byte, f func(id cid.ID, entity ObjectCounters)) (bool, error) {
|
|
||||||
db.modeMtx.RLock()
|
|
||||||
defer db.modeMtx.RUnlock()
|
|
||||||
|
|
||||||
if db.mode.NoMetabase() {
|
|
||||||
return false, ErrDegradedMode
|
|
||||||
}
|
|
||||||
|
|
||||||
counter := 0
|
|
||||||
const batchSize = 1000
|
|
||||||
|
|
||||||
err := db.database.View(func(tx *bbolt.Tx) error {
|
|
||||||
b := tx.Bucket(containerCounterBucketName)
|
|
||||||
if b == nil {
|
|
||||||
return ErrInterruptIterator
|
|
||||||
}
|
|
||||||
c := b.Cursor()
|
|
||||||
var key, value []byte
|
|
||||||
for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() {
|
|
||||||
if bytes.Equal(lastKey, key) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
copy(lastKey, key)
|
|
||||||
|
|
||||||
cnrID, err := parseContainerCounterKey(key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
ent, err := parseContainerCounterValue(value)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
f(cnrID, ent)
|
|
||||||
|
|
||||||
counter++
|
|
||||||
if counter == batchSize {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if counter < batchSize { // last batch
|
|
||||||
return ErrInterruptIterator
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, ErrInterruptIterator) {
|
return ContainerCounters{}, metaerr.Wrap(err)
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
return false, metaerr.Wrap(err)
|
|
||||||
}
|
}
|
||||||
return false, nil
|
success = true
|
||||||
|
return cc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, error) {
|
func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, error) {
|
||||||
|
@ -216,144 +136,65 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er
|
||||||
return ObjectCounters{}, ErrDegradedMode
|
return ObjectCounters{}, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
var result ObjectCounters
|
var cc map[cid.ID]ObjectCounters
|
||||||
|
err := db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
err := db.database.View(func(tx *bbolt.Tx) error {
|
|
||||||
b := tx.Bucket(containerCounterBucketName)
|
|
||||||
key := make([]byte, cidSize)
|
|
||||||
id.Encode(key)
|
|
||||||
v := b.Get(key)
|
|
||||||
if v == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
var err error
|
var err error
|
||||||
result, err = parseContainerCounterValue(v)
|
cc, err = containerObjectCounters(ctx, s, &id)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
return result, metaerr.Wrap(err)
|
return ObjectCounters{}, metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
return cc[id], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
func containerCounterKey(cnrID cid.ID, bucketID uint16) []byte {
|
||||||
b := tx.Bucket(shardInfoBucket)
|
result := make([]byte, containerObjectCountKeySize)
|
||||||
if b == nil {
|
result[0] = containerCountersPrefix
|
||||||
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
cnrID.Encode(result[1:])
|
||||||
}
|
binary.LittleEndian.PutUint16(result[containerObjectCountPrefixSize:], bucketID)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
|
func incCounters(b *pebble.Batch, cnrID cid.ID, isUserObject bool, bucketID uint16) error {
|
||||||
return fmt.Errorf("could not increase phy object counter: %w", err)
|
delta := objectCounterValue{
|
||||||
}
|
Logic: 1,
|
||||||
if err := db.updateShardObjectCounterBucket(b, logical, 1, true); err != nil {
|
Phy: 1,
|
||||||
return fmt.Errorf("could not increase logical object counter: %w", err)
|
|
||||||
}
|
}
|
||||||
if isUserObject {
|
if isUserObject {
|
||||||
if err := db.updateShardObjectCounterBucket(b, user, 1, true); err != nil {
|
delta.User = 1
|
||||||
return fmt.Errorf("could not increase user object counter: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
return editContainerCounterValue(b, cnrID, delta, bucketID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
|
func updateContainerCounter(b *pebble.Batch, delta map[cid.ID]objectCounterValue, bucketIDs map[cid.ID]uint16) error {
|
||||||
b := tx.Bucket(shardInfoBucket)
|
|
||||||
if b == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return db.updateShardObjectCounterBucket(b, typ, delta, inc)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*DB) updateShardObjectCounterBucket(b *bbolt.Bucket, typ objectType, delta uint64, inc bool) error {
|
|
||||||
var counter uint64
|
|
||||||
var counterKey []byte
|
|
||||||
|
|
||||||
switch typ {
|
|
||||||
case phy:
|
|
||||||
counterKey = objectPhyCounterKey
|
|
||||||
case logical:
|
|
||||||
counterKey = objectLogicCounterKey
|
|
||||||
case user:
|
|
||||||
counterKey = objectUserCounterKey
|
|
||||||
default:
|
|
||||||
panic("unknown object type counter")
|
|
||||||
}
|
|
||||||
|
|
||||||
data := b.Get(counterKey)
|
|
||||||
if len(data) == 8 {
|
|
||||||
counter = binary.LittleEndian.Uint64(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
if inc {
|
|
||||||
counter += delta
|
|
||||||
} else if counter <= delta {
|
|
||||||
counter = 0
|
|
||||||
} else {
|
|
||||||
counter -= delta
|
|
||||||
}
|
|
||||||
|
|
||||||
newCounter := make([]byte, 8)
|
|
||||||
binary.LittleEndian.PutUint64(newCounter, counter)
|
|
||||||
|
|
||||||
return b.Put(counterKey, newCounter)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error {
|
|
||||||
b := tx.Bucket(containerCounterBucketName)
|
|
||||||
if b == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
key := make([]byte, cidSize)
|
|
||||||
for cnrID, cnrDelta := range delta {
|
for cnrID, cnrDelta := range delta {
|
||||||
cnrID.Encode(key)
|
bucketID, found := bucketIDs[cnrID]
|
||||||
if err := db.editContainerCounterValue(b, key, cnrDelta, inc); err != nil {
|
if !found {
|
||||||
|
return fmt.Errorf("bucket ID not found for container %s", cnrID)
|
||||||
|
}
|
||||||
|
if err := editContainerCounterValue(b, cnrID, cnrDelta, bucketID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error {
|
func editContainerCounterValue(b *pebble.Batch, cnrID cid.ID, delta objectCounterValue, bucketID uint16) error {
|
||||||
var entity ObjectCounters
|
key := containerCounterKey(cnrID, bucketID)
|
||||||
var err error
|
val, err := valueSafe(b, key)
|
||||||
data := b.Get(key)
|
if err != nil {
|
||||||
if len(data) > 0 {
|
return err
|
||||||
entity, err = parseContainerCounterValue(data)
|
}
|
||||||
|
setValue := delta
|
||||||
|
if val != nil {
|
||||||
|
exited, err := parseContainerCounterValue(val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
setValue = mergeObjectCounterValues(setValue, exited)
|
||||||
}
|
}
|
||||||
entity.Phy = nextValue(entity.Phy, delta.Phy, inc)
|
return b.Set(key, marshalContainerCounterValue(setValue), pebble.Sync)
|
||||||
entity.Logic = nextValue(entity.Logic, delta.Logic, inc)
|
|
||||||
entity.User = nextValue(entity.User, delta.User, inc)
|
|
||||||
value := containerCounterValue(entity)
|
|
||||||
return b.Put(key, value)
|
|
||||||
}
|
|
||||||
|
|
||||||
func nextValue(existed, delta uint64, inc bool) uint64 {
|
|
||||||
if inc {
|
|
||||||
existed += delta
|
|
||||||
} else if existed <= delta {
|
|
||||||
existed = 0
|
|
||||||
} else {
|
|
||||||
existed -= delta
|
|
||||||
}
|
|
||||||
return existed
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
|
||||||
b := tx.Bucket(containerCounterBucketName)
|
|
||||||
if b == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
key := make([]byte, cidSize)
|
|
||||||
cnrID.Encode(key)
|
|
||||||
c := ObjectCounters{Logic: 1, Phy: 1}
|
|
||||||
if isUserObject {
|
|
||||||
c.User = 1
|
|
||||||
}
|
|
||||||
return db.editContainerCounterValue(b, key, c, true)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncCounter updates object counters according to metabase state:
|
// syncCounter updates object counters according to metabase state:
|
||||||
|
@ -362,34 +203,31 @@ func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject
|
||||||
//
|
//
|
||||||
// Does nothing if counters are not empty and force is false. If force is
|
// Does nothing if counters are not empty and force is false. If force is
|
||||||
// true, updates the counters anyway.
|
// true, updates the counters anyway.
|
||||||
func syncCounter(tx *bbolt.Tx, force bool) error {
|
func syncCounter(ctx context.Context, b *pebble.Batch, force bool) error {
|
||||||
shardInfoB, err := createBucketLikelyExists(tx, shardInfoBucket)
|
if !force && containerObjectCounterInitialized(ctx, b) {
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not get shard info bucket: %w", err)
|
|
||||||
}
|
|
||||||
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 &&
|
|
||||||
len(shardInfoB.Get(objectLogicCounterKey)) == 8 &&
|
|
||||||
len(shardInfoB.Get(objectUserCounterKey)) == 8
|
|
||||||
containerObjectCounterInitialized := containerObjectCounterInitialized(tx)
|
|
||||||
if !force && shardObjectCounterInitialized && containerObjectCounterInitialized {
|
|
||||||
// the counters are already inited
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
containerCounterB, err := createBucketLikelyExists(tx, containerCounterBucketName)
|
// drop existed counters
|
||||||
|
err := deleteByPrefix(ctx, b, []byte{containerCountersPrefix})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not get container counter bucket: %w", err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
counters, err := getActualObjectCounters(b)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return setObjectCounters(b, counters)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getActualObjectCounters(r pebble.Reader) (map[cid.ID]ObjectCounters, error) {
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
|
var isAvailable bool
|
||||||
counters := make(map[cid.ID]ObjectCounters)
|
counters := make(map[cid.ID]ObjectCounters)
|
||||||
|
|
||||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
err := iteratePhyObjects(r, func(cnr cid.ID, objID oid.ID, obj *objectSDK.Object) error {
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
|
||||||
key := make([]byte, addressKeySize)
|
|
||||||
var isAvailable bool
|
|
||||||
|
|
||||||
err = iteratePhyObjects(tx, func(cnr cid.ID, objID oid.ID, obj *objectSDK.Object) error {
|
|
||||||
if v, ok := counters[cnr]; ok {
|
if v, ok := counters[cnr]; ok {
|
||||||
v.Phy++
|
v.Phy++
|
||||||
counters[cnr] = v
|
counters[cnr] = v
|
||||||
|
@ -403,9 +241,12 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
addr.SetObject(objID)
|
addr.SetObject(objID)
|
||||||
isAvailable = false
|
isAvailable = false
|
||||||
|
|
||||||
// check if an object is available: not with GCMark
|
st, err := inGraveyardWithKey(r, addr)
|
||||||
// and not covered with a tombstone
|
if err != nil {
|
||||||
if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 {
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if st == 0 {
|
||||||
if v, ok := counters[cnr]; ok {
|
if v, ok := counters[cnr]; ok {
|
||||||
v.Logic++
|
v.Logic++
|
||||||
counters[cnr] = v
|
counters[cnr] = v
|
||||||
|
@ -431,104 +272,27 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not iterate objects: %w", err)
|
return nil, fmt.Errorf("could not iterate objects: %w", err)
|
||||||
}
|
}
|
||||||
|
return counters, nil
|
||||||
return setObjectCounters(counters, shardInfoB, containerCounterB)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error {
|
func setObjectCounters(b *pebble.Batch, counters map[cid.ID]ObjectCounters) error {
|
||||||
var phyTotal uint64
|
|
||||||
var logicTotal uint64
|
|
||||||
var userTotal uint64
|
|
||||||
key := make([]byte, cidSize)
|
|
||||||
for cnrID, count := range counters {
|
for cnrID, count := range counters {
|
||||||
phyTotal += count.Phy
|
delta := objectCounterValue{
|
||||||
logicTotal += count.Logic
|
Logic: int64(count.Logic),
|
||||||
userTotal += count.User
|
Phy: int64(count.Phy),
|
||||||
|
User: int64(count.User),
|
||||||
cnrID.Encode(key)
|
}
|
||||||
value := containerCounterValue(count)
|
// this function called by init or refill, so no other updates should happen
|
||||||
err := containerCounterB.Put(key, value)
|
// so here bucketID = 0 can be used
|
||||||
if err != nil {
|
if err := editContainerCounterValue(b, cnrID, delta, 0); err != nil {
|
||||||
return fmt.Errorf("could not update phy container object counter: %w", err)
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
phyData := make([]byte, 8)
|
|
||||||
binary.LittleEndian.PutUint64(phyData, phyTotal)
|
|
||||||
|
|
||||||
err := shardInfoB.Put(objectPhyCounterKey, phyData)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not update phy object counter: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
logData := make([]byte, 8)
|
|
||||||
binary.LittleEndian.PutUint64(logData, logicTotal)
|
|
||||||
|
|
||||||
err = shardInfoB.Put(objectLogicCounterKey, logData)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not update logic object counter: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
userData := make([]byte, 8)
|
|
||||||
binary.LittleEndian.PutUint64(userData, userTotal)
|
|
||||||
|
|
||||||
err = shardInfoB.Put(objectUserCounterKey, userData)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not update user object counter: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func containerCounterValue(entity ObjectCounters) []byte {
|
|
||||||
res := make([]byte, 24)
|
|
||||||
binary.LittleEndian.PutUint64(res, entity.Phy)
|
|
||||||
binary.LittleEndian.PutUint64(res[8:], entity.Logic)
|
|
||||||
binary.LittleEndian.PutUint64(res[16:], entity.User)
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseContainerCounterKey(buf []byte) (cid.ID, error) {
|
|
||||||
if len(buf) != cidSize {
|
|
||||||
return cid.ID{}, errInvalidKeyLenght
|
|
||||||
}
|
|
||||||
var cnrID cid.ID
|
|
||||||
if err := cnrID.Decode(buf); err != nil {
|
|
||||||
return cid.ID{}, fmt.Errorf("failed to decode container ID: %w", err)
|
|
||||||
}
|
|
||||||
return cnrID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// parseContainerCounterValue return phy, logic values.
|
|
||||||
func parseContainerCounterValue(buf []byte) (ObjectCounters, error) {
|
|
||||||
if len(buf) != 24 {
|
|
||||||
return ObjectCounters{}, errInvalidValueLenght
|
|
||||||
}
|
|
||||||
return ObjectCounters{
|
|
||||||
Phy: binary.LittleEndian.Uint64(buf),
|
|
||||||
Logic: binary.LittleEndian.Uint64(buf[8:16]),
|
|
||||||
User: binary.LittleEndian.Uint64(buf[16:]),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func containerObjectCounterInitialized(tx *bbolt.Tx) bool {
|
|
||||||
b := tx.Bucket(containerCounterBucketName)
|
|
||||||
if b == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
k, v := b.Cursor().First()
|
|
||||||
if k == nil && v == nil {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
_, err := parseContainerCounterKey(k)
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
_, err = parseContainerCounterValue(v)
|
|
||||||
return err == nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func IsUserObject(obj *objectSDK.Object) bool {
|
func IsUserObject(obj *objectSDK.Object) bool {
|
||||||
ech := obj.ECHeader()
|
ech := obj.ECHeader()
|
||||||
if ech == nil {
|
if ech == nil {
|
||||||
|
@ -540,134 +304,6 @@ func IsUserObject(obj *objectSDK.Object) bool {
|
||||||
return ech.Index() == 0 && (ech.ParentSplitID() == nil || ech.ParentSplitParentID() != nil)
|
return ech.Index() == 0 && (ech.ParentSplitID() == nil || ech.ParentSplitParentID() != nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ZeroSizeContainers returns containers with size = 0.
|
|
||||||
func (db *DB) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) {
|
|
||||||
var (
|
|
||||||
startedAt = time.Now()
|
|
||||||
success = false
|
|
||||||
)
|
|
||||||
defer func() {
|
|
||||||
db.metrics.AddMethodDuration("ZeroSizeContainers", time.Since(startedAt), success)
|
|
||||||
}()
|
|
||||||
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroSizeContainers")
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
db.modeMtx.RLock()
|
|
||||||
defer db.modeMtx.RUnlock()
|
|
||||||
|
|
||||||
var result []cid.ID
|
|
||||||
lastKey := make([]byte, cidSize)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
completed, err := db.containerSizesNextBatch(lastKey, func(contID cid.ID, size uint64) {
|
|
||||||
if size == 0 {
|
|
||||||
result = append(result, contID)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if completed {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
success = true
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) containerSizesNextBatch(lastKey []byte, f func(cid.ID, uint64)) (bool, error) {
|
|
||||||
db.modeMtx.RLock()
|
|
||||||
defer db.modeMtx.RUnlock()
|
|
||||||
|
|
||||||
if db.mode.NoMetabase() {
|
|
||||||
return false, ErrDegradedMode
|
|
||||||
}
|
|
||||||
|
|
||||||
counter := 0
|
|
||||||
const batchSize = 1000
|
|
||||||
|
|
||||||
err := db.database.View(func(tx *bbolt.Tx) error {
|
|
||||||
b := tx.Bucket(containerVolumeBucketName)
|
|
||||||
c := b.Cursor()
|
|
||||||
var key, value []byte
|
|
||||||
for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() {
|
|
||||||
if bytes.Equal(lastKey, key) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
copy(lastKey, key)
|
|
||||||
|
|
||||||
size := parseContainerSize(value)
|
|
||||||
var id cid.ID
|
|
||||||
if err := id.Decode(key); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
f(id, size)
|
|
||||||
|
|
||||||
counter++
|
|
||||||
if counter == batchSize {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if counter < batchSize {
|
|
||||||
return ErrInterruptIterator
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, ErrInterruptIterator) {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
return false, metaerr.Wrap(err)
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error {
|
|
||||||
var (
|
|
||||||
startedAt = time.Now()
|
|
||||||
success = false
|
|
||||||
)
|
|
||||||
defer func() {
|
|
||||||
db.metrics.AddMethodDuration("DeleteContainerSize", time.Since(startedAt), success)
|
|
||||||
}()
|
|
||||||
|
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerSize",
|
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.Stringer("container_id", id),
|
|
||||||
))
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
db.modeMtx.RLock()
|
|
||||||
defer db.modeMtx.RUnlock()
|
|
||||||
|
|
||||||
if db.mode.NoMetabase() {
|
|
||||||
return ErrDegradedMode
|
|
||||||
}
|
|
||||||
|
|
||||||
if db.mode.ReadOnly() {
|
|
||||||
return ErrReadOnlyMode
|
|
||||||
}
|
|
||||||
|
|
||||||
err := db.database.Update(func(tx *bbolt.Tx) error {
|
|
||||||
b := tx.Bucket(containerVolumeBucketName)
|
|
||||||
|
|
||||||
key := make([]byte, cidSize)
|
|
||||||
id.Encode(key)
|
|
||||||
return b.Delete(key)
|
|
||||||
})
|
|
||||||
success = err == nil
|
|
||||||
return metaerr.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ZeroCountContainers returns containers with objects count = 0 in metabase.
|
// ZeroCountContainers returns containers with objects count = 0 in metabase.
|
||||||
func (db *DB) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) {
|
func (db *DB) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) {
|
||||||
var (
|
var (
|
||||||
|
@ -690,24 +326,18 @@ func (db *DB) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) {
|
||||||
|
|
||||||
var result []cid.ID
|
var result []cid.ID
|
||||||
|
|
||||||
lastKey := make([]byte, cidSize)
|
var cc map[cid.ID]ObjectCounters
|
||||||
for {
|
err := db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
select {
|
var err error
|
||||||
case <-ctx.Done():
|
cc, err = containerObjectCounters(ctx, s, nil)
|
||||||
return nil, ctx.Err()
|
return err
|
||||||
default:
|
})
|
||||||
}
|
if err != nil {
|
||||||
|
return nil, metaerr.Wrap(err)
|
||||||
completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) {
|
}
|
||||||
if entity.IsZero() {
|
for cnrID, c := range cc {
|
||||||
result = append(result, id)
|
if c.IsZero() {
|
||||||
}
|
result = append(result, cnrID)
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, metaerr.Wrap(err)
|
|
||||||
}
|
|
||||||
if completed {
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
success = true
|
success = true
|
||||||
|
@ -740,13 +370,112 @@ func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error {
|
||||||
return ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
err := db.database.Update(func(tx *bbolt.Tx) error {
|
prefix := make([]byte, containerObjectCountPrefixSize)
|
||||||
b := tx.Bucket(containerCounterBucketName)
|
prefix[0] = containerCountersPrefix
|
||||||
|
id.Encode(prefix[1:])
|
||||||
|
|
||||||
key := make([]byte, cidSize)
|
err := db.batch(func(b *pebble.Batch) error {
|
||||||
id.Encode(key)
|
return deleteByPrefix(ctx, b, prefix)
|
||||||
return b.Delete(key)
|
|
||||||
})
|
})
|
||||||
success = err == nil
|
if err != nil {
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
success = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func containerObjectCounterInitialized(ctx context.Context, r pebble.Reader) bool {
|
||||||
|
_, err := containerObjectCounters(ctx, r, nil)
|
||||||
|
return err == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func containerObjectCounters(ctx context.Context, r pebble.Reader, cnrID *cid.ID) (map[cid.ID]ObjectCounters, error) {
|
||||||
|
prefix := []byte{containerCountersPrefix}
|
||||||
|
if cnrID != nil {
|
||||||
|
buf := make([]byte, cidSize)
|
||||||
|
cnrID.Encode(buf)
|
||||||
|
prefix = append(prefix, buf...)
|
||||||
|
}
|
||||||
|
it, err := r.NewIter(&pebble.IterOptions{
|
||||||
|
LowerBound: prefix,
|
||||||
|
OnlyReadGuaranteedDurable: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
counters := make(map[cid.ID]objectCounterValue)
|
||||||
|
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, errors.Join(ctx.Err(), it.Close())
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
var cnrID cid.ID
|
||||||
|
if !parseContainerID(&cnrID, it.Key()) {
|
||||||
|
return nil, errors.Join(errInvalidContainerIDValue, it.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
oc, err := parseContainerCounterValue(it.Value())
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Join(err, it.Close())
|
||||||
|
}
|
||||||
|
counters[cnrID] = mergeObjectCounterValues(counters[cnrID], oc)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := it.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return normilizeObjectCounters(counters)
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseContainerCounterValue return phy, logic values.
|
||||||
|
func parseContainerCounterValue(buf []byte) (objectCounterValue, error) {
|
||||||
|
if len(buf) != 24 {
|
||||||
|
return objectCounterValue{}, errInvalidValueLenght
|
||||||
|
}
|
||||||
|
return objectCounterValue{
|
||||||
|
Phy: int64(binary.LittleEndian.Uint64(buf[:8])),
|
||||||
|
Logic: int64(binary.LittleEndian.Uint64(buf[8:16])),
|
||||||
|
User: int64(binary.LittleEndian.Uint64(buf[16:])),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func marshalContainerCounterValue(v objectCounterValue) []byte {
|
||||||
|
buf := make([]byte, 24)
|
||||||
|
binary.LittleEndian.PutUint64(buf[:8], uint64(v.Phy))
|
||||||
|
binary.LittleEndian.PutUint64(buf[8:16], uint64(v.Logic))
|
||||||
|
binary.LittleEndian.PutUint64(buf[16:], uint64(v.User))
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
func mergeObjectCounterValues(lhs, rhs objectCounterValue) objectCounterValue {
|
||||||
|
lhs.Logic += rhs.Logic
|
||||||
|
lhs.Phy += rhs.Phy
|
||||||
|
lhs.User += rhs.User
|
||||||
|
return lhs
|
||||||
|
}
|
||||||
|
|
||||||
|
func normilizeObjectCounters(values map[cid.ID]objectCounterValue) (map[cid.ID]ObjectCounters, error) {
|
||||||
|
result := make(map[cid.ID]ObjectCounters, len(values))
|
||||||
|
for k, v := range values {
|
||||||
|
if v.Logic < 0 || v.Phy < 0 || v.User < 0 {
|
||||||
|
return nil, fmt.Errorf("invalid container object counter for container ID %s", k.EncodeToString())
|
||||||
|
}
|
||||||
|
var oc ObjectCounters
|
||||||
|
oc.Logic = uint64(v.Logic)
|
||||||
|
oc.Phy = uint64(v.Phy)
|
||||||
|
oc.User = uint64(v.User)
|
||||||
|
result[k] = oc
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseContainerID(dst *cid.ID, name []byte) bool {
|
||||||
|
if len(name) < bucketKeySize {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return dst.Decode(name[1:bucketKeySize]) == nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -181,26 +181,26 @@ func isExpired(ctx context.Context, r pebble.Reader, addr oid.Address, currEpoch
|
||||||
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return false, ctx.Err()
|
return false, errors.Join(ctx.Err(), it.Close())
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
expEpoch, err := expirationEpochFromExpiredKey(it.Key())
|
expEpoch, err := expirationEpochFromExpiredKey(it.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, errors.Join(err, it.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
if expEpoch >= currEpoch {
|
if expEpoch >= currEpoch {
|
||||||
return false, nil // keys are ordered by epoch, so next items will be discarded anyway.
|
return false, it.Close() // keys are ordered by epoch, so next items will be discarded anyway.
|
||||||
}
|
}
|
||||||
|
|
||||||
curAddr, err := addressFromExpiredKey(it.Key())
|
curAddr, err := addressFromExpiredKey(it.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, errors.Join(err, it.Close())
|
||||||
}
|
}
|
||||||
if curAddr == addr {
|
if curAddr == addr {
|
||||||
return true, nil
|
return true, it.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false, nil
|
return false, it.Close()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,23 +1,19 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"github.com/cockroachdb/pebble"
|
||||||
|
"github.com/dgraph-io/badger/v4"
|
||||||
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExpiredObject is a descriptor of expired object from DB.
|
// ExpiredObject is a descriptor of expired object from DB.
|
||||||
|
@ -44,99 +40,7 @@ type ExpiredObjectHandler func(*ExpiredObject) error
|
||||||
// as a "break" keyword.
|
// as a "break" keyword.
|
||||||
var ErrInterruptIterator = logicerr.New("iterator is interrupted")
|
var ErrInterruptIterator = logicerr.New("iterator is interrupted")
|
||||||
|
|
||||||
// IterateExpired iterates over all objects in DB which are out of date
|
var errInvalidAttributeKey = errors.New("invalid userr attribute key")
|
||||||
// relative to epoch. Locked objects are not included (do not confuse
|
|
||||||
// with objects of type LOCK).
|
|
||||||
//
|
|
||||||
// If h returns ErrInterruptIterator, nil returns immediately.
|
|
||||||
// Returns other errors of h directly.
|
|
||||||
func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectHandler) error {
|
|
||||||
var (
|
|
||||||
startedAt = time.Now()
|
|
||||||
success = false
|
|
||||||
)
|
|
||||||
defer func() {
|
|
||||||
db.metrics.AddMethodDuration("IterateExpired", time.Since(startedAt), success)
|
|
||||||
}()
|
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateExpired",
|
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.String("epoch", strconv.FormatUint(epoch, 10)),
|
|
||||||
))
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
db.modeMtx.RLock()
|
|
||||||
defer db.modeMtx.RUnlock()
|
|
||||||
|
|
||||||
if db.mode.NoMetabase() {
|
|
||||||
return ErrDegradedMode
|
|
||||||
}
|
|
||||||
|
|
||||||
err := metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error {
|
|
||||||
return db.iterateExpired(tx, epoch, h)
|
|
||||||
}))
|
|
||||||
success = err == nil
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error {
|
|
||||||
err := tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
|
||||||
cidBytes := cidFromAttributeBucket(name, objectV2.SysAttributeExpEpoch)
|
|
||||||
if cidBytes == nil {
|
|
||||||
cidBytes = cidFromAttributeBucket(name, objectV2.SysAttributeExpEpochNeoFS)
|
|
||||||
if cidBytes == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var cnrID cid.ID
|
|
||||||
err := cnrID.Decode(cidBytes)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not parse container ID of expired bucket: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return b.ForEachBucket(func(expKey []byte) error {
|
|
||||||
bktExpired := b.Bucket(expKey)
|
|
||||||
expiresAfter, err := strconv.ParseUint(string(expKey), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not parse expiration epoch: %w", err)
|
|
||||||
} else if expiresAfter >= epoch {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return bktExpired.ForEach(func(idKey, _ []byte) error {
|
|
||||||
var id oid.ID
|
|
||||||
|
|
||||||
err = id.Decode(idKey)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not parse ID of expired object: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ignore locked objects.
|
|
||||||
//
|
|
||||||
// To slightly optimize performance we can check only REGULAR objects
|
|
||||||
// (only they can be locked), but it's more reliable.
|
|
||||||
if objectLocked(tx, cnrID, id) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var addr oid.Address
|
|
||||||
addr.SetContainer(cnrID)
|
|
||||||
addr.SetObject(id)
|
|
||||||
|
|
||||||
return h(&ExpiredObject{
|
|
||||||
typ: firstIrregularObjectType(tx, cnrID, idKey),
|
|
||||||
addr: addr,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
if errors.Is(err, ErrInterruptIterator) {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// IterateCoveredByTombstones iterates over all objects in DB which are covered
|
// IterateCoveredByTombstones iterates over all objects in DB which are covered
|
||||||
// by tombstone with string address from tss. Locked objects are not included
|
// by tombstone with string address from tss. Locked objects are not included
|
||||||
|
@ -164,69 +68,98 @@ func (db *DB) IterateCoveredByTombstones(ctx context.Context, tss map[string]oid
|
||||||
return ErrDegradedMode
|
return ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.database.View(func(tx *bbolt.Tx) error {
|
return db.database.View(func(tx *badger.Txn) error {
|
||||||
return db.iterateCoveredByTombstones(tx, tss, h)
|
return db.iterateCoveredByTombstones(ctx, tx, tss, h)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]oid.Address, h func(oid.Address) error) error {
|
func (db *DB) iterateCoveredByTombstones(ctx context.Context, tx *badger.Txn, tss map[string]oid.Address, h func(oid.Address) error) error {
|
||||||
bktGraveyard := tx.Bucket(graveyardBucketName)
|
prefix := []byte{graveyardPrefix}
|
||||||
|
it := tx.NewIterator(badger.IteratorOptions{
|
||||||
|
PrefetchSize: badger.DefaultIteratorOptions.PrefetchSize,
|
||||||
|
Prefix: prefix,
|
||||||
|
PrefetchValues: true,
|
||||||
|
})
|
||||||
|
defer it.Close()
|
||||||
|
|
||||||
err := bktGraveyard.ForEach(func(k, v []byte) error {
|
for it.Seek(nil); it.ValidForPrefix(prefix); it.Next() {
|
||||||
var addr oid.Address
|
select {
|
||||||
if err := decodeAddressFromKey(&addr, v); err != nil {
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
var tombstoneAddress oid.Address
|
||||||
|
if err := it.Item().Value(func(val []byte) error {
|
||||||
|
var e error
|
||||||
|
tombstoneAddress, e = decodeAddressFromGrave(val)
|
||||||
|
return e
|
||||||
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, ok := tss[addr.EncodeToString()]; ok {
|
if _, ok := tss[tombstoneAddress.EncodeToString()]; !ok {
|
||||||
var addr oid.Address
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
err := decodeAddressFromKey(&addr, k)
|
var objectAddress oid.Address
|
||||||
if err != nil {
|
var err error
|
||||||
return fmt.Errorf("could not parse address of the object under tombstone: %w", err)
|
objectAddress, err = addressFromGraveyardKey(it.Item().Key())
|
||||||
}
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if objectLocked(tx, addr.Container(), addr.Object()) {
|
isLocked, err := objectLocked(ctx, tx, objectAddress.Container(), objectAddress.Object())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if isLocked {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := h(objectAddress); err != nil {
|
||||||
|
if errors.Is(err, ErrInterruptIterator) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
return h(addr)
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
func iteratePhyObjects(r pebble.Reader, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
|
||||||
|
if err := iteratePhyObjectsWithPrefix(r, primaryPrefix, f); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := iteratePhyObjectsWithPrefix(r, lockersPrefix, f); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := iteratePhyObjectsWithPrefix(r, tombstonePrefix, f); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func iteratePhyObjectsWithPrefix(r pebble.Reader, typePrefix byte, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
|
||||||
|
prefix := []byte{typePrefix}
|
||||||
|
it, err := r.NewIter(&pebble.IterOptions{
|
||||||
|
LowerBound: prefix,
|
||||||
|
OnlyReadGuaranteedDurable: true,
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
if errors.Is(err, ErrInterruptIterator) {
|
return err
|
||||||
err = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||||
}
|
addr, err := addressFromKey(typePrefix, it.Key())
|
||||||
|
if err != nil {
|
||||||
func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
|
return errors.Join(err, it.Close())
|
||||||
var cid cid.ID
|
|
||||||
var oid oid.ID
|
|
||||||
obj := objectSDK.New()
|
|
||||||
|
|
||||||
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
|
||||||
b58CID, postfix := parseContainerIDWithPrefix(&cid, name)
|
|
||||||
if len(b58CID) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
obj := objectSDK.New()
|
||||||
switch postfix {
|
if err := obj.Unmarshal(it.Value()); err != nil {
|
||||||
case primaryPrefix,
|
return errors.Join(err, it.Close())
|
||||||
lockersPrefix,
|
|
||||||
tombstonePrefix:
|
|
||||||
default:
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
if err := f(addr.Container(), addr.Object(), obj); err != nil {
|
||||||
return b.ForEach(func(k, v []byte) error {
|
return errors.Join(err, it.Close())
|
||||||
if oid.Decode(k) == nil && obj.Unmarshal(v) == nil {
|
}
|
||||||
return f(cid, oid, obj)
|
}
|
||||||
}
|
return it.Close()
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue