[#9999] metabase: Fix db engine to pebble in storage_id.go

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-07-02 13:05:38 +03:00
parent 7d1e719e1f
commit 7df4e3607c

View file

@ -8,7 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt" "github.com/cockroachdb/pebble"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
@ -57,8 +57,8 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
return res, ErrDegradedMode return res, ErrDegradedMode
} }
err = db.database.View(func(tx *bbolt.Tx) error { err = db.snapshot(func(s *pebble.Snapshot) error {
res.id, err = db.storageID(tx, prm.addr) res.id, err = db.storageID(s, prm.addr)
return err return err
}) })
@ -66,14 +66,11 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
return res, metaerr.Wrap(err) return res, metaerr.Wrap(err)
} }
func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) { func (db *DB) storageID(r pebble.Reader, addr oid.Address) ([]byte, error) {
key := make([]byte, bucketKeySize) storageID, err := valueSafe(r, smallKey(addr.Container(), addr.Object()))
smallBucket := tx.Bucket(smallBucketName(addr.Container(), key)) if err != nil {
if smallBucket == nil { return nil, err
return nil, nil
} }
storageID := smallBucket.Get(objectKey(addr.Object(), key))
if storageID == nil { if storageID == nil {
return nil, nil return nil, nil
} }
@ -126,9 +123,28 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
return res, ErrReadOnlyMode return res, ErrReadOnlyMode
} }
err = db.database.Batch(func(tx *bbolt.Tx) error { defer db.guard.LockContainerID(prm.addr.Container())()
return setStorageID(tx, prm.addr, prm.id, true)
err = db.batch(func(b *pebble.Batch) error {
return setStorageID(b, prm.addr, prm.id, true)
}) })
success = err == nil success = err == nil
return res, metaerr.Wrap(err) return res, metaerr.Wrap(err)
} }
// setStorageID for existing objects if they were moved from one
// storage location to another.
func setStorageID(b *pebble.Batch, addr oid.Address, id []byte, override bool) error {
key := smallKey(addr.Container(), addr.Object())
if override {
return b.Set(key, id, pebble.Sync)
}
v, err := valueSafe(b, key)
if err != nil {
return err
}
if v == nil {
return b.Set(key, id, pebble.Sync)
}
return nil
}