diff --git a/pkg/local_object_storage/metabase/containers.go b/pkg/local_object_storage/metabase/containers.go index 5fae1f72e..c8a1318bc 100644 --- a/pkg/local_object_storage/metabase/containers.go +++ b/pkg/local_object_storage/metabase/containers.go @@ -1,14 +1,23 @@ package meta import ( + "bytes" "context" "encoding/binary" + "fmt" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +const ( + containerSizeKeySize = 1 + cidSize + 2 + containerSizePrefixSize = 1 + cidSize ) func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) { @@ -30,8 +39,8 @@ func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) { return nil, ErrDegradedMode } - err = db.database.View(func(tx *bbolt.Tx) error { - list, err = db.containers(tx) + err = db.snapshot(func(s *pebble.Snapshot) error { + list, err = containers(ctx, s) return err }) @@ -39,50 +48,30 @@ func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) { return list, metaerr.Wrap(err) } -func (db *DB) containers(tx *bbolt.Tx) ([]cid.ID, error) { +func containers(ctx context.Context, r pebble.Reader) ([]cid.ID, error) { result := make([]cid.ID, 0) unique := make(map[string]struct{}) var cnr cid.ID - err := tx.ForEach(func(name []byte, _ *bbolt.Bucket) error { - if parseContainerID(&cnr, name, unique) { - result = append(result, cnr) - unique[string(name[1:bucketKeySize])] = struct{}{} - } - - return nil + it, err := r.NewIter(&pebble.IterOptions{ + OnlyReadGuaranteedDurable: true, }) - - return result, err -} - -func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) { - db.modeMtx.RLock() - defer db.modeMtx.RUnlock() - - if db.mode.NoMetabase() { - return 0, ErrDegradedMode + if err != nil { + return nil, err } - err = db.database.View(func(tx *bbolt.Tx) error { - size, err = db.containerSize(tx, id) + for v := it.First(); v; v = it.Next() { + if parseContainerIDWithIgnore(&cnr, it.Key(), unique) { + result = append(result, cnr) + unique[string(it.Key()[1:containerSizePrefixSize])] = struct{}{} + } + } - return err - }) - - return size, metaerr.Wrap(err) + return result, nil } -func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) (uint64, error) { - containerVolume := tx.Bucket(containerVolumeBucketName) - key := make([]byte, cidSize) - id.Encode(key) - - return parseContainerSize(containerVolume.Get(key)), nil -} - -func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool { - if len(name) != bucketKeySize { +func parseContainerIDWithIgnore(dst *cid.ID, name []byte, ignore map[string]struct{}) bool { + if len(name) < bucketKeySize { return false } if _, ok := ignore[string(name[1:bucketKeySize])]; ok { @@ -91,31 +80,194 @@ func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool return dst.Decode(name[1:bucketKeySize]) == nil } -func parseContainerSize(v []byte) uint64 { +func (db *DB) ContainerSize(ctx context.Context, id cid.ID) (size uint64, err error) { + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return 0, ErrDegradedMode + } + + result, err := db.containerSizesInternal(ctx, &id) + if err != nil { + return 0, metaerr.Wrap(err) + } + return result[id], nil +} + +func (db *DB) ContainerSizes(ctx context.Context) (map[cid.ID]uint64, error) { + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return nil, ErrDegradedMode + } + + return db.containerSizesInternal(ctx, nil) +} + +// ZeroSizeContainers returns containers with size = 0. +func (db *DB) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ZeroSizeContainers", time.Since(startedAt), success) + }() + + ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroSizeContainers") + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + sizes, err := db.containerSizesInternal(ctx, nil) + if err != nil { + return nil, err + } + var result []cid.ID + for id, size := range sizes { + if size == 0 { + result = append(result, id) + } + } + return result, nil +} + +func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("DeleteContainerSize", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerSize", + trace.WithAttributes( + attribute.Stringer("container_id", id), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + if db.mode.ReadOnly() { + return ErrReadOnlyMode + } + + return metaerr.Wrap(db.batch( + func(b *pebble.Batch) error { + return deleteByPrefix(ctx, b, containerSizeKeyPrefix(id)) + })) +} + +func (db *DB) containerSizesInternal(ctx context.Context, id *cid.ID) (map[cid.ID]uint64, error) { + prefix := []byte{containerSizePrefix} + if id != nil { + prefix = containerSizeKeyPrefix(*id) + } + result := make(map[cid.ID]int64) + err := db.snapshot(func(s *pebble.Snapshot) error { + it, err := s.NewIter(&pebble.IterOptions{ + LowerBound: prefix, + OnlyReadGuaranteedDurable: true, + }) + if err != nil { + return err + } + + for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + key := it.Key() + var cnr cid.ID + if err := cnr.Decode(key[1:containerSizePrefixSize]); err != nil { + return fmt.Errorf("invalid container size key: %w", err) + } + + value, ok := parseSize(it.Value()) + if !ok { + return fmt.Errorf("invalid container size value for container %s", cnr) + } + result[cnr] += value + } + return nil + }) + if err != nil { + return nil, metaerr.Wrap(err) + } + + return normilizeContainerSizes(result) +} + +func normilizeContainerSizes(sizes map[cid.ID]int64) (map[cid.ID]uint64, error) { + result := make(map[cid.ID]uint64, len(sizes)) + for k, v := range sizes { + if v < 0 { + return nil, fmt.Errorf("invalid cumulative size for container %s", k) + } + result[k] = uint64(v) + } + return result, nil +} + +func changeContainerSize(b *pebble.Batch, id cid.ID, delta int64, bucketID uint16) error { + key := containerSizeKey(id, bucketID) + + v, err := valueSafe(b, key) + if err != nil { + return err + } + + size, ok := parseSize(v) + if !ok { + return fmt.Errorf("invalid container size value for container %s", id) + } + + size += delta + value := marshalSize(size) + return b.Set(key, value, pebble.Sync) +} + +// containerSizeKeyPrefix returns containerSizePrefix_CID key prefix. +func containerSizeKeyPrefix(cnr cid.ID) []byte { + result := make([]byte, containerSizePrefixSize) + result[0] = containerSizePrefix + cnr.Encode(result[1:]) + return result +} + +// containerSizeKey returns containerVolumePrefix_CID_bucketID key. +func containerSizeKey(cnr cid.ID, bucketID uint16) []byte { + result := make([]byte, containerSizeKeySize) + result[0] = containerSizePrefix + cnr.Encode(result[1:]) + binary.LittleEndian.PutUint16(result[containerSizePrefixSize:], bucketID) + return result +} + +func parseSize(v []byte) (int64, bool) { if len(v) == 0 { - return 0 + return 0, true } - - return binary.LittleEndian.Uint64(v) + if len(v) != 8 { + return 0, false + } + return int64(binary.LittleEndian.Uint64(v)), true } -func changeContainerSize(tx *bbolt.Tx, id cid.ID, delta uint64, increase bool) error { - containerVolume := tx.Bucket(containerVolumeBucketName) - key := make([]byte, cidSize) - id.Encode(key) - - size := parseContainerSize(containerVolume.Get(key)) - - if increase { - size += delta - } else if size > delta { - size -= delta - } else { - size = 0 - } - - buf := make([]byte, 8) // consider using sync.Pool to decrease allocations - binary.LittleEndian.PutUint64(buf, size) - - return containerVolume.Put(key, buf) +func marshalSize(v int64) []byte { + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, uint64(v)) + return buf } diff --git a/pkg/local_object_storage/metabase/pebble.go b/pkg/local_object_storage/metabase/pebble.go index 9921ce119..3f94289c1 100644 --- a/pkg/local_object_storage/metabase/pebble.go +++ b/pkg/local_object_storage/metabase/pebble.go @@ -67,3 +67,26 @@ func selectByPrefixBatch(ctx context.Context, r pebble.Reader, prefix []byte, ba } return result, it.Close() } + +func deleteByPrefix(ctx context.Context, b *pebble.Batch, prefix []byte) error { + for { + batch, err := selectByPrefixBatch(ctx, b, prefix, batchSize) + if err != nil { + return err + } + for _, key := range batch { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if err := b.Delete(key, pebble.Sync); err != nil { + return err + } + } + if len(batch) < batchSize { + return nil + } + } +}