Add container objects counter #778

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:feat/container_objects_total_metric into master 2024-09-04 19:51:04 +00:00
17 changed files with 765 additions and 83 deletions

View file

@ -516,4 +516,5 @@ const (
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
FailedToCountWritecacheItems = "failed to count writecache items"
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
FailedToGetContainerCounters = "failed to get container counters values"
)

View file

@ -21,6 +21,10 @@ type MetricRegister interface {
ClearErrorCounter(shardID string)
DeleteShardMetrics(shardID string)
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncContainerObjectCounter(shardID, contID, objectType string)
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
WriteCache() metrics.WriteCacheMetrics
GC() metrics.GCMetrics
}

View file

@ -74,6 +74,18 @@ func (m *metricsWithID) DeleteShardMetrics() {
m.mw.DeleteShardMetrics(m.id)
}
func (m *metricsWithID) SetContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mw.SetContainerObjectCounter(m.id, cnrID, objectType, value)
}
func (m *metricsWithID) IncContainerObjectsCount(cnrID string, objectType string) {
m.mw.IncContainerObjectCounter(m.id, cnrID, objectType)
}
func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value)
}
// AddShard adds a new shard to the storage engine.
//
// Returns any error encountered that did not allow adding a shard.

View file

@ -104,6 +104,7 @@ func (db *DB) init(reset bool) error {
mStaticBuckets := map[string]struct{}{
string(containerVolumeBucketName): {},
string(containerCounterBucketName): {},
string(graveyardBucketName): {},
string(toMoveItBucketName): {},
string(garbageBucketName): {},
@ -135,7 +136,7 @@ func (db *DB) init(reset bool) error {
}
}
if !reset {
if !reset { // counters will be recalculated by refill metabase
err = syncCounter(tx, false)
if err != nil {
return fmt.Errorf("could not sync object counter: %w", err)

View file

@ -1,10 +1,15 @@
package meta
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
@ -73,9 +78,128 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
return cc, metaerr.Wrap(err)
}
// updateCounter updates the object counter. Tx MUST be writable.
// If inc == `true`, increases the counter, decreases otherwise.
func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
type ContainerCounters struct {
Logical map[cid.ID]uint64
Physical map[cid.ID]uint64
}
// ContainerCounters returns object counters for each container
// that metabase has tracked since it was opened and initialized.
//
// Returns only the errors that do not allow reading counter
// in Bolt database.
//
// It is guaranteed that the ContainerCounters fields are not nil.
func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("ContainerCounters", time.Since(startedAt), success)
}()
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerCounters")
defer span.End()
cc := ContainerCounters{
Logical: make(map[cid.ID]uint64),
Physical: make(map[cid.ID]uint64),
}
lastKey := make([]byte, cidSize)
// there is no limit for containers count, so use batching with cancellation
for {
select {
case <-ctx.Done():
return cc, ctx.Err()
default:
}
completed, err := db.containerCountersNextBatch(lastKey, &cc)
if err != nil {
return cc, err
}
if completed {
break
}
}
success = true
return cc, nil
}
func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) (bool, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return false, ErrDegradedMode
}
counter := 0
const batchSize = 1000
err := db.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerCounterBucketName)
if b == nil {
return ErrInterruptIterator
}
c := b.Cursor()
var key, value []byte
for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() {
if bytes.Equal(lastKey, key) {
continue
}
copy(lastKey, key)
cnrID, err := parseContainerCounterKey(key)
if err != nil {
return err
}
phy, logic, err := parseContainerCounterValue(value)
if err != nil {
return err
}
if phy > 0 {
cc.Physical[cnrID] = phy
}
if logic > 0 {
cc.Logical[cnrID] = logic
}
counter++
if counter == batchSize {
break
}
}
if counter < batchSize { // last batch
return ErrInterruptIterator
}
return nil
})
if err != nil {
if errors.Is(err, ErrInterruptIterator) {
return true, nil
}
return false, metaerr.Wrap(err)
}
return false, nil
}
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID) error {
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
return fmt.Errorf("could not increase phy object counter: %w", err)
}
if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil {
return fmt.Errorf("could not increase logical object counter: %w", err)
}
return db.incContainerObjectCounter(tx, cnrID)
}
func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
b := tx.Bucket(shardInfoBucket)
if b == nil {
return nil
@ -112,6 +236,63 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool
return b.Put(counterKey, newCounter)
}
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error {
b := tx.Bucket(containerCounterBucketName)
if b == nil {
return nil
}
key := make([]byte, cidSize)
for cnrID, cnrDelta := range delta {
cnrID.Encode(key)
if err := db.editContainerCounterValue(b, key, cnrDelta, inc); err != nil {
return err
}
}
return nil
}
fyrchik marked this conversation as resolved Outdated

It is always the same size, we can pre-allocate here.

It is always the same size, we can pre-allocate here.

done

done
func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error {
var phyValue, logicValue uint64
var err error
data := b.Get(key)
if len(data) > 0 {
phyValue, logicValue, err = parseContainerCounterValue(data)
if err != nil {
return err
}
}
phyValue = nextValue(phyValue, delta.phy, inc)
logicValue = nextValue(logicValue, delta.logic, inc)
if phyValue > 0 || logicValue > 0 {
value := containerCounterValue(phyValue, logicValue)
return b.Put(key, value)
}
return b.Delete(key)
}
func nextValue(existed, delta uint64, inc bool) uint64 {
if inc {
existed += delta
} else if existed <= delta {
existed = 0
} else {
existed -= delta
}
return existed
}
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error {
b := tx.Bucket(containerCounterBucketName)
if b == nil {
return nil
}
key := make([]byte, cidSize)
cnrID.Encode(key)
return db.editContainerCounterValue(b, key, ObjectCounters{logic: 1, phy: 1}, true)
}
// syncCounter updates object counters according to metabase state:
// it counts all the physically/logically stored objects using internal
// indexes. Tx MUST be writable.
@ -119,26 +300,38 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool
// Does nothing if counters are not empty and force is false. If force is
// true, updates the counters anyway.
func syncCounter(tx *bbolt.Tx, force bool) error {
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
shardInfoB, err := tx.CreateBucketIfNotExists(shardInfoBucket)
if err != nil {
return fmt.Errorf("could not get shard info bucket: %w", err)
}
if !force && len(b.Get(objectPhyCounterKey)) == 8 && len(b.Get(objectLogicCounterKey)) == 8 {
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 && len(shardInfoB.Get(objectLogicCounterKey)) == 8
containerCounterInitialized := tx.Bucket(containerCounterBucketName) != nil
if !force && shardObjectCounterInitialized && containerCounterInitialized {
// the counters are already inited
return nil
}
containerCounterB, err := tx.CreateBucketIfNotExists(containerCounterBucketName)
if err != nil {
return fmt.Errorf("could not get container counter bucket: %w", err)
}
var addr oid.Address
var phyCounter uint64
var logicCounter uint64
counters := make(map[cid.ID]ObjectCounters)
graveyardBKT := tx.Bucket(graveyardBucketName)
garbageBKT := tx.Bucket(garbageBucketName)
key := make([]byte, addressKeySize)
err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error {
phyCounter++
if v, ok := counters[cnr]; ok {
v.phy++
counters[cnr] = v
} else {
counters[cnr] = ObjectCounters{
phy: 1,
}
}
addr.SetContainer(cnr)
addr.SetObject(obj)
@ -146,7 +339,14 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
// check if an object is available: not with GCMark
// and not covered with a tombstone
if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 {
logicCounter++
if v, ok := counters[cnr]; ok {
v.logic++
counters[cnr] = v
} else {
counters[cnr] = ObjectCounters{
logic: 1,
}
}
}
return nil
@ -155,21 +355,65 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
return fmt.Errorf("could not iterate objects: %w", err)
}
data := make([]byte, 8)
binary.LittleEndian.PutUint64(data, phyCounter)
return setObjectCounters(counters, shardInfoB, containerCounterB)
}
err = b.Put(objectPhyCounterKey, data)
func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error {
var phyTotal uint64
var logicTotal uint64
key := make([]byte, cidSize)
for cnrID, count := range counters {
phyTotal += count.phy
logicTotal += count.logic
cnrID.Encode(key)
value := containerCounterValue(count.phy, count.logic)
err := containerCounterB.Put(key, value)
if err != nil {
return fmt.Errorf("could not update phy container object counter: %w", err)
}
}
phyData := make([]byte, 8)
binary.LittleEndian.PutUint64(phyData, phyTotal)
fyrchik marked this conversation as resolved Outdated

This will break if we later have subbuckets here, but I doubt we will.
Anyway, wlll containerCounterInitialized := containerCounterB.Stats().KeyN != 0 suffice?

This will break if we later have subbuckets here, but I doubt we will. Anyway, wlll `containerCounterInitialized := containerCounterB.Stats().KeyN != 0` suffice?

Also, can bucket != nil check serve as a sign that we have initialized everything?

Also, can `bucket != nil` check serve as a sign that we have initialized everything?

Stats() will read all bucket pages, but we need only first.

`Stats()` will read all bucket pages, but we need only first.

Also, can bucket != nil check serve as a sign that we have initialized everything?

Yes, fixed

> Also, can bucket != nil check serve as a sign that we have initialized everything? Yes, fixed
err := shardInfoB.Put(objectPhyCounterKey, phyData)
if err != nil {
return fmt.Errorf("could not update phy object counter: %w", err)
}
data = make([]byte, 8)
binary.LittleEndian.PutUint64(data, logicCounter)
logData := make([]byte, 8)
binary.LittleEndian.PutUint64(logData, logicTotal)
err = b.Put(objectLogicCounterKey, data)
err = shardInfoB.Put(objectLogicCounterKey, logData)
if err != nil {
return fmt.Errorf("could not update logic object counter: %w", err)
}
return nil
}
func containerCounterValue(phy, logic uint64) []byte {
res := make([]byte, 16)
binary.LittleEndian.PutUint64(res, phy)
binary.LittleEndian.PutUint64(res[8:], logic)
return res
}
func parseContainerCounterKey(buf []byte) (cid.ID, error) {
if len(buf) != cidSize {
return cid.ID{}, fmt.Errorf("invalid key length")
}
var cnrID cid.ID
if err := cnrID.Decode(buf); err != nil {
return cid.ID{}, fmt.Errorf("failed to decode container ID: %w", err)
}
return cnrID, nil
}
// parseContainerCounterValue return phy, logic values.
func parseContainerCounterValue(buf []byte) (uint64, uint64, error) {
if len(buf) != 16 {
return 0, 0, fmt.Errorf("invalid value length")
}
return binary.LittleEndian.Uint64(buf), binary.LittleEndian.Uint64(buf[8:]), nil
}

View file

@ -7,6 +7,7 @@ import (
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
@ -25,6 +26,11 @@ func TestCounters(t *testing.T) {
require.NoError(t, err)
require.Zero(t, c.Phy())
require.Zero(t, c.Logic())
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Zero(t, len(cc.Physical))
require.Zero(t, len(cc.Logical))
})
t.Run("put", func(t *testing.T) {
@ -36,9 +42,14 @@ func TestCounters(t *testing.T) {
}
var prm meta.PutPrm
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
for i := 0; i < objCount; i++ {
prm.SetObject(oo[i])
cnrID, _ := oo[i].ContainerID()
expPhy[cnrID]++
expLog[cnrID]++
_, err := db.Put(context.Background(), prm)
require.NoError(t, err)
@ -48,6 +59,12 @@ func TestCounters(t *testing.T) {
require.Equal(t, uint64(i+1), c.Phy())
require.Equal(t, uint64(i+1), c.Logic())
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
}
})
@ -56,6 +73,14 @@ func TestCounters(t *testing.T) {
db := newDB(t)
oo := putObjs(t, db, objCount, false)
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
for _, obj := range oo {
cnrID, _ := obj.ContainerID()
expPhy[cnrID]++
expLog[cnrID]++
}
var prm meta.DeletePrm
for i := objCount - 1; i >= 0; i-- {
prm.SetAddresses(objectcore.AddressOf(oo[i]))
@ -69,6 +94,27 @@ func TestCounters(t *testing.T) {
require.Equal(t, uint64(i), c.Phy())
require.Equal(t, uint64(i), c.Logic())
cnrID, _ := oo[i].ContainerID()
if v, ok := expPhy[cnrID]; ok {
if v == 1 {
delete(expPhy, cnrID)
} else {
expPhy[cnrID]--
}
}
if v, ok := expLog[cnrID]; ok {
if v == 1 {
delete(expLog, cnrID)
} else {
expLog[cnrID]--
}
}
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
}
})
@ -77,6 +123,14 @@ func TestCounters(t *testing.T) {
db := newDB(t)
oo := putObjs(t, db, objCount, false)
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
for _, obj := range oo {
cnrID, _ := obj.ContainerID()
expPhy[cnrID]++
expLog[cnrID]++
}
inhumedObjs := make([]oid.Address, objCount/2)
for i, o := range oo {
@ -87,6 +141,16 @@ func TestCounters(t *testing.T) {
inhumedObjs[i] = objectcore.AddressOf(o)
}
for _, addr := range inhumedObjs {
if v, ok := expLog[addr.Container()]; ok {
if v == 1 {
delete(expLog, addr.Container())
} else {
expLog[addr.Container()]--
}
}
}
var prm meta.InhumePrm
prm.SetTombstoneAddress(oidtest.Address())
prm.SetAddresses(inhumedObjs...)
@ -100,6 +164,12 @@ func TestCounters(t *testing.T) {
require.Equal(t, uint64(objCount), c.Phy())
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
})
t.Run("put_split", func(t *testing.T) {
@ -107,6 +177,9 @@ func TestCounters(t *testing.T) {
db := newDB(t)
parObj := testutil.GenerateObject()
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
// put objects and check that parent info
// does not affect the counter
for i := 0; i < objCount; i++ {
@ -115,12 +188,21 @@ func TestCounters(t *testing.T) {
o.SetParent(parObj)
}
cnrID, _ := o.ContainerID()
expLog[cnrID]++
expPhy[cnrID]++
require.NoError(t, putBig(db, o))
c, err := db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(i+1), c.Phy())
require.Equal(t, uint64(i+1), c.Logic())
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
}
})
@ -129,16 +211,40 @@ func TestCounters(t *testing.T) {
db := newDB(t)
oo := putObjs(t, db, objCount, true)
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
for _, obj := range oo {
cnrID, _ := obj.ContainerID()
expPhy[cnrID]++
expLog[cnrID]++
}
// delete objects that have parent info
// and check that it does not affect
// the counter
for i, o := range oo {
require.NoError(t, metaDelete(db, objectcore.AddressOf(o)))
addr := objectcore.AddressOf(o)
require.NoError(t, metaDelete(db, addr))
c, err := db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(objCount-i-1), c.Phy())
require.Equal(t, uint64(objCount-i-1), c.Logic())
if v, ok := expPhy[addr.Container()]; ok {
if v == 1 {
delete(expPhy, addr.Container())
} else {
expPhy[addr.Container()]--
}
}
if v, ok := expLog[addr.Container()]; ok {
if v == 1 {
delete(expLog, addr.Container())
} else {
expLog[addr.Container()]--
}
}
}
})
@ -147,6 +253,14 @@ func TestCounters(t *testing.T) {
db := newDB(t)
oo := putObjs(t, db, objCount, true)
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
for _, obj := range oo {
cnrID, _ := obj.ContainerID()
expPhy[cnrID]++
expLog[cnrID]++
}
inhumedObjs := make([]oid.Address, objCount/2)
for i, o := range oo {
@ -157,6 +271,16 @@ func TestCounters(t *testing.T) {
inhumedObjs[i] = objectcore.AddressOf(o)
}
for _, addr := range inhumedObjs {
if v, ok := expLog[addr.Container()]; ok {
if v == 1 {
delete(expLog, addr.Container())
} else {
expLog[addr.Container()]--
}
}
}
var prm meta.InhumePrm
prm.SetTombstoneAddress(oidtest.Address())
prm.SetAddresses(inhumedObjs...)
@ -169,6 +293,12 @@ func TestCounters(t *testing.T) {
require.Equal(t, uint64(objCount), c.Phy())
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
})
}
@ -190,6 +320,13 @@ func TestCounters_Expired(t *testing.T) {
oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1)
}
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
for _, addr := range oo {
expPhy[addr.Container()]++
expLog[addr.Container()]++
}
// 1. objects are available and counters are correct
c, err := db.ObjectCounters()
@ -197,6 +334,12 @@ func TestCounters_Expired(t *testing.T) {
require.Equal(t, uint64(objCount), c.Phy())
require.Equal(t, uint64(objCount), c.Logic())
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
for _, o := range oo {
_, err := metaGet(db, o, true)
require.NoError(t, err)
@ -212,6 +355,12 @@ func TestCounters_Expired(t *testing.T) {
require.Equal(t, uint64(objCount), c.Phy())
require.Equal(t, uint64(objCount), c.Logic())
cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
for _, o := range oo {
_, err := metaGet(db, o, true)
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
@ -235,6 +384,20 @@ func TestCounters_Expired(t *testing.T) {
require.Equal(t, uint64(len(oo)), c.Phy())
require.Equal(t, uint64(len(oo)-1), c.Logic())
if v, ok := expLog[oo[0].Container()]; ok {
if v == 1 {
delete(expLog, oo[0].Container())
} else {
expLog[oo[0].Container()]--
}
}
cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
// 4. `Delete` an object with GCMark should decrease the
// phy counter but does not affect the logic counter (after
// that step they should be equal)
@ -246,6 +409,14 @@ func TestCounters_Expired(t *testing.T) {
require.NoError(t, err)
require.Zero(t, deleteRes.AvailableObjectsRemoved())
if v, ok := expPhy[oo[0].Container()]; ok {
if v == 1 {
delete(expPhy, oo[0].Container())
} else {
expPhy[oo[0].Container()]--
}
}
oo = oo[1:]
c, err = db.ObjectCounters()
@ -253,6 +424,12 @@ func TestCounters_Expired(t *testing.T) {
require.Equal(t, uint64(len(oo)), c.Phy())
require.Equal(t, uint64(len(oo)), c.Logic())
cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
// 5 `Delete` an expired object (like it would the control
// service do) should decrease both counters despite the
// expiration fact
@ -263,12 +440,34 @@ func TestCounters_Expired(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
if v, ok := expLog[oo[0].Container()]; ok {
if v == 1 {
delete(expLog, oo[0].Container())
} else {
expLog[oo[0].Container()]--
}
}
if v, ok := expPhy[oo[0].Container()]; ok {
if v == 1 {
delete(expPhy, oo[0].Container())
} else {
expPhy[oo[0].Container()]--
}
}
oo = oo[1:]
c, err = db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(len(oo)), c.Phy())
require.Equal(t, uint64(len(oo)), c.Logic())
cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
}
func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object {

View file

@ -12,6 +12,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
@ -30,6 +31,7 @@ type DeleteRes struct {
availableRemoved uint64
sizes []uint64
availableSizes []uint64
removedByCnrID map[cid.ID]ObjectCounters
}
// AvailableObjectsRemoved returns the number of removed available
@ -38,6 +40,11 @@ func (d DeleteRes) AvailableObjectsRemoved() uint64 {
return d.availableRemoved
}
// RemovedByCnrID returns the number of removed objects by container ID.
func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
return d.removedByCnrID
}
// RawObjectsRemoved returns the number of removed raw objects.
func (d DeleteRes) RawObjectsRemoved() uint64 {
return d.rawRemoved
@ -95,15 +102,11 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
return DeleteRes{}, ErrReadOnlyMode
}
var rawRemoved uint64
var availableRemoved uint64
var err error
sizes := make([]uint64, len(prm.addrs))
availableSizes := make([]uint64, len(prm.addrs))
var res DeleteRes
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
// We need to clear slice because tx can try to execute multiple times.
rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs, sizes, availableSizes)
res, err = db.deleteGroup(tx, prm.addrs)
return err
})
if err == nil {
@ -114,68 +117,83 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
storagelog.OpField("metabase DELETE"))
}
}
return DeleteRes{
rawRemoved: rawRemoved,
availableRemoved: availableRemoved,
sizes: sizes,
availableSizes: availableSizes,
}, metaerr.Wrap(err)
return res, metaerr.Wrap(err)
}
// deleteGroup deletes object from the metabase. Handles removal of the
// references of the split objects.
// The first return value is a physical objects removed number: physical
// objects that were stored. The second return value is a logical objects
// removed number: objects that were available (without Tombstones, GCMarks
// non-expired, etc.)
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64, availableSizes []uint64) (uint64, uint64, error) {
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
res := DeleteRes{
sizes: make([]uint64, len(addrs)),
availableSizes: make([]uint64, len(addrs)),
removedByCnrID: make(map[cid.ID]ObjectCounters),
}
refCounter := make(referenceCounter, len(addrs))
currEpoch := db.epochState.CurrentEpoch()
var rawDeleted uint64
var availableDeleted uint64
for i := range addrs {
removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch)
if err != nil {
return 0, 0, err // maybe log and continue?
return DeleteRes{}, err // maybe log and continue?
}
if removed {
rawDeleted++
sizes[i] = size
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.phy++
res.removedByCnrID[addrs[i].Container()] = v
} else {
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
phy: 1,
}
}
res.rawRemoved++
res.sizes[i] = size
}
if available {
availableDeleted++
availableSizes[i] = size
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.logic++
res.removedByCnrID[addrs[i].Container()] = v
} else {
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
logic: 1,
}
}
if rawDeleted > 0 {
err := db.updateCounter(tx, phy, rawDeleted, false)
if err != nil {
return 0, 0, fmt.Errorf("could not decrease phy object counter: %w", err)
res.availableRemoved++
res.availableSizes[i] = size
}
}
if availableDeleted > 0 {
err := db.updateCounter(tx, logical, availableDeleted, false)
if res.rawRemoved > 0 {
err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false)
if err != nil {
return 0, 0, fmt.Errorf("could not decrease logical object counter: %w", err)
return DeleteRes{}, fmt.Errorf("could not decrease phy object counter: %w", err)
}
}
if res.availableRemoved > 0 {
err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false)
if err != nil {
return DeleteRes{}, fmt.Errorf("could not decrease logical object counter: %w", err)
}
}
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
return DeleteRes{}, fmt.Errorf("could not decrease container object counter: %w", err)
}
for _, refNum := range refCounter {
if refNum.cur == refNum.all {
err := db.deleteObject(tx, refNum.obj, true)
if err != nil {
return rawDeleted, availableDeleted, err // maybe log and continue?
return DeleteRes{}, err // maybe log and continue?
}
}
}
return rawDeleted, availableDeleted, nil
return res, nil
}
// delete removes object indexes from the metabase. Counts the references

