From 11ccf9fec977e35b2adbd9120249a0b2688014cb Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 7 Mar 2024 17:03:15 +0300 Subject: [PATCH] [#9999] billing: Implement list containers. Signed-off-by: Dmitrii Stepanov --- .../engine/container_stat.go | 167 ++++++++++++ .../engine/container_stat_test.go | 221 ++++++++++++++++ .../metabase/container_stat.go | 237 ++++++++++++++++++ .../metabase/containers.go | 9 +- pkg/local_object_storage/metabase/counter.go | 23 +- .../shard/container_stat.go | 63 +++++ .../billing/server/list_containers.go | 129 +++++++++- 7 files changed, 834 insertions(+), 15 deletions(-) create mode 100644 pkg/local_object_storage/engine/container_stat.go create mode 100644 pkg/local_object_storage/engine/container_stat_test.go create mode 100644 pkg/local_object_storage/metabase/container_stat.go create mode 100644 pkg/local_object_storage/shard/container_stat.go diff --git a/pkg/local_object_storage/engine/container_stat.go b/pkg/local_object_storage/engine/container_stat.go new file mode 100644 index 000000000..f1e419e09 --- /dev/null +++ b/pkg/local_object_storage/engine/container_stat.go @@ -0,0 +1,167 @@ +package engine + +import ( + "bytes" + "context" + "errors" + "sort" + "sync" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" +) + +type ContainerStatPrm struct { + ContainerID []cid.ID + + Limit uint32 + StartFromContainerID *cid.ID +} + +type ContainerStatRes struct { + ContainerStats []ContainerStat + Partial bool +} + +type ContainerStat struct { + ContainerID cid.ID + SizeLogic uint64 + CountPhy, CountLogic, CountUser uint64 +} + +var errInvalidLimit = errors.New("limit must be greater than zero") + +func (e *StorageEngine) ContainerStat(ctx context.Context, prm ContainerStatPrm) (*ContainerStatRes, error) { + if e.metrics != nil { + defer elapsed("ContainerStat", e.metrics.AddMethodDuration)() + } + + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.ContainerStat", + trace.WithAttributes( + attribute.Int("container_ids", len(prm.ContainerID)), + attribute.Int64("limit", int64(prm.Limit)), + attribute.Bool("start_from_container_id", prm.StartFromContainerID != nil), + )) + defer span.End() + + if len(prm.ContainerID) == 0 && prm.Limit == 0 { + return nil, errInvalidLimit + } + + var result *ContainerStatRes + err := e.execIfNotBlocked(func() error { + var sErr error + result, sErr = e.containerStat(ctx, prm) + return sErr + }) + return result, err +} + +func (e *StorageEngine) containerStat(ctx context.Context, prm ContainerStatPrm) (*ContainerStatRes, error) { + e.mtx.RLock() + defer e.mtx.RUnlock() + + if len(prm.ContainerID) > 0 { + sort.Slice(prm.ContainerID, func(i, j int) bool { + return bytes.Compare(prm.ContainerID[i][:], prm.ContainerID[j][:]) < 0 + }) + } + + shardResults, partial, err := e.collectShardContainerStats(ctx, prm) + if err != nil { + return nil, err + } + + return &ContainerStatRes{ + ContainerStats: e.mergeShardContainerStats(shardResults, int(prm.Limit)), + Partial: partial, + }, nil +} + +func (e *StorageEngine) collectShardContainerStats(ctx context.Context, prm ContainerStatPrm) ([][]shard.ContainerStat, bool, error) { + if len(prm.ContainerID) > 0 { + sort.Slice(prm.ContainerID, func(i, j int) bool { + return bytes.Compare(prm.ContainerID[i][:], prm.ContainerID[j][:]) < 0 + }) + } + + var shardResults [][]shard.ContainerStat + var shardErrors []error + var resultsGuard sync.Mutex + + shPrm := shard.ContainerStatPrm{ + ContainerID: prm.ContainerID, + Limit: prm.Limit, + StartFromContainerID: prm.StartFromContainerID, + } + + eg, egCtx := errgroup.WithContext(ctx) + var shardsCount int + e.iterateOverUnsortedShards(func(hs hashedShard) (stop bool) { + shardsCount++ + eg.Go(func() error { + s, err := hs.ContainerStat(egCtx, shPrm) + resultsGuard.Lock() + defer resultsGuard.Unlock() + + if err != nil { + shardErrors = append(shardErrors, err) + return nil + } + + if len(s) > 0 { + shardResults = append(shardResults, s) + } + + return nil + }) + return false + }) + if err := eg.Wait(); err != nil { + return nil, false, err + } + if shardsCount == len(shardErrors) { + return nil, false, errors.Join(shardErrors...) + } + + return shardResults, len(shardErrors) > 0, nil +} + +func (e *StorageEngine) mergeShardContainerStats(shardResults [][]shard.ContainerStat, limit int) []ContainerStat { + var stats []ContainerStat + for len(stats) <= limit && len(shardResults) > 0 { + // shard results are sorted by container ID + sort.Slice(shardResults, func(i, j int) bool { + return bytes.Compare(shardResults[i][0].ContainerID[:], shardResults[j][0].ContainerID[:]) < 0 + }) + + if len(stats) > 0 && stats[len(stats)-1].ContainerID == shardResults[0][0].ContainerID { + stats[len(stats)-1].SizeLogic += shardResults[0][0].SizeLogic + stats[len(stats)-1].CountPhy += shardResults[0][0].CountPhy + stats[len(stats)-1].CountLogic += shardResults[0][0].CountLogic + stats[len(stats)-1].CountUser += shardResults[0][0].CountUser + } else { + stats = append(stats, ContainerStat{ + ContainerID: shardResults[0][0].ContainerID, + SizeLogic: shardResults[0][0].SizeLogic, + CountPhy: shardResults[0][0].CountPhy, + CountLogic: shardResults[0][0].CountLogic, + CountUser: shardResults[0][0].CountUser, + }) + } + + if len(shardResults[0]) == 1 { // last item for shard + shardResults = shardResults[1:] + } else { + shardResults[0] = shardResults[0][1:] + } + } + if len(stats) > limit { + stats = stats[:limit] + } + return stats +} diff --git a/pkg/local_object_storage/engine/container_stat_test.go b/pkg/local_object_storage/engine/container_stat_test.go new file mode 100644 index 000000000..264f6d4b6 --- /dev/null +++ b/pkg/local_object_storage/engine/container_stat_test.go @@ -0,0 +1,221 @@ +package engine + +import ( + "context" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +func TestContainerList(t *testing.T) { + t.Parallel() + + s1 := testNewShard(t) + s2 := testNewShard(t) + s3 := testNewShard(t) + + e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine + e.log = test.NewLogger(t) + defer e.Close(context.Background()) + + const containerCount = 10 + expStat := testPutComplexObject(t, []*shard.Shard{s1, s2, s3}, containerCount, nil) + expStat = testPutSimpleObject(t, []*shard.Shard{s1, s2, s3}, containerCount, expStat) + expStat = testPutSimpleObject(t, []*shard.Shard{s1, s2}, containerCount, expStat) + expStat = testPutSimpleObject(t, []*shard.Shard{s2, s3}, containerCount, expStat) + expStat = testPutSimpleObject(t, []*shard.Shard{s1}, containerCount, expStat) + expStat = testPutSimpleObject(t, []*shard.Shard{s2}, containerCount, expStat) + expStat = testPutSimpleObject(t, []*shard.Shard{s3}, containerCount, expStat) + + t.Run("with default limit", func(t *testing.T) { + var prm ContainerStatPrm + prm.Limit = 10_000 + res, err := e.ContainerStat(context.Background(), prm) + require.NoError(t, err) + require.NotNil(t, res) + require.ElementsMatch(t, expStat, res.ContainerStats) + require.False(t, res.Partial) + }) + + t.Run("with limit, batched", func(t *testing.T) { + var prm ContainerStatPrm + prm.Limit = 1 + var stats []ContainerStat + + for { + res, err := e.ContainerStat(context.Background(), prm) + require.NoError(t, err) + require.NotNil(t, res) + require.False(t, res.Partial) + if len(res.ContainerStats) == 0 { + break + } + stats = append(stats, res.ContainerStats...) + last := res.ContainerStats[len(res.ContainerStats)-1].ContainerID + prm.StartFromContainerID = &last + prm.Limit += 1 + } + + require.ElementsMatch(t, expStat, stats) + }) + + t.Run("by container id", func(t *testing.T) { + for _, cc := range []int{1, 2, 3, 4, 5} { + var prm ContainerStatPrm + for idx := 0; idx+cc < len(expStat); idx += cc { + prm.ContainerID = nil + for i := 0; i < cc; i++ { + prm.ContainerID = append(prm.ContainerID, expStat[idx+i].ContainerID) + } + prm.Limit = uint32(len(prm.ContainerID)) + res, err := e.ContainerStat(context.Background(), prm) + require.NoError(t, err) + require.NotNil(t, res) + require.False(t, res.Partial) + require.ElementsMatch(t, expStat[idx:idx+cc], res.ContainerStats) + } + } + }) + + t.Run("unknown container id", func(t *testing.T) { + var prm ContainerStatPrm + prm.ContainerID = append(prm.ContainerID, cidtest.ID()) + prm.Limit = uint32(len(prm.ContainerID)) + res, err := e.ContainerStat(context.Background(), prm) + require.NoError(t, err) + require.NotNil(t, res) + require.False(t, res.Partial) + require.ElementsMatch(t, []ContainerStat{{ContainerID: prm.ContainerID[0]}}, res.ContainerStats) + }) + + t.Run("degraded shard", func(t *testing.T) { + s1.SetMode(mode.Degraded) + var prm ContainerStatPrm + prm.Limit = 10_000 + res, err := e.ContainerStat(context.Background(), prm) + require.NoError(t, err) + require.NotNil(t, res) + require.True(t, res.Partial) + }) +} + +func testPutComplexObject(t *testing.T, shards []*shard.Shard, count int, stats []ContainerStat) []ContainerStat { + const payloadSize = 10 * 1024 + for count > 0 { + var cnr cid.ID + var stat *ContainerStat + var newStat bool + if len(stats) == 0 || count%2 == 0 { + cnr = cidtest.ID() + stat = &ContainerStat{ContainerID: cnr} + newStat = true + } else { + cnr = stats[count%len(stats)].ContainerID + stat = &stats[count%len(stats)] + } + + parentID := oidtest.ID() + splitID := objectSDK.NewSplitID() + + parent := testutil.GenerateObjectWithCID(cnr) + parent.SetID(parentID) + parent.SetPayload(nil) + + const childCount = 10 + children := make([]*objectSDK.Object, childCount) + childIDs := make([]oid.ID, childCount) + for i := range children { + children[i] = testutil.GenerateObjectWithCID(cnr) + if i != 0 { + children[i].SetPreviousID(childIDs[i-1]) + } + if i == len(children)-1 { + children[i].SetParent(parent) + } + children[i].SetSplitID(splitID) + children[i].SetPayload(make([]byte, payloadSize)) + children[i].SetPayloadSize(payloadSize) + childIDs[i], _ = children[i].ID() + + stat.SizeLogic += payloadSize + stat.CountLogic += 1 + stat.CountPhy += 1 + } + stat.CountUser += 1 + + link := testutil.GenerateObjectWithCID(cnr) + link.SetParent(parent) + link.SetParentID(parentID) + link.SetSplitID(splitID) + link.SetChildren(childIDs...) + + stat.CountLogic += 1 + stat.CountPhy += 1 + stat.SizeLogic += link.PayloadSize() + + for i := range children { + sh := shards[i%len(shards)] + var putPrm shard.PutPrm + putPrm.SetObject(children[i]) + _, err := sh.Put(context.Background(), putPrm) + require.NoError(t, err) + } + sh := shards[count%len(shards)] + var putPrm shard.PutPrm + putPrm.SetObject(link) + _, err := sh.Put(context.Background(), putPrm) + require.NoError(t, err) + + if newStat { + stats = append(stats, *stat) + } + count-- + } + return stats +} + +func testPutSimpleObject(t *testing.T, shards []*shard.Shard, count int, stats []ContainerStat) []ContainerStat { + const payloadSize = 7 * 1024 + for count > 0 { + var cnr cid.ID + var stat *ContainerStat + var newStat bool + if len(stats) == 0 || count%2 == 0 { + cnr = cidtest.ID() + stat = &ContainerStat{ContainerID: cnr} + newStat = true + } else { + cnr = stats[count%len(stats)].ContainerID + stat = &stats[count%len(stats)] + } + + obj := testutil.GenerateObjectWithCID(cnr) + obj.SetPayload(make([]byte, payloadSize)) + obj.SetPayloadSize(payloadSize) + stat.SizeLogic += payloadSize + stat.CountLogic += 1 + stat.CountPhy += 1 + stat.CountUser += 1 + + sh := shards[count%len(shards)] + var putPrm shard.PutPrm + putPrm.SetObject(obj) + _, err := sh.Put(context.Background(), putPrm) + require.NoError(t, err) + + if newStat { + stats = append(stats, *stat) + } + count-- + } + return stats +} diff --git a/pkg/local_object_storage/metabase/container_stat.go b/pkg/local_object_storage/metabase/container_stat.go new file mode 100644 index 000000000..db4c1ee8d --- /dev/null +++ b/pkg/local_object_storage/metabase/container_stat.go @@ -0,0 +1,237 @@ +package meta + +import ( + "bytes" + "context" + "crypto/sha256" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "go.etcd.io/bbolt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +type ContainerStatPrm struct { + ContainerID []cid.ID + + Limit uint32 + StartFromContainerID *cid.ID +} + +type ContainerStat struct { + ContainerID cid.ID + SizeLogic uint64 + CountPhy, CountLogic, CountUser uint64 +} + +// ContainerStat returns object count and size for containers. +// If len(prm.ContainerID) > 0, then result slice contains records in the same order as prm.ContainerID. +// Otherwise result slice sorted by ContainerID. +func (db *DB) ContainerStat(ctx context.Context, prm ContainerStatPrm) ([]ContainerStat, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ContainerStat", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerStat", + trace.WithAttributes( + attribute.Int("container_ids", len(prm.ContainerID)), + attribute.Int64("limit", int64(prm.Limit)), + attribute.Bool("start_from_container_id", prm.StartFromContainerID != nil), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return nil, ErrDegradedMode + } + + if len(prm.ContainerID) > 0 { + return db.containerStatByContainerID(prm.ContainerID) + } + return db.containerStatByLimit(prm.StartFromContainerID, prm.Limit) +} + +func (db *DB) containerStatByContainerID(containerID []cid.ID) ([]ContainerStat, error) { + var result []ContainerStat + err := db.boltDB.View(func(tx *bbolt.Tx) error { + for _, contID := range containerID { + var stat ContainerStat + stat.ContainerID = contID + stat.SizeLogic = db.containerSize(tx, contID) + + counters, err := db.containerCounters(tx, contID) + if err != nil { + return err + } + stat.CountPhy = counters.Phy + stat.CountLogic = counters.Logic + stat.CountUser = counters.User + result = append(result, stat) + } + return nil + }) + if err != nil { + return nil, metaerr.Wrap(err) + } + return result, nil +} + +func (db *DB) containerStatByLimit(startFrom *cid.ID, limit uint32) ([]ContainerStat, error) { + var result []ContainerStat + + var lastKey []byte + if startFrom != nil { + lastKey = make([]byte, sha256.Size) + startFrom.Encode(lastKey) + } + + var counts []containerIDObjectCounters + var sizes []containerIDSize + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + var e error + counts, e = getContainerCountersBatch(tx, lastKey, limit) + if e != nil { + return e + } + + sizes, e = getContainerSizesBatch(tx, lastKey, limit) + return e + }) + if err != nil { + return nil, metaerr.Wrap(err) + } + + result = mergeSizeAndCounts(counts, sizes) + if len(result) > int(limit) { + result = result[:limit] + } + return result, nil +} + +type containerIDObjectCounters struct { + ContainerID cid.ID + ObjectCounters +} + +func getContainerCountersBatch(tx *bbolt.Tx, lastKey []byte, limit uint32) ([]containerIDObjectCounters, error) { + var result []containerIDObjectCounters + b := tx.Bucket(containerCounterBucketName) + if b == nil { + return result, nil + } + c := b.Cursor() + var key, value []byte + for key, value = c.Seek(lastKey); key != nil && uint32(len(result)) < limit; key, value = c.Next() { + if bytes.Equal(lastKey, key) { + continue + } + + cnrID, err := parseContainerCounterKey(key) + if err != nil { + return nil, err + } + ent, err := parseContainerCounterValue(value) + if err != nil { + return nil, err + } + result = append(result, containerIDObjectCounters{ + ContainerID: cnrID, + ObjectCounters: ent, + }) + } + return result, nil +} + +type containerIDSize struct { + ContainerID cid.ID + Size uint64 +} + +func getContainerSizesBatch(tx *bbolt.Tx, lastKey []byte, limit uint32) ([]containerIDSize, error) { + var result []containerIDSize + b := tx.Bucket(containerVolumeBucketName) + c := b.Cursor() + var key, value []byte + for key, value = c.Seek(lastKey); key != nil && uint32(len(result)) < limit; key, value = c.Next() { + if bytes.Equal(lastKey, key) { + continue + } + + var r containerIDSize + r.Size = parseContainerSize(value) + if err := r.ContainerID.Decode(key); err != nil { + return nil, err + } + result = append(result, r) + } + return result, nil +} + +// mergeSizeAndCounts merges sizes and counts. +// As records are deleted in background, it can happen that metabase contains size record for container, +// but doesn't contain record for count. +func mergeSizeAndCounts(counts []containerIDObjectCounters, sizes []containerIDSize) []ContainerStat { + var result []ContainerStat + + for len(counts) > 0 || len(sizes) > 0 { + if len(counts) == 0 { + result = append(result, ContainerStat{ + ContainerID: sizes[0].ContainerID, + SizeLogic: sizes[0].Size, + }) + sizes = sizes[1:] + continue + } + + if len(sizes) == 0 { + result = append(result, ContainerStat{ + ContainerID: counts[0].ContainerID, + CountPhy: counts[0].Phy, + CountLogic: counts[0].Logic, + CountUser: counts[0].User, + }) + counts = counts[1:] + continue + } + + v := bytes.Compare(sizes[0].ContainerID[:], counts[0].ContainerID[:]) + + if v == 0 { // equal + result = append(result, ContainerStat{ + ContainerID: counts[0].ContainerID, + CountPhy: counts[0].Phy, + CountLogic: counts[0].Logic, + CountUser: counts[0].User, + SizeLogic: sizes[0].Size, + }) + counts = counts[1:] + sizes = sizes[1:] + } else if v < 0 { // from sizes + result = append(result, ContainerStat{ + ContainerID: sizes[0].ContainerID, + SizeLogic: sizes[0].Size, + }) + sizes = sizes[1:] + } else { // from counts + result = append(result, ContainerStat{ + ContainerID: counts[0].ContainerID, + CountPhy: counts[0].Phy, + CountLogic: counts[0].Logic, + CountUser: counts[0].User, + }) + counts = counts[1:] + } + } + + return result +} diff --git a/pkg/local_object_storage/metabase/containers.go b/pkg/local_object_storage/metabase/containers.go index 472b2affc..24a515486 100644 --- a/pkg/local_object_storage/metabase/containers.go +++ b/pkg/local_object_storage/metabase/containers.go @@ -65,20 +65,19 @@ func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) { } err = db.boltDB.View(func(tx *bbolt.Tx) error { - size, err = db.containerSize(tx, id) - - return err + size = db.containerSize(tx, id) + return nil }) return size, metaerr.Wrap(err) } -func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) (uint64, error) { +func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) uint64 { containerVolume := tx.Bucket(containerVolumeBucketName) key := make([]byte, cidSize) id.Encode(key) - return parseContainerSize(containerVolume.Get(key)), nil + return parseContainerSize(containerVolume.Get(key)) } func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool { diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index 275099ff2..d11710fd9 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -216,21 +216,28 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er var result ObjectCounters err := db.boltDB.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(containerCounterBucketName) - key := make([]byte, cidSize) - id.Encode(key) - v := b.Get(key) - if v == nil { - return nil - } var err error - result, err = parseContainerCounterValue(v) + result, err = db.containerCounters(tx, id) return err }) return result, metaerr.Wrap(err) } +func (*DB) containerCounters(tx *bbolt.Tx, id cid.ID) (ObjectCounters, error) { + b := tx.Bucket(containerCounterBucketName) + if b == nil { + return ObjectCounters{}, nil + } + key := make([]byte, cidSize) + id.Encode(key) + v := b.Get(key) + if v == nil { + return ObjectCounters{}, nil + } + return parseContainerCounterValue(v) +} + func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { b := tx.Bucket(shardInfoBucket) if b == nil { diff --git a/pkg/local_object_storage/shard/container_stat.go b/pkg/local_object_storage/shard/container_stat.go new file mode 100644 index 000000000..c213aa7fc --- /dev/null +++ b/pkg/local_object_storage/shard/container_stat.go @@ -0,0 +1,63 @@ +package shard + +import ( + "context" + + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +type ContainerStatPrm struct { + ContainerID []cid.ID + + Limit uint32 + StartFromContainerID *cid.ID +} + +type ContainerStat struct { + ContainerID cid.ID + SizeLogic uint64 + CountPhy, CountLogic, CountUser uint64 +} + +// ContainerStat returns object count and size for containers from metabase. +func (s *Shard) ContainerStat(ctx context.Context, prm ContainerStatPrm) ([]ContainerStat, error) { + _, span := tracing.StartSpanFromContext(ctx, "Shard.ContainerStat", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.Int("container_ids", len(prm.ContainerID)), + attribute.Int64("limit", int64(prm.Limit)), + attribute.Bool("start_from_container_id", prm.StartFromContainerID != nil), + )) + defer span.End() + + s.m.RLock() + defer s.m.RUnlock() + + if s.GetMode().NoMetabase() { + return nil, ErrDegradedMode + } + + res, err := s.metaBase.ContainerStat(ctx, meta.ContainerStatPrm{ + ContainerID: prm.ContainerID, + Limit: prm.Limit, + StartFromContainerID: prm.StartFromContainerID, + }) + if err != nil { + return nil, err + } + result := make([]ContainerStat, 0, len(res)) + for _, r := range res { + result = append(result, ContainerStat{ + ContainerID: r.ContainerID, + SizeLogic: r.SizeLogic, + CountPhy: r.CountPhy, + CountLogic: r.CountLogic, + CountUser: r.CountUser, + }) + } + return result, nil +} diff --git a/pkg/services/billing/server/list_containers.go b/pkg/services/billing/server/list_containers.go index e121e37f7..3882c879f 100644 --- a/pkg/services/billing/server/list_containers.go +++ b/pkg/services/billing/server/list_containers.go @@ -3,10 +3,135 @@ package server import ( "context" "errors" + "fmt" + containerApi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" + containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/billing" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) -func (s *Server) ListContainers(context.Context, *billing.ListContainersRequest) (*billing.ListContainersResponse, error) { - return nil, errors.New("not implemented") +const ( + defaultLimit = 1000 + maxLimit = 10000 +) + +var ( + errInvalidContainerIDLenght = errors.New("count of container ID array must be lower or equal 10 000 items") + errInvalidLimit = errors.New("limit value must be lower or equal 10 000") +) + +func (s *Server) ListContainers(ctx context.Context, req *billing.ListContainersRequest) (*billing.ListContainersResponse, error) { + err := s.isValidRequest(req) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + enginePrm, err := convertToEngineContainerStatPrm(req) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + res, err := s.se.ContainerStat(ctx, enginePrm) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + resp := &billing.ListContainersResponse{ + Body: &billing.ListContainersResponse_Body{ + Result: s.addContainerInfo(res.ContainerStats), + NextPageToken: containerListNextPageToken(res.ContainerStats, enginePrm.Limit), + Partial: res.Partial, + }, + } + + if err = SignMessage(s.key, resp); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return resp, nil +} + +func (s *Server) addContainerInfo(engineStats []engine.ContainerStat) []*billing.ListContainersResponse_Body_ContainerInfo { + var result []*billing.ListContainersResponse_Body_ContainerInfo + + for _, engineStat := range engineStats { + containerInfo := &billing.ListContainersResponse_Body_ContainerInfo{ + ContainerId: engineStat.ContainerID[:], + Count: &billing.ListContainersResponse_Body_ContainerInfo_Count{ + Phy: engineStat.CountPhy, + Logic: engineStat.CountLogic, + User: engineStat.CountUser, + }, + Size: &billing.ListContainersResponse_Body_ContainerInfo_Size{ + Logic: engineStat.SizeLogic, + }, + ContainerStatus: billing.ListContainersResponse_Body_ContainerInfo_UNDEFINED, + } + + cnr, err := s.cnrSrc.Get(engineStat.ContainerID) + if err != nil { + if client.IsErrContainerNotFound(err) { + existed, errWasRemoved := containercore.WasRemoved(s.cnrSrc, engineStat.ContainerID) + if errWasRemoved == nil && existed { + containerInfo.ContainerStatus = billing.ListContainersResponse_Body_ContainerInfo_DELETED + } + } + } else { + containerInfo.ContainerStatus = billing.ListContainersResponse_Body_ContainerInfo_AVAILABLE + containerInfo.Attributes = &billing.ListContainersResponse_Body_ContainerInfo_Attributes{ + OwnerWallet: cnr.Value.Owner().WalletBytes(), + Zone: cnr.Value.Attribute(containerApi.SysAttributeZone), + } + } + result = append(result, containerInfo) + } + + return result +} + +func containerListNextPageToken(engineStats []engine.ContainerStat, limit uint32) []byte { + if uint32(len(engineStats)) <= limit { + return nil + } + return engineStats[len(engineStats)-1].ContainerID[:] +} + +func convertToEngineContainerStatPrm(req *billing.ListContainersRequest) (engine.ContainerStatPrm, error) { + var result engine.ContainerStatPrm + + if len(req.GetBody().GetContainerId()) > 10000 { + return result, errInvalidContainerIDLenght + } + if len(req.GetBody().GetContainerId()) > 0 { + for idx, contIDBytes := range req.GetBody().GetContainerId() { + var contID cid.ID + if err := contID.Decode(contIDBytes); err != nil { + return result, fmt.Errorf("failed to decode container ID at index %d: %w", idx, err) + } + result.ContainerID = append(result.ContainerID, contID) + } + return result, nil + } + + result.Limit = defaultLimit + if req.GetBody().GetLimit() > maxLimit { + return result, errInvalidLimit + } + if req.GetBody().GetLimit() > 0 { + result.Limit = req.GetBody().GetLimit() + } + + if len(req.GetBody().GetNextPageToken()) > 0 { + var contID cid.ID + if err := contID.Decode(req.GetBody().GetNextPageToken()); err != nil { + return result, fmt.Errorf("invalid next page token: %w", err) + } + result.StartFromContainerID = &contID + } + + return result, nil }