Drop frostfs_node_engine_container_size_bytes and ..._count_total metric for removed containers #889

Merged
fyrchik merged 3 commits from dstepanov-yadro/frostfs-node:fix/drop_zero_metrics into master 2024-09-04 19:51:05 +00:00
18 changed files with 716 additions and 109 deletions

View file

@ -45,6 +45,8 @@ func initContainerService(_ context.Context, c *cfg) {
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
containerGRPC.RegisterContainerServiceServer(s, server)
})
c.cfgObject.cfgLocalStorage.localStorage.SetContainerSource(cnrRdr)
}
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {

View file

@ -557,4 +557,14 @@ const (
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
ObjectRemovalFailureExistsInWritecache = "can't remove object: object must be flushed from writecache"
FailedToReportStatusToSystemd = "failed to report status to systemd"
ShardGCCollectingExpiredMetricsStarted = "collecting expired metrics started"
ShardGCCollectingExpiredMetricsCompleted = "collecting expired metrics completed"
ShardGCFailedToCollectZeroSizeContainers = "failed to collect zero-size containers"
ShardGCFailedToCollectZeroCountContainers = "failed to collect zero-count containers"
EngineFailedToCheckContainerAvailability = "failed to check container availability"
EngineFailedToGetContainerSize = "failed to get container size"
EngineFailedToDeleteContainerSize = "failed to delete container size"
EngineInterruptProcessingZeroSizeContainers = "interrupt processing zero-size containers"
EngineInterruptProcessingZeroCountContainers = "interrupt processing zero-count containers"
EngineFailedToGetContainerCounters = "failed to get container counters"
)

View file

@ -7,12 +7,14 @@ import (
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
)
@ -218,14 +220,18 @@ type cfg struct {
lowMem bool
rebuildWorkersCount uint32
containerSource atomic.Pointer[containerSource]

Btw, why is it not atomic.Pointer?

Btw, why is it not `atomic.Pointer`?

old school, fixed.

old school, fixed.
}
func defaultCfg() *cfg {
return &cfg{
res := &cfg{
log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20,
rebuildWorkersCount: 100,
}
res.containerSource.Store(&containerSource{})
return res
}
// New creates, initializes and returns new StorageEngine instance.
@ -288,3 +294,30 @@ func WithRebuildWorkersCount(count uint32) Option {
c.rebuildWorkersCount = count
}
}
// SetContainerSource sets container source.
func (e *StorageEngine) SetContainerSource(cs container.Source) {
e.containerSource.Store(&containerSource{cs: cs})
}
type containerSource struct {
cs container.Source
}
elebedeva marked this conversation as resolved Outdated

Is it necessary to wrap container.Source in a new type?

Is it necessary to wrap ```container.Source``` in a new type?

Yes. container.Source is an interface, but atomic.Value doesn't support changing type. From docs: All calls to CompareAndSwap for a given Value must use values of the same concrete type.

Yes. `container.Source` is an interface, but `atomic.Value` doesn't support changing type. From docs: `All calls to CompareAndSwap for a given Value must use values of the same concrete type. `
func (s *containerSource) IsContainerAvailable(ctx context.Context, id cid.ID) (bool, error) {
select {
fyrchik marked this conversation as resolved Outdated

WasRemoved arguably serves the same purpose, but with inverted result. Isn't it enough? Like we only want to remove metric if WasRemoved returns true, nil.

`WasRemoved` arguably serves the same purpose, but with inverted result. Isn't it enough? Like we _only_ want to remove metric if `WasRemoved` returns `true, nil`.

Fixed

Fixed
case <-ctx.Done():
return false, ctx.Err()

By default all containers are available

By default all containers are available
default:
}
if s == nil || s.cs == nil {
return true, nil
}
wasRemoved, err := container.WasRemoved(s.cs, id)
if err != nil {
return false, err
}
return !wasRemoved, nil
}
fyrchik marked this conversation as resolved Outdated

Get is not enough, there should be a DeletionInfo somewhere.

`Get` is not enough, there should be a `DeletionInfo` somewhere.

Fixed

Fixed

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
@ -259,3 +260,177 @@ func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.A
}
})
}
func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid.ID) {
if len(ids) == 0 {
return
}
idMap, err := e.selectNonExistentIDs(ctx, ids)
if err != nil {
return
}
if len(idMap) == 0 {

Is this check really necessary?

Is this check really necessary?

Yes: container may be empty, but still exist

Yes: container may be empty, but still exist
return
}
var failed bool
var prm shard.ContainerSizePrm
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
select {
case <-ctx.Done():
e.log.Info(logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(ctx.Err()))
failed = true
return true
default:
}
var drop []cid.ID
for id := range idMap {
prm.SetContainerID(id)
s, err := sh.ContainerSize(prm)
if err != nil {
e.log.Warn(logs.EngineFailedToGetContainerSize, zap.Stringer("container_id", id), zap.Error(err))
failed = true
return true
}
if s.Size() > 0 {
drop = append(drop, id)
}
}
for _, id := range drop {
delete(idMap, id)
}
return len(idMap) == 0
})
if failed || len(idMap) == 0 {
return
}
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
select {
case <-ctx.Done():
e.log.Info(logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(ctx.Err()))
failed = true
return true
default:
}
for id := range idMap {
if err := sh.DeleteContainerSize(ctx, id); err != nil {
e.log.Warn(logs.EngineFailedToDeleteContainerSize, zap.Stringer("container_id", id), zap.Error(err))
failed = true
return true
}
}
return false
})
if failed {
return
}
for id := range idMap {
e.metrics.DeleteContainerSize(id.EncodeToString())
}
}
func (e *StorageEngine) processZeroCountContainers(ctx context.Context, ids []cid.ID) {
if len(ids) == 0 {
return
}
idMap, err := e.selectNonExistentIDs(ctx, ids)
if err != nil {
return
}
if len(idMap) == 0 {
return
}
var failed bool
var prm shard.ContainerCountPrm
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
select {
case <-ctx.Done():
e.log.Info(logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err()))
failed = true
return true
default:
}
var drop []cid.ID
for id := range idMap {
prm.ContainerID = id
s, err := sh.ContainerCount(ctx, prm)
if err != nil {
e.log.Warn(logs.EngineFailedToGetContainerCounters, zap.Stringer("container_id", id), zap.Error(err))
failed = true
return true
}
if s.User > 0 || s.Logic > 0 || s.Phy > 0 {
drop = append(drop, id)
}
}
for _, id := range drop {
delete(idMap, id)
}
return len(idMap) == 0
})
if failed || len(idMap) == 0 {
return
}
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
select {
case <-ctx.Done():
e.log.Info(logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err()))
failed = true
return true
default:
}
for id := range idMap {
if err := sh.DeleteContainerSize(ctx, id); err != nil {
e.log.Warn(logs.EngineFailedToDeleteContainerSize, zap.Stringer("container_id", id), zap.Error(err))
failed = true
return true
}
}
return false
})
if failed {
return
}
for id := range idMap {
e.metrics.DeleteContainerCount(id.EncodeToString())
}
}
func (e *StorageEngine) selectNonExistentIDs(ctx context.Context, ids []cid.ID) (map[cid.ID]struct{}, error) {
fyrchik marked this conversation as resolved Outdated

nonExistent?

`nonExistent`?

Fixed

Fixed
cs := e.containerSource.Load()
idMap := make(map[cid.ID]struct{})
for _, id := range ids {
isAvailable, err := cs.IsContainerAvailable(ctx, id)
if err != nil {
e.log.Warn(logs.EngineFailedToCheckContainerAvailability, zap.Stringer("container_id", id), zap.Error(err))
return nil, err
}
if isAvailable {
continue
}
idMap[id] = struct{}{}
}
return idMap, nil
}

View file

@ -16,6 +16,8 @@ type MetricRegister interface {
SetMode(shardID string, mode mode.Mode)
AddToContainerSize(cnrID string, size int64)
DeleteContainerSize(cnrID string)
DeleteContainerCount(cnrID string)
AddToPayloadCounter(shardID string, size int64)
IncErrorCounter(shardID string)
ClearErrorCounter(shardID string)

View file

@ -119,6 +119,8 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
shard.WithDeletedLockCallback(e.processDeletedLocks),
shard.WithReportErrorFunc(e.reportShardErrorBackground),
shard.WithRebuildWorkerLimiter(e.rebuildLimiter),
shard.WithZeroSizeCallback(e.processZeroSizeContainers),
shard.WithZeroCountCallback(e.processZeroCountContainers),
)...)
if err := sh.UpdateID(ctx); err != nil {

View file

@ -14,6 +14,8 @@ import (
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
var (
@ -118,7 +120,9 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error)
default:
}
completed, err := db.containerCountersNextBatch(lastKey, &cc)
completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) {
cc.Counts[id] = entity
})
if err != nil {
return cc, err
}
@ -131,7 +135,7 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error)
return cc, nil
}
func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) (bool, error) {
func (db *DB) containerCountersNextBatch(lastKey []byte, f func(id cid.ID, entity ObjectCounters)) (bool, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
@ -163,7 +167,7 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters)
if err != nil {
return err
}
cc.Counts[cnrID] = ent
f(cnrID, ent)
counter++
if counter == batchSize {
@ -185,6 +189,43 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters)
return false, nil
}
func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("ContainerCount", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerCount")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ObjectCounters{}, ErrDegradedMode
}
var result ObjectCounters
err := db.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerCounterBucketName)
key := make([]byte, cidSize)
id.Encode(key)
v := b.Get(key)
if v == nil {
return nil
}
var err error
result, err = parseContainerCounterValue(v)
return err
})
return result, metaerr.Wrap(err)
}
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
return fmt.Errorf("could not increase phy object counter: %w", err)
@ -239,7 +280,7 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6
return b.Put(counterKey, newCounter)
}
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { // TODO #838
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error {
b := tx.Bucket(containerCounterBucketName)
if b == nil {
return nil
@ -268,9 +309,6 @@ func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCo
entity.Phy = nextValue(entity.Phy, delta.Phy, inc)
entity.Logic = nextValue(entity.Logic, delta.Logic, inc)
entity.User = nextValue(entity.User, delta.User, inc)
if entity.IsZero() {
return b.Delete(key)
}
value := containerCounterValue(entity)
return b.Put(key, value)
}
@ -480,3 +518,214 @@ func IsUserObject(obj *objectSDK.Object) bool {
(obj.SplitID() == nil ||
(hasParentID && len(obj.Children()) == 0))
}
// ZeroSizeContainers returns containers with size = 0.
func (db *DB) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("ZeroSizeContainers", time.Since(startedAt), success)
}()
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroSizeContainers")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
var result []cid.ID
lastKey := make([]byte, cidSize)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
completed, err := db.containerSizesNextBatch(lastKey, func(contID cid.ID, size uint64) {
if size == 0 {
result = append(result, contID)
}
})
if err != nil {
return nil, err
}
if completed {
break
}
}
success = true
return result, nil
}
func (db *DB) containerSizesNextBatch(lastKey []byte, f func(cid.ID, uint64)) (bool, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return false, ErrDegradedMode
}
counter := 0
const batchSize = 1000
err := db.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerVolumeBucketName)
c := b.Cursor()
var key, value []byte
for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() {
if bytes.Equal(lastKey, key) {
continue
}
copy(lastKey, key)
size := parseContainerSize(value)
var id cid.ID
if err := id.Decode(key); err != nil {
return err
}
f(id, size)
counter++
if counter == batchSize {
break
}
}
if counter < batchSize {
return ErrInterruptIterator
}
return nil
})
if err != nil {
if errors.Is(err, ErrInterruptIterator) {
return true, nil
}
return false, metaerr.Wrap(err)
}
return false, nil
}
func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("DeleteContainerSize", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerSize",
trace.WithAttributes(
attribute.Stringer("container_id", id),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
if db.mode.ReadOnly() {
return ErrReadOnlyMode
}
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerVolumeBucketName)
key := make([]byte, cidSize)
id.Encode(key)
return b.Delete(key)
})
success = err == nil
return metaerr.Wrap(err)
}
// ZeroCountContainers returns containers with objects count = 0 in metabase.
fyrchik marked this conversation as resolved Outdated

It is not obvious, what count is from the comment.

It is not obvious, what `count` is from the comment.

Fixed

Fixed
func (db *DB) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("ZeroCountContainers", time.Since(startedAt), success)
}()
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroCountContainers")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return nil, ErrDegradedMode
}
var result []cid.ID
lastKey := make([]byte, cidSize)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) {
if entity.IsZero() {
result = append(result, id)
}
})
if err != nil {
return nil, metaerr.Wrap(err)
}
if completed {
break
}
}
success = true
return result, nil
}
func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error {
fyrchik marked this conversation as resolved Outdated

Why are DeleteContainerCount and DeleteContainerSize separate methods? It seems they become 0 simultaneously and are removed together too.

Why are `DeleteContainerCount` and `DeleteContainerSize` separate methods? It seems they become 0 simultaneously and are removed together too.

Nope, count may have non zero phy counter, but size (it is logical size) is already 0.

Nope, count may have non zero `phy` counter, but size (it is logical size) is already 0.
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("DeleteContainerCount", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerCount",
trace.WithAttributes(
attribute.Stringer("container_id", id),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
if db.mode.ReadOnly() {
return ErrReadOnlyMode
}
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerCounterBucketName)
key := make([]byte, cidSize)
id.Encode(key)
return b.Delete(key)
})
success = err == nil
return metaerr.Wrap(err)
}

View file

@ -91,7 +91,7 @@ func TestCounters(t *testing.T) {
res, err := db.Delete(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(1), res.AvailableObjectsRemoved())
require.Equal(t, uint64(1), res.LogicCount())
c, err := db.ObjectCounters()
require.NoError(t, err)
@ -105,12 +105,8 @@ func TestCounters(t *testing.T) {
v.Phy--
v.Logic--
v.User--
if v.IsZero() {
delete(exp, cnrID)
} else {
exp[cnrID] = v
}
}
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
@ -161,7 +157,7 @@ func TestCounters(t *testing.T) {
res, err := db.Inhume(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed())
require.Equal(t, uint64(len(inhumedObjs)), res.LogicInhumed())
require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed())
c, err := db.ObjectCounters()
@ -389,7 +385,7 @@ func TestCounters_Expired(t *testing.T) {
inhumeRes, err := db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
require.Equal(t, uint64(1), inhumeRes.AvailableInhumed())
require.Equal(t, uint64(1), inhumeRes.LogicInhumed())
require.Equal(t, uint64(1), inhumeRes.UserInhumed())
c, err = db.ObjectCounters()
@ -423,17 +419,13 @@ func TestCounters_Expired(t *testing.T) {
deleteRes, err := db.Delete(context.Background(), deletePrm)
require.NoError(t, err)
require.Zero(t, deleteRes.AvailableObjectsRemoved())
require.Zero(t, deleteRes.UserObjectsRemoved())
require.Zero(t, deleteRes.LogicCount())
require.Zero(t, deleteRes.UserCount())
if v, ok := exp[oo[0].Container()]; ok {
v.Phy--
if v.IsZero() {
delete(exp, oo[0].Container())
} else {
exp[oo[0].Container()] = v
}
}
oo = oo[1:]
@ -456,19 +448,15 @@ func TestCounters_Expired(t *testing.T) {
deleteRes, err = db.Delete(context.Background(), deletePrm)
require.NoError(t, err)
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
require.Equal(t, uint64(1), deleteRes.UserObjectsRemoved())
require.Equal(t, uint64(1), deleteRes.LogicCount())
require.Equal(t, uint64(1), deleteRes.UserCount())
if v, ok := exp[oo[0].Container()]; ok {
v.Phy--
v.Logic--
v.User--
if v.IsZero() {
delete(exp, oo[0].Container())
} else {
exp[oo[0].Container()] = v
}
}
oo = oo[1:]

View file

@ -27,22 +27,22 @@ type DeletePrm struct {
// DeleteRes groups the resulting values of Delete operation.
type DeleteRes struct {
rawRemoved uint64
availableRemoved uint64
userRemoved uint64
sizes []uint64
availableSizes []uint64
phyCount uint64
logicCount uint64
userCount uint64
phySize uint64
logicSize uint64
removedByCnrID map[cid.ID]ObjectCounters
}
// AvailableObjectsRemoved returns the number of removed available
// LogicCount returns the number of removed logic
// objects.
func (d DeleteRes) AvailableObjectsRemoved() uint64 {
return d.availableRemoved
func (d DeleteRes) LogicCount() uint64 {
return d.logicCount
}
func (d DeleteRes) UserObjectsRemoved() uint64 {
return d.userRemoved
func (d DeleteRes) UserCount() uint64 {
return d.userCount
}
// RemovedByCnrID returns the number of removed objects by container ID.
@ -50,19 +50,19 @@ func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
return d.removedByCnrID
}
// RawObjectsRemoved returns the number of removed raw objects.
func (d DeleteRes) RawObjectsRemoved() uint64 {
return d.rawRemoved
// PhyCount returns the number of removed physical objects.
func (d DeleteRes) PhyCount() uint64 {
return d.phyCount
}
// RemovedPhysicalObjectSizes returns the sizes of removed physical objects.
func (d DeleteRes) RemovedPhysicalObjectSizes() []uint64 {
return d.sizes
// PhySize returns the size of removed physical objects.
func (d DeleteRes) PhySize() uint64 {
return d.phySize
}
// RemovedLogicalObjectSizes returns the sizes of removed logical objects.
func (d DeleteRes) RemovedLogicalObjectSizes() []uint64 {
return d.availableSizes
// LogicSize returns the size of removed logical objects.
func (d DeleteRes) LogicSize() uint64 {
return d.logicSize
}
// SetAddresses is a Delete option to set the addresses of the objects to delete.
@ -129,8 +129,6 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
// references of the split objects.
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
res := DeleteRes{
sizes: make([]uint64, len(addrs)),
availableSizes: make([]uint64, len(addrs)),
removedByCnrID: make(map[cid.ID]ObjectCounters),
}
refCounter := make(referenceCounter, len(addrs))
@ -162,22 +160,22 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
}
func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
if res.rawRemoved > 0 {
err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false)
if res.phyCount > 0 {
err := db.updateShardObjectCounter(tx, phy, res.phyCount, false)
if err != nil {
return fmt.Errorf("could not decrease phy object counter: %w", err)
}
}
if res.availableRemoved > 0 {
err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false)
if res.logicCount > 0 {
err := db.updateShardObjectCounter(tx, logical, res.logicCount, false)
if err != nil {
return fmt.Errorf("could not decrease logical object counter: %w", err)
}
}
if res.userRemoved > 0 {
err := db.updateShardObjectCounter(tx, user, res.userRemoved, false)
if res.userCount > 0 {
err := db.updateShardObjectCounter(tx, user, res.userCount, false)
if err != nil {
return fmt.Errorf("could not decrease user object counter: %w", err)
}
@ -190,7 +188,7 @@ func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
}
func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.Address, i int) {
if r.Removed {
if r.Phy {
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.Phy++
res.removedByCnrID[addrs[i].Container()] = v
@ -200,11 +198,11 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A
}
}
res.rawRemoved++
res.sizes[i] = r.Size
res.phyCount++
res.phySize += r.Size
}
if r.Available {
if r.Logic {
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.Logic++
res.removedByCnrID[addrs[i].Container()] = v
@ -214,8 +212,8 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A
}
}
res.availableRemoved++
res.availableSizes[i] = r.Size
res.logicCount++
res.logicSize += r.Size
}
if r.User {
@ -228,13 +226,13 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A
}
}
res.userRemoved++
res.userCount++
}
}
type deleteSingleResult struct {
Removed bool
Available bool
Phy bool
Logic bool
User bool
Size uint64
}
@ -302,8 +300,8 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
}
return deleteSingleResult{
Removed: true,
Available: removeAvailableObject,
Phy: true,
Logic: removeAvailableObject,
User: isUserObject && removeAvailableObject,
Size: obj.PayloadSize(),
}, nil

View file

@ -38,16 +38,16 @@ type DeletionInfo struct {
// InhumeRes encapsulates results of Inhume operation.
type InhumeRes struct {
deletedLockObj []oid.Address
availableInhumed uint64
logicInhumed uint64
userInhumed uint64
inhumedByCnrID map[cid.ID]ObjectCounters
deletionDetails []DeletionInfo
}
// AvailableInhumed return number of available object
// LogicInhumed return number of logic object
// that have been inhumed.
func (i InhumeRes) AvailableInhumed() uint64 {
return i.availableInhumed
func (i InhumeRes) LogicInhumed() uint64 {
return i.logicInhumed
}
func (i InhumeRes) UserInhumed() uint64 {
@ -87,7 +87,7 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, is
CID: containerID,
IsUser: isUser,
})
i.availableInhumed++
i.logicInhumed++
if isUser {
i.userInhumed++
}
@ -265,7 +265,7 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
}
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
if err := db.updateShardObjectCounter(tx, logical, res.AvailableInhumed(), false); err != nil {
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
return err
}
if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil {

View file

@ -1,9 +1,13 @@
package shard
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type ContainerSizePrm struct {
@ -39,3 +43,84 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
size: size,
}, nil
}
type ContainerCountPrm struct {
ContainerID cid.ID
}
type ContainerCountRes struct {
Phy uint64
Logic uint64
User uint64
}
func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (ContainerCountRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ContainerCount",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.Stringer("container_id", prm.ContainerID),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return ContainerCountRes{}, ErrDegradedMode
}
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
if err != nil {
return ContainerCountRes{}, fmt.Errorf("could not get container counters: %w", err)
}
return ContainerCountRes{
Phy: counters.Phy,
Logic: counters.Logic,
User: counters.User,
}, nil
}
func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerSize",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.Stringer("container_id", id),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.metaBase.DeleteContainerSize(ctx, id)
}
func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerCount",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.Stringer("container_id", id),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.metaBase.DeleteContainerCount(ctx, id)
}

View file

@ -155,6 +155,7 @@ func (s *Shard) Init(ctx context.Context) error {
s.collectExpiredLocks,
s.collectExpiredObjects,
s.collectExpiredTombstones,
s.collectExpiredMetrics,
},
},
},

View file

@ -141,16 +141,12 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error
if err != nil {
return err
}
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
s.decObjectCounterBy(user, res.UserObjectsRemoved())
s.decObjectCounterBy(physical, res.PhyCount())
s.decObjectCounterBy(logical, res.LogicCount())
s.decObjectCounterBy(user, res.UserCount())
s.decContainerObjectCounter(res.RemovedByCnrID())
removedPayload := res.RemovedPhysicalObjectSizes()[0]
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]
if logicalRemovedPayload > 0 {
s.addToContainerSize(addr.Container().EncodeToString(), -int64(logicalRemovedPayload))
}
s.addToPayloadSize(-int64(removedPayload))
s.addToContainerSize(addr.Container().EncodeToString(), -int64(res.LogicSize()))
s.addToPayloadSize(-int64(res.PhySize()))
return nil
}

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
@ -414,8 +415,8 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return
}
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular)
s.decObjectCounterBy(logical, res.AvailableInhumed())
s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeRegular)
s.decObjectCounterBy(logical, res.LogicInhumed())
s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
@ -629,8 +630,8 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
return
}
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone)
s.decObjectCounterBy(logical, res.AvailableInhumed())
s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeTombstone)
s.decObjectCounterBy(logical, res.LogicInhumed())
s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
@ -677,8 +678,8 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
return
}
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock)
s.decObjectCounterBy(logical, res.AvailableInhumed())
s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeLock)
s.decObjectCounterBy(logical, res.LogicInhumed())
s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
@ -726,3 +727,40 @@ func (s *Shard) HandleDeletedLocks(lockers []oid.Address) {
func (s *Shard) NotificationChannel() chan<- Event {
return s.gc.eventChan
}
func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) {
ctx, span := tracing.StartSpanFromContext(ctx, "shard.collectExpiredMetrics")
defer span.End()
epoch := e.(newEpoch).epoch
s.log.Debug(logs.ShardGCCollectingExpiredMetricsStarted, zap.Uint64("epoch", epoch))
defer s.log.Debug(logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch))
s.collectExpiredContainerSizeMetrics(ctx, epoch)
s.collectExpiredContainerCountMetrics(ctx, epoch)
}
func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) {
ids, err := s.metaBase.ZeroSizeContainers(ctx)
if err != nil {
s.log.Warn(logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
}
if len(ids) == 0 {
return
}
s.zeroSizeContainersCallback(ctx, ids)
}
func (s *Shard) collectExpiredContainerCountMetrics(ctx context.Context, epoch uint64) {
ids, err := s.metaBase.ZeroCountContainers(ctx)
if err != nil {
s.log.Warn(logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err))
return
}
if len(ids) == 0 {
return
}
s.zeroCountContainersCallback(ctx, ids)
}

View file

@ -121,7 +121,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
s.m.RUnlock()
s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decObjectCounterBy(logical, res.LogicInhumed())
s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())

View file

@ -336,13 +336,9 @@ func TestCounters(t *testing.T) {
v.Logic--
v.Phy--
v.User--
if v.IsZero() {
delete(expected, cnr)
} else {
expected[cnr] = v
}
}
}
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize())

View file

@ -53,6 +53,9 @@ type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address)
// DeletedLockCallback is a callback handling list of deleted LOCK objects.
type DeletedLockCallback func(context.Context, []oid.Address)
// EmptyContainersCallback is a callback hanfling list of zero-size and zero-count containers.
type EmptyContainersCallback func(context.Context, []cid.ID)
// MetricsWriter is an interface that must store shard's metrics.
type MetricsWriter interface {
// SetObjectCounter must set object counter taking into account object type.
@ -118,6 +121,9 @@ type cfg struct {
deletedLockCallBack DeletedLockCallback
zeroSizeContainersCallback EmptyContainersCallback
zeroCountContainersCallback EmptyContainersCallback
tsSource TombstoneSource
metricsWriter MetricsWriter
@ -134,6 +140,8 @@ func defaultCfg() *cfg {
gcCfg: defaultGCCfg(),
reportErrorFunc: func(string, string, error) {},
rebuildLimiter: &noopRebuildLimiter{},
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
}
}
@ -363,6 +371,20 @@ func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option {
}
}
// WithZeroSizeCallback returns option to set zero-size containers callback.
func WithZeroSizeCallback(cb EmptyContainersCallback) Option {
return func(c *cfg) {
c.zeroSizeContainersCallback = cb
}
}
// WithZeroCountCallback returns option to set zero-count containers callback.
func WithZeroCountCallback(cb EmptyContainersCallback) Option {
return func(c *cfg) {
c.zeroCountContainersCallback = cb
}
}
func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()

View file

@ -11,6 +11,8 @@ import (
type EngineMetrics interface {
AddMethodDuration(method string, d time.Duration)
AddToContainerSize(cnrID string, size int64)
DeleteContainerSize(cnrID string)
DeleteContainerCount(cnrID string)
IncErrorCounter(shardID string)
ClearErrorCounter(shardID string)
DeleteShardMetrics(shardID string)
@ -79,6 +81,14 @@ func (m *engineMetrics) AddToContainerSize(cnrID string, size int64) {
m.containerSize.With(prometheus.Labels{containerIDLabelKey: cnrID}).Add(float64(size))
}
func (m *engineMetrics) DeleteContainerSize(cnrID string) {
m.containerSize.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID})
}
func (m *engineMetrics) DeleteContainerCount(cnrID string) {
m.contObjCounter.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID})
}
func (m *engineMetrics) AddToPayloadCounter(shardID string, size int64) {
m.payloadSize.With(prometheus.Labels{shardIDLabel: shardID}).Add(float64(size))
}