From 352e92c18c644481909f47945a4eec013976cbb9 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 2 Jul 2024 00:44:27 +0300 Subject: [PATCH] [#9999] metabase: Fix db engine to pebble in delete.go Signed-off-by: Dmitrii Stepanov --- .../metabase/concurrency.go | 24 + .../metabase/containers.go | 53 ++- pkg/local_object_storage/metabase/counter.go | 77 ++-- pkg/local_object_storage/metabase/db.go | 3 + pkg/local_object_storage/metabase/delete.go | 432 +++++------------- pkg/local_object_storage/metabase/get.go | 4 +- pkg/local_object_storage/metabase/put.go | 77 ++-- 7 files changed, 249 insertions(+), 421 deletions(-) create mode 100644 pkg/local_object_storage/metabase/concurrency.go diff --git a/pkg/local_object_storage/metabase/concurrency.go b/pkg/local_object_storage/metabase/concurrency.go new file mode 100644 index 000000000..b9a71abaa --- /dev/null +++ b/pkg/local_object_storage/metabase/concurrency.go @@ -0,0 +1,24 @@ +package meta + +import ( + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" +) + +type concurrency struct { + containerLocks *utilSync.KeyLocker[cid.ID] +} + +func newConcurrency() *concurrency { + return &concurrency{ + containerLocks: utilSync.NewKeyLocker[cid.ID](), + } +} + +func (c *concurrency) LockContainerID(id cid.ID) func() { + c.containerLocks.Lock(id) + + return func() { + c.containerLocks.Unlock(id) + } +} diff --git a/pkg/local_object_storage/metabase/containers.go b/pkg/local_object_storage/metabase/containers.go index 70b156a37..198978892 100644 --- a/pkg/local_object_storage/metabase/containers.go +++ b/pkg/local_object_storage/metabase/containers.go @@ -17,8 +17,7 @@ import ( ) const ( - containerSizeKeySize = 1 + cidSize + 2 - containerSizePrefixSize = 1 + cidSize + containerSizeKeySize = 1 + cidSize ) func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) { @@ -64,7 +63,7 @@ func containers(ctx context.Context, r pebble.Reader) ([]cid.ID, error) { for v := it.First(); v; v = it.Next() { if parseContainerIDWithIgnore(&cnr, it.Key(), unique) { result = append(result, cnr) - unique[string(it.Key()[1:containerSizePrefixSize])] = struct{}{} + unique[string(it.Key()[1:containerSizeKeySize])] = struct{}{} } } @@ -89,11 +88,25 @@ func (db *DB) ContainerSize(ctx context.Context, id cid.ID) (size uint64, err er return 0, ErrDegradedMode } - result, err := db.containerSizesInternal(ctx, &id) + err = db.snapshot(func(s *pebble.Snapshot) error { + val, err := valueSafe(s, containerSizeKey(id)) + if err != nil { + return err + } + if len(val) == 0 { + return nil + } + value, ok := parseSize(val) + if !ok || value < 0 { + return fmt.Errorf("invalid container size value for container %s", id) + } + size = uint64(value) + return nil + }) if err != nil { return 0, metaerr.Wrap(err) } - return result[id], nil + return size, nil } func (db *DB) ContainerSizes(ctx context.Context) (map[cid.ID]uint64, error) { @@ -104,7 +117,7 @@ func (db *DB) ContainerSizes(ctx context.Context) (map[cid.ID]uint64, error) { return nil, ErrDegradedMode } - return db.containerSizesInternal(ctx, nil) + return db.containerSizesInternal(ctx) } // ZeroSizeContainers returns containers with size = 0. @@ -123,7 +136,7 @@ func (db *DB) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() - sizes, err := db.containerSizesInternal(ctx, nil) + sizes, err := db.containerSizesInternal(ctx) if err != nil { return nil, err } @@ -162,17 +175,16 @@ func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error { return ErrReadOnlyMode } + defer db.guard.LockContainerID(id)() + return metaerr.Wrap(db.batch( func(b *pebble.Batch) error { - return deleteByPrefix(ctx, b, containerSizeKeyPrefix(id)) + return b.Delete(containerSizeKey(id), pebble.Sync) })) } -func (db *DB) containerSizesInternal(ctx context.Context, id *cid.ID) (map[cid.ID]uint64, error) { +func (db *DB) containerSizesInternal(ctx context.Context) (map[cid.ID]uint64, error) { prefix := []byte{containerSizePrefix} - if id != nil { - prefix = containerSizeKeyPrefix(*id) - } result := make(map[cid.ID]int64) err := db.snapshot(func(s *pebble.Snapshot) error { it, err := s.NewIter(&pebble.IterOptions{ @@ -192,7 +204,7 @@ func (db *DB) containerSizesInternal(ctx context.Context, id *cid.ID) (map[cid.I key := it.Key() var cnr cid.ID - if err := cnr.Decode(key[1:containerSizePrefixSize]); err != nil { + if err := cnr.Decode(key[1:containerSizeKeySize]); err != nil { return errors.Join(fmt.Errorf("invalid container size key: %w", err), it.Close()) } @@ -222,8 +234,8 @@ func normilizeContainerSizes(sizes map[cid.ID]int64) (map[cid.ID]uint64, error) return result, nil } -func changeContainerSize(b *pebble.Batch, id cid.ID, delta int64, bucketID uint16) error { - key := containerSizeKey(id, bucketID) +func changeContainerSize(b *pebble.Batch, id cid.ID, delta int64) error { + key := containerSizeKey(id) v, err := valueSafe(b, key) if err != nil { @@ -240,20 +252,11 @@ func changeContainerSize(b *pebble.Batch, id cid.ID, delta int64, bucketID uint1 return b.Set(key, value, pebble.Sync) } -// containerSizeKeyPrefix returns containerSizePrefix_CID key prefix. -func containerSizeKeyPrefix(cnr cid.ID) []byte { - result := make([]byte, containerSizePrefixSize) - result[0] = containerSizePrefix - cnr.Encode(result[1:]) - return result -} - // containerSizeKey returns containerVolumePrefix_CID_bucketID key. -func containerSizeKey(cnr cid.ID, bucketID uint16) []byte { +func containerSizeKey(cnr cid.ID) []byte { result := make([]byte, containerSizeKeySize) result[0] = containerSizePrefix cnr.Encode(result[1:]) - binary.LittleEndian.PutUint16(result[containerSizePrefixSize:], bucketID) return result } diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index 0d3c4f16a..9e3423c3e 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -26,8 +26,7 @@ var ( ) const ( - containerObjectCountKeySize = 1 + cidSize + 2 - containerObjectCountPrefixSize = 1 + cidSize + containerObjectCountKeySize = 1 + cidSize ) // ObjectCounters groups object counter @@ -64,7 +63,7 @@ func (db *DB) ObjectCounters(ctx context.Context) (ObjectCounters, error) { var cc map[cid.ID]ObjectCounters err := db.snapshot(func(s *pebble.Snapshot) error { var err error - cc, err = containerObjectCounters(ctx, s, nil) + cc, err = containerObjectCounters(ctx, s) return err }) if err != nil { @@ -107,7 +106,7 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) } err := db.snapshot(func(s *pebble.Snapshot) error { var err error - cc.Counts, err = containerObjectCounters(ctx, s, nil) + cc.Counts, err = containerObjectCounters(ctx, s) return err }) if err != nil { @@ -136,27 +135,44 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er return ObjectCounters{}, ErrDegradedMode } - var cc map[cid.ID]ObjectCounters + var res ObjectCounters err := db.snapshot(func(s *pebble.Snapshot) error { - var err error - cc, err = containerObjectCounters(ctx, s, &id) - return err + val, err := valueSafe(s, containerCounterKey(id)) + if err != nil { + return err + } + if len(val) == 0 { + return nil + } + oc, err := parseContainerCounterValue(val) + if err != nil { + return err + } + + if oc.Logic < 0 || oc.Phy < 0 || oc.User < 0 { + return fmt.Errorf("invalid container object counter for container ID %s", id.EncodeToString()) + } + + res.Logic = uint64(oc.Logic) + res.Phy = uint64(oc.Phy) + res.User = uint64(oc.User) + + return nil }) if err != nil { return ObjectCounters{}, metaerr.Wrap(err) } - return cc[id], nil + return res, nil } -func containerCounterKey(cnrID cid.ID, bucketID uint16) []byte { +func containerCounterKey(cnrID cid.ID) []byte { result := make([]byte, containerObjectCountKeySize) result[0] = containerCountersPrefix cnrID.Encode(result[1:]) - binary.LittleEndian.PutUint16(result[containerObjectCountPrefixSize:], bucketID) return result } -func incCounters(b *pebble.Batch, cnrID cid.ID, isUserObject bool, bucketID uint16) error { +func incCounters(b *pebble.Batch, cnrID cid.ID, isUserObject bool) error { delta := objectCounterValue{ Logic: 1, Phy: 1, @@ -164,24 +180,20 @@ func incCounters(b *pebble.Batch, cnrID cid.ID, isUserObject bool, bucketID uint if isUserObject { delta.User = 1 } - return editContainerCounterValue(b, cnrID, delta, bucketID) + return editContainerCounterValue(b, cnrID, delta) } -func updateContainerCounter(b *pebble.Batch, delta map[cid.ID]objectCounterValue, bucketIDs map[cid.ID]uint16) error { +func updateContainerCounter(b *pebble.Batch, delta map[cid.ID]objectCounterValue) error { for cnrID, cnrDelta := range delta { - bucketID, found := bucketIDs[cnrID] - if !found { - return fmt.Errorf("bucket ID not found for container %s", cnrID) - } - if err := editContainerCounterValue(b, cnrID, cnrDelta, bucketID); err != nil { + if err := editContainerCounterValue(b, cnrID, cnrDelta); err != nil { return err } } return nil } -func editContainerCounterValue(b *pebble.Batch, cnrID cid.ID, delta objectCounterValue, bucketID uint16) error { - key := containerCounterKey(cnrID, bucketID) +func editContainerCounterValue(b *pebble.Batch, cnrID cid.ID, delta objectCounterValue) error { + key := containerCounterKey(cnrID) val, err := valueSafe(b, key) if err != nil { return err @@ -284,9 +296,7 @@ func setObjectCounters(b *pebble.Batch, counters map[cid.ID]ObjectCounters) erro Phy: int64(count.Phy), User: int64(count.User), } - // this function called by init or refill, so no other updates should happen - // so here bucketID = 0 can be used - if err := editContainerCounterValue(b, cnrID, delta, 0); err != nil { + if err := editContainerCounterValue(b, cnrID, delta); err != nil { return err } } @@ -329,7 +339,7 @@ func (db *DB) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) { var cc map[cid.ID]ObjectCounters err := db.snapshot(func(s *pebble.Snapshot) error { var err error - cc, err = containerObjectCounters(ctx, s, nil) + cc, err = containerObjectCounters(ctx, s) return err }) if err != nil { @@ -370,12 +380,10 @@ func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error { return ErrReadOnlyMode } - prefix := make([]byte, containerObjectCountPrefixSize) - prefix[0] = containerCountersPrefix - id.Encode(prefix[1:]) + defer db.guard.LockContainerID(id)() err := db.batch(func(b *pebble.Batch) error { - return deleteByPrefix(ctx, b, prefix) + return b.Delete(containerCounterKey(id), pebble.Sync) }) if err != nil { return metaerr.Wrap(err) @@ -385,17 +393,12 @@ func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error { } func containerObjectCounterInitialized(ctx context.Context, r pebble.Reader) bool { - _, err := containerObjectCounters(ctx, r, nil) + _, err := containerObjectCounters(ctx, r) return err == nil } -func containerObjectCounters(ctx context.Context, r pebble.Reader, cnrID *cid.ID) (map[cid.ID]ObjectCounters, error) { +func containerObjectCounters(ctx context.Context, r pebble.Reader) (map[cid.ID]ObjectCounters, error) { prefix := []byte{containerCountersPrefix} - if cnrID != nil { - buf := make([]byte, cidSize) - cnrID.Encode(buf) - prefix = append(prefix, buf...) - } it, err := r.NewIter(&pebble.IterOptions{ LowerBound: prefix, OnlyReadGuaranteedDurable: true, @@ -421,7 +424,7 @@ func containerObjectCounters(ctx context.Context, r pebble.Reader, cnrID *cid.ID if err != nil { return nil, errors.Join(err, it.Close()) } - counters[cnrID] = mergeObjectCounterValues(counters[cnrID], oc) + counters[cnrID] = oc } if err := it.Close(); err != nil { diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index 159913137..60a59540d 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -58,6 +58,8 @@ type cfg struct { epochState EpochState metrics Metrics + + guard *concurrency } func defaultCfg() *cfg { @@ -68,6 +70,7 @@ func defaultCfg() *cfg { log: &logger.Logger{Logger: zap.L()}, metrics: &noopMetrics{}, dbOptions: &pebble.Options{}, + guard: newConcurrency(), } } diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 8b2f20898..8fd3b66f3 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -1,7 +1,6 @@ package meta import ( - "bytes" "context" "errors" "fmt" @@ -15,63 +14,23 @@ import ( cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" + "github.com/cockroachdb/pebble" ) var errFailedToRemoveUniqueIndexes = errors.New("can't remove unique indexes") // DeletePrm groups the parameters of Delete operation. type DeletePrm struct { - addrs []oid.Address + Address oid.Address } // DeleteRes groups the resulting values of Delete operation. type DeleteRes struct { - phyCount uint64 - logicCount uint64 - userCount uint64 - phySize uint64 - logicSize uint64 - removedByCnrID map[cid.ID]ObjectCounters -} - -// LogicCount returns the number of removed logic -// objects. -func (d DeleteRes) LogicCount() uint64 { - return d.logicCount -} - -func (d DeleteRes) UserCount() uint64 { - return d.userCount -} - -// RemovedByCnrID returns the number of removed objects by container ID. -func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters { - return d.removedByCnrID -} - -// PhyCount returns the number of removed physical objects. -func (d DeleteRes) PhyCount() uint64 { - return d.phyCount -} - -// PhySize returns the size of removed physical objects. -func (d DeleteRes) PhySize() uint64 { - return d.phySize -} - -// LogicSize returns the size of removed logical objects. -func (d DeleteRes) LogicSize() uint64 { - return d.logicSize -} - -// SetAddresses is a Delete option to set the addresses of the objects to delete. -// -// Option is required. -func (p *DeletePrm) SetAddresses(addrs ...oid.Address) { - p.addrs = addrs + PhyCount uint64 + LogicCount uint64 + UserCount uint64 + PhySize uint64 + LogicSize uint64 } type referenceNumber struct { @@ -82,8 +41,6 @@ type referenceNumber struct { obj *objectSDK.Object } -type referenceCounter map[string]*referenceNumber - // Delete removed object records from metabase indexes. func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { var ( @@ -94,10 +51,7 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { db.metrics.AddMethodDuration("Delete", time.Since(startedAt), deleted) }() - _, span := tracing.StartSpanFromContext(ctx, "metabase.Delete", - trace.WithAttributes( - attribute.Int("addr_count", len(prm.addrs)), - )) + _, span := tracing.StartSpanFromContext(ctx, "metabase.Delete") defer span.End() db.modeMtx.RLock() @@ -109,370 +63,228 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { return DeleteRes{}, ErrReadOnlyMode } + defer db.guard.LockContainerID(prm.Address.Container())() + var err error var res DeleteRes - err = db.database.Update(func(tx *bbolt.Tx) error { - res, err = db.deleteGroup(tx, prm.addrs) + err = db.batch(func(b *pebble.Batch) error { + res, err = db.deleteByAddress(ctx, b, prm.Address) return err }) if err == nil { deleted = true - for i := range prm.addrs { - storagelog.Write(db.log, - storagelog.AddressField(prm.addrs[i]), - storagelog.OpField("metabase DELETE")) - } + storagelog.Write(db.log, + storagelog.AddressField(prm.Address), + storagelog.OpField("metabase DELETE")) } return res, metaerr.Wrap(err) } // deleteGroup deletes object from the metabase. Handles removal of the // references of the split objects. -func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) { - res := DeleteRes{ - removedByCnrID: make(map[cid.ID]ObjectCounters), - } - refCounter := make(referenceCounter, len(addrs)) +func (db *DB) deleteByAddress(ctx context.Context, b *pebble.Batch, addr oid.Address) (DeleteRes, error) { + refCounter := &referenceNumber{} currEpoch := db.epochState.CurrentEpoch() - - for i := range addrs { - r, err := db.delete(tx, addrs[i], refCounter, currEpoch) - if err != nil { - return DeleteRes{}, err - } - - applyDeleteSingleResult(r, &res, addrs, i) - } - - if err := db.updateCountersDelete(tx, res); err != nil { + res, err := db.delete(ctx, b, addr, refCounter, currEpoch) + if err != nil { return DeleteRes{}, err } - for _, refNum := range refCounter { - if refNum.cur == refNum.all { - err := db.deleteObject(tx, refNum.obj, true) - if err != nil { - return DeleteRes{}, err - } - } + if err := db.updateCountersDelete(b, addr.Container(), res); err != nil { + return DeleteRes{}, err } + if refCounter.cur == refCounter.all { + err := db.deleteObject(ctx, b, refCounter.obj, true) + if err != nil { + return DeleteRes{}, err + } + } return res, nil } -func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error { - if res.phyCount > 0 { - err := db.updateShardObjectCounter(tx, phy, res.phyCount, false) - if err != nil { - return fmt.Errorf("could not decrease phy object counter: %w", err) - } - } - - if res.logicCount > 0 { - err := db.updateShardObjectCounter(tx, logical, res.logicCount, false) - if err != nil { - return fmt.Errorf("could not decrease logical object counter: %w", err) - } - } - - if res.userCount > 0 { - err := db.updateShardObjectCounter(tx, user, res.userCount, false) - if err != nil { - return fmt.Errorf("could not decrease user object counter: %w", err) - } - } - - if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil { +func (db *DB) updateCountersDelete(b *pebble.Batch, cnrID cid.ID, res DeleteRes) error { + if err := editContainerCounterValue(b, cnrID, objectCounterValue{ + Logic: -1 * int64(res.LogicCount), + Phy: -1 * int64(res.PhyCount), + User: -1 * int64(res.UserCount), + }); err != nil { return fmt.Errorf("could not decrease container object counter: %w", err) } return nil } -func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.Address, i int) { - if r.Phy { - if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { - v.Phy++ - res.removedByCnrID[addrs[i].Container()] = v - } else { - res.removedByCnrID[addrs[i].Container()] = ObjectCounters{ - Phy: 1, - } - } - - res.phyCount++ - res.phySize += r.Size - } - - if r.Logic { - if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { - v.Logic++ - res.removedByCnrID[addrs[i].Container()] = v - } else { - res.removedByCnrID[addrs[i].Container()] = ObjectCounters{ - Logic: 1, - } - } - - res.logicCount++ - res.logicSize += r.Size - } - - if r.User { - if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { - v.User++ - res.removedByCnrID[addrs[i].Container()] = v - } else { - res.removedByCnrID[addrs[i].Container()] = ObjectCounters{ - User: 1, - } - } - - res.userCount++ - } -} - -type deleteSingleResult struct { - Phy bool - Logic bool - User bool - Size uint64 -} - // delete removes object indexes from the metabase. Counts the references // of the object that is being removed. // The first return value indicates if an object has been removed. (removing a // non-exist object is error-free). The second return value indicates if an // object was available before the removal (for calculating the logical object // counter). The third return value The fourth return value is removed object payload size. -func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) { - key := make([]byte, addressKeySize) - addrKey := addressKey(addr, key) - garbageBKT := tx.Bucket(garbageBucketName) - graveyardBKT := tx.Bucket(graveyardBucketName) - - removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0 +func (db *DB) delete(ctx context.Context, b *pebble.Batch, addr oid.Address, refCounter *referenceNumber, currEpoch uint64) (DeleteRes, error) { + status, err := inGraveyardWithKey(b, addr) + if err != nil { + return DeleteRes{}, err + } + removeAvailableObject := status == 0 // unmarshal object, work only with physically stored (raw == true) objects - obj, err := db.get(tx, addr, key, false, true, currEpoch) + obj, err := get(ctx, b, addr, false, true, currEpoch) if err != nil { if client.IsErrObjectNotFound(err) { - addrKey = addressKey(addr, key) - if garbageBKT != nil { - err := garbageBKT.Delete(addrKey) - if err != nil { - return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err) - } + err := deleteFromGarbage(b, addr) + if err != nil { + return DeleteRes{}, fmt.Errorf("could not remove from garbage bucket: %w", err) } - return deleteSingleResult{}, nil + return DeleteRes{}, nil } var siErr *objectSDK.SplitInfoError var ecErr *objectSDK.ECInfoError if errors.As(err, &siErr) || errors.As(err, &ecErr) { // if object is virtual (parent) then do nothing, it will be deleted with last child // if object is erasure-coded it will be deleted with the last chunk presented on the shard - return deleteSingleResult{}, nil + return DeleteRes{}, nil } - return deleteSingleResult{}, err + return DeleteRes{}, err } - addrKey = addressKey(addr, key) // remove record from the garbage bucket - if garbageBKT != nil { - err := garbageBKT.Delete(addrKey) - if err != nil { - return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err) - } + err = deleteFromGarbage(b, addr) + if err != nil { + return DeleteRes{}, fmt.Errorf("could not remove from garbage bucket: %w", err) } // if object is an only link to a parent, then remove parent if parent := obj.Parent(); parent != nil { parAddr := object.AddressOf(parent) - sParAddr := addressKey(parAddr, key) - k := string(sParAddr) - - nRef, ok := refCounter[k] - if !ok { - nRef = &referenceNumber{ - all: parentLength(tx, parAddr), - addr: parAddr, - obj: parent, - } - - refCounter[k] = nRef + parentLen, err := parentLength(ctx, b, parAddr) + if err != nil { + return DeleteRes{}, fmt.Errorf("failed to get parent count for object %s: %w", parAddr, err) } - - nRef.cur++ + refCounter.addr = parAddr + refCounter.all = parentLen + refCounter.obj = parent + refCounter.cur = 1 } isUserObject := IsUserObject(obj) // remove object - err = db.deleteObject(tx, obj, false) + err = db.deleteObject(ctx, b, obj, false) if err != nil { - return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err) + return DeleteRes{}, fmt.Errorf("could not remove object: %w", err) } - return deleteSingleResult{ - Phy: true, - Logic: removeAvailableObject, - User: isUserObject && removeAvailableObject, - Size: obj.PayloadSize(), - }, nil + var result DeleteRes + result.PhyCount = 1 + result.PhySize = obj.PayloadSize() + if removeAvailableObject { + result.LogicCount = 1 + result.LogicSize = obj.PayloadSize() + } + if removeAvailableObject && isUserObject { + result.UserCount = 1 + } + + return result, nil } func (db *DB) deleteObject( - tx *bbolt.Tx, + ctx context.Context, + b *pebble.Batch, obj *objectSDK.Object, isParent bool, ) error { - err := delUniqueIndexes(tx, obj, isParent) + err := delUniqueIndexes(ctx, b, obj, isParent) if err != nil { return errFailedToRemoveUniqueIndexes } - err = updateListIndexes(tx, obj, delListIndexItem) + err = updateListIndexes(b, obj, deleteByKey) if err != nil { return fmt.Errorf("can't remove list indexes: %w", err) } - err = updateFKBTIndexes(tx, obj, delFKBTIndexItem) + err = updateFKBTIndexes(b, obj, deleteByKey) if err != nil { return fmt.Errorf("can't remove fake bucket tree indexes: %w", err) } if isParent { // remove record from the garbage bucket, because regular object deletion does nothing for virtual object - garbageBKT := tx.Bucket(garbageBucketName) - if garbageBKT != nil { - key := make([]byte, addressKeySize) - addrKey := addressKey(object.AddressOf(obj), key) - err := garbageBKT.Delete(addrKey) - if err != nil { - return fmt.Errorf("could not remove from garbage bucket: %w", err) - } + err := deleteFromGarbage(b, object.AddressOf(obj)) + if err != nil { + return fmt.Errorf("could not remove from garbage bucket: %w", err) } } return nil } +func deleteByKey(b *pebble.Batch, key []byte) error { + return b.Delete(key, pebble.Sync) +} + +func deleteFromGarbage(b *pebble.Batch, addr oid.Address) error { + return b.Delete(garbageKey(addr.Container(), addr.Object()), pebble.Sync) +} + // parentLength returns amount of available children from parentid index. -func parentLength(tx *bbolt.Tx, addr oid.Address) int { - bucketName := make([]byte, bucketKeySize) - - bkt := tx.Bucket(parentBucketName(addr.Container(), bucketName[:])) - if bkt == nil { - return 0 - } - - lst, err := decodeList(bkt.Get(objectKey(addr.Object(), bucketName[:]))) - if err != nil { - return 0 - } - - return len(lst) -} - -func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) { - bkt := tx.Bucket(item.name) - if bkt != nil { - _ = bkt.Delete(item.key) // ignore error, best effort there - } -} - -func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error { - bkt := tx.Bucket(item.name) - if bkt == nil { - return nil - } - - fkbtRoot := bkt.Bucket(item.key) - if fkbtRoot == nil { - return nil - } - - _ = fkbtRoot.Delete(item.val) // ignore error, best effort there - return nil -} - -func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error { - bkt := tx.Bucket(item.name) - if bkt == nil { - return nil - } - - lst, err := decodeList(bkt.Get(item.key)) - if err != nil || len(lst) == 0 { - return nil - } - - // remove element from the list - for i := range lst { - if bytes.Equal(item.val, lst[i]) { - copy(lst[i:], lst[i+1:]) - lst = lst[:len(lst)-1] +func parentLength(ctx context.Context, r pebble.Reader, addr oid.Address) (int, error) { + var result int + prefix := parentKeyLongPrefix(addr.Container(), addr.Object()) + for { + ids, err := selectByPrefixBatch(ctx, r, prefix, batchSize) + if err != nil { + return 0, err + } + result += len(ids) + if len(ids) < batchSize { break } } - - // if list empty, remove the key from bucket - if len(lst) == 0 { - _ = bkt.Delete(item.key) // ignore error, best effort there - - return nil - } - - // if list is not empty, then update it - encodedLst, err := encodeList(lst) - if err != nil { - return nil // ignore error, best effort there - } - - _ = bkt.Put(item.key, encodedLst) // ignore error, best effort there - return nil + return result, nil } -func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error { - addr := object.AddressOf(obj) +func delParent(ctx context.Context, b *pebble.Batch, addr oid.Address) error { + prefix := parentKeyLongPrefix(addr.Container(), addr.Object()) + return deleteByPrefix(ctx, b, prefix) +} - objKey := objectKey(addr.Object(), make([]byte, objectKeySize)) - cnr := addr.Container() - bucketName := make([]byte, bucketKeySize) +func delUniqueIndexes(ctx context.Context, b *pebble.Batch, obj *objectSDK.Object, isParent bool) error { + addr := object.AddressOf(obj) // add value to primary unique bucket if !isParent { + var key []byte switch obj.Type() { case objectSDK.TypeRegular: - bucketName = primaryBucketName(cnr, bucketName) + key = primaryKey(addr.Container(), addr.Object()) case objectSDK.TypeTombstone: - bucketName = tombstoneBucketName(cnr, bucketName) + key = tombstoneKey(addr.Container(), addr.Object()) case objectSDK.TypeLock: - bucketName = bucketNameLockers(cnr, bucketName) + key = lockersKey(addr.Container(), addr.Object()) default: return ErrUnknownObjectType } - delUniqueIndexItem(tx, namedBucketItem{ - name: bucketName, - key: objKey, - }) + if err := b.Delete(key, pebble.Sync); err != nil { + return err + } } else { - delUniqueIndexItem(tx, namedBucketItem{ - name: parentBucketName(cnr, bucketName), - key: objKey, - }) + if err := delParent(ctx, b, addr); err != nil { + return err + } } - delUniqueIndexItem(tx, namedBucketItem{ // remove from storage id index - name: smallBucketName(cnr, bucketName), - key: objKey, - }) - delUniqueIndexItem(tx, namedBucketItem{ // remove from root index - name: rootBucketName(cnr, bucketName), - key: objKey, - }) - - return nil + if err := b.Delete(smallKey(addr.Container(), addr.Object()), pebble.Sync); err != nil { + return err + } + if ecHead := obj.ECHeader(); ecHead != nil { + if err := b.Delete(ecInfoKey(addr.Container(), ecHead.Parent(), addr.Object()), pebble.Sync); err != nil { + return err + } + } + return b.Delete(rootKey(addr.Container(), addr.Object()), pebble.Sync) } diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index 7a8dd006d..4535cad3f 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -80,7 +80,7 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) { err = db.database.View(func(tx *bbolt.Tx) error { key := make([]byte, addressKeySize) - res.hdr, err = db.get(tx, prm.addr, key, true, prm.raw, currEpoch) + res.hdr, err = get(tx, prm.addr, key, true, prm.raw, currEpoch) return err }) @@ -88,7 +88,7 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) { return res, metaerr.Wrap(err) } -func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) { +func get(ctx context.Context, r pebble.Reader, addr oid.Address, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) { if checkStatus { switch objectStatus(tx, addr, currEpoch) { case 1: diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index d9af2a681..5b33a1918 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -6,8 +6,10 @@ import ( "errors" "fmt" gio "io" + "strconv" "time" + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" @@ -55,6 +57,8 @@ var ( ErrUnknownObjectType = errors.New("unknown object type") ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it") ErrIncorrectRootObject = errors.New("invalid root object") + + errInvalidUserAttributeKeyFormat = errors.New("invalid user attribute key format") ) // Put saves object header in metabase. Object payload expected to be cut. @@ -290,22 +294,15 @@ func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName [] }) } -type updateIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error +type updateIndexItemFunc = func(b *pebble.Batch, key []byte) error -func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error { +func updateListIndexes(b *pebble.Batch, obj *objectSDK.Object, f updateIndexItemFunc) error { idObj, _ := obj.ID() cnr, _ := obj.ContainerID() - objKey := objectKey(idObj, make([]byte, objectKeySize)) - bucketName := make([]byte, bucketKeySize) - cs, _ := obj.PayloadChecksum() // index payload hashes - err := f(tx, namedBucketItem{ - name: payloadHashBucketName(cnr, bucketName), - key: cs.Value(), - val: objKey, - }) + err := f(b, payloadHashKey(cnr, idObj, cs.Value())) if err != nil { return err } @@ -314,11 +311,7 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun // index parent ids if ok { - err := f(tx, namedBucketItem{ - name: parentBucketName(cnr, bucketName), - key: objectKey(idParent, make([]byte, objectKeySize)), - val: objKey, - }) + err := f(b, parentKey(cnr, idParent, idObj)) if err != nil { return err } @@ -326,33 +319,35 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun // index split ids if obj.SplitID() != nil { - err := f(tx, namedBucketItem{ - name: splitBucketName(cnr, bucketName), - key: obj.SplitID().ToV2(), - val: objKey, - }) + err := f(b, splitKey(cnr, idObj, obj.SplitID().ToV2())) if err != nil { return err } } + for _, attr := range obj.Attributes() { + if attr.Key() != objectV2.SysAttributeExpEpochNeoFS && attr.Key() != objectV2.SysAttributeExpEpoch { + continue + } + expEpoch, err := strconv.ParseUint(attr.Value(), 10, 64) + if err != nil { + return errInvalidUserAttributeKeyFormat + } + err = f(b, expiredKey(cnr, idObj, expEpoch)) + if err != nil { + return err + } + break + } + if ech := obj.ECHeader(); ech != nil { - err := f(tx, namedBucketItem{ - name: ecInfoBucketName(cnr, bucketName), - key: objectKey(ech.Parent(), make([]byte, objectKeySize)), - val: objKey, - }) + err := f(b, ecInfoKey(cnr, ech.Parent(), idObj)) if err != nil { return err } if ech.ParentSplitID() != nil { - objKey := objectKey(ech.Parent(), make([]byte, objectKeySize)) - err := f(tx, namedBucketItem{ - name: splitBucketName(cnr, bucketName), - key: ech.ParentSplitID().ToV2(), - val: objKey, - }) + err := f(b, splitKey(cnr, ech.Parent(), ech.ParentSplitID().ToV2())) if err != nil { return err } @@ -362,17 +357,10 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun return nil } -func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error { +func updateFKBTIndexes(b *pebble.Batch, obj *objectSDK.Object, f updateIndexItemFunc) error { id, _ := obj.ID() cnr, _ := obj.ContainerID() - objKey := objectKey(id, make([]byte, objectKeySize)) - - key := make([]byte, bucketKeySize) - err := f(tx, namedBucketItem{ - name: ownerBucketName(cnr, key), - key: []byte(obj.OwnerID().EncodeToString()), - val: objKey, - }) + err := f(b, ownerKey(cnr, id, []byte(obj.OwnerID().EncodeToString()))) if err != nil { return err } @@ -380,19 +368,14 @@ func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun var attrs []objectSDK.Attribute if obj.ECHeader() != nil { attrs = obj.ECHeader().ParentAttributes() - objKey = objectKey(obj.ECHeader().Parent(), make([]byte, objectKeySize)) + id = obj.ECHeader().Parent() } else { attrs = obj.Attributes() } // user specified attributes for i := range attrs { - key = attributeBucketName(cnr, attrs[i].Key(), key) - err := f(tx, namedBucketItem{ - name: key, - key: []byte(attrs[i].Value()), - val: objKey, - }) + err := f(b, attributeKey(cnr, id, attrs[i].Key(), attrs[i].Value())) if err != nil { return err }