frostfs-node/pkg/local_object_storage/metabase/containers.go
Dmitrii Stepanov 956a64e06b [#9999] metabase: Fix db engine to pebble in delete.go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-30 12:14:46 +03:00

277 lines
6.1 KiB
Go

package meta
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/cockroachdb/pebble"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const (
containerSizeKeySize = 1 + cidSize
)
func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("Containers", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.Containers")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}
err = db.snapshot(func(s *pebble.Snapshot) error {
list, err = containers(ctx, s)
return err
})
success = err == nil
return list, metaerr.Wrap(err)
}
func containers(ctx context.Context, r pebble.Reader) ([]cid.ID, error) {
result := make([]cid.ID, 0)
unique := make(map[string]struct{})
var cnr cid.ID
it, err := r.NewIter(&pebble.IterOptions{
OnlyReadGuaranteedDurable: true,
})
if err != nil {
return nil, err
}
for v := it.First(); v; v = it.Next() {
if parseContainerIDWithIgnore(&cnr, it.Key(), unique) {
result = append(result, cnr)
unique[string(it.Key()[1:containerSizeKeySize])] = struct{}{}
}
}
return result, it.Close()
}
func parseContainerIDWithIgnore(dst *cid.ID, name []byte, ignore map[string]struct{}) bool {
if len(name) < bucketKeySize {
return false
}
if _, ok := ignore[string(name[1:bucketKeySize])]; ok {
return false
}
return dst.Decode(name[1:bucketKeySize]) == nil
}
func (db *DB) ContainerSize(ctx context.Context, id cid.ID) (size uint64, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return 0, ErrDegradedMode
}
err = db.snapshot(func(s *pebble.Snapshot) error {
val, err := valueSafe(s, containerSizeKey(id))
if err != nil {
return err
}
if len(val) == 0 {
return nil
}
value, ok := parseSize(val)
if !ok || value < 0 {
return fmt.Errorf("invalid container size value for container %s", id)
}
size = uint64(value)
return nil
})
if err != nil {
return 0, metaerr.Wrap(err)
}
return size, nil
}
func (db *DB) ContainerSizes(ctx context.Context) (map[cid.ID]uint64, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}
return db.containerSizesInternal(ctx)
}
// ZeroSizeContainers returns containers with size = 0.
func (db *DB) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("ZeroSizeContainers", time.Since(startedAt), success)
}()
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroSizeContainers")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
sizes, err := db.containerSizesInternal(ctx)
if err != nil {
return nil, err
}
var result []cid.ID
for id, size := range sizes {
if size == 0 {
result = append(result, id)
}
}
return result, nil
}
func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("DeleteContainerSize", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerSize",
trace.WithAttributes(
attribute.Stringer("container_id", id),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
if db.mode.ReadOnly() {
return ErrReadOnlyMode
}
defer db.guard.LockContainerID(id)()
return metaerr.Wrap(db.batch(
func(b *pebble.Batch) error {
return b.Delete(containerSizeKey(id), pebble.Sync)
}))
}
func (db *DB) containerSizesInternal(ctx context.Context) (map[cid.ID]uint64, error) {
prefix := []byte{containerSizePrefix}
result := make(map[cid.ID]int64)
err := db.snapshot(func(s *pebble.Snapshot) error {
it, err := s.NewIter(&pebble.IterOptions{
LowerBound: prefix,
OnlyReadGuaranteedDurable: true,
})
if err != nil {
return err
}
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
select {
case <-ctx.Done():
return errors.Join(ctx.Err(), it.Close())
default:
}
key := it.Key()
var cnr cid.ID
if err := cnr.Decode(key[1:containerSizeKeySize]); err != nil {
return errors.Join(fmt.Errorf("invalid container size key: %w", err), it.Close())
}
value, ok := parseSize(it.Value())
if !ok {
return errors.Join(fmt.Errorf("invalid container size value for container %s", cnr), it.Close())
}
result[cnr] += value
}
return it.Close()
})
if err != nil {
return nil, metaerr.Wrap(err)
}
return normilizeContainerSizes(result)
}
func normilizeContainerSizes(sizes map[cid.ID]int64) (map[cid.ID]uint64, error) {
result := make(map[cid.ID]uint64, len(sizes))
for k, v := range sizes {
if v < 0 {
return nil, fmt.Errorf("invalid cumulative size for container %s", k)
}
result[k] = uint64(v)
}
return result, nil
}
func changeContainerSize(b *pebble.Batch, id cid.ID, delta int64) error {
key := containerSizeKey(id)
v, err := valueSafe(b, key)
if err != nil {
return err
}
size, ok := parseSize(v)
if !ok {
return fmt.Errorf("invalid container size value for container %s", id)
}
size += delta
value := marshalSize(size)
return b.Set(key, value, pebble.Sync)
}
// containerSizeKey returns containerVolumePrefix_CID_bucketID key.
func containerSizeKey(cnr cid.ID) []byte {
result := make([]byte, containerSizeKeySize)
result[0] = containerSizePrefix
cnr.Encode(result[1:])
return result
}
func parseSize(v []byte) (int64, bool) {
if len(v) == 0 {
return 0, true
}
if len(v) != 8 {
return 0, false
}
return int64(binary.LittleEndian.Uint64(v)), true
}
func marshalSize(v int64) []byte {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(v))
return buf
}