forked from TrueCloudLab/frostfs-node
[#9999] metabase: Fix db engine to pebble in storage_id.go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
4bac5644f1
commit
8bfa2fe106
1 changed files with 28 additions and 12 deletions
|
@ -8,7 +8,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
@ -57,8 +57,8 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
|
|||
return res, ErrDegradedMode
|
||||
}
|
||||
|
||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
||||
res.id, err = db.storageID(tx, prm.addr)
|
||||
err = db.snapshot(func(s *pebble.Snapshot) error {
|
||||
res.id, err = db.storageID(s, prm.addr)
|
||||
|
||||
return err
|
||||
})
|
||||
|
@ -66,14 +66,11 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
|
|||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) {
|
||||
key := make([]byte, bucketKeySize)
|
||||
smallBucket := tx.Bucket(smallBucketName(addr.Container(), key))
|
||||
if smallBucket == nil {
|
||||
return nil, nil
|
||||
func (db *DB) storageID(r pebble.Reader, addr oid.Address) ([]byte, error) {
|
||||
storageID, err := valueSafe(r, smallKey(addr.Container(), addr.Object()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
storageID := smallBucket.Get(objectKey(addr.Object(), key))
|
||||
if storageID == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -126,9 +123,28 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
|
|||
return res, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
err = db.database.Batch(func(tx *bbolt.Tx) error {
|
||||
return setStorageID(tx, prm.addr, prm.id, true)
|
||||
defer db.guard.LockContainerID(prm.addr.Container())()
|
||||
|
||||
err = db.batch(func(b *pebble.Batch) error {
|
||||
return setStorageID(b, prm.addr, prm.id, true)
|
||||
})
|
||||
success = err == nil
|
||||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// setStorageID for existing objects if they were moved from one
|
||||
// storage location to another.
|
||||
func setStorageID(b *pebble.Batch, addr oid.Address, id []byte, override bool) error {
|
||||
key := smallKey(addr.Container(), addr.Object())
|
||||
if override {
|
||||
return b.Set(key, id, pebble.Sync)
|
||||
}
|
||||
v, err := valueSafe(b, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if v == nil {
|
||||
return b.Set(key, id, pebble.Sync)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue