Drop frostfs_node_engine_container_size_bytes and ..._count_total metric for removed containers #889

Merged
fyrchik merged 3 commits from dstepanov-yadro/frostfs-node:fix/drop_zero_metrics into master 2024-09-04 19:51:05 +00:00
18 changed files with 716 additions and 109 deletions

View file

@ -45,6 +45,8 @@ func initContainerService(_ context.Context, c *cfg) {
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
containerGRPC.RegisterContainerServiceServer(s, server) containerGRPC.RegisterContainerServiceServer(s, server)
}) })
c.cfgObject.cfgLocalStorage.localStorage.SetContainerSource(cnrRdr)
} }
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) { func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {

View file

@ -557,4 +557,14 @@ const (
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed" BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
ObjectRemovalFailureExistsInWritecache = "can't remove object: object must be flushed from writecache" ObjectRemovalFailureExistsInWritecache = "can't remove object: object must be flushed from writecache"
FailedToReportStatusToSystemd = "failed to report status to systemd" FailedToReportStatusToSystemd = "failed to report status to systemd"
ShardGCCollectingExpiredMetricsStarted = "collecting expired metrics started"
ShardGCCollectingExpiredMetricsCompleted = "collecting expired metrics completed"
ShardGCFailedToCollectZeroSizeContainers = "failed to collect zero-size containers"
ShardGCFailedToCollectZeroCountContainers = "failed to collect zero-count containers"
EngineFailedToCheckContainerAvailability = "failed to check container availability"
EngineFailedToGetContainerSize = "failed to get container size"
EngineFailedToDeleteContainerSize = "failed to delete container size"
EngineInterruptProcessingZeroSizeContainers = "interrupt processing zero-size containers"
EngineInterruptProcessingZeroCountContainers = "interrupt processing zero-count containers"
EngineFailedToGetContainerCounters = "failed to get container counters"
) )

View file

@ -7,12 +7,14 @@ import (
"sync/atomic" "sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -218,14 +220,18 @@ type cfg struct {
lowMem bool lowMem bool
rebuildWorkersCount uint32 rebuildWorkersCount uint32
containerSource atomic.Pointer[containerSource]
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
return &cfg{ res := &cfg{
log: &logger.Logger{Logger: zap.L()}, log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20, shardPoolSize: 20,
rebuildWorkersCount: 100, rebuildWorkersCount: 100,
} }
res.containerSource.Store(&containerSource{})
return res
} }
// New creates, initializes and returns new StorageEngine instance. // New creates, initializes and returns new StorageEngine instance.
@ -288,3 +294,30 @@ func WithRebuildWorkersCount(count uint32) Option {
c.rebuildWorkersCount = count c.rebuildWorkersCount = count
} }
} }
// SetContainerSource sets container source.
func (e *StorageEngine) SetContainerSource(cs container.Source) {
e.containerSource.Store(&containerSource{cs: cs})
}
type containerSource struct {
cs container.Source
}
func (s *containerSource) IsContainerAvailable(ctx context.Context, id cid.ID) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
if s == nil || s.cs == nil {
return true, nil
}
wasRemoved, err := container.WasRemoved(s.cs, id)
if err != nil {
return false, err
}
return !wasRemoved, nil
}

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
@ -259,3 +260,177 @@ func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.A
} }
}) })
} }
func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid.ID) {
if len(ids) == 0 {
return
}
idMap, err := e.selectNonExistentIDs(ctx, ids)
if err != nil {
return
}
if len(idMap) == 0 {
return
}
var failed bool
var prm shard.ContainerSizePrm
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
select {
case <-ctx.Done():
e.log.Info(logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(ctx.Err()))
failed = true
return true
default:
}
var drop []cid.ID
for id := range idMap {
prm.SetContainerID(id)
s, err := sh.ContainerSize(prm)
if err != nil {
e.log.Warn(logs.EngineFailedToGetContainerSize, zap.Stringer("container_id", id), zap.Error(err))
failed = true
return true
}
if s.Size() > 0 {
drop = append(drop, id)
}
}
for _, id := range drop {
delete(idMap, id)
}
return len(idMap) == 0
})
if failed || len(idMap) == 0 {
return
}
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
select {
case <-ctx.Done():
e.log.Info(logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(ctx.Err()))
failed = true
return true
default:
}
for id := range idMap {
if err := sh.DeleteContainerSize(ctx, id); err != nil {
e.log.Warn(logs.EngineFailedToDeleteContainerSize, zap.Stringer("container_id", id), zap.Error(err))
failed = true
return true
}
}
return false
})
if failed {
return
}
for id := range idMap {
e.metrics.DeleteContainerSize(id.EncodeToString())
}
}
func (e *StorageEngine) processZeroCountContainers(ctx context.Context, ids []cid.ID) {
if len(ids) == 0 {
return
}
idMap, err := e.selectNonExistentIDs(ctx, ids)
if err != nil {
return
}
if len(idMap) == 0 {
return
}
var failed bool
var prm shard.ContainerCountPrm
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
select {
case <-ctx.Done():
e.log.Info(logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err()))
failed = true
return true
default:
}
var drop []cid.ID
for id := range idMap {
prm.ContainerID = id
s, err := sh.ContainerCount(ctx, prm)
if err != nil {
e.log.Warn(logs.EngineFailedToGetContainerCounters, zap.Stringer("container_id", id), zap.Error(err))
failed = true
return true
}
if s.User > 0 || s.Logic > 0 || s.Phy > 0 {
drop = append(drop, id)
}
}
for _, id := range drop {
delete(idMap, id)
}
return len(idMap) == 0
})
if failed || len(idMap) == 0 {
return
}
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
select {
case <-ctx.Done():
e.log.Info(logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err()))
failed = true
return true
default:
}
for id := range idMap {
if err := sh.DeleteContainerSize(ctx, id); err != nil {
e.log.Warn(logs.EngineFailedToDeleteContainerSize, zap.Stringer("container_id", id), zap.Error(err))
failed = true
return true
}
}
return false
})
if failed {
return
}
for id := range idMap {
e.metrics.DeleteContainerCount(id.EncodeToString())
}
}
func (e *StorageEngine) selectNonExistentIDs(ctx context.Context, ids []cid.ID) (map[cid.ID]struct{}, error) {
cs := e.containerSource.Load()
idMap := make(map[cid.ID]struct{})
for _, id := range ids {
isAvailable, err := cs.IsContainerAvailable(ctx, id)
if err != nil {
e.log.Warn(logs.EngineFailedToCheckContainerAvailability, zap.Stringer("container_id", id), zap.Error(err))
return nil, err
}
if isAvailable {
continue
}
idMap[id] = struct{}{}
}
return idMap, nil
}

View file

@ -16,6 +16,8 @@ type MetricRegister interface {
SetMode(shardID string, mode mode.Mode) SetMode(shardID string, mode mode.Mode)
AddToContainerSize(cnrID string, size int64) AddToContainerSize(cnrID string, size int64)
DeleteContainerSize(cnrID string)
DeleteContainerCount(cnrID string)
AddToPayloadCounter(shardID string, size int64) AddToPayloadCounter(shardID string, size int64)
IncErrorCounter(shardID string) IncErrorCounter(shardID string)
ClearErrorCounter(shardID string) ClearErrorCounter(shardID string)

View file

@ -119,6 +119,8 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
shard.WithDeletedLockCallback(e.processDeletedLocks), shard.WithDeletedLockCallback(e.processDeletedLocks),
shard.WithReportErrorFunc(e.reportShardErrorBackground), shard.WithReportErrorFunc(e.reportShardErrorBackground),
shard.WithRebuildWorkerLimiter(e.rebuildLimiter), shard.WithRebuildWorkerLimiter(e.rebuildLimiter),
shard.WithZeroSizeCallback(e.processZeroSizeContainers),
shard.WithZeroCountCallback(e.processZeroCountContainers),
)...) )...)
if err := sh.UpdateID(ctx); err != nil { if err := sh.UpdateID(ctx); err != nil {

View file

@ -14,6 +14,8 @@ import (
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
) )
var ( var (
@ -118,7 +120,9 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error)
default: default:
} }
completed, err := db.containerCountersNextBatch(lastKey, &cc) completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) {
cc.Counts[id] = entity
})
if err != nil { if err != nil {
return cc, err return cc, err
} }
@ -131,7 +135,7 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error)
return cc, nil return cc, nil
} }
func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) (bool, error) { func (db *DB) containerCountersNextBatch(lastKey []byte, f func(id cid.ID, entity ObjectCounters)) (bool, error) {
db.modeMtx.RLock() db.modeMtx.RLock()
defer db.modeMtx.RUnlock() defer db.modeMtx.RUnlock()
@ -163,7 +167,7 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters)
if err != nil { if err != nil {
return err return err
} }
cc.Counts[cnrID] = ent f(cnrID, ent)
counter++ counter++
if counter == batchSize { if counter == batchSize {
@ -185,6 +189,43 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters)
return false, nil return false, nil
} }
func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("ContainerCount", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerCount")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ObjectCounters{}, ErrDegradedMode
}
var result ObjectCounters
err := db.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerCounterBucketName)
key := make([]byte, cidSize)
id.Encode(key)
v := b.Get(key)
if v == nil {
return nil
}
var err error
result, err = parseContainerCounterValue(v)
return err
})
return result, metaerr.Wrap(err)
}
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil { if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
return fmt.Errorf("could not increase phy object counter: %w", err) return fmt.Errorf("could not increase phy object counter: %w", err)
@ -239,7 +280,7 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6
return b.Put(counterKey, newCounter) return b.Put(counterKey, newCounter)
} }
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { // TODO #838 func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error {
b := tx.Bucket(containerCounterBucketName) b := tx.Bucket(containerCounterBucketName)
if b == nil { if b == nil {
return nil return nil
@ -268,9 +309,6 @@ func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCo
entity.Phy = nextValue(entity.Phy, delta.Phy, inc) entity.Phy = nextValue(entity.Phy, delta.Phy, inc)
entity.Logic = nextValue(entity.Logic, delta.Logic, inc) entity.Logic = nextValue(entity.Logic, delta.Logic, inc)
entity.User = nextValue(entity.User, delta.User, inc) entity.User = nextValue(entity.User, delta.User, inc)
if entity.IsZero() {
return b.Delete(key)
}
value := containerCounterValue(entity) value := containerCounterValue(entity)
return b.Put(key, value) return b.Put(key, value)
} }
@ -480,3 +518,214 @@ func IsUserObject(obj *objectSDK.Object) bool {
(obj.SplitID() == nil || (obj.SplitID() == nil ||
(hasParentID && len(obj.Children()) == 0)) (hasParentID && len(obj.Children()) == 0))
} }
// ZeroSizeContainers returns containers with size = 0.
func (db *DB) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("ZeroSizeContainers", time.Since(startedAt), success)
}()
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroSizeContainers")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
var result []cid.ID
lastKey := make([]byte, cidSize)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
completed, err := db.containerSizesNextBatch(lastKey, func(contID cid.ID, size uint64) {
if size == 0 {
result = append(result, contID)
}
})
if err != nil {
return nil, err
}
if completed {
break
}
}
success = true
return result, nil
}
func (db *DB) containerSizesNextBatch(lastKey []byte, f func(cid.ID, uint64)) (bool, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return false, ErrDegradedMode
}
counter := 0
const batchSize = 1000
err := db.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerVolumeBucketName)
c := b.Cursor()
var key, value []byte
for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() {
if bytes.Equal(lastKey, key) {
continue
}
copy(lastKey, key)
size := parseContainerSize(value)
var id cid.ID
if err := id.Decode(key); err != nil {
return err
}
f(id, size)
counter++
if counter == batchSize {
break
}
}
if counter < batchSize {
return ErrInterruptIterator
}
return nil
})
if err != nil {
if errors.Is(err, ErrInterruptIterator) {
return true, nil
}
return false, metaerr.Wrap(err)
}
return false, nil
}
func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("DeleteContainerSize", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerSize",
trace.WithAttributes(
attribute.Stringer("container_id", id),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
if db.mode.ReadOnly() {
return ErrReadOnlyMode
}
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerVolumeBucketName)
key := make([]byte, cidSize)
id.Encode(key)
return b.Delete(key)
})
success = err == nil
return metaerr.Wrap(err)
}
// ZeroCountContainers returns containers with objects count = 0 in metabase.
func (db *DB) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("ZeroCountContainers", time.Since(startedAt), success)
}()
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroCountContainers")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}
var result []cid.ID
lastKey := make([]byte, cidSize)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) {
if entity.IsZero() {
result = append(result, id)
}
})
if err != nil {
return nil, metaerr.Wrap(err)
}
if completed {
break
}
}
success = true
return result, nil
}
func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("DeleteContainerCount", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerCount",
trace.WithAttributes(
attribute.Stringer("container_id", id),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
if db.mode.ReadOnly() {
return ErrReadOnlyMode
}
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerCounterBucketName)
key := make([]byte, cidSize)
id.Encode(key)
return b.Delete(key)
})
success = err == nil
return metaerr.Wrap(err)
}

View file

@ -91,7 +91,7 @@ func TestCounters(t *testing.T) {
res, err := db.Delete(context.Background(), prm) res, err := db.Delete(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(1), res.AvailableObjectsRemoved()) require.Equal(t, uint64(1), res.LogicCount())
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
@ -105,11 +105,7 @@ func TestCounters(t *testing.T) {
v.Phy-- v.Phy--
v.Logic-- v.Logic--
v.User-- v.User--
if v.IsZero() { exp[cnrID] = v
delete(exp, cnrID)
} else {
exp[cnrID] = v
}
} }
cc, err := db.ContainerCounters(context.Background()) cc, err := db.ContainerCounters(context.Background())
@ -161,7 +157,7 @@ func TestCounters(t *testing.T) {
res, err := db.Inhume(context.Background(), prm) res, err := db.Inhume(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed()) require.Equal(t, uint64(len(inhumedObjs)), res.LogicInhumed())
require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed()) require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed())
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
@ -389,7 +385,7 @@ func TestCounters_Expired(t *testing.T) {
inhumeRes, err := db.Inhume(context.Background(), inhumePrm) inhumeRes, err := db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(1), inhumeRes.AvailableInhumed()) require.Equal(t, uint64(1), inhumeRes.LogicInhumed())
require.Equal(t, uint64(1), inhumeRes.UserInhumed()) require.Equal(t, uint64(1), inhumeRes.UserInhumed())
c, err = db.ObjectCounters() c, err = db.ObjectCounters()
@ -423,16 +419,12 @@ func TestCounters_Expired(t *testing.T) {
deleteRes, err := db.Delete(context.Background(), deletePrm) deleteRes, err := db.Delete(context.Background(), deletePrm)
require.NoError(t, err) require.NoError(t, err)
require.Zero(t, deleteRes.AvailableObjectsRemoved()) require.Zero(t, deleteRes.LogicCount())
require.Zero(t, deleteRes.UserObjectsRemoved()) require.Zero(t, deleteRes.UserCount())
if v, ok := exp[oo[0].Container()]; ok { if v, ok := exp[oo[0].Container()]; ok {
v.Phy-- v.Phy--
if v.IsZero() { exp[oo[0].Container()] = v
delete(exp, oo[0].Container())
} else {
exp[oo[0].Container()] = v
}
} }
oo = oo[1:] oo = oo[1:]
@ -456,18 +448,14 @@ func TestCounters_Expired(t *testing.T) {
deleteRes, err = db.Delete(context.Background(), deletePrm) deleteRes, err = db.Delete(context.Background(), deletePrm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved()) require.Equal(t, uint64(1), deleteRes.LogicCount())
require.Equal(t, uint64(1), deleteRes.UserObjectsRemoved()) require.Equal(t, uint64(1), deleteRes.UserCount())
if v, ok := exp[oo[0].Container()]; ok { if v, ok := exp[oo[0].Container()]; ok {
v.Phy-- v.Phy--
v.Logic-- v.Logic--
v.User-- v.User--
if v.IsZero() { exp[oo[0].Container()] = v
delete(exp, oo[0].Container())
} else {
exp[oo[0].Container()] = v
}
} }
oo = oo[1:] oo = oo[1:]

View file

@ -27,22 +27,22 @@ type DeletePrm struct {
// DeleteRes groups the resulting values of Delete operation. // DeleteRes groups the resulting values of Delete operation.
type DeleteRes struct { type DeleteRes struct {
rawRemoved uint64 phyCount uint64
availableRemoved uint64 logicCount uint64
userRemoved uint64 userCount uint64
sizes []uint64 phySize uint64
availableSizes []uint64 logicSize uint64
removedByCnrID map[cid.ID]ObjectCounters removedByCnrID map[cid.ID]ObjectCounters
} }
// AvailableObjectsRemoved returns the number of removed available // LogicCount returns the number of removed logic
// objects. // objects.
func (d DeleteRes) AvailableObjectsRemoved() uint64 { func (d DeleteRes) LogicCount() uint64 {
return d.availableRemoved return d.logicCount
} }
func (d DeleteRes) UserObjectsRemoved() uint64 { func (d DeleteRes) UserCount() uint64 {
return d.userRemoved return d.userCount
} }
// RemovedByCnrID returns the number of removed objects by container ID. // RemovedByCnrID returns the number of removed objects by container ID.
@ -50,19 +50,19 @@ func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
return d.removedByCnrID return d.removedByCnrID
} }
// RawObjectsRemoved returns the number of removed raw objects. // PhyCount returns the number of removed physical objects.
func (d DeleteRes) RawObjectsRemoved() uint64 { func (d DeleteRes) PhyCount() uint64 {
return d.rawRemoved return d.phyCount
} }
// RemovedPhysicalObjectSizes returns the sizes of removed physical objects. // PhySize returns the size of removed physical objects.
func (d DeleteRes) RemovedPhysicalObjectSizes() []uint64 { func (d DeleteRes) PhySize() uint64 {
return d.sizes return d.phySize
} }
// RemovedLogicalObjectSizes returns the sizes of removed logical objects. // LogicSize returns the size of removed logical objects.
func (d DeleteRes) RemovedLogicalObjectSizes() []uint64 { func (d DeleteRes) LogicSize() uint64 {
return d.availableSizes return d.logicSize
} }
// SetAddresses is a Delete option to set the addresses of the objects to delete. // SetAddresses is a Delete option to set the addresses of the objects to delete.
@ -129,8 +129,6 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
// references of the split objects. // references of the split objects.
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) { func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
res := DeleteRes{ res := DeleteRes{
sizes: make([]uint64, len(addrs)),
availableSizes: make([]uint64, len(addrs)),
removedByCnrID: make(map[cid.ID]ObjectCounters), removedByCnrID: make(map[cid.ID]ObjectCounters),
} }
refCounter := make(referenceCounter, len(addrs)) refCounter := make(referenceCounter, len(addrs))
@ -162,22 +160,22 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
} }
func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error { func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
if res.rawRemoved > 0 { if res.phyCount > 0 {
err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false) err := db.updateShardObjectCounter(tx, phy, res.phyCount, false)
if err != nil { if err != nil {
return fmt.Errorf("could not decrease phy object counter: %w", err) return fmt.Errorf("could not decrease phy object counter: %w", err)
} }
} }
if res.availableRemoved > 0 { if res.logicCount > 0 {
err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false) err := db.updateShardObjectCounter(tx, logical, res.logicCount, false)
if err != nil { if err != nil {
return fmt.Errorf("could not decrease logical object counter: %w", err) return fmt.Errorf("could not decrease logical object counter: %w", err)
} }
} }
if res.userRemoved > 0 { if res.userCount > 0 {
err := db.updateShardObjectCounter(tx, user, res.userRemoved, false) err := db.updateShardObjectCounter(tx, user, res.userCount, false)
if err != nil { if err != nil {
return fmt.Errorf("could not decrease user object counter: %w", err) return fmt.Errorf("could not decrease user object counter: %w", err)
} }
@ -190,7 +188,7 @@ func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
} }
func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.Address, i int) { func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.Address, i int) {
if r.Removed { if r.Phy {
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.Phy++ v.Phy++
res.removedByCnrID[addrs[i].Container()] = v res.removedByCnrID[addrs[i].Container()] = v
@ -200,11 +198,11 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A
} }
} }
res.rawRemoved++ res.phyCount++
res.sizes[i] = r.Size res.phySize += r.Size
} }
if r.Available { if r.Logic {
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.Logic++ v.Logic++
res.removedByCnrID[addrs[i].Container()] = v res.removedByCnrID[addrs[i].Container()] = v
@ -214,8 +212,8 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A
} }
} }
res.availableRemoved++ res.logicCount++
res.availableSizes[i] = r.Size res.logicSize += r.Size
} }
if r.User { if r.User {
@ -228,15 +226,15 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A
} }
} }
res.userRemoved++ res.userCount++
} }
} }
type deleteSingleResult struct { type deleteSingleResult struct {
Removed bool Phy bool
Available bool Logic bool
User bool User bool
Size uint64 Size uint64
} }
// delete removes object indexes from the metabase. Counts the references // delete removes object indexes from the metabase. Counts the references
@ -302,10 +300,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
} }
return deleteSingleResult{ return deleteSingleResult{
Removed: true, Phy: true,
Available: removeAvailableObject, Logic: removeAvailableObject,
User: isUserObject && removeAvailableObject, User: isUserObject && removeAvailableObject,
Size: obj.PayloadSize(), Size: obj.PayloadSize(),
}, nil }, nil
} }

View file

@ -37,17 +37,17 @@ type DeletionInfo struct {
// InhumeRes encapsulates results of Inhume operation. // InhumeRes encapsulates results of Inhume operation.
type InhumeRes struct { type InhumeRes struct {
deletedLockObj []oid.Address deletedLockObj []oid.Address
availableInhumed uint64 logicInhumed uint64
userInhumed uint64 userInhumed uint64
inhumedByCnrID map[cid.ID]ObjectCounters inhumedByCnrID map[cid.ID]ObjectCounters
deletionDetails []DeletionInfo deletionDetails []DeletionInfo
} }
// AvailableInhumed return number of available object // LogicInhumed return number of logic object
// that have been inhumed. // that have been inhumed.
func (i InhumeRes) AvailableInhumed() uint64 { func (i InhumeRes) LogicInhumed() uint64 {
return i.availableInhumed return i.logicInhumed
} }
func (i InhumeRes) UserInhumed() uint64 { func (i InhumeRes) UserInhumed() uint64 {
@ -87,7 +87,7 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, is
CID: containerID, CID: containerID,
IsUser: isUser, IsUser: isUser,
}) })
i.availableInhumed++ i.logicInhumed++
if isUser { if isUser {
i.userInhumed++ i.userInhumed++
} }
@ -265,7 +265,7 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
} }
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
if err := db.updateShardObjectCounter(tx, logical, res.AvailableInhumed(), false); err != nil { if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
return err return err
} }
if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil { if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil {

View file

@ -1,9 +1,13 @@
package shard package shard
import ( import (
"context"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
) )
type ContainerSizePrm struct { type ContainerSizePrm struct {
@ -39,3 +43,84 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
size: size, size: size,
}, nil }, nil
} }
type ContainerCountPrm struct {
ContainerID cid.ID
}
type ContainerCountRes struct {
Phy uint64
Logic uint64
User uint64
}
func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (ContainerCountRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ContainerCount",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.Stringer("container_id", prm.ContainerID),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return ContainerCountRes{}, ErrDegradedMode
}
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
if err != nil {
return ContainerCountRes{}, fmt.Errorf("could not get container counters: %w", err)
}
return ContainerCountRes{
Phy: counters.Phy,
Logic: counters.Logic,
User: counters.User,
}, nil
}
func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerSize",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.Stringer("container_id", id),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.metaBase.DeleteContainerSize(ctx, id)
}
func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerCount",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.Stringer("container_id", id),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.metaBase.DeleteContainerCount(ctx, id)
}

View file

@ -155,6 +155,7 @@ func (s *Shard) Init(ctx context.Context) error {
s.collectExpiredLocks, s.collectExpiredLocks,
s.collectExpiredObjects, s.collectExpiredObjects,
s.collectExpiredTombstones, s.collectExpiredTombstones,
s.collectExpiredMetrics,
}, },
}, },
}, },

View file

@ -141,16 +141,12 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error
if err != nil { if err != nil {
return err return err
} }
s.decObjectCounterBy(physical, res.RawObjectsRemoved()) s.decObjectCounterBy(physical, res.PhyCount())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) s.decObjectCounterBy(logical, res.LogicCount())
s.decObjectCounterBy(user, res.UserObjectsRemoved()) s.decObjectCounterBy(user, res.UserCount())
s.decContainerObjectCounter(res.RemovedByCnrID()) s.decContainerObjectCounter(res.RemovedByCnrID())
removedPayload := res.RemovedPhysicalObjectSizes()[0] s.addToContainerSize(addr.Container().EncodeToString(), -int64(res.LogicSize()))
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0] s.addToPayloadSize(-int64(res.PhySize()))
if logicalRemovedPayload > 0 {
s.addToContainerSize(addr.Container().EncodeToString(), -int64(logicalRemovedPayload))
}
s.addToPayloadSize(-int64(removedPayload))
return nil return nil
} }

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
@ -414,8 +415,8 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return return
} }
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular) s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeRegular)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.LogicInhumed())
s.decObjectCounterBy(user, res.UserInhumed()) s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID()) s.decContainerObjectCounter(res.InhumedByCnrID())
@ -629,8 +630,8 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
return return
} }
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone) s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeTombstone)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.LogicInhumed())
s.decObjectCounterBy(user, res.UserInhumed()) s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID()) s.decContainerObjectCounter(res.InhumedByCnrID())
@ -677,8 +678,8 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
return return
} }
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock) s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeLock)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.LogicInhumed())
s.decObjectCounterBy(user, res.UserInhumed()) s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID()) s.decContainerObjectCounter(res.InhumedByCnrID())
@ -726,3 +727,40 @@ func (s *Shard) HandleDeletedLocks(lockers []oid.Address) {
func (s *Shard) NotificationChannel() chan<- Event { func (s *Shard) NotificationChannel() chan<- Event {
return s.gc.eventChan return s.gc.eventChan
} }
func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) {
ctx, span := tracing.StartSpanFromContext(ctx, "shard.collectExpiredMetrics")
defer span.End()
epoch := e.(newEpoch).epoch
s.log.Debug(logs.ShardGCCollectingExpiredMetricsStarted, zap.Uint64("epoch", epoch))
defer s.log.Debug(logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch))
s.collectExpiredContainerSizeMetrics(ctx, epoch)
s.collectExpiredContainerCountMetrics(ctx, epoch)
}
func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) {
ids, err := s.metaBase.ZeroSizeContainers(ctx)
if err != nil {
s.log.Warn(logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
}
if len(ids) == 0 {
return
}
s.zeroSizeContainersCallback(ctx, ids)
}
func (s *Shard) collectExpiredContainerCountMetrics(ctx context.Context, epoch uint64) {
ids, err := s.metaBase.ZeroCountContainers(ctx)
if err != nil {
s.log.Warn(logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
}
if len(ids) == 0 {
return
}
s.zeroCountContainersCallback(ctx, ids)
}

View file

@ -121,7 +121,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
s.m.RUnlock() s.m.RUnlock()
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.LogicInhumed())
s.decObjectCounterBy(user, res.UserInhumed()) s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID()) s.decContainerObjectCounter(res.InhumedByCnrID())

View file

@ -336,11 +336,7 @@ func TestCounters(t *testing.T) {
v.Logic-- v.Logic--
v.Phy-- v.Phy--
v.User-- v.User--
if v.IsZero() { expected[cnr] = v
delete(expected, cnr)
} else {
expected[cnr] = v
}
} }
} }
require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, expectedLogicalSizes, mm.containerSizes())

View file

@ -53,6 +53,9 @@ type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address)
// DeletedLockCallback is a callback handling list of deleted LOCK objects. // DeletedLockCallback is a callback handling list of deleted LOCK objects.
type DeletedLockCallback func(context.Context, []oid.Address) type DeletedLockCallback func(context.Context, []oid.Address)
// EmptyContainersCallback is a callback hanfling list of zero-size and zero-count containers.
type EmptyContainersCallback func(context.Context, []cid.ID)
// MetricsWriter is an interface that must store shard's metrics. // MetricsWriter is an interface that must store shard's metrics.
type MetricsWriter interface { type MetricsWriter interface {
// SetObjectCounter must set object counter taking into account object type. // SetObjectCounter must set object counter taking into account object type.
@ -118,6 +121,9 @@ type cfg struct {
deletedLockCallBack DeletedLockCallback deletedLockCallBack DeletedLockCallback
zeroSizeContainersCallback EmptyContainersCallback
zeroCountContainersCallback EmptyContainersCallback
tsSource TombstoneSource tsSource TombstoneSource
metricsWriter MetricsWriter metricsWriter MetricsWriter
@ -129,11 +135,13 @@ type cfg struct {
func defaultCfg() *cfg { func defaultCfg() *cfg {
return &cfg{ return &cfg{
rmBatchSize: 100, rmBatchSize: 100,
log: &logger.Logger{Logger: zap.L()}, log: &logger.Logger{Logger: zap.L()},
gcCfg: defaultGCCfg(), gcCfg: defaultGCCfg(),
reportErrorFunc: func(string, string, error) {}, reportErrorFunc: func(string, string, error) {},
rebuildLimiter: &noopRebuildLimiter{}, rebuildLimiter: &noopRebuildLimiter{},
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
} }
} }
@ -363,6 +371,20 @@ func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option {
} }
} }
// WithZeroSizeCallback returns option to set zero-size containers callback.
func WithZeroSizeCallback(cb EmptyContainersCallback) Option {
return func(c *cfg) {
c.zeroSizeContainersCallback = cb
}
}
// WithZeroCountCallback returns option to set zero-count containers callback.
func WithZeroCountCallback(cb EmptyContainersCallback) Option {
return func(c *cfg) {
c.zeroCountContainersCallback = cb
}
}
func (s *Shard) fillInfo() { func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()

View file

@ -11,6 +11,8 @@ import (
type EngineMetrics interface { type EngineMetrics interface {
AddMethodDuration(method string, d time.Duration) AddMethodDuration(method string, d time.Duration)
AddToContainerSize(cnrID string, size int64) AddToContainerSize(cnrID string, size int64)
DeleteContainerSize(cnrID string)
DeleteContainerCount(cnrID string)
IncErrorCounter(shardID string) IncErrorCounter(shardID string)
ClearErrorCounter(shardID string) ClearErrorCounter(shardID string)
DeleteShardMetrics(shardID string) DeleteShardMetrics(shardID string)
@ -79,6 +81,14 @@ func (m *engineMetrics) AddToContainerSize(cnrID string, size int64) {
m.containerSize.With(prometheus.Labels{containerIDLabelKey: cnrID}).Add(float64(size)) m.containerSize.With(prometheus.Labels{containerIDLabelKey: cnrID}).Add(float64(size))
} }
func (m *engineMetrics) DeleteContainerSize(cnrID string) {
m.containerSize.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID})
}
func (m *engineMetrics) DeleteContainerCount(cnrID string) {
m.contObjCounter.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID})
}
func (m *engineMetrics) AddToPayloadCounter(shardID string, size int64) { func (m *engineMetrics) AddToPayloadCounter(shardID string, size int64) {
m.payloadSize.With(prometheus.Labels{shardIDLabel: shardID}).Add(float64(size)) m.payloadSize.With(prometheus.Labels{shardIDLabel: shardID}).Add(float64(size))
} }