[#763] metrics: Add container_objects_total metric

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-11-02 13:50:52 +03:00
parent 9c98fa6152
commit 70ab1ebd54
12 changed files with 250 additions and 54 deletions

View file

@ -516,4 +516,5 @@ const (
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped" RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
FailedToCountWritecacheItems = "failed to count writecache items" FailedToCountWritecacheItems = "failed to count writecache items"
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza" AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
FailedToGetContainerCounters = "failed to get container counters values"
) )

View file

@ -21,6 +21,10 @@ type MetricRegister interface {
ClearErrorCounter(shardID string) ClearErrorCounter(shardID string)
DeleteShardMetrics(shardID string) DeleteShardMetrics(shardID string)
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncContainerObjectCounter(shardID, contID, objectType string)
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
WriteCache() metrics.WriteCacheMetrics WriteCache() metrics.WriteCacheMetrics
GC() metrics.GCMetrics GC() metrics.GCMetrics
} }

View file

@ -74,6 +74,18 @@ func (m *metricsWithID) DeleteShardMetrics() {
m.mw.DeleteShardMetrics(m.id) m.mw.DeleteShardMetrics(m.id)
} }
func (m *metricsWithID) SetContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mw.SetContainerObjectCounter(m.id, cnrID, objectType, value)
}
func (m *metricsWithID) IncContainerObjectsCount(cnrID string, objectType string) {
m.mw.IncContainerObjectCounter(m.id, cnrID, objectType)
}
func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value)
}
// AddShard adds a new shard to the storage engine. // AddShard adds a new shard to the storage engine.
// //
// Returns any error encountered that did not allow adding a shard. // Returns any error encountered that did not allow adding a shard.

View file

@ -31,6 +31,7 @@ type DeleteRes struct {
availableRemoved uint64 availableRemoved uint64
sizes []uint64 sizes []uint64
availableSizes []uint64 availableSizes []uint64
removedByCnrID map[cid.ID]ObjectCounters
} }
// AvailableObjectsRemoved returns the number of removed available // AvailableObjectsRemoved returns the number of removed available
@ -39,6 +40,11 @@ func (d DeleteRes) AvailableObjectsRemoved() uint64 {
return d.availableRemoved return d.availableRemoved
} }
// RemovedByCnrID returns the number of removed objects by container ID.
func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
return d.removedByCnrID
}
// RawObjectsRemoved returns the number of removed raw objects. // RawObjectsRemoved returns the number of removed raw objects.
func (d DeleteRes) RawObjectsRemoved() uint64 { func (d DeleteRes) RawObjectsRemoved() uint64 {
return d.rawRemoved return d.rawRemoved
@ -96,15 +102,11 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
return DeleteRes{}, ErrReadOnlyMode return DeleteRes{}, ErrReadOnlyMode
} }
var rawRemoved uint64
var availableRemoved uint64
var err error var err error
sizes := make([]uint64, len(prm.addrs)) var res DeleteRes
availableSizes := make([]uint64, len(prm.addrs))
err = db.boltDB.Update(func(tx *bbolt.Tx) error { err = db.boltDB.Update(func(tx *bbolt.Tx) error {
// We need to clear slice because tx can try to execute multiple times. res, err = db.deleteGroup(tx, prm.addrs)
rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs, sizes, availableSizes)
return err return err
}) })
if err == nil { if err == nil {
@ -115,91 +117,83 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
storagelog.OpField("metabase DELETE")) storagelog.OpField("metabase DELETE"))
} }
} }
return DeleteRes{ return res, metaerr.Wrap(err)
rawRemoved: rawRemoved,
availableRemoved: availableRemoved,
sizes: sizes,
availableSizes: availableSizes,
}, metaerr.Wrap(err)
} }
// deleteGroup deletes object from the metabase. Handles removal of the // deleteGroup deletes object from the metabase. Handles removal of the
// references of the split objects. // references of the split objects.
// The first return value is a physical objects removed number: physical func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
// objects that were stored. The second return value is a logical objects res := DeleteRes{
// removed number: objects that were available (without Tombstones, GCMarks sizes: make([]uint64, len(addrs)),
// non-expired, etc.) availableSizes: make([]uint64, len(addrs)),
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64, availableSizes []uint64) (uint64, uint64, error) { removedByCnrID: make(map[cid.ID]ObjectCounters),
}
refCounter := make(referenceCounter, len(addrs)) refCounter := make(referenceCounter, len(addrs))
currEpoch := db.epochState.CurrentEpoch() currEpoch := db.epochState.CurrentEpoch()
cnrIDDelta := make(map[cid.ID]ObjectCounters)
var rawDeletedTotal uint64
var availableDeletedTotal uint64
for i := range addrs { for i := range addrs {
removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch) removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch)
if err != nil { if err != nil {
return 0, 0, err // maybe log and continue? return DeleteRes{}, err // maybe log and continue?
} }
if removed { if removed {
if v, ok := cnrIDDelta[addrs[i].Container()]; ok { if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.phy++ v.phy++
cnrIDDelta[addrs[i].Container()] = v res.removedByCnrID[addrs[i].Container()] = v
} else { } else {
cnrIDDelta[addrs[i].Container()] = ObjectCounters{ res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
phy: 1, phy: 1,
} }
} }
rawDeletedTotal++ res.rawRemoved++
sizes[i] = size res.sizes[i] = size
} }
if available { if available {
if v, ok := cnrIDDelta[addrs[i].Container()]; ok { if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.logic++ v.logic++
cnrIDDelta[addrs[i].Container()] = v res.removedByCnrID[addrs[i].Container()] = v
} else { } else {
cnrIDDelta[addrs[i].Container()] = ObjectCounters{ res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
logic: 1, logic: 1,
} }
} }
availableDeletedTotal++ res.availableRemoved++
availableSizes[i] = size res.availableSizes[i] = size
} }
} }
if rawDeletedTotal > 0 { if res.rawRemoved > 0 {
err := db.updateShardObjectCounter(tx, phy, rawDeletedTotal, false) err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false)
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("could not decrease phy object counter: %w", err) return DeleteRes{}, fmt.Errorf("could not decrease phy object counter: %w", err)
} }
} }
if availableDeletedTotal > 0 { if res.availableRemoved > 0 {
err := db.updateShardObjectCounter(tx, logical, availableDeletedTotal, false) err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false)
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("could not decrease logical object counter: %w", err) return DeleteRes{}, fmt.Errorf("could not decrease logical object counter: %w", err)
} }
} }
if err := db.updateContainerCounter(tx, cnrIDDelta, false); err != nil { if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
return 0, 0, fmt.Errorf("could not decrease container object counter: %w", err) return DeleteRes{}, fmt.Errorf("could not decrease container object counter: %w", err)
} }
for _, refNum := range refCounter { for _, refNum := range refCounter {
if refNum.cur == refNum.all { if refNum.cur == refNum.all {
err := db.deleteObject(tx, refNum.obj, true) err := db.deleteObject(tx, refNum.obj, true)
if err != nil { if err != nil {
return rawDeletedTotal, availableDeletedTotal, err // maybe log and continue? return DeleteRes{}, err // maybe log and continue?
} }
} }
} }
return rawDeletedTotal, availableDeletedTotal, nil return res, nil
} }
// delete removes object indexes from the metabase. Counts the references // delete removes object indexes from the metabase. Counts the references

View file

@ -37,14 +37,21 @@ type DeletionInfo struct {
// InhumeRes encapsulates results of Inhume operation. // InhumeRes encapsulates results of Inhume operation.
type InhumeRes struct { type InhumeRes struct {
deletedLockObj []oid.Address deletedLockObj []oid.Address
availableImhumed uint64 availableInhumed uint64
inhumedByCnrID map[cid.ID]ObjectCounters
deletionDetails []DeletionInfo deletionDetails []DeletionInfo
} }
// AvailableInhumed return number of available object // AvailableInhumed return number of available object
// that have been inhumed. // that have been inhumed.
func (i InhumeRes) AvailableInhumed() uint64 { func (i InhumeRes) AvailableInhumed() uint64 {
return i.availableImhumed return i.availableInhumed
}
// InhumedByCnrID return number of object
// that have been inhumed by container ID.
func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters {
return i.inhumedByCnrID
} }
// DeletedLockObjects returns deleted object of LOCK // DeletedLockObjects returns deleted object of LOCK
@ -73,7 +80,15 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) {
Size: deletedSize, Size: deletedSize,
CID: containerID, CID: containerID,
}) })
i.availableImhumed++ i.availableInhumed++
if v, ok := i.inhumedByCnrID[containerID]; ok {
v.logic++
i.inhumedByCnrID[containerID] = v
} else {
i.inhumedByCnrID[containerID] = ObjectCounters{
logic: 1,
}
}
} }
// SetAddresses sets a list of object addresses that should be inhumed. // SetAddresses sets a list of object addresses that should be inhumed.
@ -123,7 +138,7 @@ var ErrLockObjectRemoval = logicerr.New("lock object removal")
// //
// NOTE: Marks any object with GC mark (despite any prohibitions on operations // NOTE: Marks any object with GC mark (despite any prohibitions on operations
// with that object) if WithForceGCMark option has been provided. // with that object) if WithForceGCMark option has been provided.
func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err error) { func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
var ( var (
startedAt = time.Now() startedAt = time.Now()
success = false success = false
@ -143,8 +158,11 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err err
return InhumeRes{}, ErrReadOnlyMode return InhumeRes{}, ErrReadOnlyMode
} }
res := InhumeRes{
inhumedByCnrID: make(map[cid.ID]ObjectCounters),
}
currEpoch := db.epochState.CurrentEpoch() currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.Update(func(tx *bbolt.Tx) error { err := db.boltDB.Update(func(tx *bbolt.Tx) error {
return db.inhumeTx(tx, currEpoch, prm, &res) return db.inhumeTx(tx, currEpoch, prm, &res)
}) })
success = err == nil success = err == nil

View file

@ -118,6 +118,7 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error
} }
s.decObjectCounterBy(physical, res.RawObjectsRemoved()) s.decObjectCounterBy(physical, res.RawObjectsRemoved())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
s.decContainerObjectCounter(res.RemovedByCnrID())
removedPayload := res.RemovedPhysicalObjectSizes()[0] removedPayload := res.RemovedPhysicalObjectSizes()[0]
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0] logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]
if logicalRemovedPayload > 0 { if logicalRemovedPayload > 0 {

View file

@ -416,6 +416,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular) s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0 i := 0
for i < res.GetDeletionInfoLength() { for i < res.GetDeletionInfoLength() {
@ -629,6 +630,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone) s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0 i := 0
for i < res.GetDeletionInfoLength() { for i < res.GetDeletionInfoLength() {
@ -675,6 +677,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock) s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0 i := 0
for i < res.GetDeletionInfoLength() { for i < res.GetDeletionInfoLength() {

View file

@ -122,6 +122,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
s.m.RUnlock() s.m.RUnlock()
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0 i := 0
for i < res.GetDeletionInfoLength() { for i < res.GetDeletionInfoLength() {

View file

@ -14,6 +14,7 @@ import (
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -23,6 +24,7 @@ type metricsStore struct {
mtx sync.Mutex mtx sync.Mutex
objCounters map[string]uint64 objCounters map[string]uint64
cnrSize map[string]int64 cnrSize map[string]int64
cnrCount map[string]uint64
pldSize int64 pldSize int64
mode mode.Mode mode mode.Mode
errCounter int64 errCounter int64
@ -126,6 +128,39 @@ func (m *metricsStore) DeleteShardMetrics() {
m.errCounter = 0 m.errCounter = 0
} }
func (m *metricsStore) SetContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.cnrCount[cnrID+objectType] = value
}
func (m *metricsStore) IncContainerObjectsCount(cnrID string, objectType string) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.cnrCount[cnrID+objectType]++
}
func (m *metricsStore) SubContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mtx.Lock()
defer m.mtx.Unlock()
existed := m.cnrCount[cnrID+objectType]
if existed < value {
panic("existed value smaller than value to sustract")
}
if existed == value {
delete(m.cnrCount, cnrID+objectType)
} else {
m.cnrCount[cnrID+objectType] -= value
}
}
func (m *metricsStore) getContainerCount(cnrID, objectType string) (uint64, bool) {
m.mtx.Lock()
defer m.mtx.Unlock()
v, ok := m.cnrCount[cnrID+objectType]
return v, ok
}
func TestCounters(t *testing.T) { func TestCounters(t *testing.T) {
t.Parallel() t.Parallel()
@ -143,21 +178,37 @@ func TestCounters(t *testing.T) {
oo[i] = testutil.GenerateObject() oo[i] = testutil.GenerateObject()
} }
cc := meta.ContainerCounters{Logical: make(map[cid.ID]uint64), Physical: make(map[cid.ID]uint64)}
t.Run("defaults", func(t *testing.T) { t.Run("defaults", func(t *testing.T) {
require.Zero(t, mm.getObjectCounter(physical)) require.Zero(t, mm.getObjectCounter(physical))
require.Zero(t, mm.getObjectCounter(logical)) require.Zero(t, mm.getObjectCounter(logical))
require.Empty(t, mm.containerSizes()) require.Empty(t, mm.containerSizes())
require.Zero(t, mm.payloadSize()) require.Zero(t, mm.payloadSize())
for _, obj := range oo {
contID, _ := obj.ContainerID()
v, ok := mm.getContainerCount(contID.EncodeToString(), physical)
require.Zero(t, v)
require.False(t, ok)
v, ok = mm.getContainerCount(contID.EncodeToString(), logical)
require.Zero(t, v)
require.False(t, ok)
}
}) })
var totalPayload int64 var totalPayload int64
expectedLogicalSizes := make(map[string]int64) expectedLogicalSizes := make(map[string]int64)
expectedLogCC := make(map[cid.ID]uint64)
expectedPhyCC := make(map[cid.ID]uint64)
for i := range oo { for i := range oo {
cnr, _ := oo[i].ContainerID() cnr, _ := oo[i].ContainerID()
oSize := int64(oo[i].PayloadSize()) oSize := int64(oo[i].PayloadSize())
expectedLogicalSizes[cnr.EncodeToString()] += oSize expectedLogicalSizes[cnr.EncodeToString()] += oSize
totalPayload += oSize totalPayload += oSize
expectedLogCC[cnr]++
expectedPhyCC[cnr]++
} }
var prm PutPrm var prm PutPrm
@ -174,6 +225,11 @@ func TestCounters(t *testing.T) {
require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize()) require.Equal(t, totalPayload, mm.payloadSize())
cc, err := sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical)
require.Equal(t, expectedPhyCC, cc.Physical)
t.Run("inhume_GC", func(t *testing.T) { t.Run("inhume_GC", func(t *testing.T) {
var prm InhumePrm var prm InhumePrm
inhumedNumber := objNumber / 4 inhumedNumber := objNumber / 4
@ -187,6 +243,11 @@ func TestCounters(t *testing.T) {
cid, ok := oo[i].ContainerID() cid, ok := oo[i].ContainerID()
require.True(t, ok) require.True(t, ok)
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize()) expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
expectedLogCC[cid]--
if expectedLogCC[cid] == 0 {
delete(expectedLogCC, cid)
}
} }
require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical)) require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical))
@ -194,6 +255,11 @@ func TestCounters(t *testing.T) {
require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize()) require.Equal(t, totalPayload, mm.payloadSize())
cc, err := sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical)
require.Equal(t, expectedPhyCC, cc.Physical)
oo = oo[inhumedNumber:] oo = oo[inhumedNumber:]
}) })
@ -214,6 +280,11 @@ func TestCounters(t *testing.T) {
cid, ok := oo[i].ContainerID() cid, ok := oo[i].ContainerID()
require.True(t, ok) require.True(t, ok)
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize()) expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
expectedLogCC[cid]--
if expectedLogCC[cid] == 0 {
delete(expectedLogCC, cid)
}
} }
require.Equal(t, phy, mm.getObjectCounter(physical)) require.Equal(t, phy, mm.getObjectCounter(physical))
@ -221,6 +292,11 @@ func TestCounters(t *testing.T) {
require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize()) require.Equal(t, totalPayload, mm.payloadSize())
cc, err = sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical)
require.Equal(t, expectedPhyCC, cc.Physical)
oo = oo[inhumedNumber:] oo = oo[inhumedNumber:]
}) })
@ -245,9 +321,24 @@ func TestCounters(t *testing.T) {
cnr, _ := oo[i].ContainerID() cnr, _ := oo[i].ContainerID()
expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload) expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload)
expectedLogCC[cnr]--
if expectedLogCC[cnr] == 0 {
delete(expectedLogCC, cnr)
}
expectedPhyCC[cnr]--
if expectedPhyCC[cnr] == 0 {
delete(expectedPhyCC, cnr)
}
} }
require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize()) require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize())
cc, err = sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical)
require.Equal(t, expectedPhyCC, cc.Physical)
}) })
} }
@ -269,6 +360,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
"logic": 0, "logic": 0,
}, },
cnrSize: make(map[string]int64), cnrSize: make(map[string]int64),
cnrCount: make(map[string]uint64),
} }
sh := New( sh := New(

View file

@ -90,7 +90,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err) return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
} }
s.incObjectCounter() s.incObjectCounter(putPrm.Address.Container())
s.addToPayloadSize(int64(prm.obj.PayloadSize())) s.addToPayloadSize(int64(prm.obj.PayloadSize()))
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize())) s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
} }

View file

@ -18,6 +18,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -85,6 +86,12 @@ type MetricsWriter interface {
ClearErrorCounter() ClearErrorCounter()
// DeleteShardMetrics deletes shard metrics from registry. // DeleteShardMetrics deletes shard metrics from registry.
DeleteShardMetrics() DeleteShardMetrics()
// SetContainerObjectsCount sets container object count.
SetContainerObjectsCount(cnrID string, objectType string, value uint64)
// IncContainerObjectsCount increments container object count.
IncContainerObjectsCount(cnrID string, objectType string)
// SubContainerObjectsCount subtracts container object count.
SubContainerObjectsCount(cnrID string, objectType string, value uint64)
} }
type cfg struct { type cfg struct {
@ -425,15 +432,29 @@ func (s *Shard) updateMetrics(ctx context.Context) {
} }
s.metricsWriter.AddToPayloadSize(int64(totalPayload)) s.metricsWriter.AddToPayloadSize(int64(totalPayload))
contCount, err := s.metaBase.ContainerCounters(ctx)
if err != nil {
s.log.Warn(logs.FailedToGetContainerCounters, zap.Error(err))
return
}
for contID, count := range contCount.Physical {
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), physical, count)
}
for contID, count := range contCount.Logical {
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), logical, count)
}
} }
} }
// incObjectCounter increment both physical and logical object // incObjectCounter increment both physical and logical object
// counters. // counters.
func (s *Shard) incObjectCounter() { func (s *Shard) incObjectCounter(cnrID cid.ID) {
if s.cfg.metricsWriter != nil { if s.cfg.metricsWriter != nil {
s.cfg.metricsWriter.IncObjectCounter(physical) s.cfg.metricsWriter.IncObjectCounter(physical)
s.cfg.metricsWriter.IncObjectCounter(logical) s.cfg.metricsWriter.IncObjectCounter(logical)
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical)
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical)
} }
} }
@ -443,6 +464,17 @@ func (s *Shard) decObjectCounterBy(typ string, v uint64) {
} }
} }
func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]meta.ObjectCounters) {
if s.cfg.metricsWriter == nil {
return
}
for cnrID, count := range byCnr {
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy())
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic())
}
}
func (s *Shard) addToContainerSize(cnr string, size int64) { func (s *Shard) addToContainerSize(cnr string, size int64) {
if s.cfg.metricsWriter != nil { if s.cfg.metricsWriter != nil {
s.cfg.metricsWriter.AddToContainerSize(cnr, size) s.cfg.metricsWriter.AddToContainerSize(cnr, size)

View file

@ -18,6 +18,9 @@ type EngineMetrics interface {
SetObjectCounter(shardID, objectType string, v uint64) SetObjectCounter(shardID, objectType string, v uint64)
AddToPayloadCounter(shardID string, size int64) AddToPayloadCounter(shardID string, size int64)
SetMode(shardID string, mode mode.Mode) SetMode(shardID string, mode mode.Mode)
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncContainerObjectCounter(shardID, contID, objectType string)
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
WriteCache() WriteCacheMetrics WriteCache() WriteCacheMetrics
GC() GCMetrics GC() GCMetrics
@ -30,6 +33,7 @@ type engineMetrics struct {
payloadSize *prometheus.GaugeVec payloadSize *prometheus.GaugeVec
errorCounter *prometheus.GaugeVec errorCounter *prometheus.GaugeVec
mode *shardIDModeValue mode *shardIDModeValue
contObjCounter *prometheus.GaugeVec
gc *gcMetrics gc *gcMetrics
writeCache *writeCacheMetrics writeCache *writeCacheMetrics
@ -46,10 +50,13 @@ func newEngineMetrics() *engineMetrics {
Name: "request_duration_seconds", Name: "request_duration_seconds",
Help: "Duration of Engine requests", Help: "Duration of Engine requests",
}, []string{methodLabel}), }, []string{methodLabel}),
objectCounter: newEngineGaugeVector("objects_total", "Objects counters per shards", []string{shardIDLabel, typeLabel}), objectCounter: newEngineGaugeVector("objects_total",
"Objects counters per shards. DEPRECATED: Will be deleted in next releasese, use frostfs_node_engine_container_objects_total metric.",
[]string{shardIDLabel, typeLabel}),
gc: newGCMetrics(), gc: newGCMetrics(),
writeCache: newWriteCacheMetrics(), writeCache: newWriteCacheMetrics(),
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"), mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}),
} }
} }
@ -88,6 +95,7 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
m.errorCounter.Delete(prometheus.Labels{shardIDLabel: shardID}) m.errorCounter.Delete(prometheus.Labels{shardIDLabel: shardID})
m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID}) m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID})
m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.mode.Delete(shardID) m.mode.Delete(shardID)
} }
@ -109,6 +117,36 @@ func (m *engineMetrics) SetObjectCounter(shardID, objectType string, v uint64) {
).Set(float64(v)) ).Set(float64(v))
} }
func (m *engineMetrics) SetContainerObjectCounter(shardID, contID, objectType string, v uint64) {
m.contObjCounter.With(
prometheus.Labels{
shardIDLabel: shardID,
containerIDLabelKey: contID,
typeLabel: objectType,
},
).Set(float64(v))
}
func (m *engineMetrics) IncContainerObjectCounter(shardID, contID, objectType string) {
m.contObjCounter.With(
prometheus.Labels{
shardIDLabel: shardID,
containerIDLabelKey: contID,
typeLabel: objectType,
},
).Inc()
}
func (m *engineMetrics) SubContainerObjectCounter(shardID, contID, objectType string, v uint64) {
m.contObjCounter.With(
prometheus.Labels{
shardIDLabel: shardID,
containerIDLabelKey: contID,
typeLabel: objectType,
},
).Sub(float64(v))
}
func (m *engineMetrics) SetMode(shardID string, mode mode.Mode) { func (m *engineMetrics) SetMode(shardID string, mode mode.Mode) {
m.mode.SetMode(shardID, mode.String()) m.mode.SetMode(shardID, mode.String())
} }