[#9999] metabase: Fix db engine to pebble in graveyard.go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
d3898ebd2c
commit
168fc7832c
2 changed files with 114 additions and 41 deletions
|
@ -1,6 +1,9 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"slices"
|
||||||
|
|
||||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
)
|
)
|
||||||
|
@ -22,3 +25,27 @@ func (c *concurrency) LockContainerID(id cid.ID) func() {
|
||||||
c.containerLocks.Unlock(id)
|
c.containerLocks.Unlock(id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *concurrency) LockContainerIDs(ids []cid.ID) func() {
|
||||||
|
var containerIDs []cid.ID
|
||||||
|
m := make(map[cid.ID]struct{})
|
||||||
|
for _, id := range ids {
|
||||||
|
m[id] = struct{}{}
|
||||||
|
}
|
||||||
|
for id := range m {
|
||||||
|
containerIDs = append(containerIDs, id)
|
||||||
|
}
|
||||||
|
slices.SortFunc(containerIDs, func(lhs, rhs cid.ID) int {
|
||||||
|
return bytes.Compare(lhs[:], rhs[:])
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, id := range containerIDs {
|
||||||
|
c.containerLocks.Lock(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return func() {
|
||||||
|
for idx := range containerIDs {
|
||||||
|
c.containerLocks.Unlock(containerIDs[len(containerIDs)-idx-1])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -9,8 +9,9 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"github.com/cockroachdb/pebble"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GarbageObject represents descriptor of the
|
// GarbageObject represents descriptor of the
|
||||||
|
@ -80,8 +81,8 @@ func (db *DB) IterateOverGarbage(ctx context.Context, p GarbageIterationPrm) err
|
||||||
return ErrDegradedMode
|
return ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
err := metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error {
|
err := metaerr.Wrap(db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
return db.iterateDeletedObj(tx, gcHandler{p.h}, p.offset)
|
return db.iterateDeletedObj(ctx, s, gcHandler{p.h}, p.offset)
|
||||||
}))
|
}))
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return err
|
return err
|
||||||
|
@ -160,8 +161,8 @@ func (db *DB) IterateOverGraveyard(ctx context.Context, p GraveyardIterationPrm)
|
||||||
return ErrDegradedMode
|
return ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
return metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error {
|
return metaerr.Wrap(db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
return db.iterateDeletedObj(tx, graveyardHandler{p.h}, p.offset)
|
return db.iterateDeletedObj(ctx, s, graveyardHandler{p.h}, p.offset)
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,49 +196,66 @@ func (g graveyardHandler) handleKV(k, v []byte) error {
|
||||||
return g.h(o)
|
return g.h(o)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) iterateDeletedObj(tx *bbolt.Tx, h kvHandler, offset *oid.Address) error {
|
func (db *DB) iterateDeletedObj(ctx context.Context, r pebble.Reader, h kvHandler, offset *oid.Address) error {
|
||||||
var bkt *bbolt.Bucket
|
var prefix []byte
|
||||||
switch t := h.(type) {
|
switch t := h.(type) {
|
||||||
case graveyardHandler:
|
case graveyardHandler:
|
||||||
bkt = tx.Bucket(graveyardBucketName)
|
prefix = []byte{graveyardPrefix}
|
||||||
case gcHandler:
|
case gcHandler:
|
||||||
bkt = tx.Bucket(garbageBucketName)
|
prefix = []byte{garbagePrefix}
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("metabase: unknown iteration object hadler: %T", t))
|
panic(fmt.Sprintf("metabase: unknown iteration object hadler: %T", t))
|
||||||
}
|
}
|
||||||
|
var seekKey []byte
|
||||||
|
if offset != nil {
|
||||||
|
cidBytes := make([]byte, cidSize)
|
||||||
|
offset.Container().Encode(cidBytes)
|
||||||
|
oidBytes := make([]byte, objectKeySize)
|
||||||
|
offset.Object().Encode(oidBytes)
|
||||||
|
seekKey = append(prefix, cidBytes...)
|
||||||
|
seekKey = append(seekKey, oidBytes...)
|
||||||
|
}
|
||||||
|
|
||||||
c := bkt.Cursor()
|
it, err := r.NewIter(&pebble.IterOptions{
|
||||||
var k, v []byte
|
LowerBound: prefix,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if offset == nil {
|
var v bool
|
||||||
k, v = c.First()
|
if len(seekKey) > 0 {
|
||||||
|
v = it.SeekGE(seekKey)
|
||||||
} else {
|
} else {
|
||||||
rawAddr := addressKey(*offset, make([]byte, addressKeySize))
|
v = it.First()
|
||||||
|
|
||||||
k, v = c.Seek(rawAddr)
|
|
||||||
if bytes.Equal(k, rawAddr) {
|
|
||||||
// offset was found, move
|
|
||||||
// cursor to the next element
|
|
||||||
k, v = c.Next()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for ; k != nil; k, v = c.Next() {
|
for ; v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||||
err := h.handleKV(k, v)
|
select {
|
||||||
if err != nil {
|
case <-ctx.Done():
|
||||||
|
return errors.Join(ctx.Err(), it.Close())
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if bytes.Equal(it.Key(), seekKey) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
key := bytes.Clone(it.Key())
|
||||||
|
value := bytes.Clone(it.Value())
|
||||||
|
if err = h.handleKV(key, value); err != nil {
|
||||||
if errors.Is(err, ErrInterruptIterator) {
|
if errors.Is(err, ErrInterruptIterator) {
|
||||||
return nil
|
return it.Close()
|
||||||
}
|
}
|
||||||
|
return errors.Join(err, it.Close())
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return it.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func garbageFromKV(k []byte) (res GarbageObject, err error) {
|
func garbageFromKV(k []byte) (res GarbageObject, err error) {
|
||||||
err = decodeAddressFromKey(&res.addr, k)
|
res.addr, err = addressFromGarbageKey(k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("could not parse address: %w", err)
|
err = fmt.Errorf("could not parse address: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -246,15 +264,44 @@ func garbageFromKV(k []byte) (res GarbageObject, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
|
func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
|
||||||
if err = decodeAddressFromKey(&res.addr, k); err != nil {
|
res.addr, err = addressFromGraveyardKey(k)
|
||||||
|
if err != nil {
|
||||||
err = fmt.Errorf("decode tombstone target from key: %w", err)
|
err = fmt.Errorf("decode tombstone target from key: %w", err)
|
||||||
} else if err = decodeAddressFromKey(&res.tomb, v); err != nil {
|
return
|
||||||
err = fmt.Errorf("decode tombstone address from value: %w", err)
|
}
|
||||||
|
res.tomb, err = decodeAddressFromGrave(v)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("decode tombstone address from value: %w", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func encodeAddressToGrave(addr oid.Address) []byte {
|
||||||
|
value := make([]byte, cidSize+objectKeySize)
|
||||||
|
addr.Container().Encode(value)
|
||||||
|
addr.Object().Encode(value[cidSize:])
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeAddressFromGrave(v []byte) (oid.Address, error) {
|
||||||
|
if len(v) != cidSize+objectKeySize {
|
||||||
|
return oid.Address{}, errInvalidValueLenght
|
||||||
|
}
|
||||||
|
var cont cid.ID
|
||||||
|
if err := cont.Decode(v[:cidSize]); err != nil {
|
||||||
|
return oid.Address{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||||
|
}
|
||||||
|
var obj oid.ID
|
||||||
|
if err := obj.Decode(v[cidSize:]); err != nil {
|
||||||
|
return oid.Address{}, fmt.Errorf("failed to decode object ID: %w", err)
|
||||||
|
}
|
||||||
|
var result oid.Address
|
||||||
|
result.SetContainer(cont)
|
||||||
|
result.SetObject(obj)
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
// DropGraves deletes tombstoned objects from the
|
// DropGraves deletes tombstoned objects from the
|
||||||
// graveyard bucket.
|
// graveyard bucket.
|
||||||
//
|
//
|
||||||
|
@ -280,16 +327,15 @@ func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error {
|
||||||
return ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, addressKeySize)
|
var contIDs []cid.ID
|
||||||
|
for _, to := range tss {
|
||||||
return db.database.Update(func(tx *bbolt.Tx) error {
|
contIDs = append(contIDs, to.tomb.Container())
|
||||||
bkt := tx.Bucket(graveyardBucketName)
|
}
|
||||||
if bkt == nil {
|
defer db.guard.LockContainerIDs(contIDs)()
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
|
return db.batch(func(b *pebble.Batch) error {
|
||||||
for _, ts := range tss {
|
for _, ts := range tss {
|
||||||
err := bkt.Delete(addressKey(ts.Address(), buf))
|
err := b.Delete(graveyardKey(ts.Address().Container(), ts.Address().Object()), pebble.Sync)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue