[#2116] metrics: Track physical object capacity in the container

Currently we track based on `PayloadSize`, because it is already stored
in the metabase and it is easier to calculate without slowing down the
whole system.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
fyrchik/simplify-services
Evgenii Stratonikov 2022-12-01 14:59:22 +03:00 committed by fyrchik
parent d65a95a2c6
commit 9513f163aa
11 changed files with 124 additions and 45 deletions

View File

@ -6,6 +6,7 @@ Changelog for FrostFS Node
### Added ### Added
- Separate batching for replicated operations over the same container in pilorama (#1621) - Separate batching for replicated operations over the same container in pilorama (#1621)
- Doc for extended headers (#2128) - Doc for extended headers (#2128)
- New `frostfs_node_object_container_size` metric for tracking size of reqular objects in a container (#2116)
### Changed ### Changed
- `common.PrintVerbose` prints via `cobra.Command.Printf` (#1962) - `common.PrintVerbose` prints via `cobra.Command.Printf` (#1962)

View File

@ -21,6 +21,8 @@ type MetricRegister interface {
AddToObjectCounter(shardID, objectType string, delta int) AddToObjectCounter(shardID, objectType string, delta int)
SetReadonly(shardID string, readonly bool) SetReadonly(shardID string, readonly bool)
AddToContainerSize(cnrID string, size int64)
} }
func elapsed(addFunc func(d time.Duration)) func() { func elapsed(addFunc func(d time.Duration)) func() {

View File

@ -49,6 +49,10 @@ func (m *metricsWithID) SetReadonly(readonly bool) {
m.mw.SetReadonly(m.id, readonly) m.mw.SetReadonly(m.id, readonly)
} }
func (m *metricsWithID) AddToContainerSize(cnr string, size int64) {
m.mw.AddToContainerSize(cnr, size)
}
// AddShard adds a new shard to the storage engine. // AddShard adds a new shard to the storage engine.
// //
// Returns any error encountered that did not allow adding a shard. // Returns any error encountered that did not allow adding a shard.

View File

@ -22,6 +22,7 @@ type DeletePrm struct {
type DeleteRes struct { type DeleteRes struct {
rawRemoved uint64 rawRemoved uint64
availableRemoved uint64 availableRemoved uint64
sizes []uint64
} }
// AvailableObjectsRemoved returns the number of removed available // AvailableObjectsRemoved returns the number of removed available
@ -35,6 +36,11 @@ func (d DeleteRes) RawObjectsRemoved() uint64 {
return d.rawRemoved return d.rawRemoved
} }
// RemovedObjectSizes returns the sizes of removed objects.
func (d DeleteRes) RemovedObjectSizes() []uint64 {
return d.sizes
}
// SetAddresses is a Delete option to set the addresses of the objects to delete. // SetAddresses is a Delete option to set the addresses of the objects to delete.
// //
// Option is required. // Option is required.
@ -66,9 +72,11 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
var rawRemoved uint64 var rawRemoved uint64
var availableRemoved uint64 var availableRemoved uint64
var err error var err error
var sizes = make([]uint64, len(prm.addrs))
err = db.boltDB.Update(func(tx *bbolt.Tx) error { err = db.boltDB.Update(func(tx *bbolt.Tx) error {
rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs) // We need to clear slice because tx can try to execute multiple times.
rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs, sizes)
return err return err
}) })
if err == nil { if err == nil {
@ -81,6 +89,7 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
return DeleteRes{ return DeleteRes{
rawRemoved: rawRemoved, rawRemoved: rawRemoved,
availableRemoved: availableRemoved, availableRemoved: availableRemoved,
sizes: sizes,
}, err }, err
} }
@ -90,7 +99,7 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
// objects that were stored. The second return value is a logical objects // objects that were stored. The second return value is a logical objects
// removed number: objects that were available (without Tombstones, GCMarks // removed number: objects that were available (without Tombstones, GCMarks
// non-expired, etc.) // non-expired, etc.)
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, error) { func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (uint64, uint64, error) {
refCounter := make(referenceCounter, len(addrs)) refCounter := make(referenceCounter, len(addrs))
currEpoch := db.epochState.CurrentEpoch() currEpoch := db.epochState.CurrentEpoch()
@ -98,13 +107,14 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, er
var availableDeleted uint64 var availableDeleted uint64
for i := range addrs { for i := range addrs {
removed, available, err := db.delete(tx, addrs[i], refCounter, currEpoch) removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch)
if err != nil { if err != nil {
return 0, 0, err // maybe log and continue? return 0, 0, err // maybe log and continue?
} }
if removed { if removed {
rawDeleted++ rawDeleted++
sizes[i] = size
} }
if available { if available {
@ -143,8 +153,8 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, er
// The first return value indicates if an object has been removed. (removing a // The first return value indicates if an object has been removed. (removing a
// non-exist object is error-free). The second return value indicates if an // non-exist object is error-free). The second return value indicates if an
// object was available before the removal (for calculating the logical object // object was available before the removal (for calculating the logical object
// counter). // counter). The third return value is removed object payload size.
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, error) { func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, uint64, error) {
key := make([]byte, addressKeySize) key := make([]byte, addressKeySize)
addrKey := addressKey(addr, key) addrKey := addressKey(addr, key)
garbageBKT := tx.Bucket(garbageBucketName) garbageBKT := tx.Bucket(garbageBucketName)
@ -156,7 +166,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
if garbageBKT != nil { if garbageBKT != nil {
err := garbageBKT.Delete(addrKey) err := garbageBKT.Delete(addrKey)
if err != nil { if err != nil {
return false, false, fmt.Errorf("could not remove from garbage bucket: %w", err) return false, false, 0, fmt.Errorf("could not remove from garbage bucket: %w", err)
} }
} }
@ -167,10 +177,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
var notFoundErr apistatus.ObjectNotFound var notFoundErr apistatus.ObjectNotFound
if errors.As(err, &notFoundErr) || errors.As(err, &siErr) { if errors.As(err, &notFoundErr) || errors.As(err, &siErr) {
return false, false, nil return false, false, 0, nil
} }
return false, false, err return false, false, 0, err
} }
// if object is an only link to a parent, then remove parent // if object is an only link to a parent, then remove parent
@ -196,10 +206,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
// remove object // remove object
err = db.deleteObject(tx, obj, false) err = db.deleteObject(tx, obj, false)
if err != nil { if err != nil {
return false, false, fmt.Errorf("could not remove object: %w", err) return false, false, 0, fmt.Errorf("could not remove object: %w", err)
} }
return true, removeAvailableObject, nil return true, removeAvailableObject, obj.PayloadSize(), nil
} }
func (db *DB) deleteObject( func (db *DB) deleteObject(

View File

@ -132,7 +132,7 @@ func (s *Shard) Init() error {
} }
} }
s.updateObjectCounter() s.updateMetrics()
s.gc = &gc{ s.gc = &gc{
gcCfg: &s.gcCfg, gcCfg: &s.gcCfg,

View File

@ -80,6 +80,9 @@ func (s *Shard) delete(prm DeletePrm) (DeleteRes, error) {
s.decObjectCounterBy(physical, res.RawObjectsRemoved()) s.decObjectCounterBy(physical, res.RawObjectsRemoved())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
for i := range prm.addr {
s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(res.RemovedObjectSizes()[i]))
}
for i := range prm.addr { for i := range prm.addr {
var delPrm common.DeletePrm var delPrm common.DeletePrm

View File

@ -17,26 +17,28 @@ import (
) )
type metricsStore struct { type metricsStore struct {
s map[string]uint64 objCounters map[string]uint64
cnrSize map[string]int64
readOnly bool
} }
func (m metricsStore) SetShardID(_ string) {} func (m metricsStore) SetShardID(_ string) {}
func (m metricsStore) SetObjectCounter(objectType string, v uint64) { func (m metricsStore) SetObjectCounter(objectType string, v uint64) {
m.s[objectType] = v m.objCounters[objectType] = v
} }
func (m metricsStore) AddToObjectCounter(objectType string, delta int) { func (m metricsStore) AddToObjectCounter(objectType string, delta int) {
switch { switch {
case delta > 0: case delta > 0:
m.s[objectType] += uint64(delta) m.objCounters[objectType] += uint64(delta)
case delta < 0: case delta < 0:
uDelta := uint64(-delta) uDelta := uint64(-delta)
if m.s[objectType] >= uDelta { if m.objCounters[objectType] >= uDelta {
m.s[objectType] -= uDelta m.objCounters[objectType] -= uDelta
} else { } else {
m.s[objectType] = 0 m.objCounters[objectType] = 0
} }
case delta == 0: case delta == 0:
return return
@ -44,19 +46,19 @@ func (m metricsStore) AddToObjectCounter(objectType string, delta int) {
} }
func (m metricsStore) IncObjectCounter(objectType string) { func (m metricsStore) IncObjectCounter(objectType string) {
m.s[objectType] += 1 m.objCounters[objectType] += 1
} }
func (m metricsStore) DecObjectCounter(objectType string) { func (m metricsStore) DecObjectCounter(objectType string) {
m.AddToObjectCounter(objectType, -1) m.AddToObjectCounter(objectType, -1)
} }
func (m metricsStore) SetReadonly(r bool) { func (m *metricsStore) SetReadonly(r bool) {
if r { m.readOnly = r
m.s[readonly] = 1 }
} else {
m.s[readonly] = 0 func (m metricsStore) AddToContainerSize(cnr string, size int64) {
} m.cnrSize[cnr] += size
} }
const physical = "phy" const physical = "phy"
@ -68,9 +70,9 @@ func TestCounters(t *testing.T) {
sh, mm := shardWithMetrics(t, dir) sh, mm := shardWithMetrics(t, dir)
sh.SetMode(mode.ReadOnly) sh.SetMode(mode.ReadOnly)
require.Equal(t, mm.s[readonly], uint64(1)) require.True(t, mm.readOnly)
sh.SetMode(mode.ReadWrite) sh.SetMode(mode.ReadWrite)
require.Equal(t, mm.s[readonly], uint64(0)) require.False(t, mm.readOnly)
const objNumber = 10 const objNumber = 10
oo := make([]*object.Object, objNumber) oo := make([]*object.Object, objNumber)
@ -79,10 +81,17 @@ func TestCounters(t *testing.T) {
} }
t.Run("defaults", func(t *testing.T) { t.Run("defaults", func(t *testing.T) {
require.Zero(t, mm.s[physical]) require.Zero(t, mm.objCounters[physical])
require.Zero(t, mm.s[logical]) require.Zero(t, mm.objCounters[logical])
require.Empty(t, mm.cnrSize)
}) })
expectedSizes := make(map[string]int64)
for i := range oo {
cnr, _ := oo[i].ContainerID()
expectedSizes[cnr.EncodeToString()] += int64(oo[i].PayloadSize())
}
t.Run("put", func(t *testing.T) { t.Run("put", func(t *testing.T) {
var prm shard.PutPrm var prm shard.PutPrm
@ -93,8 +102,9 @@ func TestCounters(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
require.Equal(t, uint64(objNumber), mm.s[physical]) require.Equal(t, uint64(objNumber), mm.objCounters[physical])
require.Equal(t, uint64(objNumber), mm.s[logical]) require.Equal(t, uint64(objNumber), mm.objCounters[logical])
require.Equal(t, expectedSizes, mm.cnrSize)
}) })
t.Run("inhume_GC", func(t *testing.T) { t.Run("inhume_GC", func(t *testing.T) {
@ -108,8 +118,9 @@ func TestCounters(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
require.Equal(t, uint64(objNumber), mm.s[physical]) require.Equal(t, uint64(objNumber), mm.objCounters[physical])
require.Equal(t, uint64(objNumber-inhumedNumber), mm.s[logical]) require.Equal(t, uint64(objNumber-inhumedNumber), mm.objCounters[logical])
require.Equal(t, expectedSizes, mm.cnrSize)
oo = oo[inhumedNumber:] oo = oo[inhumedNumber:]
}) })
@ -118,8 +129,8 @@ func TestCounters(t *testing.T) {
var prm shard.InhumePrm var prm shard.InhumePrm
ts := objectcore.AddressOf(generateObject(t)) ts := objectcore.AddressOf(generateObject(t))
phy := mm.s[physical] phy := mm.objCounters[physical]
logic := mm.s[logical] logic := mm.objCounters[logical]
inhumedNumber := int(phy / 4) inhumedNumber := int(phy / 4)
prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...) prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...)
@ -127,8 +138,9 @@ func TestCounters(t *testing.T) {
_, err := sh.Inhume(prm) _, err := sh.Inhume(prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, phy, mm.s[physical]) require.Equal(t, phy, mm.objCounters[physical])
require.Equal(t, logic-uint64(inhumedNumber), mm.s[logical]) require.Equal(t, logic-uint64(inhumedNumber), mm.objCounters[logical])
require.Equal(t, expectedSizes, mm.cnrSize)
oo = oo[inhumedNumber:] oo = oo[inhumedNumber:]
}) })
@ -136,8 +148,8 @@ func TestCounters(t *testing.T) {
t.Run("Delete", func(t *testing.T) { t.Run("Delete", func(t *testing.T) {
var prm shard.DeletePrm var prm shard.DeletePrm
phy := mm.s[physical] phy := mm.objCounters[physical]
logic := mm.s[logical] logic := mm.objCounters[logical]
deletedNumber := int(phy / 4) deletedNumber := int(phy / 4)
prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...) prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...)
@ -145,8 +157,13 @@ func TestCounters(t *testing.T) {
_, err := sh.Delete(prm) _, err := sh.Delete(prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, phy-uint64(deletedNumber), mm.s[physical]) require.Equal(t, phy-uint64(deletedNumber), mm.objCounters[physical])
require.Equal(t, logic-uint64(deletedNumber), mm.s[logical]) require.Equal(t, logic-uint64(deletedNumber), mm.objCounters[logical])
for i := range oo[:deletedNumber] {
cnr, _ := oo[i].ContainerID()
expectedSizes[cnr.EncodeToString()] -= int64(oo[i].PayloadSize())
}
require.Equal(t, expectedSizes, mm.cnrSize)
}) })
} }
@ -163,11 +180,11 @@ func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) {
} }
mm := &metricsStore{ mm := &metricsStore{
s: map[string]uint64{ objCounters: map[string]uint64{
"phy": 0, "phy": 0,
"logic": 0, "logic": 0,
"readonly": 1,
}, },
cnrSize: make(map[string]int64),
} }
sh := shard.New( sh := shard.New(

View File

@ -79,6 +79,7 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
} }
s.incObjectCounter() s.incObjectCounter()
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
} }
return PutRes{}, nil return PutRes{}, nil

View File

@ -53,6 +53,9 @@ type MetricsWriter interface {
// type. // type.
// Negative parameter must decrease the counter. // Negative parameter must decrease the counter.
AddToObjectCounter(objectType string, delta int) AddToObjectCounter(objectType string, delta int)
// AddToContainerSize must add a value to the container size.
// Value can be negative.
AddToContainerSize(cnr string, value int64)
// IncObjectCounter must increment shard's object counter taking into account // IncObjectCounter must increment shard's object counter taking into account
// object type. // object type.
IncObjectCounter(objectType string) IncObjectCounter(objectType string)
@ -323,7 +326,7 @@ const (
logical = "logic" logical = "logic"
) )
func (s *Shard) updateObjectCounter() { func (s *Shard) updateMetrics() {
if s.cfg.metricsWriter != nil && !s.GetMode().NoMetabase() { if s.cfg.metricsWriter != nil && !s.GetMode().NoMetabase() {
cc, err := s.metaBase.ObjectCounters() cc, err := s.metaBase.ObjectCounters()
if err != nil { if err != nil {
@ -336,6 +339,23 @@ func (s *Shard) updateObjectCounter() {
s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy()) s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy())
s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic()) s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic())
cnrList, err := s.metaBase.Containers()
if err != nil {
s.log.Warn("meta: can't read container list", zap.Error(err))
return
}
for i := range cnrList {
size, err := s.metaBase.ContainerSize(cnrList[i])
if err != nil {
s.log.Warn("meta: can't read container size",
zap.String("cid", cnrList[i].EncodeToString()),
zap.Error(err))
continue
}
s.metricsWriter.AddToContainerSize(cnrList[i].EncodeToString(), int64(size))
}
} }
} }
@ -353,3 +373,9 @@ func (s *Shard) decObjectCounterBy(typ string, v uint64) {
s.cfg.metricsWriter.AddToObjectCounter(typ, -int(v)) s.cfg.metricsWriter.AddToObjectCounter(typ, -int(v))
} }
} }
func (s *Shard) addToContainerSize(cnr string, size int64) {
if s.cfg.metricsWriter != nil {
s.cfg.metricsWriter.AddToContainerSize(cnr, size)
}
}

View File

@ -19,6 +19,7 @@ type (
rangeDuration prometheus.Counter rangeDuration prometheus.Counter
searchDuration prometheus.Counter searchDuration prometheus.Counter
listObjectsDuration prometheus.Counter listObjectsDuration prometheus.Counter
containerSize prometheus.GaugeVec
} }
) )
@ -102,6 +103,13 @@ func newEngineMetrics() engineMetrics {
Name: "list_objects_duration", Name: "list_objects_duration",
Help: "Accumulated duration of engine list objects operations", Help: "Accumulated duration of engine list objects operations",
}) })
containerSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: engineSubsystem,
Name: "container_size",
Help: "Accumulated size of all objects in a container",
}, []string{containerIDLabelKey})
) )
return engineMetrics{ return engineMetrics{
@ -116,6 +124,7 @@ func newEngineMetrics() engineMetrics {
rangeDuration: rangeDuration, rangeDuration: rangeDuration,
searchDuration: searchDuration, searchDuration: searchDuration,
listObjectsDuration: listObjectsDuration, listObjectsDuration: listObjectsDuration,
containerSize: *containerSize,
} }
} }
@ -131,6 +140,7 @@ func (m engineMetrics) register() {
prometheus.MustRegister(m.rangeDuration) prometheus.MustRegister(m.rangeDuration)
prometheus.MustRegister(m.searchDuration) prometheus.MustRegister(m.searchDuration)
prometheus.MustRegister(m.listObjectsDuration) prometheus.MustRegister(m.listObjectsDuration)
prometheus.MustRegister(m.containerSize)
} }
func (m engineMetrics) AddListContainersDuration(d time.Duration) { func (m engineMetrics) AddListContainersDuration(d time.Duration) {
@ -176,3 +186,7 @@ func (m engineMetrics) AddSearchDuration(d time.Duration) {
func (m engineMetrics) AddListObjectsDuration(d time.Duration) { func (m engineMetrics) AddListObjectsDuration(d time.Duration) {
m.listObjectsDuration.Add(float64(d)) m.listObjectsDuration.Add(float64(d))
} }
func (m engineMetrics) AddToContainerSize(cnrID string, size int64) {
m.containerSize.With(prometheus.Labels{containerIDLabelKey: cnrID}).Add(float64(size))
}

View File

@ -43,6 +43,7 @@ type (
const ( const (
shardIDLabelKey = "shard" shardIDLabelKey = "shard"
counterTypeLabelKey = "type" counterTypeLabelKey = "type"
containerIDLabelKey = "cid"
) )
func newMethodCallCounter(name string) methodCount { func newMethodCallCounter(name string) methodCount {