View file

@ -37,14 +37,21 @@ type DeletionInfo struct {
// InhumeRes encapsulates results of Inhume operation.
type InhumeRes struct {
deletedLockObj []oid.Address
availableImhumed uint64
availableInhumed uint64
inhumedByCnrID map[cid.ID]ObjectCounters
deletionDetails []DeletionInfo
}
// AvailableInhumed return number of available object
// that have been inhumed.
func (i InhumeRes) AvailableInhumed() uint64 {
return i.availableImhumed
return i.availableInhumed
}
// InhumedByCnrID return number of object
// that have been inhumed by container ID.
func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters {
return i.inhumedByCnrID
}
// DeletedLockObjects returns deleted object of LOCK
@ -73,6 +80,15 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) {
Size: deletedSize,
CID: containerID,
})
i.availableInhumed++
if v, ok := i.inhumedByCnrID[containerID]; ok {
v.logic++
i.inhumedByCnrID[containerID] = v
} else {
i.inhumedByCnrID[containerID] = ObjectCounters{
logic: 1,
}
}
}
// SetAddresses sets a list of object addresses that should be inhumed.
@ -122,7 +138,7 @@ var ErrLockObjectRemoval = logicerr.New("lock object removal")
//
// NOTE: Marks any object with GC mark (despite any prohibitions on operations
// with that object) if WithForceGCMark option has been provided.
func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err error) {
func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
var (
startedAt = time.Now()
success = false
@ -142,8 +158,11 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err err
return InhumeRes{}, ErrReadOnlyMode
}
res := InhumeRes{
inhumedByCnrID: make(map[cid.ID]ObjectCounters),
}
currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
return db.inhumeTx(tx, currEpoch, prm, &res)
})
success = err == nil
@ -224,7 +243,27 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
}
}
return db.updateCounter(tx, logical, res.availableImhumed, false)
return db.applyInhumeResToCounters(tx, res)
}
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
var inhumedCount uint64
inhumedbyCnr := make(map[cid.ID]ObjectCounters)
for _, dd := range res.deletionDetails {
if v, ok := inhumedbyCnr[dd.CID]; ok {
v.logic++
inhumedbyCnr[dd.CID] = v
} else {
inhumedbyCnr[dd.CID] = ObjectCounters{logic: 1}
}
inhumedCount++
}
if err := db.updateShardObjectCounter(tx, logical, inhumedCount, false); err != nil {
return err
}
return db.updateContainerCounter(tx, inhumedbyCnr, false)
}
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
@ -279,7 +318,6 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
containerID, _ := obj.ContainerID()
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
res.availableImhumed++
res.storeDeletionInfo(containerID, obj.PayloadSize())
}

View file

@ -183,16 +183,8 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
}
if !isParent {
err = db.updateCounter(tx, phy, 1, true)
if err != nil {
return fmt.Errorf("could not increase phy object counter: %w", err)
}
// it is expected that putting an unavailable object is
// impossible and should be handled on the higher levels
err = db.updateCounter(tx, logical, 1, true)
if err != nil {
return fmt.Errorf("could not increase logical object counter: %w", err)
if err = db.incCounters(tx, cnr); err != nil {
fyrchik marked this conversation as resolved Outdated

Can we avoid creating a map here?

Can we avoid creating a map here?

done

done
return err
}
}

View file

@ -22,6 +22,7 @@ var (
garbageBucketName = []byte{garbagePrefix}
toMoveItBucketName = []byte{toMoveItPrefix}
containerVolumeBucketName = []byte{containerVolumePrefix}
containerCounterBucketName = []byte{containerCountersPrefix}
zeroValue = []byte{0xFF}
)
@ -111,6 +112,11 @@ const (
// Key: split ID
// Value: list of object IDs
splitPrefix
// containerCountersPrefix is used for storing container object counters.
// Key: container ID + type
// Value: container size in bytes as little-endian uint64
containerCountersPrefix
)
const (

View file

@ -118,6 +118,7 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error
}
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
s.decContainerObjectCounter(res.RemovedByCnrID())
removedPayload := res.RemovedPhysicalObjectSizes()[0]
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]
if logicalRemovedPayload > 0 {

View file

@ -416,6 +416,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular)
s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0
for i < res.GetDeletionInfoLength() {
@ -629,6 +630,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone)
s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0
for i < res.GetDeletionInfoLength() {
@ -675,6 +677,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock)
s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0
for i < res.GetDeletionInfoLength() {

View file

@ -122,6 +122,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
s.m.RUnlock()
s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0
for i < res.GetDeletionInfoLength() {

View file

@ -14,6 +14,7 @@ import (
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
@ -23,6 +24,7 @@ type metricsStore struct {
mtx sync.Mutex
objCounters map[string]uint64
cnrSize map[string]int64
cnrCount map[string]uint64
pldSize int64
mode mode.Mode
errCounter int64
@ -126,6 +128,39 @@ func (m *metricsStore) DeleteShardMetrics() {
m.errCounter = 0
}
func (m *metricsStore) SetContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.cnrCount[cnrID+objectType] = value
}
func (m *metricsStore) IncContainerObjectsCount(cnrID string, objectType string) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.cnrCount[cnrID+objectType]++
}
func (m *metricsStore) SubContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mtx.Lock()
defer m.mtx.Unlock()
existed := m.cnrCount[cnrID+objectType]
if existed < value {
panic("existed value smaller than value to sustract")
}
if existed == value {
delete(m.cnrCount, cnrID+objectType)
} else {
m.cnrCount[cnrID+objectType] -= value
}
}
func (m *metricsStore) getContainerCount(cnrID, objectType string) (uint64, bool) {
m.mtx.Lock()
defer m.mtx.Unlock()
v, ok := m.cnrCount[cnrID+objectType]
return v, ok
}
func TestCounters(t *testing.T) {
t.Parallel()
@ -143,21 +178,37 @@ func TestCounters(t *testing.T) {
oo[i] = testutil.GenerateObject()
}
cc := meta.ContainerCounters{Logical: make(map[cid.ID]uint64), Physical: make(map[cid.ID]uint64)}
t.Run("defaults", func(t *testing.T) {
require.Zero(t, mm.getObjectCounter(physical))
require.Zero(t, mm.getObjectCounter(logical))
require.Empty(t, mm.containerSizes())
require.Zero(t, mm.payloadSize())
for _, obj := range oo {
contID, _ := obj.ContainerID()
v, ok := mm.getContainerCount(contID.EncodeToString(), physical)
require.Zero(t, v)
require.False(t, ok)
v, ok = mm.getContainerCount(contID.EncodeToString(), logical)
require.Zero(t, v)
require.False(t, ok)
}
})
var totalPayload int64
expectedLogicalSizes := make(map[string]int64)
expectedLogCC := make(map[cid.ID]uint64)
expectedPhyCC := make(map[cid.ID]uint64)
for i := range oo {
cnr, _ := oo[i].ContainerID()
oSize := int64(oo[i].PayloadSize())
expectedLogicalSizes[cnr.EncodeToString()] += oSize
totalPayload += oSize
expectedLogCC[cnr]++
expectedPhyCC[cnr]++
}
var prm PutPrm
@ -174,6 +225,11 @@ func TestCounters(t *testing.T) {
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize())
cc, err := sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical)
require.Equal(t, expectedPhyCC, cc.Physical)
t.Run("inhume_GC", func(t *testing.T) {
var prm InhumePrm
inhumedNumber := objNumber / 4
@ -187,6 +243,11 @@ func TestCounters(t *testing.T) {
cid, ok := oo[i].ContainerID()
require.True(t, ok)
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
expectedLogCC[cid]--
if expectedLogCC[cid] == 0 {
delete(expectedLogCC, cid)
}
}
require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical))
@ -194,6 +255,11 @@ func TestCounters(t *testing.T) {
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize())
fyrchik marked this conversation as resolved Outdated

How about mm.checkContainerCount(t, cnr, logical, v) or require.Equal(t, expV, mm.getContainerCount(t, ...)?
To make the function smaller

How about `mm.checkContainerCount(t, cnr, logical, v)` or `require.Equal(t, expV, mm.getContainerCount(t, ...)`? To make the function smaller

Done, thx

Done, thx
cc, err := sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical)
require.Equal(t, expectedPhyCC, cc.Physical)
oo = oo[inhumedNumber:]
})
@ -214,6 +280,11 @@ func TestCounters(t *testing.T) {
cid, ok := oo[i].ContainerID()
require.True(t, ok)
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
expectedLogCC[cid]--
if expectedLogCC[cid] == 0 {
delete(expectedLogCC, cid)
}
}
require.Equal(t, phy, mm.getObjectCounter(physical))
@ -221,6 +292,11 @@ func TestCounters(t *testing.T) {
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize())
cc, err = sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical)
require.Equal(t, expectedPhyCC, cc.Physical)
oo = oo[inhumedNumber:]
})
@ -245,9 +321,24 @@ func TestCounters(t *testing.T) {
cnr, _ := oo[i].ContainerID()
expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload)
expectedLogCC[cnr]--
if expectedLogCC[cnr] == 0 {
delete(expectedLogCC, cnr)
}
expectedPhyCC[cnr]--
if expectedPhyCC[cnr] == 0 {
delete(expectedPhyCC, cnr)
}
}
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize())
cc, err = sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical)
require.Equal(t, expectedPhyCC, cc.Physical)
})
}
@ -269,6 +360,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
"logic": 0,
},
cnrSize: make(map[string]int64),
cnrCount: make(map[string]uint64),
}
sh := New(

View file

@ -90,7 +90,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
}
s.incObjectCounter()
s.incObjectCounter(putPrm.Address.Container())
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
}

View file

@ -18,6 +18,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
@ -85,6 +86,12 @@ type MetricsWriter interface {
ClearErrorCounter()
// DeleteShardMetrics deletes shard metrics from registry.
DeleteShardMetrics()
// SetContainerObjectsCount sets container object count.
SetContainerObjectsCount(cnrID string, objectType string, value uint64)
// IncContainerObjectsCount increments container object count.
IncContainerObjectsCount(cnrID string, objectType string)
// SubContainerObjectsCount subtracts container object count.
SubContainerObjectsCount(cnrID string, objectType string, value uint64)
}
type cfg struct {
@ -425,15 +432,29 @@ func (s *Shard) updateMetrics(ctx context.Context) {
}
s.metricsWriter.AddToPayloadSize(int64(totalPayload))
contCount, err := s.metaBase.ContainerCounters(ctx)
if err != nil {
s.log.Warn(logs.FailedToGetContainerCounters, zap.Error(err))
return
}
for contID, count := range contCount.Physical {
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), physical, count)
}
for contID, count := range contCount.Logical {
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), logical, count)
}
}
}
// incObjectCounter increment both physical and logical object
// counters.
func (s *Shard) incObjectCounter() {
func (s *Shard) incObjectCounter(cnrID cid.ID) {
if s.cfg.metricsWriter != nil {
s.cfg.metricsWriter.IncObjectCounter(physical)
s.cfg.metricsWriter.IncObjectCounter(logical)
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical)
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical)
}
}
@ -443,6 +464,17 @@ func (s *Shard) decObjectCounterBy(typ string, v uint64) {
}
}
func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]meta.ObjectCounters) {
if s.cfg.metricsWriter == nil {
return
}
for cnrID, count := range byCnr {
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy())
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic())
}
}
func (s *Shard) addToContainerSize(cnr string, size int64) {
if s.cfg.metricsWriter != nil {
s.cfg.metricsWriter.AddToContainerSize(cnr, size)

View file

@ -18,6 +18,9 @@ type EngineMetrics interface {
SetObjectCounter(shardID, objectType string, v uint64)
AddToPayloadCounter(shardID string, size int64)
SetMode(shardID string, mode mode.Mode)
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncContainerObjectCounter(shardID, contID, objectType string)
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
WriteCache() WriteCacheMetrics
GC() GCMetrics
@ -30,6 +33,7 @@ type engineMetrics struct {
payloadSize *prometheus.GaugeVec
errorCounter *prometheus.GaugeVec
mode *shardIDModeValue
contObjCounter *prometheus.GaugeVec
gc *gcMetrics
writeCache *writeCacheMetrics
@ -46,10 +50,13 @@ func newEngineMetrics() *engineMetrics {
Name: "request_duration_seconds",
Help: "Duration of Engine requests",
}, []string{methodLabel}),
objectCounter: newEngineGaugeVector("objects_total", "Objects counters per shards", []string{shardIDLabel, typeLabel}),
objectCounter: newEngineGaugeVector("objects_total",
fyrchik marked this conversation as resolved Outdated

Why is it deprecated? It just has different usage: we count object in the container to calculate quotas, we count objects per shards to calculate the amount of space.

Why is it deprecated? It just has different usage: we count object in the container to calculate quotas, we count objects per shards to calculate the amount of space.

Deprecated metric (objects per shard) can be calculated from new one (objects per shard per container) with PromQL query. So it looks redundant.

Deprecated metric (objects per shard) can be calculated from new one (objects per shard per container) with PromQL query. So it looks redundant.
"Objects counters per shards. DEPRECATED: Will be deleted in next releasese, use frostfs_node_engine_container_objects_total metric.",
[]string{shardIDLabel, typeLabel}),
gc: newGCMetrics(),
writeCache: newWriteCacheMetrics(),
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}),
}
}
@ -88,6 +95,7 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
m.errorCounter.Delete(prometheus.Labels{shardIDLabel: shardID})
m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID})
m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.mode.Delete(shardID)
}
@ -109,6 +117,36 @@ func (m *engineMetrics) SetObjectCounter(shardID, objectType string, v uint64) {
).Set(float64(v))
}
func (m *engineMetrics) SetContainerObjectCounter(shardID, contID, objectType string, v uint64) {
m.contObjCounter.With(
prometheus.Labels{
shardIDLabel: shardID,
containerIDLabelKey: contID,
typeLabel: objectType,
},
).Set(float64(v))
}
func (m *engineMetrics) IncContainerObjectCounter(shardID, contID, objectType string) {
m.contObjCounter.With(
prometheus.Labels{
shardIDLabel: shardID,
containerIDLabelKey: contID,
typeLabel: objectType,
},
).Inc()
}
func (m *engineMetrics) SubContainerObjectCounter(shardID, contID, objectType string, v uint64) {
m.contObjCounter.With(
prometheus.Labels{
shardIDLabel: shardID,
containerIDLabelKey: contID,
typeLabel: objectType,
},
).Sub(float64(v))
}
func (m *engineMetrics) SetMode(shardID string, mode mode.Mode) {
m.mode.SetMode(shardID, mode.String())
}