From 6bb7330a4697c72244540c147ea491deb6317696 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 2 Jul 2024 10:18:54 +0300 Subject: [PATCH] [#9999] metabase: Fix db engine to pebble in graveyard.go Signed-off-by: Dmitrii Stepanov --- .../metabase/concurrency.go | 27 ++++ .../metabase/graveyard.go | 128 ++++++++++++------ 2 files changed, 114 insertions(+), 41 deletions(-) diff --git a/pkg/local_object_storage/metabase/concurrency.go b/pkg/local_object_storage/metabase/concurrency.go index b9a71abaa..79523146d 100644 --- a/pkg/local_object_storage/metabase/concurrency.go +++ b/pkg/local_object_storage/metabase/concurrency.go @@ -1,6 +1,9 @@ package meta import ( + "bytes" + "slices" + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" ) @@ -22,3 +25,27 @@ func (c *concurrency) LockContainerID(id cid.ID) func() { c.containerLocks.Unlock(id) } } + +func (c *concurrency) LockContainerIDs(ids []cid.ID) func() { + var containerIDs []cid.ID + m := make(map[cid.ID]struct{}) + for _, id := range ids { + m[id] = struct{}{} + } + for id := range m { + containerIDs = append(containerIDs, id) + } + slices.SortFunc(containerIDs, func(lhs, rhs cid.ID) int { + return bytes.Compare(lhs[:], rhs[:]) + }) + + for _, id := range containerIDs { + c.containerLocks.Lock(id) + } + + return func() { + for idx := range containerIDs { + c.containerLocks.Unlock(containerIDs[len(containerIDs)-idx-1]) + } + } +} diff --git a/pkg/local_object_storage/metabase/graveyard.go b/pkg/local_object_storage/metabase/graveyard.go index e68960ea4..cd371df64 100644 --- a/pkg/local_object_storage/metabase/graveyard.go +++ b/pkg/local_object_storage/metabase/graveyard.go @@ -9,8 +9,9 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" ) // GarbageObject represents descriptor of the @@ -80,8 +81,8 @@ func (db *DB) IterateOverGarbage(ctx context.Context, p GarbageIterationPrm) err return ErrDegradedMode } - err := metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error { - return db.iterateDeletedObj(tx, gcHandler{p.h}, p.offset) + err := metaerr.Wrap(db.snapshot(func(s *pebble.Snapshot) error { + return db.iterateDeletedObj(ctx, s, gcHandler{p.h}, p.offset) })) success = err == nil return err @@ -160,8 +161,8 @@ func (db *DB) IterateOverGraveyard(ctx context.Context, p GraveyardIterationPrm) return ErrDegradedMode } - return metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error { - return db.iterateDeletedObj(tx, graveyardHandler{p.h}, p.offset) + return metaerr.Wrap(db.snapshot(func(s *pebble.Snapshot) error { + return db.iterateDeletedObj(ctx, s, graveyardHandler{p.h}, p.offset) })) } @@ -195,49 +196,66 @@ func (g graveyardHandler) handleKV(k, v []byte) error { return g.h(o) } -func (db *DB) iterateDeletedObj(tx *bbolt.Tx, h kvHandler, offset *oid.Address) error { - var bkt *bbolt.Bucket +func (db *DB) iterateDeletedObj(ctx context.Context, r pebble.Reader, h kvHandler, offset *oid.Address) error { + var prefix []byte switch t := h.(type) { case graveyardHandler: - bkt = tx.Bucket(graveyardBucketName) + prefix = []byte{graveyardPrefix} case gcHandler: - bkt = tx.Bucket(garbageBucketName) + prefix = []byte{garbagePrefix} default: panic(fmt.Sprintf("metabase: unknown iteration object hadler: %T", t)) } + var seekKey []byte + if offset != nil { + cidBytes := make([]byte, cidSize) + offset.Container().Encode(cidBytes) + oidBytes := make([]byte, objectKeySize) + offset.Object().Encode(oidBytes) + seekKey = append(prefix, cidBytes...) + seekKey = append(seekKey, oidBytes...) + } - c := bkt.Cursor() - var k, v []byte + it, err := r.NewIter(&pebble.IterOptions{ + LowerBound: prefix, + }) + if err != nil { + return err + } - if offset == nil { - k, v = c.First() + var v bool + if len(seekKey) > 0 { + v = it.SeekGE(seekKey) } else { - rawAddr := addressKey(*offset, make([]byte, addressKeySize)) - - k, v = c.Seek(rawAddr) - if bytes.Equal(k, rawAddr) { - // offset was found, move - // cursor to the next element - k, v = c.Next() - } + v = it.First() } - for ; k != nil; k, v = c.Next() { - err := h.handleKV(k, v) - if err != nil { + for ; v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() { + select { + case <-ctx.Done(): + return errors.Join(ctx.Err(), it.Close()) + default: + } + + if bytes.Equal(it.Key(), seekKey) { + continue + } + + key := bytes.Clone(it.Key()) + value := bytes.Clone(it.Value()) + if err = h.handleKV(key, value); err != nil { if errors.Is(err, ErrInterruptIterator) { - return nil + return it.Close() } - - return err + return errors.Join(err, it.Close()) } } - return nil + return it.Close() } func garbageFromKV(k []byte) (res GarbageObject, err error) { - err = decodeAddressFromKey(&res.addr, k) + res.addr, err = addressFromGarbageKey(k) if err != nil { err = fmt.Errorf("could not parse address: %w", err) } @@ -246,15 +264,44 @@ func garbageFromKV(k []byte) (res GarbageObject, err error) { } func graveFromKV(k, v []byte) (res TombstonedObject, err error) { - if err = decodeAddressFromKey(&res.addr, k); err != nil { + res.addr, err = addressFromGraveyardKey(k) + if err != nil { err = fmt.Errorf("decode tombstone target from key: %w", err) - } else if err = decodeAddressFromKey(&res.tomb, v); err != nil { - err = fmt.Errorf("decode tombstone address from value: %w", err) + return + } + res.tomb, err = decodeAddressFromGrave(v) + if err != nil { + err = fmt.Errorf("decode tombstone address from value: %w", err) + return } - return } +func encodeAddressToGrave(addr oid.Address) []byte { + value := make([]byte, cidSize+objectKeySize) + addr.Container().Encode(value) + addr.Object().Encode(value[cidSize:]) + return value +} + +func decodeAddressFromGrave(v []byte) (oid.Address, error) { + if len(v) != cidSize+objectKeySize { + return oid.Address{}, errInvalidValueLenght + } + var cont cid.ID + if err := cont.Decode(v[:cidSize]); err != nil { + return oid.Address{}, fmt.Errorf("failed to decode container ID: %w", err) + } + var obj oid.ID + if err := obj.Decode(v[cidSize:]); err != nil { + return oid.Address{}, fmt.Errorf("failed to decode object ID: %w", err) + } + var result oid.Address + result.SetContainer(cont) + result.SetObject(obj) + return result, nil +} + // DropGraves deletes tombstoned objects from the // graveyard bucket. // @@ -280,16 +327,15 @@ func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error { return ErrReadOnlyMode } - buf := make([]byte, addressKeySize) - - return db.database.Update(func(tx *bbolt.Tx) error { - bkt := tx.Bucket(graveyardBucketName) - if bkt == nil { - return nil - } + var contIDs []cid.ID + for _, to := range tss { + contIDs = append(contIDs, to.tomb.Container()) + } + defer db.guard.LockContainerIDs(contIDs)() + return db.batch(func(b *pebble.Batch) error { for _, ts := range tss { - err := bkt.Delete(addressKey(ts.Address(), buf)) + err := b.Delete(graveyardKey(ts.Address().Container(), ts.Address().Object()), pebble.Sync) if err != nil { return err }