WIP: Change metabase engine to pebble #1221
2 changed files with 114 additions and 41 deletions
|
@ -1,6 +1,9 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"slices"
|
||||
|
||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
)
|
||||
|
@ -22,3 +25,27 @@ func (c *concurrency) LockContainerID(id cid.ID) func() {
|
|||
c.containerLocks.Unlock(id)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *concurrency) LockContainerIDs(ids []cid.ID) func() {
|
||||
var containerIDs []cid.ID
|
||||
m := make(map[cid.ID]struct{})
|
||||
for _, id := range ids {
|
||||
m[id] = struct{}{}
|
||||
}
|
||||
for id := range m {
|
||||
containerIDs = append(containerIDs, id)
|
||||
}
|
||||
slices.SortFunc(containerIDs, func(lhs, rhs cid.ID) int {
|
||||
return bytes.Compare(lhs[:], rhs[:])
|
||||
})
|
||||
|
||||
for _, id := range containerIDs {
|
||||
c.containerLocks.Lock(id)
|
||||
}
|
||||
|
||||
return func() {
|
||||
for idx := range containerIDs {
|
||||
c.containerLocks.Unlock(containerIDs[len(containerIDs)-idx-1])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,8 +9,9 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"github.com/cockroachdb/pebble"
|
||||
)
|
||||
|
||||
// GarbageObject represents descriptor of the
|
||||
|
@ -80,8 +81,8 @@ func (db *DB) IterateOverGarbage(ctx context.Context, p GarbageIterationPrm) err
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
err := metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateDeletedObj(tx, gcHandler{p.h}, p.offset)
|
||||
err := metaerr.Wrap(db.snapshot(func(s *pebble.Snapshot) error {
|
||||
return db.iterateDeletedObj(ctx, s, gcHandler{p.h}, p.offset)
|
||||
}))
|
||||
success = err == nil
|
||||
return err
|
||||
|
@ -160,8 +161,8 @@ func (db *DB) IterateOverGraveyard(ctx context.Context, p GraveyardIterationPrm)
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateDeletedObj(tx, graveyardHandler{p.h}, p.offset)
|
||||
return metaerr.Wrap(db.snapshot(func(s *pebble.Snapshot) error {
|
||||
return db.iterateDeletedObj(ctx, s, graveyardHandler{p.h}, p.offset)
|
||||
}))
|
||||
}
|
||||
|
||||
|
@ -195,49 +196,66 @@ func (g graveyardHandler) handleKV(k, v []byte) error {
|
|||
return g.h(o)
|
||||
}
|
||||
|
||||
func (db *DB) iterateDeletedObj(tx *bbolt.Tx, h kvHandler, offset *oid.Address) error {
|
||||
var bkt *bbolt.Bucket
|
||||
func (db *DB) iterateDeletedObj(ctx context.Context, r pebble.Reader, h kvHandler, offset *oid.Address) error {
|
||||
var prefix []byte
|
||||
switch t := h.(type) {
|
||||
case graveyardHandler:
|
||||
bkt = tx.Bucket(graveyardBucketName)
|
||||
prefix = []byte{graveyardPrefix}
|
||||
case gcHandler:
|
||||
bkt = tx.Bucket(garbageBucketName)
|
||||
prefix = []byte{garbagePrefix}
|
||||
default:
|
||||
panic(fmt.Sprintf("metabase: unknown iteration object hadler: %T", t))
|
||||
}
|
||||
|
||||
c := bkt.Cursor()
|
||||
var k, v []byte
|
||||
|
||||
if offset == nil {
|
||||
k, v = c.First()
|
||||
} else {
|
||||
rawAddr := addressKey(*offset, make([]byte, addressKeySize))
|
||||
|
||||
k, v = c.Seek(rawAddr)
|
||||
if bytes.Equal(k, rawAddr) {
|
||||
// offset was found, move
|
||||
// cursor to the next element
|
||||
k, v = c.Next()
|
||||
}
|
||||
var seekKey []byte
|
||||
if offset != nil {
|
||||
cidBytes := make([]byte, cidSize)
|
||||
offset.Container().Encode(cidBytes)
|
||||
oidBytes := make([]byte, objectKeySize)
|
||||
offset.Object().Encode(oidBytes)
|
||||
seekKey = append(prefix, cidBytes...)
|
||||
seekKey = append(seekKey, oidBytes...)
|
||||
}
|
||||
|
||||
for ; k != nil; k, v = c.Next() {
|
||||
err := h.handleKV(k, v)
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
LowerBound: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrInterruptIterator) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
var v bool
|
||||
if len(seekKey) > 0 {
|
||||
v = it.SeekGE(seekKey)
|
||||
} else {
|
||||
v = it.First()
|
||||
}
|
||||
|
||||
return nil
|
||||
for ; v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return errors.Join(ctx.Err(), it.Close())
|
||||
default:
|
||||
}
|
||||
|
||||
if bytes.Equal(it.Key(), seekKey) {
|
||||
continue
|
||||
}
|
||||
|
||||
key := bytes.Clone(it.Key())
|
||||
value := bytes.Clone(it.Value())
|
||||
if err = h.handleKV(key, value); err != nil {
|
||||
if errors.Is(err, ErrInterruptIterator) {
|
||||
return it.Close()
|
||||
}
|
||||
return errors.Join(err, it.Close())
|
||||
}
|
||||
}
|
||||
|
||||
return it.Close()
|
||||
}
|
||||
|
||||
func garbageFromKV(k []byte) (res GarbageObject, err error) {
|
||||
err = decodeAddressFromKey(&res.addr, k)
|
||||
res.addr, err = addressFromGarbageKey(k)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("could not parse address: %w", err)
|
||||
}
|
||||
|
@ -246,13 +264,42 @@ func garbageFromKV(k []byte) (res GarbageObject, err error) {
|
|||
}
|
||||
|
||||
func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
|
||||
if err = decodeAddressFromKey(&res.addr, k); err != nil {
|
||||
res.addr, err = addressFromGraveyardKey(k)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("decode tombstone target from key: %w", err)
|
||||
} else if err = decodeAddressFromKey(&res.tomb, v); err != nil {
|
||||
err = fmt.Errorf("decode tombstone address from value: %w", err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
res.tomb, err = decodeAddressFromGrave(v)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("decode tombstone address from value: %w", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func encodeAddressToGrave(addr oid.Address) []byte {
|
||||
value := make([]byte, cidSize+objectKeySize)
|
||||
addr.Container().Encode(value)
|
||||
addr.Object().Encode(value[cidSize:])
|
||||
return value
|
||||
}
|
||||
|
||||
func decodeAddressFromGrave(v []byte) (oid.Address, error) {
|
||||
if len(v) != cidSize+objectKeySize {
|
||||
return oid.Address{}, errInvalidValueLenght
|
||||
}
|
||||
var cont cid.ID
|
||||
if err := cont.Decode(v[:cidSize]); err != nil {
|
||||
return oid.Address{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||
}
|
||||
var obj oid.ID
|
||||
if err := obj.Decode(v[cidSize:]); err != nil {
|
||||
return oid.Address{}, fmt.Errorf("failed to decode object ID: %w", err)
|
||||
}
|
||||
var result oid.Address
|
||||
result.SetContainer(cont)
|
||||
result.SetObject(obj)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// DropGraves deletes tombstoned objects from the
|
||||
|
@ -280,16 +327,15 @@ func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error {
|
|||
return ErrReadOnlyMode
|
||||
}
|
||||
|
||||
buf := make([]byte, addressKeySize)
|
||||
|
||||
return db.database.Update(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(graveyardBucketName)
|
||||
if bkt == nil {
|
||||
return nil
|
||||
var contIDs []cid.ID
|
||||
for _, to := range tss {
|
||||
contIDs = append(contIDs, to.tomb.Container())
|
||||
}
|
||||
defer db.guard.LockContainerIDs(contIDs)()
|
||||
|
||||
return db.batch(func(b *pebble.Batch) error {
|
||||
for _, ts := range tss {
|
||||
err := bkt.Delete(addressKey(ts.Address(), buf))
|
||||
err := b.Delete(graveyardKey(ts.Address().Container(), ts.Address().Object()), pebble.Sync)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue