forked from TrueCloudLab/frostfs-node
[#9999] billing: Implement list containers.
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
020281a0e9
commit
11ccf9fec9
7 changed files with 834 additions and 15 deletions
167
pkg/local_object_storage/engine/container_stat.go
Normal file
167
pkg/local_object_storage/engine/container_stat.go
Normal file
|
@ -0,0 +1,167 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type ContainerStatPrm struct {
|
||||
ContainerID []cid.ID
|
||||
|
||||
Limit uint32
|
||||
StartFromContainerID *cid.ID
|
||||
}
|
||||
|
||||
type ContainerStatRes struct {
|
||||
ContainerStats []ContainerStat
|
||||
Partial bool
|
||||
}
|
||||
|
||||
type ContainerStat struct {
|
||||
ContainerID cid.ID
|
||||
SizeLogic uint64
|
||||
CountPhy, CountLogic, CountUser uint64
|
||||
}
|
||||
|
||||
var errInvalidLimit = errors.New("limit must be greater than zero")
|
||||
|
||||
func (e *StorageEngine) ContainerStat(ctx context.Context, prm ContainerStatPrm) (*ContainerStatRes, error) {
|
||||
if e.metrics != nil {
|
||||
defer elapsed("ContainerStat", e.metrics.AddMethodDuration)()
|
||||
}
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.ContainerStat",
|
||||
trace.WithAttributes(
|
||||
attribute.Int("container_ids", len(prm.ContainerID)),
|
||||
attribute.Int64("limit", int64(prm.Limit)),
|
||||
attribute.Bool("start_from_container_id", prm.StartFromContainerID != nil),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if len(prm.ContainerID) == 0 && prm.Limit == 0 {
|
||||
return nil, errInvalidLimit
|
||||
}
|
||||
|
||||
var result *ContainerStatRes
|
||||
err := e.execIfNotBlocked(func() error {
|
||||
var sErr error
|
||||
result, sErr = e.containerStat(ctx, prm)
|
||||
return sErr
|
||||
})
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (e *StorageEngine) containerStat(ctx context.Context, prm ContainerStatPrm) (*ContainerStatRes, error) {
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
|
||||
if len(prm.ContainerID) > 0 {
|
||||
sort.Slice(prm.ContainerID, func(i, j int) bool {
|
||||
return bytes.Compare(prm.ContainerID[i][:], prm.ContainerID[j][:]) < 0
|
||||
})
|
||||
}
|
||||
|
||||
shardResults, partial, err := e.collectShardContainerStats(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ContainerStatRes{
|
||||
ContainerStats: e.mergeShardContainerStats(shardResults, int(prm.Limit)),
|
||||
Partial: partial,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) collectShardContainerStats(ctx context.Context, prm ContainerStatPrm) ([][]shard.ContainerStat, bool, error) {
|
||||
if len(prm.ContainerID) > 0 {
|
||||
sort.Slice(prm.ContainerID, func(i, j int) bool {
|
||||
return bytes.Compare(prm.ContainerID[i][:], prm.ContainerID[j][:]) < 0
|
||||
})
|
||||
}
|
||||
|
||||
var shardResults [][]shard.ContainerStat
|
||||
var shardErrors []error
|
||||
var resultsGuard sync.Mutex
|
||||
|
||||
shPrm := shard.ContainerStatPrm{
|
||||
ContainerID: prm.ContainerID,
|
||||
Limit: prm.Limit,
|
||||
StartFromContainerID: prm.StartFromContainerID,
|
||||
}
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
var shardsCount int
|
||||
e.iterateOverUnsortedShards(func(hs hashedShard) (stop bool) {
|
||||
shardsCount++
|
||||
eg.Go(func() error {
|
||||
s, err := hs.ContainerStat(egCtx, shPrm)
|
||||
resultsGuard.Lock()
|
||||
defer resultsGuard.Unlock()
|
||||
|
||||
if err != nil {
|
||||
shardErrors = append(shardErrors, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(s) > 0 {
|
||||
shardResults = append(shardResults, s)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
return false
|
||||
})
|
||||
if err := eg.Wait(); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if shardsCount == len(shardErrors) {
|
||||
return nil, false, errors.Join(shardErrors...)
|
||||
}
|
||||
|
||||
return shardResults, len(shardErrors) > 0, nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) mergeShardContainerStats(shardResults [][]shard.ContainerStat, limit int) []ContainerStat {
|
||||
var stats []ContainerStat
|
||||
for len(stats) <= limit && len(shardResults) > 0 {
|
||||
// shard results are sorted by container ID
|
||||
sort.Slice(shardResults, func(i, j int) bool {
|
||||
return bytes.Compare(shardResults[i][0].ContainerID[:], shardResults[j][0].ContainerID[:]) < 0
|
||||
})
|
||||
|
||||
if len(stats) > 0 && stats[len(stats)-1].ContainerID == shardResults[0][0].ContainerID {
|
||||
stats[len(stats)-1].SizeLogic += shardResults[0][0].SizeLogic
|
||||
stats[len(stats)-1].CountPhy += shardResults[0][0].CountPhy
|
||||
stats[len(stats)-1].CountLogic += shardResults[0][0].CountLogic
|
||||
stats[len(stats)-1].CountUser += shardResults[0][0].CountUser
|
||||
} else {
|
||||
stats = append(stats, ContainerStat{
|
||||
ContainerID: shardResults[0][0].ContainerID,
|
||||
SizeLogic: shardResults[0][0].SizeLogic,
|
||||
CountPhy: shardResults[0][0].CountPhy,
|
||||
CountLogic: shardResults[0][0].CountLogic,
|
||||
CountUser: shardResults[0][0].CountUser,
|
||||
})
|
||||
}
|
||||
|
||||
if len(shardResults[0]) == 1 { // last item for shard
|
||||
shardResults = shardResults[1:]
|
||||
} else {
|
||||
shardResults[0] = shardResults[0][1:]
|
||||
}
|
||||
}
|
||||
if len(stats) > limit {
|
||||
stats = stats[:limit]
|
||||
}
|
||||
return stats
|
||||
}
|
221
pkg/local_object_storage/engine/container_stat_test.go
Normal file
221
pkg/local_object_storage/engine/container_stat_test.go
Normal file
|
@ -0,0 +1,221 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestContainerList(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s1 := testNewShard(t)
|
||||
s2 := testNewShard(t)
|
||||
s3 := testNewShard(t)
|
||||
|
||||
e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine
|
||||
e.log = test.NewLogger(t)
|
||||
defer e.Close(context.Background())
|
||||
|
||||
const containerCount = 10
|
||||
expStat := testPutComplexObject(t, []*shard.Shard{s1, s2, s3}, containerCount, nil)
|
||||
expStat = testPutSimpleObject(t, []*shard.Shard{s1, s2, s3}, containerCount, expStat)
|
||||
expStat = testPutSimpleObject(t, []*shard.Shard{s1, s2}, containerCount, expStat)
|
||||
expStat = testPutSimpleObject(t, []*shard.Shard{s2, s3}, containerCount, expStat)
|
||||
expStat = testPutSimpleObject(t, []*shard.Shard{s1}, containerCount, expStat)
|
||||
expStat = testPutSimpleObject(t, []*shard.Shard{s2}, containerCount, expStat)
|
||||
expStat = testPutSimpleObject(t, []*shard.Shard{s3}, containerCount, expStat)
|
||||
|
||||
t.Run("with default limit", func(t *testing.T) {
|
||||
var prm ContainerStatPrm
|
||||
prm.Limit = 10_000
|
||||
res, err := e.ContainerStat(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, res)
|
||||
require.ElementsMatch(t, expStat, res.ContainerStats)
|
||||
require.False(t, res.Partial)
|
||||
})
|
||||
|
||||
t.Run("with limit, batched", func(t *testing.T) {
|
||||
var prm ContainerStatPrm
|
||||
prm.Limit = 1
|
||||
var stats []ContainerStat
|
||||
|
||||
for {
|
||||
res, err := e.ContainerStat(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, res)
|
||||
require.False(t, res.Partial)
|
||||
if len(res.ContainerStats) == 0 {
|
||||
break
|
||||
}
|
||||
stats = append(stats, res.ContainerStats...)
|
||||
last := res.ContainerStats[len(res.ContainerStats)-1].ContainerID
|
||||
prm.StartFromContainerID = &last
|
||||
prm.Limit += 1
|
||||
}
|
||||
|
||||
require.ElementsMatch(t, expStat, stats)
|
||||
})
|
||||
|
||||
t.Run("by container id", func(t *testing.T) {
|
||||
for _, cc := range []int{1, 2, 3, 4, 5} {
|
||||
var prm ContainerStatPrm
|
||||
for idx := 0; idx+cc < len(expStat); idx += cc {
|
||||
prm.ContainerID = nil
|
||||
for i := 0; i < cc; i++ {
|
||||
prm.ContainerID = append(prm.ContainerID, expStat[idx+i].ContainerID)
|
||||
}
|
||||
prm.Limit = uint32(len(prm.ContainerID))
|
||||
res, err := e.ContainerStat(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, res)
|
||||
require.False(t, res.Partial)
|
||||
require.ElementsMatch(t, expStat[idx:idx+cc], res.ContainerStats)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("unknown container id", func(t *testing.T) {
|
||||
var prm ContainerStatPrm
|
||||
prm.ContainerID = append(prm.ContainerID, cidtest.ID())
|
||||
prm.Limit = uint32(len(prm.ContainerID))
|
||||
res, err := e.ContainerStat(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, res)
|
||||
require.False(t, res.Partial)
|
||||
require.ElementsMatch(t, []ContainerStat{{ContainerID: prm.ContainerID[0]}}, res.ContainerStats)
|
||||
})
|
||||
|
||||
t.Run("degraded shard", func(t *testing.T) {
|
||||
s1.SetMode(mode.Degraded)
|
||||
var prm ContainerStatPrm
|
||||
prm.Limit = 10_000
|
||||
res, err := e.ContainerStat(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, res)
|
||||
require.True(t, res.Partial)
|
||||
})
|
||||
}
|
||||
|
||||
func testPutComplexObject(t *testing.T, shards []*shard.Shard, count int, stats []ContainerStat) []ContainerStat {
|
||||
const payloadSize = 10 * 1024
|
||||
for count > 0 {
|
||||
var cnr cid.ID
|
||||
var stat *ContainerStat
|
||||
var newStat bool
|
||||
if len(stats) == 0 || count%2 == 0 {
|
||||
cnr = cidtest.ID()
|
||||
stat = &ContainerStat{ContainerID: cnr}
|
||||
newStat = true
|
||||
} else {
|
||||
cnr = stats[count%len(stats)].ContainerID
|
||||
stat = &stats[count%len(stats)]
|
||||
}
|
||||
|
||||
parentID := oidtest.ID()
|
||||
splitID := objectSDK.NewSplitID()
|
||||
|
||||
parent := testutil.GenerateObjectWithCID(cnr)
|
||||
parent.SetID(parentID)
|
||||
parent.SetPayload(nil)
|
||||
|
||||
const childCount = 10
|
||||
children := make([]*objectSDK.Object, childCount)
|
||||
childIDs := make([]oid.ID, childCount)
|
||||
for i := range children {
|
||||
children[i] = testutil.GenerateObjectWithCID(cnr)
|
||||
if i != 0 {
|
||||
children[i].SetPreviousID(childIDs[i-1])
|
||||
}
|
||||
if i == len(children)-1 {
|
||||
children[i].SetParent(parent)
|
||||
}
|
||||
children[i].SetSplitID(splitID)
|
||||
children[i].SetPayload(make([]byte, payloadSize))
|
||||
children[i].SetPayloadSize(payloadSize)
|
||||
childIDs[i], _ = children[i].ID()
|
||||
|
||||
stat.SizeLogic += payloadSize
|
||||
stat.CountLogic += 1
|
||||
stat.CountPhy += 1
|
||||
}
|
||||
stat.CountUser += 1
|
||||
|
||||
link := testutil.GenerateObjectWithCID(cnr)
|
||||
link.SetParent(parent)
|
||||
link.SetParentID(parentID)
|
||||
link.SetSplitID(splitID)
|
||||
link.SetChildren(childIDs...)
|
||||
|
||||
stat.CountLogic += 1
|
||||
stat.CountPhy += 1
|
||||
stat.SizeLogic += link.PayloadSize()
|
||||
|
||||
for i := range children {
|
||||
sh := shards[i%len(shards)]
|
||||
var putPrm shard.PutPrm
|
||||
putPrm.SetObject(children[i])
|
||||
_, err := sh.Put(context.Background(), putPrm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
sh := shards[count%len(shards)]
|
||||
var putPrm shard.PutPrm
|
||||
putPrm.SetObject(link)
|
||||
_, err := sh.Put(context.Background(), putPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
if newStat {
|
||||
stats = append(stats, *stat)
|
||||
}
|
||||
count--
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
func testPutSimpleObject(t *testing.T, shards []*shard.Shard, count int, stats []ContainerStat) []ContainerStat {
|
||||
const payloadSize = 7 * 1024
|
||||
for count > 0 {
|
||||
var cnr cid.ID
|
||||
var stat *ContainerStat
|
||||
var newStat bool
|
||||
if len(stats) == 0 || count%2 == 0 {
|
||||
cnr = cidtest.ID()
|
||||
stat = &ContainerStat{ContainerID: cnr}
|
||||
newStat = true
|
||||
} else {
|
||||
cnr = stats[count%len(stats)].ContainerID
|
||||
stat = &stats[count%len(stats)]
|
||||
}
|
||||
|
||||
obj := testutil.GenerateObjectWithCID(cnr)
|
||||
obj.SetPayload(make([]byte, payloadSize))
|
||||
obj.SetPayloadSize(payloadSize)
|
||||
stat.SizeLogic += payloadSize
|
||||
stat.CountLogic += 1
|
||||
stat.CountPhy += 1
|
||||
stat.CountUser += 1
|
||||
|
||||
sh := shards[count%len(shards)]
|
||||
var putPrm shard.PutPrm
|
||||
putPrm.SetObject(obj)
|
||||
_, err := sh.Put(context.Background(), putPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
if newStat {
|
||||
stats = append(stats, *stat)
|
||||
}
|
||||
count--
|
||||
}
|
||||
return stats
|
||||
}
|
237
pkg/local_object_storage/metabase/container_stat.go
Normal file
237
pkg/local_object_storage/metabase/container_stat.go
Normal file
|
@ -0,0 +1,237 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
type ContainerStatPrm struct {
|
||||
ContainerID []cid.ID
|
||||
|
||||
Limit uint32
|
||||
StartFromContainerID *cid.ID
|
||||
}
|
||||
|
||||
type ContainerStat struct {
|
||||
ContainerID cid.ID
|
||||
SizeLogic uint64
|
||||
CountPhy, CountLogic, CountUser uint64
|
||||
}
|
||||
|
||||
// ContainerStat returns object count and size for containers.
|
||||
// If len(prm.ContainerID) > 0, then result slice contains records in the same order as prm.ContainerID.
|
||||
// Otherwise result slice sorted by ContainerID.
|
||||
func (db *DB) ContainerStat(ctx context.Context, prm ContainerStatPrm) ([]ContainerStat, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("ContainerStat", time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerStat",
|
||||
trace.WithAttributes(
|
||||
attribute.Int("container_ids", len(prm.ContainerID)),
|
||||
attribute.Int64("limit", int64(prm.Limit)),
|
||||
attribute.Bool("start_from_container_id", prm.StartFromContainerID != nil),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
|
||||
if len(prm.ContainerID) > 0 {
|
||||
return db.containerStatByContainerID(prm.ContainerID)
|
||||
}
|
||||
return db.containerStatByLimit(prm.StartFromContainerID, prm.Limit)
|
||||
}
|
||||
|
||||
func (db *DB) containerStatByContainerID(containerID []cid.ID) ([]ContainerStat, error) {
|
||||
var result []ContainerStat
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
for _, contID := range containerID {
|
||||
var stat ContainerStat
|
||||
stat.ContainerID = contID
|
||||
stat.SizeLogic = db.containerSize(tx, contID)
|
||||
|
||||
counters, err := db.containerCounters(tx, contID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stat.CountPhy = counters.Phy
|
||||
stat.CountLogic = counters.Logic
|
||||
stat.CountUser = counters.User
|
||||
result = append(result, stat)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, metaerr.Wrap(err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (db *DB) containerStatByLimit(startFrom *cid.ID, limit uint32) ([]ContainerStat, error) {
|
||||
var result []ContainerStat
|
||||
|
||||
var lastKey []byte
|
||||
if startFrom != nil {
|
||||
lastKey = make([]byte, sha256.Size)
|
||||
startFrom.Encode(lastKey)
|
||||
}
|
||||
|
||||
var counts []containerIDObjectCounters
|
||||
var sizes []containerIDSize
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
var e error
|
||||
counts, e = getContainerCountersBatch(tx, lastKey, limit)
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
|
||||
sizes, e = getContainerSizesBatch(tx, lastKey, limit)
|
||||
return e
|
||||
})
|
||||
if err != nil {
|
||||
return nil, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
result = mergeSizeAndCounts(counts, sizes)
|
||||
if len(result) > int(limit) {
|
||||
result = result[:limit]
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type containerIDObjectCounters struct {
|
||||
ContainerID cid.ID
|
||||
ObjectCounters
|
||||
}
|
||||
|
||||
func getContainerCountersBatch(tx *bbolt.Tx, lastKey []byte, limit uint32) ([]containerIDObjectCounters, error) {
|
||||
var result []containerIDObjectCounters
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
if b == nil {
|
||||
return result, nil
|
||||
}
|
||||
c := b.Cursor()
|
||||
var key, value []byte
|
||||
for key, value = c.Seek(lastKey); key != nil && uint32(len(result)) < limit; key, value = c.Next() {
|
||||
if bytes.Equal(lastKey, key) {
|
||||
continue
|
||||
}
|
||||
|
||||
cnrID, err := parseContainerCounterKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ent, err := parseContainerCounterValue(value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, containerIDObjectCounters{
|
||||
ContainerID: cnrID,
|
||||
ObjectCounters: ent,
|
||||
})
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type containerIDSize struct {
|
||||
ContainerID cid.ID
|
||||
Size uint64
|
||||
}
|
||||
|
||||
func getContainerSizesBatch(tx *bbolt.Tx, lastKey []byte, limit uint32) ([]containerIDSize, error) {
|
||||
var result []containerIDSize
|
||||
b := tx.Bucket(containerVolumeBucketName)
|
||||
c := b.Cursor()
|
||||
var key, value []byte
|
||||
for key, value = c.Seek(lastKey); key != nil && uint32(len(result)) < limit; key, value = c.Next() {
|
||||
if bytes.Equal(lastKey, key) {
|
||||
continue
|
||||
}
|
||||
|
||||
var r containerIDSize
|
||||
r.Size = parseContainerSize(value)
|
||||
if err := r.ContainerID.Decode(key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, r)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// mergeSizeAndCounts merges sizes and counts.
|
||||
// As records are deleted in background, it can happen that metabase contains size record for container,
|
||||
// but doesn't contain record for count.
|
||||
func mergeSizeAndCounts(counts []containerIDObjectCounters, sizes []containerIDSize) []ContainerStat {
|
||||
var result []ContainerStat
|
||||
|
||||
for len(counts) > 0 || len(sizes) > 0 {
|
||||
if len(counts) == 0 {
|
||||
result = append(result, ContainerStat{
|
||||
ContainerID: sizes[0].ContainerID,
|
||||
SizeLogic: sizes[0].Size,
|
||||
})
|
||||
sizes = sizes[1:]
|
||||
continue
|
||||
}
|
||||
|
||||
if len(sizes) == 0 {
|
||||
result = append(result, ContainerStat{
|
||||
ContainerID: counts[0].ContainerID,
|
||||
CountPhy: counts[0].Phy,
|
||||
CountLogic: counts[0].Logic,
|
||||
CountUser: counts[0].User,
|
||||
})
|
||||
counts = counts[1:]
|
||||
continue
|
||||
}
|
||||
|
||||
v := bytes.Compare(sizes[0].ContainerID[:], counts[0].ContainerID[:])
|
||||
|
||||
if v == 0 { // equal
|
||||
result = append(result, ContainerStat{
|
||||
ContainerID: counts[0].ContainerID,
|
||||
CountPhy: counts[0].Phy,
|
||||
CountLogic: counts[0].Logic,
|
||||
CountUser: counts[0].User,
|
||||
SizeLogic: sizes[0].Size,
|
||||
})
|
||||
counts = counts[1:]
|
||||
sizes = sizes[1:]
|
||||
} else if v < 0 { // from sizes
|
||||
result = append(result, ContainerStat{
|
||||
ContainerID: sizes[0].ContainerID,
|
||||
SizeLogic: sizes[0].Size,
|
||||
})
|
||||
sizes = sizes[1:]
|
||||
} else { // from counts
|
||||
result = append(result, ContainerStat{
|
||||
ContainerID: counts[0].ContainerID,
|
||||
CountPhy: counts[0].Phy,
|
||||
CountLogic: counts[0].Logic,
|
||||
CountUser: counts[0].User,
|
||||
})
|
||||
counts = counts[1:]
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
|
@ -65,20 +65,19 @@ func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) {
|
|||
}
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
size, err = db.containerSize(tx, id)
|
||||
|
||||
return err
|
||||
size = db.containerSize(tx, id)
|
||||
return nil
|
||||
})
|
||||
|
||||
return size, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) (uint64, error) {
|
||||
func (db *DB) containerSize(tx *bbolt.Tx, id cid.ID) uint64 {
|
||||
containerVolume := tx.Bucket(containerVolumeBucketName)
|
||||
key := make([]byte, cidSize)
|
||||
id.Encode(key)
|
||||
|
||||
return parseContainerSize(containerVolume.Get(key)), nil
|
||||
return parseContainerSize(containerVolume.Get(key))
|
||||
}
|
||||
|
||||
func parseContainerID(dst *cid.ID, name []byte, ignore map[string]struct{}) bool {
|
||||
|
|
|
@ -216,21 +216,28 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er
|
|||
var result ObjectCounters
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
key := make([]byte, cidSize)
|
||||
id.Encode(key)
|
||||
v := b.Get(key)
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
var err error
|
||||
result, err = parseContainerCounterValue(v)
|
||||
result, err = db.containerCounters(tx, id)
|
||||
return err
|
||||
})
|
||||
|
||||
return result, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (*DB) containerCounters(tx *bbolt.Tx, id cid.ID) (ObjectCounters, error) {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
if b == nil {
|
||||
return ObjectCounters{}, nil
|
||||
}
|
||||
key := make([]byte, cidSize)
|
||||
id.Encode(key)
|
||||
v := b.Get(key)
|
||||
if v == nil {
|
||||
return ObjectCounters{}, nil
|
||||
}
|
||||
return parseContainerCounterValue(v)
|
||||
}
|
||||
|
||||
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b == nil {
|
||||
|
|
63
pkg/local_object_storage/shard/container_stat.go
Normal file
63
pkg/local_object_storage/shard/container_stat.go
Normal file
|
@ -0,0 +1,63 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
type ContainerStatPrm struct {
|
||||
ContainerID []cid.ID
|
||||
|
||||
Limit uint32
|
||||
StartFromContainerID *cid.ID
|
||||
}
|
||||
|
||||
type ContainerStat struct {
|
||||
ContainerID cid.ID
|
||||
SizeLogic uint64
|
||||
CountPhy, CountLogic, CountUser uint64
|
||||
}
|
||||
|
||||
// ContainerStat returns object count and size for containers from metabase.
|
||||
func (s *Shard) ContainerStat(ctx context.Context, prm ContainerStatPrm) ([]ContainerStat, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "Shard.ContainerStat",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
attribute.Int("container_ids", len(prm.ContainerID)),
|
||||
attribute.Int64("limit", int64(prm.Limit)),
|
||||
attribute.Bool("start_from_container_id", prm.StartFromContainerID != nil),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.GetMode().NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
|
||||
res, err := s.metaBase.ContainerStat(ctx, meta.ContainerStatPrm{
|
||||
ContainerID: prm.ContainerID,
|
||||
Limit: prm.Limit,
|
||||
StartFromContainerID: prm.StartFromContainerID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := make([]ContainerStat, 0, len(res))
|
||||
for _, r := range res {
|
||||
result = append(result, ContainerStat{
|
||||
ContainerID: r.ContainerID,
|
||||
SizeLogic: r.SizeLogic,
|
||||
CountPhy: r.CountPhy,
|
||||
CountLogic: r.CountLogic,
|
||||
CountUser: r.CountUser,
|
||||
})
|
||||
}
|
||||
return result, nil
|
||||
}
|
|
@ -3,10 +3,135 @@ package server
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
containerApi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/billing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (s *Server) ListContainers(context.Context, *billing.ListContainersRequest) (*billing.ListContainersResponse, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
const (
|
||||
defaultLimit = 1000
|
||||
maxLimit = 10000
|
||||
)
|
||||
|
||||
var (
|
||||
errInvalidContainerIDLenght = errors.New("count of container ID array must be lower or equal 10 000 items")
|
||||
errInvalidLimit = errors.New("limit value must be lower or equal 10 000")
|
||||
)
|
||||
|
||||
func (s *Server) ListContainers(ctx context.Context, req *billing.ListContainersRequest) (*billing.ListContainersResponse, error) {
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
enginePrm, err := convertToEngineContainerStatPrm(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
res, err := s.se.ContainerStat(ctx, enginePrm)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resp := &billing.ListContainersResponse{
|
||||
Body: &billing.ListContainersResponse_Body{
|
||||
Result: s.addContainerInfo(res.ContainerStats),
|
||||
NextPageToken: containerListNextPageToken(res.ContainerStats, enginePrm.Limit),
|
||||
Partial: res.Partial,
|
||||
},
|
||||
}
|
||||
|
||||
if err = SignMessage(s.key, resp); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) addContainerInfo(engineStats []engine.ContainerStat) []*billing.ListContainersResponse_Body_ContainerInfo {
|
||||
var result []*billing.ListContainersResponse_Body_ContainerInfo
|
||||
|
||||
for _, engineStat := range engineStats {
|
||||
containerInfo := &billing.ListContainersResponse_Body_ContainerInfo{
|
||||
ContainerId: engineStat.ContainerID[:],
|
||||
Count: &billing.ListContainersResponse_Body_ContainerInfo_Count{
|
||||
Phy: engineStat.CountPhy,
|
||||
Logic: engineStat.CountLogic,
|
||||
User: engineStat.CountUser,
|
||||
},
|
||||
Size: &billing.ListContainersResponse_Body_ContainerInfo_Size{
|
||||
Logic: engineStat.SizeLogic,
|
||||
},
|
||||
ContainerStatus: billing.ListContainersResponse_Body_ContainerInfo_UNDEFINED,
|
||||
}
|
||||
|
||||
cnr, err := s.cnrSrc.Get(engineStat.ContainerID)
|
||||
if err != nil {
|
||||
if client.IsErrContainerNotFound(err) {
|
||||
existed, errWasRemoved := containercore.WasRemoved(s.cnrSrc, engineStat.ContainerID)
|
||||
if errWasRemoved == nil && existed {
|
||||
containerInfo.ContainerStatus = billing.ListContainersResponse_Body_ContainerInfo_DELETED
|
||||
}
|
||||
}
|
||||
} else {
|
||||
containerInfo.ContainerStatus = billing.ListContainersResponse_Body_ContainerInfo_AVAILABLE
|
||||
containerInfo.Attributes = &billing.ListContainersResponse_Body_ContainerInfo_Attributes{
|
||||
OwnerWallet: cnr.Value.Owner().WalletBytes(),
|
||||
Zone: cnr.Value.Attribute(containerApi.SysAttributeZone),
|
||||
}
|
||||
}
|
||||
result = append(result, containerInfo)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func containerListNextPageToken(engineStats []engine.ContainerStat, limit uint32) []byte {
|
||||
if uint32(len(engineStats)) <= limit {
|
||||
return nil
|
||||
}
|
||||
return engineStats[len(engineStats)-1].ContainerID[:]
|
||||
}
|
||||
|
||||
func convertToEngineContainerStatPrm(req *billing.ListContainersRequest) (engine.ContainerStatPrm, error) {
|
||||
var result engine.ContainerStatPrm
|
||||
|
||||
if len(req.GetBody().GetContainerId()) > 10000 {
|
||||
return result, errInvalidContainerIDLenght
|
||||
}
|
||||
if len(req.GetBody().GetContainerId()) > 0 {
|
||||
for idx, contIDBytes := range req.GetBody().GetContainerId() {
|
||||
var contID cid.ID
|
||||
if err := contID.Decode(contIDBytes); err != nil {
|
||||
return result, fmt.Errorf("failed to decode container ID at index %d: %w", idx, err)
|
||||
}
|
||||
result.ContainerID = append(result.ContainerID, contID)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
result.Limit = defaultLimit
|
||||
if req.GetBody().GetLimit() > maxLimit {
|
||||
return result, errInvalidLimit
|
||||
}
|
||||
if req.GetBody().GetLimit() > 0 {
|
||||
result.Limit = req.GetBody().GetLimit()
|
||||
}
|
||||
|
||||
if len(req.GetBody().GetNextPageToken()) > 0 {
|
||||
var contID cid.ID
|
||||
if err := contID.Decode(req.GetBody().GetNextPageToken()); err != nil {
|
||||
return result, fmt.Errorf("invalid next page token: %w", err)
|
||||
}
|
||||
result.StartFromContainerID = &contID
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue