diff --git a/CHANGELOG.md b/CHANGELOG.md index 2967f8b8..4a2525dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Changelog for FrostFS Node ### Added - Separate batching for replicated operations over the same container in pilorama (#1621) - Doc for extended headers (#2128) +- New `frostfs_node_object_container_size` metric for tracking size of reqular objects in a container (#2116) ### Changed - `common.PrintVerbose` prints via `cobra.Command.Printf` (#1962) diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index f2f16c63..e430707a 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -21,6 +21,8 @@ type MetricRegister interface { AddToObjectCounter(shardID, objectType string, delta int) SetReadonly(shardID string, readonly bool) + + AddToContainerSize(cnrID string, size int64) } func elapsed(addFunc func(d time.Duration)) func() { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index b6bb4201..b0cad18d 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -49,6 +49,10 @@ func (m *metricsWithID) SetReadonly(readonly bool) { m.mw.SetReadonly(m.id, readonly) } +func (m *metricsWithID) AddToContainerSize(cnr string, size int64) { + m.mw.AddToContainerSize(cnr, size) +} + // AddShard adds a new shard to the storage engine. // // Returns any error encountered that did not allow adding a shard. diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 2e6ac2a6..0204e98f 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -22,6 +22,7 @@ type DeletePrm struct { type DeleteRes struct { rawRemoved uint64 availableRemoved uint64 + sizes []uint64 } // AvailableObjectsRemoved returns the number of removed available @@ -35,6 +36,11 @@ func (d DeleteRes) RawObjectsRemoved() uint64 { return d.rawRemoved } +// RemovedObjectSizes returns the sizes of removed objects. +func (d DeleteRes) RemovedObjectSizes() []uint64 { + return d.sizes +} + // SetAddresses is a Delete option to set the addresses of the objects to delete. // // Option is required. @@ -66,9 +72,11 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) { var rawRemoved uint64 var availableRemoved uint64 var err error + var sizes = make([]uint64, len(prm.addrs)) err = db.boltDB.Update(func(tx *bbolt.Tx) error { - rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs) + // We need to clear slice because tx can try to execute multiple times. + rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs, sizes) return err }) if err == nil { @@ -81,6 +89,7 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) { return DeleteRes{ rawRemoved: rawRemoved, availableRemoved: availableRemoved, + sizes: sizes, }, err } @@ -90,7 +99,7 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) { // objects that were stored. The second return value is a logical objects // removed number: objects that were available (without Tombstones, GCMarks // non-expired, etc.) -func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, error) { +func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (uint64, uint64, error) { refCounter := make(referenceCounter, len(addrs)) currEpoch := db.epochState.CurrentEpoch() @@ -98,13 +107,14 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, er var availableDeleted uint64 for i := range addrs { - removed, available, err := db.delete(tx, addrs[i], refCounter, currEpoch) + removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch) if err != nil { return 0, 0, err // maybe log and continue? } if removed { rawDeleted++ + sizes[i] = size } if available { @@ -143,8 +153,8 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, er // The first return value indicates if an object has been removed. (removing a // non-exist object is error-free). The second return value indicates if an // object was available before the removal (for calculating the logical object -// counter). -func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, error) { +// counter). The third return value is removed object payload size. +func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, uint64, error) { key := make([]byte, addressKeySize) addrKey := addressKey(addr, key) garbageBKT := tx.Bucket(garbageBucketName) @@ -156,7 +166,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter if garbageBKT != nil { err := garbageBKT.Delete(addrKey) if err != nil { - return false, false, fmt.Errorf("could not remove from garbage bucket: %w", err) + return false, false, 0, fmt.Errorf("could not remove from garbage bucket: %w", err) } } @@ -167,10 +177,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter var notFoundErr apistatus.ObjectNotFound if errors.As(err, ¬FoundErr) || errors.As(err, &siErr) { - return false, false, nil + return false, false, 0, nil } - return false, false, err + return false, false, 0, err } // if object is an only link to a parent, then remove parent @@ -196,10 +206,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter // remove object err = db.deleteObject(tx, obj, false) if err != nil { - return false, false, fmt.Errorf("could not remove object: %w", err) + return false, false, 0, fmt.Errorf("could not remove object: %w", err) } - return true, removeAvailableObject, nil + return true, removeAvailableObject, obj.PayloadSize(), nil } func (db *DB) deleteObject( diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 0a2506a1..c96b267e 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -132,7 +132,7 @@ func (s *Shard) Init() error { } } - s.updateObjectCounter() + s.updateMetrics() s.gc = &gc{ gcCfg: &s.gcCfg, diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index 77d1c977..044b2f55 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -80,6 +80,9 @@ func (s *Shard) delete(prm DeletePrm) (DeleteRes, error) { s.decObjectCounterBy(physical, res.RawObjectsRemoved()) s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) + for i := range prm.addr { + s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(res.RemovedObjectSizes()[i])) + } for i := range prm.addr { var delPrm common.DeletePrm diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index 9500da5f..da73cca1 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -17,26 +17,28 @@ import ( ) type metricsStore struct { - s map[string]uint64 + objCounters map[string]uint64 + cnrSize map[string]int64 + readOnly bool } func (m metricsStore) SetShardID(_ string) {} func (m metricsStore) SetObjectCounter(objectType string, v uint64) { - m.s[objectType] = v + m.objCounters[objectType] = v } func (m metricsStore) AddToObjectCounter(objectType string, delta int) { switch { case delta > 0: - m.s[objectType] += uint64(delta) + m.objCounters[objectType] += uint64(delta) case delta < 0: uDelta := uint64(-delta) - if m.s[objectType] >= uDelta { - m.s[objectType] -= uDelta + if m.objCounters[objectType] >= uDelta { + m.objCounters[objectType] -= uDelta } else { - m.s[objectType] = 0 + m.objCounters[objectType] = 0 } case delta == 0: return @@ -44,19 +46,19 @@ func (m metricsStore) AddToObjectCounter(objectType string, delta int) { } func (m metricsStore) IncObjectCounter(objectType string) { - m.s[objectType] += 1 + m.objCounters[objectType] += 1 } func (m metricsStore) DecObjectCounter(objectType string) { m.AddToObjectCounter(objectType, -1) } -func (m metricsStore) SetReadonly(r bool) { - if r { - m.s[readonly] = 1 - } else { - m.s[readonly] = 0 - } +func (m *metricsStore) SetReadonly(r bool) { + m.readOnly = r +} + +func (m metricsStore) AddToContainerSize(cnr string, size int64) { + m.cnrSize[cnr] += size } const physical = "phy" @@ -68,9 +70,9 @@ func TestCounters(t *testing.T) { sh, mm := shardWithMetrics(t, dir) sh.SetMode(mode.ReadOnly) - require.Equal(t, mm.s[readonly], uint64(1)) + require.True(t, mm.readOnly) sh.SetMode(mode.ReadWrite) - require.Equal(t, mm.s[readonly], uint64(0)) + require.False(t, mm.readOnly) const objNumber = 10 oo := make([]*object.Object, objNumber) @@ -79,10 +81,17 @@ func TestCounters(t *testing.T) { } t.Run("defaults", func(t *testing.T) { - require.Zero(t, mm.s[physical]) - require.Zero(t, mm.s[logical]) + require.Zero(t, mm.objCounters[physical]) + require.Zero(t, mm.objCounters[logical]) + require.Empty(t, mm.cnrSize) }) + expectedSizes := make(map[string]int64) + for i := range oo { + cnr, _ := oo[i].ContainerID() + expectedSizes[cnr.EncodeToString()] += int64(oo[i].PayloadSize()) + } + t.Run("put", func(t *testing.T) { var prm shard.PutPrm @@ -93,8 +102,9 @@ func TestCounters(t *testing.T) { require.NoError(t, err) } - require.Equal(t, uint64(objNumber), mm.s[physical]) - require.Equal(t, uint64(objNumber), mm.s[logical]) + require.Equal(t, uint64(objNumber), mm.objCounters[physical]) + require.Equal(t, uint64(objNumber), mm.objCounters[logical]) + require.Equal(t, expectedSizes, mm.cnrSize) }) t.Run("inhume_GC", func(t *testing.T) { @@ -108,8 +118,9 @@ func TestCounters(t *testing.T) { require.NoError(t, err) } - require.Equal(t, uint64(objNumber), mm.s[physical]) - require.Equal(t, uint64(objNumber-inhumedNumber), mm.s[logical]) + require.Equal(t, uint64(objNumber), mm.objCounters[physical]) + require.Equal(t, uint64(objNumber-inhumedNumber), mm.objCounters[logical]) + require.Equal(t, expectedSizes, mm.cnrSize) oo = oo[inhumedNumber:] }) @@ -118,8 +129,8 @@ func TestCounters(t *testing.T) { var prm shard.InhumePrm ts := objectcore.AddressOf(generateObject(t)) - phy := mm.s[physical] - logic := mm.s[logical] + phy := mm.objCounters[physical] + logic := mm.objCounters[logical] inhumedNumber := int(phy / 4) prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...) @@ -127,8 +138,9 @@ func TestCounters(t *testing.T) { _, err := sh.Inhume(prm) require.NoError(t, err) - require.Equal(t, phy, mm.s[physical]) - require.Equal(t, logic-uint64(inhumedNumber), mm.s[logical]) + require.Equal(t, phy, mm.objCounters[physical]) + require.Equal(t, logic-uint64(inhumedNumber), mm.objCounters[logical]) + require.Equal(t, expectedSizes, mm.cnrSize) oo = oo[inhumedNumber:] }) @@ -136,8 +148,8 @@ func TestCounters(t *testing.T) { t.Run("Delete", func(t *testing.T) { var prm shard.DeletePrm - phy := mm.s[physical] - logic := mm.s[logical] + phy := mm.objCounters[physical] + logic := mm.objCounters[logical] deletedNumber := int(phy / 4) prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...) @@ -145,8 +157,13 @@ func TestCounters(t *testing.T) { _, err := sh.Delete(prm) require.NoError(t, err) - require.Equal(t, phy-uint64(deletedNumber), mm.s[physical]) - require.Equal(t, logic-uint64(deletedNumber), mm.s[logical]) + require.Equal(t, phy-uint64(deletedNumber), mm.objCounters[physical]) + require.Equal(t, logic-uint64(deletedNumber), mm.objCounters[logical]) + for i := range oo[:deletedNumber] { + cnr, _ := oo[i].ContainerID() + expectedSizes[cnr.EncodeToString()] -= int64(oo[i].PayloadSize()) + } + require.Equal(t, expectedSizes, mm.cnrSize) }) } @@ -163,11 +180,11 @@ func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) { } mm := &metricsStore{ - s: map[string]uint64{ - "phy": 0, - "logic": 0, - "readonly": 1, + objCounters: map[string]uint64{ + "phy": 0, + "logic": 0, }, + cnrSize: make(map[string]int64), } sh := shard.New( diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index 33eb3cd2..84ae3b00 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -79,6 +79,7 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) { } s.incObjectCounter() + s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize())) } return PutRes{}, nil diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 921a4221..912876b1 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -53,6 +53,9 @@ type MetricsWriter interface { // type. // Negative parameter must decrease the counter. AddToObjectCounter(objectType string, delta int) + // AddToContainerSize must add a value to the container size. + // Value can be negative. + AddToContainerSize(cnr string, value int64) // IncObjectCounter must increment shard's object counter taking into account // object type. IncObjectCounter(objectType string) @@ -323,7 +326,7 @@ const ( logical = "logic" ) -func (s *Shard) updateObjectCounter() { +func (s *Shard) updateMetrics() { if s.cfg.metricsWriter != nil && !s.GetMode().NoMetabase() { cc, err := s.metaBase.ObjectCounters() if err != nil { @@ -336,6 +339,23 @@ func (s *Shard) updateObjectCounter() { s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy()) s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic()) + + cnrList, err := s.metaBase.Containers() + if err != nil { + s.log.Warn("meta: can't read container list", zap.Error(err)) + return + } + + for i := range cnrList { + size, err := s.metaBase.ContainerSize(cnrList[i]) + if err != nil { + s.log.Warn("meta: can't read container size", + zap.String("cid", cnrList[i].EncodeToString()), + zap.Error(err)) + continue + } + s.metricsWriter.AddToContainerSize(cnrList[i].EncodeToString(), int64(size)) + } } } @@ -353,3 +373,9 @@ func (s *Shard) decObjectCounterBy(typ string, v uint64) { s.cfg.metricsWriter.AddToObjectCounter(typ, -int(v)) } } + +func (s *Shard) addToContainerSize(cnr string, size int64) { + if s.cfg.metricsWriter != nil { + s.cfg.metricsWriter.AddToContainerSize(cnr, size) + } +} diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go index 4f280e0f..7b1ce4c4 100644 --- a/pkg/metrics/engine.go +++ b/pkg/metrics/engine.go @@ -19,6 +19,7 @@ type ( rangeDuration prometheus.Counter searchDuration prometheus.Counter listObjectsDuration prometheus.Counter + containerSize prometheus.GaugeVec } ) @@ -102,6 +103,13 @@ func newEngineMetrics() engineMetrics { Name: "list_objects_duration", Help: "Accumulated duration of engine list objects operations", }) + + containerSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: engineSubsystem, + Name: "container_size", + Help: "Accumulated size of all objects in a container", + }, []string{containerIDLabelKey}) ) return engineMetrics{ @@ -116,6 +124,7 @@ func newEngineMetrics() engineMetrics { rangeDuration: rangeDuration, searchDuration: searchDuration, listObjectsDuration: listObjectsDuration, + containerSize: *containerSize, } } @@ -131,6 +140,7 @@ func (m engineMetrics) register() { prometheus.MustRegister(m.rangeDuration) prometheus.MustRegister(m.searchDuration) prometheus.MustRegister(m.listObjectsDuration) + prometheus.MustRegister(m.containerSize) } func (m engineMetrics) AddListContainersDuration(d time.Duration) { @@ -176,3 +186,7 @@ func (m engineMetrics) AddSearchDuration(d time.Duration) { func (m engineMetrics) AddListObjectsDuration(d time.Duration) { m.listObjectsDuration.Add(float64(d)) } + +func (m engineMetrics) AddToContainerSize(cnrID string, size int64) { + m.containerSize.With(prometheus.Labels{containerIDLabelKey: cnrID}).Add(float64(size)) +} diff --git a/pkg/metrics/object.go b/pkg/metrics/object.go index bb8f6c26..0a949758 100644 --- a/pkg/metrics/object.go +++ b/pkg/metrics/object.go @@ -43,6 +43,7 @@ type ( const ( shardIDLabelKey = "shard" counterTypeLabelKey = "type" + containerIDLabelKey = "cid" ) func newMethodCallCounter(name string) methodCount {