From 7df4e3607cb90c0b0af8a948ce419cfd050cd40b Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 2 Jul 2024 13:05:38 +0300 Subject: [PATCH] [#9999] metabase: Fix db engine to pebble in storage_id.go Signed-off-by: Dmitrii Stepanov --- .../metabase/storage_id.go | 40 +++++++++++++------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/pkg/local_object_storage/metabase/storage_id.go b/pkg/local_object_storage/metabase/storage_id.go index 4ccb21eba..85c57fc21 100644 --- a/pkg/local_object_storage/metabase/storage_id.go +++ b/pkg/local_object_storage/metabase/storage_id.go @@ -8,7 +8,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -57,8 +57,8 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes return res, ErrDegradedMode } - err = db.database.View(func(tx *bbolt.Tx) error { - res.id, err = db.storageID(tx, prm.addr) + err = db.snapshot(func(s *pebble.Snapshot) error { + res.id, err = db.storageID(s, prm.addr) return err }) @@ -66,14 +66,11 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes return res, metaerr.Wrap(err) } -func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) { - key := make([]byte, bucketKeySize) - smallBucket := tx.Bucket(smallBucketName(addr.Container(), key)) - if smallBucket == nil { - return nil, nil +func (db *DB) storageID(r pebble.Reader, addr oid.Address) ([]byte, error) { + storageID, err := valueSafe(r, smallKey(addr.Container(), addr.Object())) + if err != nil { + return nil, err } - - storageID := smallBucket.Get(objectKey(addr.Object(), key)) if storageID == nil { return nil, nil } @@ -126,9 +123,28 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res return res, ErrReadOnlyMode } - err = db.database.Batch(func(tx *bbolt.Tx) error { - return setStorageID(tx, prm.addr, prm.id, true) + defer db.guard.LockContainerID(prm.addr.Container())() + + err = db.batch(func(b *pebble.Batch) error { + return setStorageID(b, prm.addr, prm.id, true) }) success = err == nil return res, metaerr.Wrap(err) } + +// setStorageID for existing objects if they were moved from one +// storage location to another. +func setStorageID(b *pebble.Batch, addr oid.Address, id []byte, override bool) error { + key := smallKey(addr.Container(), addr.Object()) + if override { + return b.Set(key, id, pebble.Sync) + } + v, err := valueSafe(b, key) + if err != nil { + return err + } + if v == nil { + return b.Set(key, id, pebble.Sync) + } + return nil +}