From dfd62ca6b1e20ee3a091da36fd77f7c262d6af5d Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 25 Dec 2023 10:15:26 +0300 Subject: [PATCH 1/3] [#864] metabase: Refactor delete/inhume Available -> Logic, Raw -> Phy for delete/inhume results. Use single counter instead of vectors. Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/metabase/counter.go | 2 +- .../metabase/counter_test.go | 14 ++-- pkg/local_object_storage/metabase/delete.go | 84 +++++++++---------- pkg/local_object_storage/metabase/inhume.go | 20 ++--- pkg/local_object_storage/shard/delete.go | 14 ++-- pkg/local_object_storage/shard/gc.go | 12 +-- pkg/local_object_storage/shard/inhume.go | 2 +- 7 files changed, 71 insertions(+), 77 deletions(-) diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index 96d80077d..581df7a59 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -239,7 +239,7 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6 return b.Put(counterKey, newCounter) } -func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { // TODO #838 +func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { b := tx.Bucket(containerCounterBucketName) if b == nil { return nil diff --git a/pkg/local_object_storage/metabase/counter_test.go b/pkg/local_object_storage/metabase/counter_test.go index 9241c97a0..2f40daaae 100644 --- a/pkg/local_object_storage/metabase/counter_test.go +++ b/pkg/local_object_storage/metabase/counter_test.go @@ -91,7 +91,7 @@ func TestCounters(t *testing.T) { res, err := db.Delete(context.Background(), prm) require.NoError(t, err) - require.Equal(t, uint64(1), res.AvailableObjectsRemoved()) + require.Equal(t, uint64(1), res.LogicCount()) c, err := db.ObjectCounters() require.NoError(t, err) @@ -161,7 +161,7 @@ func TestCounters(t *testing.T) { res, err := db.Inhume(context.Background(), prm) require.NoError(t, err) - require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed()) + require.Equal(t, uint64(len(inhumedObjs)), res.LogicInhumed()) require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed()) c, err := db.ObjectCounters() @@ -389,7 +389,7 @@ func TestCounters_Expired(t *testing.T) { inhumeRes, err := db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) - require.Equal(t, uint64(1), inhumeRes.AvailableInhumed()) + require.Equal(t, uint64(1), inhumeRes.LogicInhumed()) require.Equal(t, uint64(1), inhumeRes.UserInhumed()) c, err = db.ObjectCounters() @@ -423,8 +423,8 @@ func TestCounters_Expired(t *testing.T) { deleteRes, err := db.Delete(context.Background(), deletePrm) require.NoError(t, err) - require.Zero(t, deleteRes.AvailableObjectsRemoved()) - require.Zero(t, deleteRes.UserObjectsRemoved()) + require.Zero(t, deleteRes.LogicCount()) + require.Zero(t, deleteRes.UserCount()) if v, ok := exp[oo[0].Container()]; ok { v.Phy-- @@ -456,8 +456,8 @@ func TestCounters_Expired(t *testing.T) { deleteRes, err = db.Delete(context.Background(), deletePrm) require.NoError(t, err) - require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved()) - require.Equal(t, uint64(1), deleteRes.UserObjectsRemoved()) + require.Equal(t, uint64(1), deleteRes.LogicCount()) + require.Equal(t, uint64(1), deleteRes.UserCount()) if v, ok := exp[oo[0].Container()]; ok { v.Phy-- diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 6a5661ed1..b2210b55c 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -27,22 +27,22 @@ type DeletePrm struct { // DeleteRes groups the resulting values of Delete operation. type DeleteRes struct { - rawRemoved uint64 - availableRemoved uint64 - userRemoved uint64 - sizes []uint64 - availableSizes []uint64 - removedByCnrID map[cid.ID]ObjectCounters + phyCount uint64 + logicCount uint64 + userCount uint64 + phySize uint64 + logicSize uint64 + removedByCnrID map[cid.ID]ObjectCounters } -// AvailableObjectsRemoved returns the number of removed available +// LogicCount returns the number of removed logic // objects. -func (d DeleteRes) AvailableObjectsRemoved() uint64 { - return d.availableRemoved +func (d DeleteRes) LogicCount() uint64 { + return d.logicCount } -func (d DeleteRes) UserObjectsRemoved() uint64 { - return d.userRemoved +func (d DeleteRes) UserCount() uint64 { + return d.userCount } // RemovedByCnrID returns the number of removed objects by container ID. @@ -50,19 +50,19 @@ func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters { return d.removedByCnrID } -// RawObjectsRemoved returns the number of removed raw objects. -func (d DeleteRes) RawObjectsRemoved() uint64 { - return d.rawRemoved +// PhyCount returns the number of removed physical objects. +func (d DeleteRes) PhyCount() uint64 { + return d.phyCount } -// RemovedPhysicalObjectSizes returns the sizes of removed physical objects. -func (d DeleteRes) RemovedPhysicalObjectSizes() []uint64 { - return d.sizes +// PhySize returns the size of removed physical objects. +func (d DeleteRes) PhySize() uint64 { + return d.phySize } -// RemovedLogicalObjectSizes returns the sizes of removed logical objects. -func (d DeleteRes) RemovedLogicalObjectSizes() []uint64 { - return d.availableSizes +// LogicSize returns the size of removed logical objects. +func (d DeleteRes) LogicSize() uint64 { + return d.logicSize } // SetAddresses is a Delete option to set the addresses of the objects to delete. @@ -129,8 +129,6 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { // references of the split objects. func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) { res := DeleteRes{ - sizes: make([]uint64, len(addrs)), - availableSizes: make([]uint64, len(addrs)), removedByCnrID: make(map[cid.ID]ObjectCounters), } refCounter := make(referenceCounter, len(addrs)) @@ -162,22 +160,22 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) } func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error { - if res.rawRemoved > 0 { - err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false) + if res.phyCount > 0 { + err := db.updateShardObjectCounter(tx, phy, res.phyCount, false) if err != nil { return fmt.Errorf("could not decrease phy object counter: %w", err) } } - if res.availableRemoved > 0 { - err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false) + if res.logicCount > 0 { + err := db.updateShardObjectCounter(tx, logical, res.logicCount, false) if err != nil { return fmt.Errorf("could not decrease logical object counter: %w", err) } } - if res.userRemoved > 0 { - err := db.updateShardObjectCounter(tx, user, res.userRemoved, false) + if res.userCount > 0 { + err := db.updateShardObjectCounter(tx, user, res.userCount, false) if err != nil { return fmt.Errorf("could not decrease user object counter: %w", err) } @@ -190,7 +188,7 @@ func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error { } func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.Address, i int) { - if r.Removed { + if r.Phy { if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { v.Phy++ res.removedByCnrID[addrs[i].Container()] = v @@ -200,11 +198,11 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A } } - res.rawRemoved++ - res.sizes[i] = r.Size + res.phyCount++ + res.phySize += r.Size } - if r.Available { + if r.Logic { if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { v.Logic++ res.removedByCnrID[addrs[i].Container()] = v @@ -214,8 +212,8 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A } } - res.availableRemoved++ - res.availableSizes[i] = r.Size + res.logicCount++ + res.logicSize += r.Size } if r.User { @@ -228,15 +226,15 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A } } - res.userRemoved++ + res.userCount++ } } type deleteSingleResult struct { - Removed bool - Available bool - User bool - Size uint64 + Phy bool + Logic bool + User bool + Size uint64 } // delete removes object indexes from the metabase. Counts the references @@ -302,10 +300,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter } return deleteSingleResult{ - Removed: true, - Available: removeAvailableObject, - User: isUserObject && removeAvailableObject, - Size: obj.PayloadSize(), + Phy: true, + Logic: removeAvailableObject, + User: isUserObject && removeAvailableObject, + Size: obj.PayloadSize(), }, nil } diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index 21db9a629..3b2f01673 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -37,17 +37,17 @@ type DeletionInfo struct { // InhumeRes encapsulates results of Inhume operation. type InhumeRes struct { - deletedLockObj []oid.Address - availableInhumed uint64 - userInhumed uint64 - inhumedByCnrID map[cid.ID]ObjectCounters - deletionDetails []DeletionInfo + deletedLockObj []oid.Address + logicInhumed uint64 + userInhumed uint64 + inhumedByCnrID map[cid.ID]ObjectCounters + deletionDetails []DeletionInfo } -// AvailableInhumed return number of available object +// LogicInhumed return number of logic object // that have been inhumed. -func (i InhumeRes) AvailableInhumed() uint64 { - return i.availableInhumed +func (i InhumeRes) LogicInhumed() uint64 { + return i.logicInhumed } func (i InhumeRes) UserInhumed() uint64 { @@ -87,7 +87,7 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, is CID: containerID, IsUser: isUser, }) - i.availableInhumed++ + i.logicInhumed++ if isUser { i.userInhumed++ } @@ -265,7 +265,7 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes } func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { - if err := db.updateShardObjectCounter(tx, logical, res.AvailableInhumed(), false); err != nil { + if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil { return err } if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil { diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index 1c510bdb7..44f6c6b48 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -141,16 +141,12 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error if err != nil { return err } - s.decObjectCounterBy(physical, res.RawObjectsRemoved()) - s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) - s.decObjectCounterBy(user, res.UserObjectsRemoved()) + s.decObjectCounterBy(physical, res.PhyCount()) + s.decObjectCounterBy(logical, res.LogicCount()) + s.decObjectCounterBy(user, res.UserCount()) s.decContainerObjectCounter(res.RemovedByCnrID()) - removedPayload := res.RemovedPhysicalObjectSizes()[0] - logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0] - if logicalRemovedPayload > 0 { - s.addToContainerSize(addr.Container().EncodeToString(), -int64(logicalRemovedPayload)) - } - s.addToPayloadSize(-int64(removedPayload)) + s.addToContainerSize(addr.Container().EncodeToString(), -int64(res.LogicSize())) + s.addToPayloadSize(-int64(res.PhySize())) return nil } diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 0f978f26e..d82db82b9 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -414,8 +414,8 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) return } - s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular) - s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeRegular) + s.decObjectCounterBy(logical, res.LogicInhumed()) s.decObjectCounterBy(user, res.UserInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) @@ -629,8 +629,8 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston return } - s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone) - s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeTombstone) + s.decObjectCounterBy(logical, res.LogicInhumed()) s.decObjectCounterBy(user, res.UserInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) @@ -677,8 +677,8 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers [] return } - s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock) - s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeLock) + s.decObjectCounterBy(logical, res.LogicInhumed()) s.decObjectCounterBy(user, res.UserInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) diff --git a/pkg/local_object_storage/shard/inhume.go b/pkg/local_object_storage/shard/inhume.go index 48cfaa98e..746177c3a 100644 --- a/pkg/local_object_storage/shard/inhume.go +++ b/pkg/local_object_storage/shard/inhume.go @@ -121,7 +121,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { s.m.RUnlock() - s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.decObjectCounterBy(logical, res.LogicInhumed()) s.decObjectCounterBy(user, res.UserInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) -- 2.45.2 From d75e7e9a21bf883870d4c39f8abf325976a32b5f Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 27 Dec 2023 18:58:36 +0300 Subject: [PATCH 2/3] [#864] engine: Drop container size metric if container deleted Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/container.go | 2 + internal/logs/logs.go | 7 + pkg/local_object_storage/engine/engine.go | 35 ++++- pkg/local_object_storage/engine/inhume.go | 97 ++++++++++++++ pkg/local_object_storage/engine/metrics.go | 1 + pkg/local_object_storage/engine/shards.go | 1 + pkg/local_object_storage/metabase/counter.go | 130 +++++++++++++++++++ pkg/local_object_storage/shard/container.go | 26 ++++ pkg/local_object_storage/shard/control.go | 1 + pkg/local_object_storage/shard/gc.go | 25 ++++ pkg/local_object_storage/shard/shard.go | 23 +++- pkg/metrics/engine.go | 5 + 12 files changed, 347 insertions(+), 6 deletions(-) diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index 2b9bdd71b..898a4eef9 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -45,6 +45,8 @@ func initContainerService(_ context.Context, c *cfg) { c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { containerGRPC.RegisterContainerServiceServer(s, server) }) + + c.cfgObject.cfgLocalStorage.localStorage.SetContainerSource(cnrRdr) } func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) { diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 6ead83700..3075e66e9 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -557,4 +557,11 @@ const ( BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed" ObjectRemovalFailureExistsInWritecache = "can't remove object: object must be flushed from writecache" FailedToReportStatusToSystemd = "failed to report status to systemd" + ShardGCCollectingExpiredMetricsStarted = "collecting expired metrics started" + ShardGCCollectingExpiredMetricsCompleted = "collecting expired metrics completed" + ShardGCFailedToCollectZeroSizeContainers = "failed to collect zero-size containers" + EngineFailedToCheckContainerAvailability = "failed to check container availability" + EngineFailedToGetContainerSize = "failed to get container size" + EngineFailedToDeleteContainerSize = "failed to delete container size" + EngineInterruptProcessingZeroSizeContainers = "interrupt processing zero-size containers" ) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 026a7c336..e03a08abc 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -7,12 +7,14 @@ import ( "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.uber.org/zap" ) @@ -218,14 +220,18 @@ type cfg struct { lowMem bool rebuildWorkersCount uint32 + + containerSource atomic.Pointer[containerSource] } func defaultCfg() *cfg { - return &cfg{ + res := &cfg{ log: &logger.Logger{Logger: zap.L()}, shardPoolSize: 20, rebuildWorkersCount: 100, } + res.containerSource.Store(&containerSource{}) + return res } // New creates, initializes and returns new StorageEngine instance. @@ -288,3 +294,30 @@ func WithRebuildWorkersCount(count uint32) Option { c.rebuildWorkersCount = count } } + +// SetContainerSource sets container source. +func (e *StorageEngine) SetContainerSource(cs container.Source) { + e.containerSource.Store(&containerSource{cs: cs}) +} + +type containerSource struct { + cs container.Source +} + +func (s *containerSource) IsContainerAvailable(ctx context.Context, id cid.ID) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + + if s == nil || s.cs == nil { + return true, nil + } + + wasRemoved, err := container.WasRemoved(s.cs, id) + if err != nil { + return false, err + } + return !wasRemoved, nil +} diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 293746f70..6750adec2 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" @@ -259,3 +260,99 @@ func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.A } }) } + +func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid.ID) { + if len(ids) == 0 { + return + } + + idMap, err := e.selectNonExistedIDs(ctx, ids) + if err != nil { + return + } + + if len(idMap) == 0 { + return + } + + var failed bool + var prm shard.ContainerSizePrm + e.iterateOverUnsortedShards(func(sh hashedShard) bool { + select { + case <-ctx.Done(): + e.log.Info(logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(ctx.Err())) + failed = true + return true + default: + } + + var drop []cid.ID + for id := range idMap { + prm.SetContainerID(id) + s, err := sh.ContainerSize(prm) + if err != nil { + e.log.Warn(logs.EngineFailedToGetContainerSize, zap.Stringer("container_id", id), zap.Error(err)) + failed = true + return true + } + if s.Size() > 0 { + drop = append(drop, id) + } + } + for _, id := range drop { + delete(idMap, id) + } + + return len(idMap) == 0 + }) + + if failed || len(idMap) == 0 { + return + } + + e.iterateOverUnsortedShards(func(sh hashedShard) bool { + select { + case <-ctx.Done(): + e.log.Info(logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(ctx.Err())) + failed = true + return true + default: + } + + for id := range idMap { + if err := sh.DeleteContainerSize(ctx, id); err != nil { + e.log.Warn(logs.EngineFailedToDeleteContainerSize, zap.Stringer("container_id", id), zap.Error(err)) + failed = true + return true + } + } + + return false + }) + + if failed { + return + } + + for id := range idMap { + e.metrics.DeleteContainerSize(id.EncodeToString()) + } +} + +func (e *StorageEngine) selectNonExistedIDs(ctx context.Context, ids []cid.ID) (map[cid.ID]struct{}, error) { + cs := e.containerSource.Load() + + idMap := make(map[cid.ID]struct{}) + for _, id := range ids { + isAvailable, err := cs.IsContainerAvailable(ctx, id) + if err != nil { + e.log.Warn(logs.EngineFailedToCheckContainerAvailability, zap.Stringer("container_id", id), zap.Error(err)) + return nil, err + } + if isAvailable { + continue + } + idMap[id] = struct{}{} + } + return idMap, nil +} diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index 363d098f6..e4cc504fa 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -16,6 +16,7 @@ type MetricRegister interface { SetMode(shardID string, mode mode.Mode) AddToContainerSize(cnrID string, size int64) + DeleteContainerSize(cnrID string) AddToPayloadCounter(shardID string, size int64) IncErrorCounter(shardID string) ClearErrorCounter(shardID string) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 7e3e85e81..cb3a4eca5 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -119,6 +119,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (* shard.WithDeletedLockCallback(e.processDeletedLocks), shard.WithReportErrorFunc(e.reportShardErrorBackground), shard.WithRebuildWorkerLimiter(e.rebuildLimiter), + shard.WithZeroSizeCallback(e.processZeroSizeContainers), )...) if err := sh.UpdateID(ctx); err != nil { diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index 581df7a59..271e78fe4 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -14,6 +14,8 @@ import ( objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var ( @@ -480,3 +482,131 @@ func IsUserObject(obj *objectSDK.Object) bool { (obj.SplitID() == nil || (hasParentID && len(obj.Children()) == 0)) } + +// ZeroSizeContainers returns containers with size = 0. +func (db *DB) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ZeroSizeContainers", time.Since(startedAt), success) + }() + + ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroSizeContainers") + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + var result []cid.ID + lastKey := make([]byte, cidSize) + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + completed, err := db.containerSizesNextBatch(lastKey, func(contID cid.ID, size uint64) { + if size == 0 { + result = append(result, contID) + } + }) + if err != nil { + return nil, err + } + if completed { + break + } + } + + success = true + return result, nil +} + +func (db *DB) containerSizesNextBatch(lastKey []byte, f func(cid.ID, uint64)) (bool, error) { + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return false, ErrDegradedMode + } + + counter := 0 + const batchSize = 1000 + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(containerVolumeBucketName) + c := b.Cursor() + var key, value []byte + for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() { + if bytes.Equal(lastKey, key) { + continue + } + copy(lastKey, key) + + size := parseContainerSize(value) + var id cid.ID + if err := id.Decode(key); err != nil { + return err + } + f(id, size) + + counter++ + if counter == batchSize { + break + } + } + + if counter < batchSize { + return ErrInterruptIterator + } + return nil + }) + if err != nil { + if errors.Is(err, ErrInterruptIterator) { + return true, nil + } + return false, metaerr.Wrap(err) + } + return false, nil +} + +func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("DeleteContainerSize", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerSize", + trace.WithAttributes( + attribute.Stringer("container_id", id), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + if db.mode.ReadOnly() { + return ErrReadOnlyMode + } + + err := db.boltDB.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(containerVolumeBucketName) + + key := make([]byte, cidSize) + id.Encode(key) + return b.Delete(key) + }) + success = err == nil + return metaerr.Wrap(err) +} diff --git a/pkg/local_object_storage/shard/container.go b/pkg/local_object_storage/shard/container.go index 24090e8d8..856885486 100644 --- a/pkg/local_object_storage/shard/container.go +++ b/pkg/local_object_storage/shard/container.go @@ -1,9 +1,13 @@ package shard import ( + "context" "fmt" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type ContainerSizePrm struct { @@ -39,3 +43,25 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) { size: size, }, nil } + +func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerSize", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.Stringer("container_id", id), + )) + defer span.End() + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.ReadOnly() { + return ErrReadOnlyMode + } + + if s.info.Mode.NoMetabase() { + return ErrDegradedMode + } + + return s.metaBase.DeleteContainerSize(ctx, id) +} diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index f103ebc2b..574bc1430 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -155,6 +155,7 @@ func (s *Shard) Init(ctx context.Context) error { s.collectExpiredLocks, s.collectExpiredObjects, s.collectExpiredTombstones, + s.collectExpiredMetrics, }, }, }, diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index d82db82b9..3017dce1f 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -10,6 +10,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" @@ -726,3 +727,27 @@ func (s *Shard) HandleDeletedLocks(lockers []oid.Address) { func (s *Shard) NotificationChannel() chan<- Event { return s.gc.eventChan } + +func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) { + ctx, span := tracing.StartSpanFromContext(ctx, "shard.collectExpiredMetrics") + defer span.End() + + epoch := e.(newEpoch).epoch + + s.log.Debug(logs.ShardGCCollectingExpiredMetricsStarted, zap.Uint64("epoch", epoch)) + defer s.log.Debug(logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch)) + + s.collectExpiredContainerSizeMetrics(ctx, epoch) +} + +func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) { + ids, err := s.metaBase.ZeroSizeContainers(ctx) + if err != nil { + s.log.Warn(logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err)) + return + } + if len(ids) == 0 { + return + } + s.zeroSizeContainersCallback(ctx, ids) +} diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 641b487ed..a83445cfe 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -53,6 +53,9 @@ type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address) // DeletedLockCallback is a callback handling list of deleted LOCK objects. type DeletedLockCallback func(context.Context, []oid.Address) +// ZeroSizeContainersCallback is a callback hanfling list of zero-size containers. +type ZeroSizeContainersCallback func(context.Context, []cid.ID) + // MetricsWriter is an interface that must store shard's metrics. type MetricsWriter interface { // SetObjectCounter must set object counter taking into account object type. @@ -118,6 +121,8 @@ type cfg struct { deletedLockCallBack DeletedLockCallback + zeroSizeContainersCallback ZeroSizeContainersCallback + tsSource TombstoneSource metricsWriter MetricsWriter @@ -129,11 +134,12 @@ type cfg struct { func defaultCfg() *cfg { return &cfg{ - rmBatchSize: 100, - log: &logger.Logger{Logger: zap.L()}, - gcCfg: defaultGCCfg(), - reportErrorFunc: func(string, string, error) {}, - rebuildLimiter: &noopRebuildLimiter{}, + rmBatchSize: 100, + log: &logger.Logger{Logger: zap.L()}, + gcCfg: defaultGCCfg(), + reportErrorFunc: func(string, string, error) {}, + rebuildLimiter: &noopRebuildLimiter{}, + zeroSizeContainersCallback: func(context.Context, []cid.ID) {}, } } @@ -363,6 +369,13 @@ func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option { } } +// WithZeroSizeCallback returns option to set zero-size containers callback. +func WithZeroSizeCallback(cb ZeroSizeContainersCallback) Option { + return func(c *cfg) { + c.zeroSizeContainersCallback = cb + } +} + func (s *Shard) fillInfo() { s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go index 609c30f97..80e912048 100644 --- a/pkg/metrics/engine.go +++ b/pkg/metrics/engine.go @@ -11,6 +11,7 @@ import ( type EngineMetrics interface { AddMethodDuration(method string, d time.Duration) AddToContainerSize(cnrID string, size int64) + DeleteContainerSize(cnrID string) IncErrorCounter(shardID string) ClearErrorCounter(shardID string) DeleteShardMetrics(shardID string) @@ -79,6 +80,10 @@ func (m *engineMetrics) AddToContainerSize(cnrID string, size int64) { m.containerSize.With(prometheus.Labels{containerIDLabelKey: cnrID}).Add(float64(size)) } +func (m *engineMetrics) DeleteContainerSize(cnrID string) { + m.containerSize.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID}) +} + func (m *engineMetrics) AddToPayloadCounter(shardID string, size int64) { m.payloadSize.With(prometheus.Labels{shardIDLabel: shardID}).Add(float64(size)) } -- 2.45.2 From 4b8b4da6818ecce7700dd432f8a6ed5d8be86106 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 27 Dec 2023 18:23:12 +0300 Subject: [PATCH 3/3] [#864] engine: Drop container count metric if container removed Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 3 + pkg/local_object_storage/engine/inhume.go | 82 ++++++++++- pkg/local_object_storage/engine/metrics.go | 1 + pkg/local_object_storage/engine/shards.go | 1 + pkg/local_object_storage/metabase/counter.go | 131 +++++++++++++++++- .../metabase/counter_test.go | 18 +-- pkg/local_object_storage/shard/container.go | 59 ++++++++ pkg/local_object_storage/shard/gc.go | 13 ++ .../shard/metrics_test.go | 6 +- pkg/local_object_storage/shard/shard.go | 29 ++-- pkg/metrics/engine.go | 5 + 11 files changed, 310 insertions(+), 38 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 3075e66e9..420c6612a 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -560,8 +560,11 @@ const ( ShardGCCollectingExpiredMetricsStarted = "collecting expired metrics started" ShardGCCollectingExpiredMetricsCompleted = "collecting expired metrics completed" ShardGCFailedToCollectZeroSizeContainers = "failed to collect zero-size containers" + ShardGCFailedToCollectZeroCountContainers = "failed to collect zero-count containers" EngineFailedToCheckContainerAvailability = "failed to check container availability" EngineFailedToGetContainerSize = "failed to get container size" EngineFailedToDeleteContainerSize = "failed to delete container size" EngineInterruptProcessingZeroSizeContainers = "interrupt processing zero-size containers" + EngineInterruptProcessingZeroCountContainers = "interrupt processing zero-count containers" + EngineFailedToGetContainerCounters = "failed to get container counters" ) diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 6750adec2..fc6ee2430 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -266,7 +266,7 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid return } - idMap, err := e.selectNonExistedIDs(ctx, ids) + idMap, err := e.selectNonExistentIDs(ctx, ids) if err != nil { return } @@ -339,7 +339,85 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid } } -func (e *StorageEngine) selectNonExistedIDs(ctx context.Context, ids []cid.ID) (map[cid.ID]struct{}, error) { +func (e *StorageEngine) processZeroCountContainers(ctx context.Context, ids []cid.ID) { + if len(ids) == 0 { + return + } + + idMap, err := e.selectNonExistentIDs(ctx, ids) + if err != nil { + return + } + + if len(idMap) == 0 { + return + } + + var failed bool + var prm shard.ContainerCountPrm + e.iterateOverUnsortedShards(func(sh hashedShard) bool { + select { + case <-ctx.Done(): + e.log.Info(logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err())) + failed = true + return true + default: + } + + var drop []cid.ID + for id := range idMap { + prm.ContainerID = id + s, err := sh.ContainerCount(ctx, prm) + if err != nil { + e.log.Warn(logs.EngineFailedToGetContainerCounters, zap.Stringer("container_id", id), zap.Error(err)) + failed = true + return true + } + if s.User > 0 || s.Logic > 0 || s.Phy > 0 { + drop = append(drop, id) + } + } + for _, id := range drop { + delete(idMap, id) + } + + return len(idMap) == 0 + }) + + if failed || len(idMap) == 0 { + return + } + + e.iterateOverUnsortedShards(func(sh hashedShard) bool { + select { + case <-ctx.Done(): + e.log.Info(logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err())) + failed = true + return true + default: + } + + for id := range idMap { + if err := sh.DeleteContainerSize(ctx, id); err != nil { + e.log.Warn(logs.EngineFailedToDeleteContainerSize, zap.Stringer("container_id", id), zap.Error(err)) + failed = true + return true + } + } + + return false + }) + + if failed { + return + } + + for id := range idMap { + e.metrics.DeleteContainerCount(id.EncodeToString()) + } +} + +func (e *StorageEngine) selectNonExistentIDs(ctx context.Context, ids []cid.ID) (map[cid.ID]struct{}, error) { cs := e.containerSource.Load() idMap := make(map[cid.ID]struct{}) diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index e4cc504fa..e4117bb1a 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -17,6 +17,7 @@ type MetricRegister interface { AddToContainerSize(cnrID string, size int64) DeleteContainerSize(cnrID string) + DeleteContainerCount(cnrID string) AddToPayloadCounter(shardID string, size int64) IncErrorCounter(shardID string) ClearErrorCounter(shardID string) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index cb3a4eca5..0455471e2 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -120,6 +120,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (* shard.WithReportErrorFunc(e.reportShardErrorBackground), shard.WithRebuildWorkerLimiter(e.rebuildLimiter), shard.WithZeroSizeCallback(e.processZeroSizeContainers), + shard.WithZeroCountCallback(e.processZeroCountContainers), )...) if err := sh.UpdateID(ctx); err != nil { diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index 271e78fe4..915fbd957 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -120,7 +120,9 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) default: } - completed, err := db.containerCountersNextBatch(lastKey, &cc) + completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) { + cc.Counts[id] = entity + }) if err != nil { return cc, err } @@ -133,7 +135,7 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) return cc, nil } -func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) (bool, error) { +func (db *DB) containerCountersNextBatch(lastKey []byte, f func(id cid.ID, entity ObjectCounters)) (bool, error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -165,7 +167,7 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) if err != nil { return err } - cc.Counts[cnrID] = ent + f(cnrID, ent) counter++ if counter == batchSize { @@ -187,6 +189,43 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) return false, nil } +func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ContainerCount", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerCount") + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ObjectCounters{}, ErrDegradedMode + } + + var result ObjectCounters + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(containerCounterBucketName) + key := make([]byte, cidSize) + id.Encode(key) + v := b.Get(key) + if v == nil { + return nil + } + var err error + result, err = parseContainerCounterValue(v) + return err + }) + + return result, metaerr.Wrap(err) +} + func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil { return fmt.Errorf("could not increase phy object counter: %w", err) @@ -270,9 +309,6 @@ func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCo entity.Phy = nextValue(entity.Phy, delta.Phy, inc) entity.Logic = nextValue(entity.Logic, delta.Logic, inc) entity.User = nextValue(entity.User, delta.User, inc) - if entity.IsZero() { - return b.Delete(key) - } value := containerCounterValue(entity) return b.Put(key, value) } @@ -610,3 +646,86 @@ func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error { success = err == nil return metaerr.Wrap(err) } + +// ZeroCountContainers returns containers with objects count = 0 in metabase. +func (db *DB) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ZeroCountContainers", time.Since(startedAt), success) + }() + + ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroCountContainers") + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return nil, ErrDegradedMode + } + + var result []cid.ID + + lastKey := make([]byte, cidSize) + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) { + if entity.IsZero() { + result = append(result, id) + } + }) + if err != nil { + return nil, metaerr.Wrap(err) + } + if completed { + break + } + } + success = true + return result, nil +} + +func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("DeleteContainerCount", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerCount", + trace.WithAttributes( + attribute.Stringer("container_id", id), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + if db.mode.ReadOnly() { + return ErrReadOnlyMode + } + + err := db.boltDB.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(containerCounterBucketName) + + key := make([]byte, cidSize) + id.Encode(key) + return b.Delete(key) + }) + success = err == nil + return metaerr.Wrap(err) +} diff --git a/pkg/local_object_storage/metabase/counter_test.go b/pkg/local_object_storage/metabase/counter_test.go index 2f40daaae..4b7b565b3 100644 --- a/pkg/local_object_storage/metabase/counter_test.go +++ b/pkg/local_object_storage/metabase/counter_test.go @@ -105,11 +105,7 @@ func TestCounters(t *testing.T) { v.Phy-- v.Logic-- v.User-- - if v.IsZero() { - delete(exp, cnrID) - } else { - exp[cnrID] = v - } + exp[cnrID] = v } cc, err := db.ContainerCounters(context.Background()) @@ -428,11 +424,7 @@ func TestCounters_Expired(t *testing.T) { if v, ok := exp[oo[0].Container()]; ok { v.Phy-- - if v.IsZero() { - delete(exp, oo[0].Container()) - } else { - exp[oo[0].Container()] = v - } + exp[oo[0].Container()] = v } oo = oo[1:] @@ -463,11 +455,7 @@ func TestCounters_Expired(t *testing.T) { v.Phy-- v.Logic-- v.User-- - if v.IsZero() { - delete(exp, oo[0].Container()) - } else { - exp[oo[0].Container()] = v - } + exp[oo[0].Container()] = v } oo = oo[1:] diff --git a/pkg/local_object_storage/shard/container.go b/pkg/local_object_storage/shard/container.go index 856885486..364649b50 100644 --- a/pkg/local_object_storage/shard/container.go +++ b/pkg/local_object_storage/shard/container.go @@ -44,6 +44,43 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) { }, nil } +type ContainerCountPrm struct { + ContainerID cid.ID +} + +type ContainerCountRes struct { + Phy uint64 + Logic uint64 + User uint64 +} + +func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (ContainerCountRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ContainerCount", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.Stringer("container_id", prm.ContainerID), + )) + defer span.End() + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.NoMetabase() { + return ContainerCountRes{}, ErrDegradedMode + } + + counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID) + if err != nil { + return ContainerCountRes{}, fmt.Errorf("could not get container counters: %w", err) + } + + return ContainerCountRes{ + Phy: counters.Phy, + Logic: counters.Logic, + User: counters.User, + }, nil +} + func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerSize", trace.WithAttributes( @@ -65,3 +102,25 @@ func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error { return s.metaBase.DeleteContainerSize(ctx, id) } + +func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerCount", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.Stringer("container_id", id), + )) + defer span.End() + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.ReadOnly() { + return ErrReadOnlyMode + } + + if s.info.Mode.NoMetabase() { + return ErrDegradedMode + } + + return s.metaBase.DeleteContainerCount(ctx, id) +} diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 3017dce1f..bfd1f1b11 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -738,6 +738,7 @@ func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) { defer s.log.Debug(logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch)) s.collectExpiredContainerSizeMetrics(ctx, epoch) + s.collectExpiredContainerCountMetrics(ctx, epoch) } func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) { @@ -751,3 +752,15 @@ func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch ui } s.zeroSizeContainersCallback(ctx, ids) } + +func (s *Shard) collectExpiredContainerCountMetrics(ctx context.Context, epoch uint64) { + ids, err := s.metaBase.ZeroCountContainers(ctx) + if err != nil { + s.log.Warn(logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err)) + return + } + if len(ids) == 0 { + return + } + s.zeroCountContainersCallback(ctx, ids) +} diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index f59565cb3..9c81c747d 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -336,11 +336,7 @@ func TestCounters(t *testing.T) { v.Logic-- v.Phy-- v.User-- - if v.IsZero() { - delete(expected, cnr) - } else { - expected[cnr] = v - } + expected[cnr] = v } } require.Equal(t, expectedLogicalSizes, mm.containerSizes()) diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index a83445cfe..13f7f689b 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -53,8 +53,8 @@ type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address) // DeletedLockCallback is a callback handling list of deleted LOCK objects. type DeletedLockCallback func(context.Context, []oid.Address) -// ZeroSizeContainersCallback is a callback hanfling list of zero-size containers. -type ZeroSizeContainersCallback func(context.Context, []cid.ID) +// EmptyContainersCallback is a callback hanfling list of zero-size and zero-count containers. +type EmptyContainersCallback func(context.Context, []cid.ID) // MetricsWriter is an interface that must store shard's metrics. type MetricsWriter interface { @@ -121,7 +121,8 @@ type cfg struct { deletedLockCallBack DeletedLockCallback - zeroSizeContainersCallback ZeroSizeContainersCallback + zeroSizeContainersCallback EmptyContainersCallback + zeroCountContainersCallback EmptyContainersCallback tsSource TombstoneSource @@ -134,12 +135,13 @@ type cfg struct { func defaultCfg() *cfg { return &cfg{ - rmBatchSize: 100, - log: &logger.Logger{Logger: zap.L()}, - gcCfg: defaultGCCfg(), - reportErrorFunc: func(string, string, error) {}, - rebuildLimiter: &noopRebuildLimiter{}, - zeroSizeContainersCallback: func(context.Context, []cid.ID) {}, + rmBatchSize: 100, + log: &logger.Logger{Logger: zap.L()}, + gcCfg: defaultGCCfg(), + reportErrorFunc: func(string, string, error) {}, + rebuildLimiter: &noopRebuildLimiter{}, + zeroSizeContainersCallback: func(context.Context, []cid.ID) {}, + zeroCountContainersCallback: func(context.Context, []cid.ID) {}, } } @@ -370,12 +372,19 @@ func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option { } // WithZeroSizeCallback returns option to set zero-size containers callback. -func WithZeroSizeCallback(cb ZeroSizeContainersCallback) Option { +func WithZeroSizeCallback(cb EmptyContainersCallback) Option { return func(c *cfg) { c.zeroSizeContainersCallback = cb } } +// WithZeroCountCallback returns option to set zero-count containers callback. +func WithZeroCountCallback(cb EmptyContainersCallback) Option { + return func(c *cfg) { + c.zeroCountContainersCallback = cb + } +} + func (s *Shard) fillInfo() { s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go index 80e912048..4cc542470 100644 --- a/pkg/metrics/engine.go +++ b/pkg/metrics/engine.go @@ -12,6 +12,7 @@ type EngineMetrics interface { AddMethodDuration(method string, d time.Duration) AddToContainerSize(cnrID string, size int64) DeleteContainerSize(cnrID string) + DeleteContainerCount(cnrID string) IncErrorCounter(shardID string) ClearErrorCounter(shardID string) DeleteShardMetrics(shardID string) @@ -84,6 +85,10 @@ func (m *engineMetrics) DeleteContainerSize(cnrID string) { m.containerSize.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID}) } +func (m *engineMetrics) DeleteContainerCount(cnrID string) { + m.contObjCounter.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID}) +} + func (m *engineMetrics) AddToPayloadCounter(shardID string, size int64) { m.payloadSize.With(prometheus.Labels{shardIDLabel: shardID}).Add(float64(size)) } -- 2.45.2