[#9999] metabase: Fix db engine to pebble in containers.go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
aa425c1b0c
commit
2080f1b32d
2 changed files with 235 additions and 60 deletions
|
@ -1,14 +1,23 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"go.etcd.io/bbolt"
|
"github.com/cockroachdb/pebble"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
containerSizeKeySize = 1 + cidSize + 2
|
||||||
|
containerSizePrefixSize = 1 + cidSize
|
||||||
)
|
)
|
||||||
|
|
||||||
func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) {
|
func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) {
|
||||||
|
@ -30,8 +39,8 @@ func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) {
|
||||||
return nil, ErrDegradedMode
|
return nil, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
err = db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
list, err = db.containers(tx)
|
list, err = containers(ctx, s)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
@ -39,50 +48,30 @@ func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) {
|
||||||
return list, metaerr.Wrap(err)
|
return list, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) containers(tx *bbolt.Tx) ([]cid.ID, error) {
|
func containers(ctx context.Context, r pebble.Reader) ([]cid.ID, error) {
|
||||||
result := make([]cid.ID, 0)
|
result := make([]cid.ID, 0)
|
||||||
unique := make(map[string]struct{})
|
unique := make(map[string]struct{})
|
||||||
var cnr cid.ID
|
var cnr cid.ID
|
||||||
|
|
||||||
err := tx.ForEach(func(name []byte, _ *bbolt.Bucket) error {
|
it, err := r.NewIter(&pebble.IterOptions{
|
||||||
if parseContainerID(&cnr, name, unique) {
|
OnlyReadGuaranteedDurable: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for v := it.First(); v; v = it.Next() {
|
||||||
|
if parseContainerIDWithIgnore(&cnr, it.Key(), unique) {
|
||||||
result = append(result, cnr)
|
result = append(result, cnr)
|
||||||
unique[string(name[1:bucketKeySize])] = struct{}{}
|
unique[string(it.Key()[1:containerSizePrefixSize])] = struct{}{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return result, nil
|
||||||
})
|
|
||||||
|
|
||||||
return result, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) {
|
func parseContainerIDWithIgnore(dst *cid.ID, name []byte, ignore map[string]struct{}) bool {
|
||||||
db.modeMtx.RLock()
|
if len(name) < bucketKeySize {
|
||||||
defer db.modeMtx.RUnlock()
|
|
||||||
|
|
||||||
if db.mode.NoMetabase() {
|
|
||||||
return 0, ErrDegradedMode
|
|
||||||
}
|
|
||||||
|
|
||||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
|
||||||
size, err = db.containerSize(tx, id)
|
|
||||||
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
|
|
||||||
return size, metaerr.Wrap(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) (uint64, error) {
|
|
||||||
containerVolume := tx.Bucket(containerVolumeBucketName)
|
|
||||||
key := make([]byte, cidSize)
|
|
||||||
id.Encode(key)
|
|
||||||
|
|
||||||
return parseContainerSize(containerVolume.Get(key)), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool {
|
|
||||||
if len(name) != bucketKeySize {
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if _, ok := ignore[string(name[1:bucketKeySize])]; ok {
|
if _, ok := ignore[string(name[1:bucketKeySize])]; ok {
|
||||||
|
@ -91,31 +80,194 @@ func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool
|
||||||
return dst.Decode(name[1:bucketKeySize]) == nil
|
return dst.Decode(name[1:bucketKeySize]) == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseContainerSize(v []byte) uint64 {
|
func (db *DB) ContainerSize(ctx context.Context, id cid.ID) (size uint64, err error) {
|
||||||
if len(v) == 0 {
|
db.modeMtx.RLock()
|
||||||
return 0
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return 0, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
return binary.LittleEndian.Uint64(v)
|
result, err := db.containerSizesInternal(ctx, &id)
|
||||||
|
if err != nil {
|
||||||
|
return 0, metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
return result[id], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func changeContainerSize(tx *bbolt.Tx, id cid.ID, delta uint64, increase bool) error {
|
func (db *DB) ContainerSizes(ctx context.Context) (map[cid.ID]uint64, error) {
|
||||||
containerVolume := tx.Bucket(containerVolumeBucketName)
|
db.modeMtx.RLock()
|
||||||
key := make([]byte, cidSize)
|
defer db.modeMtx.RUnlock()
|
||||||
id.Encode(key)
|
|
||||||
|
|
||||||
size := parseContainerSize(containerVolume.Get(key))
|
if db.mode.NoMetabase() {
|
||||||
|
return nil, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
return db.containerSizesInternal(ctx, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ZeroSizeContainers returns containers with size = 0.
|
||||||
|
func (db *DB) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("ZeroSizeContainers", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroSizeContainers")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
sizes, err := db.containerSizesInternal(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var result []cid.ID
|
||||||
|
for id, size := range sizes {
|
||||||
|
if size == 0 {
|
||||||
|
result = append(result, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("DeleteContainerSize", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerSize",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Stringer("container_id", id),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
if db.mode.ReadOnly() {
|
||||||
|
return ErrReadOnlyMode
|
||||||
|
}
|
||||||
|
|
||||||
|
return metaerr.Wrap(db.batch(
|
||||||
|
func(b *pebble.Batch) error {
|
||||||
|
return deleteByPrefix(ctx, b, containerSizeKeyPrefix(id))
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) containerSizesInternal(ctx context.Context, id *cid.ID) (map[cid.ID]uint64, error) {
|
||||||
|
prefix := []byte{containerSizePrefix}
|
||||||
|
if id != nil {
|
||||||
|
prefix = containerSizeKeyPrefix(*id)
|
||||||
|
}
|
||||||
|
result := make(map[cid.ID]int64)
|
||||||
|
err := db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
|
it, err := s.NewIter(&pebble.IterOptions{
|
||||||
|
LowerBound: prefix,
|
||||||
|
OnlyReadGuaranteedDurable: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
key := it.Key()
|
||||||
|
var cnr cid.ID
|
||||||
|
if err := cnr.Decode(key[1:containerSizePrefixSize]); err != nil {
|
||||||
|
return fmt.Errorf("invalid container size key: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
value, ok := parseSize(it.Value())
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid container size value for container %s", cnr)
|
||||||
|
}
|
||||||
|
result[cnr] += value
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return normilizeContainerSizes(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func normilizeContainerSizes(sizes map[cid.ID]int64) (map[cid.ID]uint64, error) {
|
||||||
|
result := make(map[cid.ID]uint64, len(sizes))
|
||||||
|
for k, v := range sizes {
|
||||||
|
if v < 0 {
|
||||||
|
return nil, fmt.Errorf("invalid cumulative size for container %s", k)
|
||||||
|
}
|
||||||
|
result[k] = uint64(v)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func changeContainerSize(b *pebble.Batch, id cid.ID, delta int64, bucketID uint16) error {
|
||||||
|
key := containerSizeKey(id, bucketID)
|
||||||
|
|
||||||
|
v, err := valueSafe(b, key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
size, ok := parseSize(v)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid container size value for container %s", id)
|
||||||
|
}
|
||||||
|
|
||||||
if increase {
|
|
||||||
size += delta
|
size += delta
|
||||||
} else if size > delta {
|
value := marshalSize(size)
|
||||||
size -= delta
|
return b.Set(key, value, pebble.Sync)
|
||||||
} else {
|
|
||||||
size = 0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, 8) // consider using sync.Pool to decrease allocations
|
// containerSizeKeyPrefix returns containerSizePrefix_CID key prefix.
|
||||||
binary.LittleEndian.PutUint64(buf, size)
|
func containerSizeKeyPrefix(cnr cid.ID) []byte {
|
||||||
|
result := make([]byte, containerSizePrefixSize)
|
||||||
return containerVolume.Put(key, buf)
|
result[0] = containerSizePrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// containerSizeKey returns containerVolumePrefix_CID_bucketID key.
|
||||||
|
func containerSizeKey(cnr cid.ID, bucketID uint16) []byte {
|
||||||
|
result := make([]byte, containerSizeKeySize)
|
||||||
|
result[0] = containerSizePrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
binary.LittleEndian.PutUint16(result[containerSizePrefixSize:], bucketID)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseSize(v []byte) (int64, bool) {
|
||||||
|
if len(v) == 0 {
|
||||||
|
return 0, true
|
||||||
|
}
|
||||||
|
if len(v) != 8 {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
return int64(binary.LittleEndian.Uint64(v)), true
|
||||||
|
}
|
||||||
|
|
||||||
|
func marshalSize(v int64) []byte {
|
||||||
|
buf := make([]byte, 8)
|
||||||
|
binary.LittleEndian.PutUint64(buf, uint64(v))
|
||||||
|
return buf
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,3 +67,26 @@ func selectByPrefixBatch(ctx context.Context, r pebble.Reader, prefix []byte, ba
|
||||||
}
|
}
|
||||||
return result, it.Close()
|
return result, it.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func deleteByPrefix(ctx context.Context, b *pebble.Batch, prefix []byte) error {
|
||||||
|
for {
|
||||||
|
batch, err := selectByPrefixBatch(ctx, b, prefix, batchSize)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, key := range batch {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.Delete(key, pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(batch) < batchSize {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue