From 1ed7744d5aad068341892fde08e1573fcbb45817 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 1 Jul 2024 22:11:23 +0300 Subject: [PATCH] [#9999] metabase: Fix db engine to pebble in counter.go Signed-off-by: Dmitrii Stepanov --- .../metabase/containers.go | 11 +- pkg/local_object_storage/metabase/counter.go | 717 ++++++------------ pkg/local_object_storage/metabase/expired.go | 12 +- .../metabase/iterators.go | 229 ++---- 4 files changed, 316 insertions(+), 653 deletions(-) diff --git a/pkg/local_object_storage/metabase/containers.go b/pkg/local_object_storage/metabase/containers.go index c8a1318bc..70b156a37 100644 --- a/pkg/local_object_storage/metabase/containers.go +++ b/pkg/local_object_storage/metabase/containers.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/binary" + "errors" "fmt" "time" @@ -67,7 +68,7 @@ func containers(ctx context.Context, r pebble.Reader) ([]cid.ID, error) { } } - return result, nil + return result, it.Close() } func parseContainerIDWithIgnore(dst *cid.ID, name []byte, ignore map[string]struct{}) bool { @@ -185,23 +186,23 @@ func (db *DB) containerSizesInternal(ctx context.Context, id *cid.ID) (map[cid.I for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() { select { case <-ctx.Done(): - return ctx.Err() + return errors.Join(ctx.Err(), it.Close()) default: } key := it.Key() var cnr cid.ID if err := cnr.Decode(key[1:containerSizePrefixSize]); err != nil { - return fmt.Errorf("invalid container size key: %w", err) + return errors.Join(fmt.Errorf("invalid container size key: %w", err), it.Close()) } value, ok := parseSize(it.Value()) if !ok { - return fmt.Errorf("invalid container size value for container %s", cnr) + return errors.Join(fmt.Errorf("invalid container size value for container %s", cnr), it.Close()) } result[cnr] += value } - return nil + return it.Close() }) if err != nil { return nil, metaerr.Wrap(err) diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index c007811d0..0d3c4f16a 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -13,32 +13,21 @@ import ( cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) -var ( - objectPhyCounterKey = []byte("phy_counter") - objectLogicCounterKey = []byte("logic_counter") - objectUserCounterKey = []byte("user_counter") -) - var ( errInvalidKeyLenght = errors.New("invalid key length") errInvalidKeyPrefix = errors.New("invalid key prefix") errInvalidValueLenght = errors.New("invalid value length") errInvalidContainerIDValue = errors.New("invalid container ID value") - errInvalidAttributeKey = errors.New("invalid userr attribute key") ) -type objectType uint8 - const ( - _ objectType = iota - phy - logical - user + containerObjectCountKeySize = 1 + cidSize + 2 + containerObjectCountPrefixSize = 1 + cidSize ) // ObjectCounters groups object counter @@ -53,12 +42,18 @@ func (o ObjectCounters) IsZero() bool { return o.Phy == 0 && o.Logic == 0 && o.User == 0 } +type objectCounterValue struct { + Logic int64 + Phy int64 + User int64 +} + // ObjectCounters returns object counters that metabase has // tracked since it was opened and initialized. // // Returns only the errors that do not allow reading counter -// in Bolt database. -func (db *DB) ObjectCounters() (cc ObjectCounters, err error) { +// in badger database. +func (db *DB) ObjectCounters(ctx context.Context) (ObjectCounters, error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -66,29 +61,22 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) { return ObjectCounters{}, ErrDegradedMode } - err = db.database.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(shardInfoBucket) - if b != nil { - data := b.Get(objectPhyCounterKey) - if len(data) == 8 { - cc.Phy = binary.LittleEndian.Uint64(data) - } - - data = b.Get(objectLogicCounterKey) - if len(data) == 8 { - cc.Logic = binary.LittleEndian.Uint64(data) - } - - data = b.Get(objectUserCounterKey) - if len(data) == 8 { - cc.User = binary.LittleEndian.Uint64(data) - } - } - - return nil + var cc map[cid.ID]ObjectCounters + err := db.snapshot(func(s *pebble.Snapshot) error { + var err error + cc, err = containerObjectCounters(ctx, s, nil) + return err }) - - return cc, metaerr.Wrap(err) + if err != nil { + return ObjectCounters{}, metaerr.Wrap(err) + } + var result ObjectCounters + for _, v := range cc { + result.Logic += v.Logic + result.Phy += v.Phy + result.User += v.User + } + return result, nil } type ContainerCounters struct { @@ -99,7 +87,7 @@ type ContainerCounters struct { // that metabase has tracked since it was opened and initialized. // // Returns only the errors that do not allow reading counter -// in Bolt database. +// in badger database. // // It is guaranteed that the ContainerCounters fields are not nil. func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) { @@ -117,84 +105,16 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) cc := ContainerCounters{ Counts: make(map[cid.ID]ObjectCounters), } - - lastKey := make([]byte, cidSize) - - // there is no limit for containers count, so use batching with cancellation - for { - select { - case <-ctx.Done(): - return cc, ctx.Err() - default: - } - - completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) { - cc.Counts[id] = entity - }) - if err != nil { - return cc, err - } - if completed { - break - } - } - - success = true - return cc, nil -} - -func (db *DB) containerCountersNextBatch(lastKey []byte, f func(id cid.ID, entity ObjectCounters)) (bool, error) { - db.modeMtx.RLock() - defer db.modeMtx.RUnlock() - - if db.mode.NoMetabase() { - return false, ErrDegradedMode - } - - counter := 0 - const batchSize = 1000 - - err := db.database.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(containerCounterBucketName) - if b == nil { - return ErrInterruptIterator - } - c := b.Cursor() - var key, value []byte - for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() { - if bytes.Equal(lastKey, key) { - continue - } - copy(lastKey, key) - - cnrID, err := parseContainerCounterKey(key) - if err != nil { - return err - } - ent, err := parseContainerCounterValue(value) - if err != nil { - return err - } - f(cnrID, ent) - - counter++ - if counter == batchSize { - break - } - } - - if counter < batchSize { // last batch - return ErrInterruptIterator - } - return nil + err := db.snapshot(func(s *pebble.Snapshot) error { + var err error + cc.Counts, err = containerObjectCounters(ctx, s, nil) + return err }) if err != nil { - if errors.Is(err, ErrInterruptIterator) { - return true, nil - } - return false, metaerr.Wrap(err) + return ContainerCounters{}, metaerr.Wrap(err) } - return false, nil + success = true + return cc, nil } func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, error) { @@ -216,144 +136,65 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er return ObjectCounters{}, ErrDegradedMode } - var result ObjectCounters - - err := db.database.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(containerCounterBucketName) - key := make([]byte, cidSize) - id.Encode(key) - v := b.Get(key) - if v == nil { - return nil - } + var cc map[cid.ID]ObjectCounters + err := db.snapshot(func(s *pebble.Snapshot) error { var err error - result, err = parseContainerCounterValue(v) + cc, err = containerObjectCounters(ctx, s, &id) return err }) - - return result, metaerr.Wrap(err) + if err != nil { + return ObjectCounters{}, metaerr.Wrap(err) + } + return cc[id], nil } -func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { - b := tx.Bucket(shardInfoBucket) - if b == nil { - return db.incContainerObjectCounter(tx, cnrID, isUserObject) - } +func containerCounterKey(cnrID cid.ID, bucketID uint16) []byte { + result := make([]byte, containerObjectCountKeySize) + result[0] = containerCountersPrefix + cnrID.Encode(result[1:]) + binary.LittleEndian.PutUint16(result[containerObjectCountPrefixSize:], bucketID) + return result +} - if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil { - return fmt.Errorf("could not increase phy object counter: %w", err) - } - if err := db.updateShardObjectCounterBucket(b, logical, 1, true); err != nil { - return fmt.Errorf("could not increase logical object counter: %w", err) +func incCounters(b *pebble.Batch, cnrID cid.ID, isUserObject bool, bucketID uint16) error { + delta := objectCounterValue{ + Logic: 1, + Phy: 1, } if isUserObject { - if err := db.updateShardObjectCounterBucket(b, user, 1, true); err != nil { - return fmt.Errorf("could not increase user object counter: %w", err) - } + delta.User = 1 } - return db.incContainerObjectCounter(tx, cnrID, isUserObject) + return editContainerCounterValue(b, cnrID, delta, bucketID) } -func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error { - b := tx.Bucket(shardInfoBucket) - if b == nil { - return nil - } - - return db.updateShardObjectCounterBucket(b, typ, delta, inc) -} - -func (*DB) updateShardObjectCounterBucket(b *bbolt.Bucket, typ objectType, delta uint64, inc bool) error { - var counter uint64 - var counterKey []byte - - switch typ { - case phy: - counterKey = objectPhyCounterKey - case logical: - counterKey = objectLogicCounterKey - case user: - counterKey = objectUserCounterKey - default: - panic("unknown object type counter") - } - - data := b.Get(counterKey) - if len(data) == 8 { - counter = binary.LittleEndian.Uint64(data) - } - - if inc { - counter += delta - } else if counter <= delta { - counter = 0 - } else { - counter -= delta - } - - newCounter := make([]byte, 8) - binary.LittleEndian.PutUint64(newCounter, counter) - - return b.Put(counterKey, newCounter) -} - -func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { - b := tx.Bucket(containerCounterBucketName) - if b == nil { - return nil - } - - key := make([]byte, cidSize) +func updateContainerCounter(b *pebble.Batch, delta map[cid.ID]objectCounterValue, bucketIDs map[cid.ID]uint16) error { for cnrID, cnrDelta := range delta { - cnrID.Encode(key) - if err := db.editContainerCounterValue(b, key, cnrDelta, inc); err != nil { + bucketID, found := bucketIDs[cnrID] + if !found { + return fmt.Errorf("bucket ID not found for container %s", cnrID) + } + if err := editContainerCounterValue(b, cnrID, cnrDelta, bucketID); err != nil { return err } } return nil } -func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error { - var entity ObjectCounters - var err error - data := b.Get(key) - if len(data) > 0 { - entity, err = parseContainerCounterValue(data) +func editContainerCounterValue(b *pebble.Batch, cnrID cid.ID, delta objectCounterValue, bucketID uint16) error { + key := containerCounterKey(cnrID, bucketID) + val, err := valueSafe(b, key) + if err != nil { + return err + } + setValue := delta + if val != nil { + exited, err := parseContainerCounterValue(val) if err != nil { return err } + setValue = mergeObjectCounterValues(setValue, exited) } - entity.Phy = nextValue(entity.Phy, delta.Phy, inc) - entity.Logic = nextValue(entity.Logic, delta.Logic, inc) - entity.User = nextValue(entity.User, delta.User, inc) - value := containerCounterValue(entity) - return b.Put(key, value) -} - -func nextValue(existed, delta uint64, inc bool) uint64 { - if inc { - existed += delta - } else if existed <= delta { - existed = 0 - } else { - existed -= delta - } - return existed -} - -func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { - b := tx.Bucket(containerCounterBucketName) - if b == nil { - return nil - } - - key := make([]byte, cidSize) - cnrID.Encode(key) - c := ObjectCounters{Logic: 1, Phy: 1} - if isUserObject { - c.User = 1 - } - return db.editContainerCounterValue(b, key, c, true) + return b.Set(key, marshalContainerCounterValue(setValue), pebble.Sync) } // syncCounter updates object counters according to metabase state: @@ -362,34 +203,31 @@ func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject // // Does nothing if counters are not empty and force is false. If force is // true, updates the counters anyway. -func syncCounter(tx *bbolt.Tx, force bool) error { - shardInfoB, err := createBucketLikelyExists(tx, shardInfoBucket) - if err != nil { - return fmt.Errorf("could not get shard info bucket: %w", err) - } - shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 && - len(shardInfoB.Get(objectLogicCounterKey)) == 8 && - len(shardInfoB.Get(objectUserCounterKey)) == 8 - containerObjectCounterInitialized := containerObjectCounterInitialized(tx) - if !force && shardObjectCounterInitialized && containerObjectCounterInitialized { - // the counters are already inited +func syncCounter(ctx context.Context, b *pebble.Batch, force bool) error { + if !force && containerObjectCounterInitialized(ctx, b) { return nil } - containerCounterB, err := createBucketLikelyExists(tx, containerCounterBucketName) + // drop existed counters + err := deleteByPrefix(ctx, b, []byte{containerCountersPrefix}) if err != nil { - return fmt.Errorf("could not get container counter bucket: %w", err) + return err } + counters, err := getActualObjectCounters(b) + if err != nil { + return err + } + + return setObjectCounters(b, counters) +} + +func getActualObjectCounters(r pebble.Reader) (map[cid.ID]ObjectCounters, error) { var addr oid.Address + var isAvailable bool counters := make(map[cid.ID]ObjectCounters) - graveyardBKT := tx.Bucket(graveyardBucketName) - garbageBKT := tx.Bucket(garbageBucketName) - key := make([]byte, addressKeySize) - var isAvailable bool - - err = iteratePhyObjects(tx, func(cnr cid.ID, objID oid.ID, obj *objectSDK.Object) error { + err := iteratePhyObjects(r, func(cnr cid.ID, objID oid.ID, obj *objectSDK.Object) error { if v, ok := counters[cnr]; ok { v.Phy++ counters[cnr] = v @@ -403,9 +241,12 @@ func syncCounter(tx *bbolt.Tx, force bool) error { addr.SetObject(objID) isAvailable = false - // check if an object is available: not with GCMark - // and not covered with a tombstone - if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 { + st, err := inGraveyardWithKey(r, addr) + if err != nil { + return err + } + + if st == 0 { if v, ok := counters[cnr]; ok { v.Logic++ counters[cnr] = v @@ -431,104 +272,27 @@ func syncCounter(tx *bbolt.Tx, force bool) error { return nil }) if err != nil { - return fmt.Errorf("could not iterate objects: %w", err) + return nil, fmt.Errorf("could not iterate objects: %w", err) } - - return setObjectCounters(counters, shardInfoB, containerCounterB) + return counters, nil } -func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error { - var phyTotal uint64 - var logicTotal uint64 - var userTotal uint64 - key := make([]byte, cidSize) +func setObjectCounters(b *pebble.Batch, counters map[cid.ID]ObjectCounters) error { for cnrID, count := range counters { - phyTotal += count.Phy - logicTotal += count.Logic - userTotal += count.User - - cnrID.Encode(key) - value := containerCounterValue(count) - err := containerCounterB.Put(key, value) - if err != nil { - return fmt.Errorf("could not update phy container object counter: %w", err) + delta := objectCounterValue{ + Logic: int64(count.Logic), + Phy: int64(count.Phy), + User: int64(count.User), + } + // this function called by init or refill, so no other updates should happen + // so here bucketID = 0 can be used + if err := editContainerCounterValue(b, cnrID, delta, 0); err != nil { + return err } } - phyData := make([]byte, 8) - binary.LittleEndian.PutUint64(phyData, phyTotal) - - err := shardInfoB.Put(objectPhyCounterKey, phyData) - if err != nil { - return fmt.Errorf("could not update phy object counter: %w", err) - } - - logData := make([]byte, 8) - binary.LittleEndian.PutUint64(logData, logicTotal) - - err = shardInfoB.Put(objectLogicCounterKey, logData) - if err != nil { - return fmt.Errorf("could not update logic object counter: %w", err) - } - - userData := make([]byte, 8) - binary.LittleEndian.PutUint64(userData, userTotal) - - err = shardInfoB.Put(objectUserCounterKey, userData) - if err != nil { - return fmt.Errorf("could not update user object counter: %w", err) - } - return nil } -func containerCounterValue(entity ObjectCounters) []byte { - res := make([]byte, 24) - binary.LittleEndian.PutUint64(res, entity.Phy) - binary.LittleEndian.PutUint64(res[8:], entity.Logic) - binary.LittleEndian.PutUint64(res[16:], entity.User) - return res -} - -func parseContainerCounterKey(buf []byte) (cid.ID, error) { - if len(buf) != cidSize { - return cid.ID{}, errInvalidKeyLenght - } - var cnrID cid.ID - if err := cnrID.Decode(buf); err != nil { - return cid.ID{}, fmt.Errorf("failed to decode container ID: %w", err) - } - return cnrID, nil -} - -// parseContainerCounterValue return phy, logic values. -func parseContainerCounterValue(buf []byte) (ObjectCounters, error) { - if len(buf) != 24 { - return ObjectCounters{}, errInvalidValueLenght - } - return ObjectCounters{ - Phy: binary.LittleEndian.Uint64(buf), - Logic: binary.LittleEndian.Uint64(buf[8:16]), - User: binary.LittleEndian.Uint64(buf[16:]), - }, nil -} - -func containerObjectCounterInitialized(tx *bbolt.Tx) bool { - b := tx.Bucket(containerCounterBucketName) - if b == nil { - return false - } - k, v := b.Cursor().First() - if k == nil && v == nil { - return true - } - _, err := parseContainerCounterKey(k) - if err != nil { - return false - } - _, err = parseContainerCounterValue(v) - return err == nil -} - func IsUserObject(obj *objectSDK.Object) bool { ech := obj.ECHeader() if ech == nil { @@ -540,134 +304,6 @@ func IsUserObject(obj *objectSDK.Object) bool { return ech.Index() == 0 && (ech.ParentSplitID() == nil || ech.ParentSplitParentID() != nil) } -// ZeroSizeContainers returns containers with size = 0. -func (db *DB) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) { - var ( - startedAt = time.Now() - success = false - ) - defer func() { - db.metrics.AddMethodDuration("ZeroSizeContainers", time.Since(startedAt), success) - }() - - ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroSizeContainers") - defer span.End() - - db.modeMtx.RLock() - defer db.modeMtx.RUnlock() - - var result []cid.ID - lastKey := make([]byte, cidSize) - - for { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - - completed, err := db.containerSizesNextBatch(lastKey, func(contID cid.ID, size uint64) { - if size == 0 { - result = append(result, contID) - } - }) - if err != nil { - return nil, err - } - if completed { - break - } - } - - success = true - return result, nil -} - -func (db *DB) containerSizesNextBatch(lastKey []byte, f func(cid.ID, uint64)) (bool, error) { - db.modeMtx.RLock() - defer db.modeMtx.RUnlock() - - if db.mode.NoMetabase() { - return false, ErrDegradedMode - } - - counter := 0 - const batchSize = 1000 - - err := db.database.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(containerVolumeBucketName) - c := b.Cursor() - var key, value []byte - for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() { - if bytes.Equal(lastKey, key) { - continue - } - copy(lastKey, key) - - size := parseContainerSize(value) - var id cid.ID - if err := id.Decode(key); err != nil { - return err - } - f(id, size) - - counter++ - if counter == batchSize { - break - } - } - - if counter < batchSize { - return ErrInterruptIterator - } - return nil - }) - if err != nil { - if errors.Is(err, ErrInterruptIterator) { - return true, nil - } - return false, metaerr.Wrap(err) - } - return false, nil -} - -func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error { - var ( - startedAt = time.Now() - success = false - ) - defer func() { - db.metrics.AddMethodDuration("DeleteContainerSize", time.Since(startedAt), success) - }() - - _, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerSize", - trace.WithAttributes( - attribute.Stringer("container_id", id), - )) - defer span.End() - - db.modeMtx.RLock() - defer db.modeMtx.RUnlock() - - if db.mode.NoMetabase() { - return ErrDegradedMode - } - - if db.mode.ReadOnly() { - return ErrReadOnlyMode - } - - err := db.database.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(containerVolumeBucketName) - - key := make([]byte, cidSize) - id.Encode(key) - return b.Delete(key) - }) - success = err == nil - return metaerr.Wrap(err) -} - // ZeroCountContainers returns containers with objects count = 0 in metabase. func (db *DB) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) { var ( @@ -690,24 +326,18 @@ func (db *DB) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) { var result []cid.ID - lastKey := make([]byte, cidSize) - for { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - - completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) { - if entity.IsZero() { - result = append(result, id) - } - }) - if err != nil { - return nil, metaerr.Wrap(err) - } - if completed { - break + var cc map[cid.ID]ObjectCounters + err := db.snapshot(func(s *pebble.Snapshot) error { + var err error + cc, err = containerObjectCounters(ctx, s, nil) + return err + }) + if err != nil { + return nil, metaerr.Wrap(err) + } + for cnrID, c := range cc { + if c.IsZero() { + result = append(result, cnrID) } } success = true @@ -740,13 +370,112 @@ func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error { return ErrReadOnlyMode } - err := db.database.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket(containerCounterBucketName) + prefix := make([]byte, containerObjectCountPrefixSize) + prefix[0] = containerCountersPrefix + id.Encode(prefix[1:]) - key := make([]byte, cidSize) - id.Encode(key) - return b.Delete(key) + err := db.batch(func(b *pebble.Batch) error { + return deleteByPrefix(ctx, b, prefix) }) - success = err == nil - return metaerr.Wrap(err) + if err != nil { + return metaerr.Wrap(err) + } + success = true + return nil +} + +func containerObjectCounterInitialized(ctx context.Context, r pebble.Reader) bool { + _, err := containerObjectCounters(ctx, r, nil) + return err == nil +} + +func containerObjectCounters(ctx context.Context, r pebble.Reader, cnrID *cid.ID) (map[cid.ID]ObjectCounters, error) { + prefix := []byte{containerCountersPrefix} + if cnrID != nil { + buf := make([]byte, cidSize) + cnrID.Encode(buf) + prefix = append(prefix, buf...) + } + it, err := r.NewIter(&pebble.IterOptions{ + LowerBound: prefix, + OnlyReadGuaranteedDurable: true, + }) + if err != nil { + return nil, err + } + + counters := make(map[cid.ID]objectCounterValue) + for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() { + select { + case <-ctx.Done(): + return nil, errors.Join(ctx.Err(), it.Close()) + default: + } + + var cnrID cid.ID + if !parseContainerID(&cnrID, it.Key()) { + return nil, errors.Join(errInvalidContainerIDValue, it.Close()) + } + + oc, err := parseContainerCounterValue(it.Value()) + if err != nil { + return nil, errors.Join(err, it.Close()) + } + counters[cnrID] = mergeObjectCounterValues(counters[cnrID], oc) + } + + if err := it.Close(); err != nil { + return nil, err + } + + return normilizeObjectCounters(counters) +} + +// parseContainerCounterValue return phy, logic values. +func parseContainerCounterValue(buf []byte) (objectCounterValue, error) { + if len(buf) != 24 { + return objectCounterValue{}, errInvalidValueLenght + } + return objectCounterValue{ + Phy: int64(binary.LittleEndian.Uint64(buf[:8])), + Logic: int64(binary.LittleEndian.Uint64(buf[8:16])), + User: int64(binary.LittleEndian.Uint64(buf[16:])), + }, nil +} + +func marshalContainerCounterValue(v objectCounterValue) []byte { + buf := make([]byte, 24) + binary.LittleEndian.PutUint64(buf[:8], uint64(v.Phy)) + binary.LittleEndian.PutUint64(buf[8:16], uint64(v.Logic)) + binary.LittleEndian.PutUint64(buf[16:], uint64(v.User)) + return buf +} + +func mergeObjectCounterValues(lhs, rhs objectCounterValue) objectCounterValue { + lhs.Logic += rhs.Logic + lhs.Phy += rhs.Phy + lhs.User += rhs.User + return lhs +} + +func normilizeObjectCounters(values map[cid.ID]objectCounterValue) (map[cid.ID]ObjectCounters, error) { + result := make(map[cid.ID]ObjectCounters, len(values)) + for k, v := range values { + if v.Logic < 0 || v.Phy < 0 || v.User < 0 { + return nil, fmt.Errorf("invalid container object counter for container ID %s", k.EncodeToString()) + } + var oc ObjectCounters + oc.Logic = uint64(v.Logic) + oc.Phy = uint64(v.Phy) + oc.User = uint64(v.User) + result[k] = oc + } + return result, nil +} + +func parseContainerID(dst *cid.ID, name []byte) bool { + if len(name) < bucketKeySize { + return false + } + return dst.Decode(name[1:bucketKeySize]) == nil } diff --git a/pkg/local_object_storage/metabase/expired.go b/pkg/local_object_storage/metabase/expired.go index d6c455832..52ee3f571 100644 --- a/pkg/local_object_storage/metabase/expired.go +++ b/pkg/local_object_storage/metabase/expired.go @@ -181,26 +181,26 @@ func isExpired(ctx context.Context, r pebble.Reader, addr oid.Address, currEpoch for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() { select { case <-ctx.Done(): - return false, ctx.Err() + return false, errors.Join(ctx.Err(), it.Close()) default: } expEpoch, err := expirationEpochFromExpiredKey(it.Key()) if err != nil { - return false, err + return false, errors.Join(err, it.Close()) } if expEpoch >= currEpoch { - return false, nil // keys are ordered by epoch, so next items will be discarded anyway. + return false, it.Close() // keys are ordered by epoch, so next items will be discarded anyway. } curAddr, err := addressFromExpiredKey(it.Key()) if err != nil { - return false, err + return false, errors.Join(err, it.Close()) } if curAddr == addr { - return true, nil + return true, it.Close() } } - return false, nil + return false, it.Close() } diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go index b259e0aa1..8386672d3 100644 --- a/pkg/local_object_storage/metabase/iterators.go +++ b/pkg/local_object_storage/metabase/iterators.go @@ -1,23 +1,19 @@ package meta import ( + "bytes" "context" "errors" - "fmt" - "strconv" "time" - objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "github.com/cockroachdb/pebble" + "github.com/dgraph-io/badger/v4" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) // ExpiredObject is a descriptor of expired object from DB. @@ -44,99 +40,7 @@ type ExpiredObjectHandler func(*ExpiredObject) error // as a "break" keyword. var ErrInterruptIterator = logicerr.New("iterator is interrupted") -// IterateExpired iterates over all objects in DB which are out of date -// relative to epoch. Locked objects are not included (do not confuse -// with objects of type LOCK). -// -// If h returns ErrInterruptIterator, nil returns immediately. -// Returns other errors of h directly. -func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectHandler) error { - var ( - startedAt = time.Now() - success = false - ) - defer func() { - db.metrics.AddMethodDuration("IterateExpired", time.Since(startedAt), success) - }() - _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateExpired", - trace.WithAttributes( - attribute.String("epoch", strconv.FormatUint(epoch, 10)), - )) - defer span.End() - - db.modeMtx.RLock() - defer db.modeMtx.RUnlock() - - if db.mode.NoMetabase() { - return ErrDegradedMode - } - - err := metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error { - return db.iterateExpired(tx, epoch, h) - })) - success = err == nil - return err -} - -func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error { - err := tx.ForEach(func(name []byte, b *bbolt.Bucket) error { - cidBytes := cidFromAttributeBucket(name, objectV2.SysAttributeExpEpoch) - if cidBytes == nil { - cidBytes = cidFromAttributeBucket(name, objectV2.SysAttributeExpEpochNeoFS) - if cidBytes == nil { - return nil - } - } - - var cnrID cid.ID - err := cnrID.Decode(cidBytes) - if err != nil { - return fmt.Errorf("could not parse container ID of expired bucket: %w", err) - } - - return b.ForEachBucket(func(expKey []byte) error { - bktExpired := b.Bucket(expKey) - expiresAfter, err := strconv.ParseUint(string(expKey), 10, 64) - if err != nil { - return fmt.Errorf("could not parse expiration epoch: %w", err) - } else if expiresAfter >= epoch { - return nil - } - - return bktExpired.ForEach(func(idKey, _ []byte) error { - var id oid.ID - - err = id.Decode(idKey) - if err != nil { - return fmt.Errorf("could not parse ID of expired object: %w", err) - } - - // Ignore locked objects. - // - // To slightly optimize performance we can check only REGULAR objects - // (only they can be locked), but it's more reliable. - if objectLocked(tx, cnrID, id) { - return nil - } - - var addr oid.Address - addr.SetContainer(cnrID) - addr.SetObject(id) - - return h(&ExpiredObject{ - typ: firstIrregularObjectType(tx, cnrID, idKey), - addr: addr, - }) - }) - }) - }) - - if errors.Is(err, ErrInterruptIterator) { - err = nil - } - - return err -} +var errInvalidAttributeKey = errors.New("invalid userr attribute key") // IterateCoveredByTombstones iterates over all objects in DB which are covered // by tombstone with string address from tss. Locked objects are not included @@ -164,69 +68,98 @@ func (db *DB) IterateCoveredByTombstones(ctx context.Context, tss map[string]oid return ErrDegradedMode } - return db.database.View(func(tx *bbolt.Tx) error { - return db.iterateCoveredByTombstones(tx, tss, h) + return db.database.View(func(tx *badger.Txn) error { + return db.iterateCoveredByTombstones(ctx, tx, tss, h) }) } -func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]oid.Address, h func(oid.Address) error) error { - bktGraveyard := tx.Bucket(graveyardBucketName) +func (db *DB) iterateCoveredByTombstones(ctx context.Context, tx *badger.Txn, tss map[string]oid.Address, h func(oid.Address) error) error { + prefix := []byte{graveyardPrefix} + it := tx.NewIterator(badger.IteratorOptions{ + PrefetchSize: badger.DefaultIteratorOptions.PrefetchSize, + Prefix: prefix, + PrefetchValues: true, + }) + defer it.Close() - err := bktGraveyard.ForEach(func(k, v []byte) error { - var addr oid.Address - if err := decodeAddressFromKey(&addr, v); err != nil { + for it.Seek(nil); it.ValidForPrefix(prefix); it.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + var tombstoneAddress oid.Address + if err := it.Item().Value(func(val []byte) error { + var e error + tombstoneAddress, e = decodeAddressFromGrave(val) + return e + }); err != nil { return err } - if _, ok := tss[addr.EncodeToString()]; ok { - var addr oid.Address + if _, ok := tss[tombstoneAddress.EncodeToString()]; !ok { + continue + } - err := decodeAddressFromKey(&addr, k) - if err != nil { - return fmt.Errorf("could not parse address of the object under tombstone: %w", err) - } + var objectAddress oid.Address + var err error + objectAddress, err = addressFromGraveyardKey(it.Item().Key()) + if err != nil { + return err + } - if objectLocked(tx, addr.Container(), addr.Object()) { + isLocked, err := objectLocked(ctx, tx, objectAddress.Container(), objectAddress.Object()) + if err != nil { + return err + } + if isLocked { + continue + } + if err := h(objectAddress); err != nil { + if errors.Is(err, ErrInterruptIterator) { return nil } - - return h(addr) + return err } + } + return nil +} - return nil +func iteratePhyObjects(r pebble.Reader, f func(cid.ID, oid.ID, *objectSDK.Object) error) error { + if err := iteratePhyObjectsWithPrefix(r, primaryPrefix, f); err != nil { + return err + } + if err := iteratePhyObjectsWithPrefix(r, lockersPrefix, f); err != nil { + return err + } + if err := iteratePhyObjectsWithPrefix(r, tombstonePrefix, f); err != nil { + return err + } + return nil +} + +func iteratePhyObjectsWithPrefix(r pebble.Reader, typePrefix byte, f func(cid.ID, oid.ID, *objectSDK.Object) error) error { + prefix := []byte{typePrefix} + it, err := r.NewIter(&pebble.IterOptions{ + LowerBound: prefix, + OnlyReadGuaranteedDurable: true, }) - - if errors.Is(err, ErrInterruptIterator) { - err = nil + if err != nil { + return err } - return err -} - -func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) error) error { - var cid cid.ID - var oid oid.ID - obj := objectSDK.New() - - return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { - b58CID, postfix := parseContainerIDWithPrefix(&cid, name) - if len(b58CID) == 0 { - return nil + for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() { + addr, err := addressFromKey(typePrefix, it.Key()) + if err != nil { + return errors.Join(err, it.Close()) } - - switch postfix { - case primaryPrefix, - lockersPrefix, - tombstonePrefix: - default: - return nil + obj := objectSDK.New() + if err := obj.Unmarshal(it.Value()); err != nil { + return errors.Join(err, it.Close()) } - - return b.ForEach(func(k, v []byte) error { - if oid.Decode(k) == nil && obj.Unmarshal(v) == nil { - return f(cid, oid, obj) - } - - return nil - }) - }) + if err := f(addr.Container(), addr.Object(), obj); err != nil { + return errors.Join(err, it.Close()) + } + } + return it.Close() }