From 9c98fa6152640aa3bfd6f50f1dc343857731f8fc Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 1 Nov 2023 20:53:24 +0300 Subject: [PATCH 1/2] [#763] metabase: Add container objects counter Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/metabase/control.go | 15 +- pkg/local_object_storage/metabase/counter.go | 276 +++++++++++++++++- .../metabase/counter_test.go | 201 ++++++++++++- pkg/local_object_storage/metabase/delete.go | 44 ++- pkg/local_object_storage/metabase/inhume.go | 24 +- pkg/local_object_storage/metabase/put.go | 12 +- pkg/local_object_storage/metabase/util.go | 12 +- 7 files changed, 535 insertions(+), 49 deletions(-) diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index 3f155eeb5..00fe1e02b 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -103,12 +103,13 @@ func (db *DB) init(reset bool) error { } mStaticBuckets := map[string]struct{}{ - string(containerVolumeBucketName): {}, - string(graveyardBucketName): {}, - string(toMoveItBucketName): {}, - string(garbageBucketName): {}, - string(shardInfoBucket): {}, - string(bucketNameLocked): {}, + string(containerVolumeBucketName): {}, + string(containerCounterBucketName): {}, + string(graveyardBucketName): {}, + string(toMoveItBucketName): {}, + string(garbageBucketName): {}, + string(shardInfoBucket): {}, + string(bucketNameLocked): {}, } return db.boltDB.Update(func(tx *bbolt.Tx) error { @@ -135,7 +136,7 @@ func (db *DB) init(reset bool) error { } } - if !reset { + if !reset { // counters will be recalculated by refill metabase err = syncCounter(tx, false) if err != nil { return fmt.Errorf("could not sync object counter: %w", err) diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index d4afe1878..d1aafa3db 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -1,10 +1,15 @@ package meta import ( + "bytes" + "context" "encoding/binary" + "errors" "fmt" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" @@ -73,9 +78,128 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) { return cc, metaerr.Wrap(err) } -// updateCounter updates the object counter. Tx MUST be writable. -// If inc == `true`, increases the counter, decreases otherwise. -func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error { +type ContainerCounters struct { + Logical map[cid.ID]uint64 + Physical map[cid.ID]uint64 +} + +// ContainerCounters returns object counters for each container +// that metabase has tracked since it was opened and initialized. +// +// Returns only the errors that do not allow reading counter +// in Bolt database. +// +// It is guaranteed that the ContainerCounters fields are not nil. +func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ContainerCounters", time.Since(startedAt), success) + }() + + ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerCounters") + defer span.End() + + cc := ContainerCounters{ + Logical: make(map[cid.ID]uint64), + Physical: make(map[cid.ID]uint64), + } + + lastKey := make([]byte, cidSize) + + // there is no limit for containers count, so use batching with cancellation + for { + select { + case <-ctx.Done(): + return cc, ctx.Err() + default: + } + + completed, err := db.containerCountersNextBatch(lastKey, &cc) + if err != nil { + return cc, err + } + if completed { + break + } + } + + success = true + return cc, nil +} + +func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) (bool, error) { + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return false, ErrDegradedMode + } + + counter := 0 + const batchSize = 1000 + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(containerCounterBucketName) + if b == nil { + return ErrInterruptIterator + } + c := b.Cursor() + var key, value []byte + for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() { + if bytes.Equal(lastKey, key) { + continue + } + copy(lastKey, key) + + cnrID, err := parseContainerCounterKey(key) + if err != nil { + return err + } + phy, logic, err := parseContainerCounterValue(value) + if err != nil { + return err + } + if phy > 0 { + cc.Physical[cnrID] = phy + } + if logic > 0 { + cc.Logical[cnrID] = logic + } + + counter++ + if counter == batchSize { + break + } + } + + if counter < batchSize { // last batch + return ErrInterruptIterator + } + return nil + }) + if err != nil { + if errors.Is(err, ErrInterruptIterator) { + return true, nil + } + return false, metaerr.Wrap(err) + } + return false, nil +} + +func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID) error { + if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil { + return fmt.Errorf("could not increase phy object counter: %w", err) + } + if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil { + return fmt.Errorf("could not increase logical object counter: %w", err) + } + return db.incContainerObjectCounter(tx, cnrID) +} + +func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error { b := tx.Bucket(shardInfoBucket) if b == nil { return nil @@ -112,6 +236,63 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool return b.Put(counterKey, newCounter) } +func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { + b := tx.Bucket(containerCounterBucketName) + if b == nil { + return nil + } + + key := make([]byte, cidSize) + for cnrID, cnrDelta := range delta { + cnrID.Encode(key) + if err := db.editContainerCounterValue(b, key, cnrDelta, inc); err != nil { + return err + } + } + return nil +} + +func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error { + var phyValue, logicValue uint64 + var err error + data := b.Get(key) + if len(data) > 0 { + phyValue, logicValue, err = parseContainerCounterValue(data) + if err != nil { + return err + } + } + phyValue = nextValue(phyValue, delta.phy, inc) + logicValue = nextValue(logicValue, delta.logic, inc) + if phyValue > 0 || logicValue > 0 { + value := containerCounterValue(phyValue, logicValue) + return b.Put(key, value) + } + return b.Delete(key) +} + +func nextValue(existed, delta uint64, inc bool) uint64 { + if inc { + existed += delta + } else if existed <= delta { + existed = 0 + } else { + existed -= delta + } + return existed +} + +func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error { + b := tx.Bucket(containerCounterBucketName) + if b == nil { + return nil + } + + key := make([]byte, cidSize) + cnrID.Encode(key) + return db.editContainerCounterValue(b, key, ObjectCounters{logic: 1, phy: 1}, true) +} + // syncCounter updates object counters according to metabase state: // it counts all the physically/logically stored objects using internal // indexes. Tx MUST be writable. @@ -119,26 +300,38 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool // Does nothing if counters are not empty and force is false. If force is // true, updates the counters anyway. func syncCounter(tx *bbolt.Tx, force bool) error { - b, err := tx.CreateBucketIfNotExists(shardInfoBucket) + shardInfoB, err := tx.CreateBucketIfNotExists(shardInfoBucket) if err != nil { return fmt.Errorf("could not get shard info bucket: %w", err) } - - if !force && len(b.Get(objectPhyCounterKey)) == 8 && len(b.Get(objectLogicCounterKey)) == 8 { + shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 && len(shardInfoB.Get(objectLogicCounterKey)) == 8 + containerCounterInitialized := tx.Bucket(containerCounterBucketName) != nil + if !force && shardObjectCounterInitialized && containerCounterInitialized { // the counters are already inited return nil } + containerCounterB, err := tx.CreateBucketIfNotExists(containerCounterBucketName) + if err != nil { + return fmt.Errorf("could not get container counter bucket: %w", err) + } + var addr oid.Address - var phyCounter uint64 - var logicCounter uint64 + counters := make(map[cid.ID]ObjectCounters) graveyardBKT := tx.Bucket(graveyardBucketName) garbageBKT := tx.Bucket(garbageBucketName) key := make([]byte, addressKeySize) err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error { - phyCounter++ + if v, ok := counters[cnr]; ok { + v.phy++ + counters[cnr] = v + } else { + counters[cnr] = ObjectCounters{ + phy: 1, + } + } addr.SetContainer(cnr) addr.SetObject(obj) @@ -146,7 +339,14 @@ func syncCounter(tx *bbolt.Tx, force bool) error { // check if an object is available: not with GCMark // and not covered with a tombstone if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 { - logicCounter++ + if v, ok := counters[cnr]; ok { + v.logic++ + counters[cnr] = v + } else { + counters[cnr] = ObjectCounters{ + logic: 1, + } + } } return nil @@ -155,21 +355,65 @@ func syncCounter(tx *bbolt.Tx, force bool) error { return fmt.Errorf("could not iterate objects: %w", err) } - data := make([]byte, 8) - binary.LittleEndian.PutUint64(data, phyCounter) + return setObjectCounters(counters, shardInfoB, containerCounterB) +} - err = b.Put(objectPhyCounterKey, data) +func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error { + var phyTotal uint64 + var logicTotal uint64 + key := make([]byte, cidSize) + for cnrID, count := range counters { + phyTotal += count.phy + logicTotal += count.logic + + cnrID.Encode(key) + value := containerCounterValue(count.phy, count.logic) + err := containerCounterB.Put(key, value) + if err != nil { + return fmt.Errorf("could not update phy container object counter: %w", err) + } + } + phyData := make([]byte, 8) + binary.LittleEndian.PutUint64(phyData, phyTotal) + + err := shardInfoB.Put(objectPhyCounterKey, phyData) if err != nil { return fmt.Errorf("could not update phy object counter: %w", err) } - data = make([]byte, 8) - binary.LittleEndian.PutUint64(data, logicCounter) + logData := make([]byte, 8) + binary.LittleEndian.PutUint64(logData, logicTotal) - err = b.Put(objectLogicCounterKey, data) + err = shardInfoB.Put(objectLogicCounterKey, logData) if err != nil { return fmt.Errorf("could not update logic object counter: %w", err) } return nil } + +func containerCounterValue(phy, logic uint64) []byte { + res := make([]byte, 16) + binary.LittleEndian.PutUint64(res, phy) + binary.LittleEndian.PutUint64(res[8:], logic) + return res +} + +func parseContainerCounterKey(buf []byte) (cid.ID, error) { + if len(buf) != cidSize { + return cid.ID{}, fmt.Errorf("invalid key length") + } + var cnrID cid.ID + if err := cnrID.Decode(buf); err != nil { + return cid.ID{}, fmt.Errorf("failed to decode container ID: %w", err) + } + return cnrID, nil +} + +// parseContainerCounterValue return phy, logic values. +func parseContainerCounterValue(buf []byte) (uint64, uint64, error) { + if len(buf) != 16 { + return 0, 0, fmt.Errorf("invalid value length") + } + return binary.LittleEndian.Uint64(buf), binary.LittleEndian.Uint64(buf[8:]), nil +} diff --git a/pkg/local_object_storage/metabase/counter_test.go b/pkg/local_object_storage/metabase/counter_test.go index 89b52c887..51062f1fe 100644 --- a/pkg/local_object_storage/metabase/counter_test.go +++ b/pkg/local_object_storage/metabase/counter_test.go @@ -7,6 +7,7 @@ import ( objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" @@ -25,6 +26,11 @@ func TestCounters(t *testing.T) { require.NoError(t, err) require.Zero(t, c.Phy()) require.Zero(t, c.Logic()) + + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + require.Zero(t, len(cc.Physical)) + require.Zero(t, len(cc.Logical)) }) t.Run("put", func(t *testing.T) { @@ -36,9 +42,14 @@ func TestCounters(t *testing.T) { } var prm meta.PutPrm + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) for i := 0; i < objCount; i++ { prm.SetObject(oo[i]) + cnrID, _ := oo[i].ContainerID() + expPhy[cnrID]++ + expLog[cnrID]++ _, err := db.Put(context.Background(), prm) require.NoError(t, err) @@ -48,6 +59,12 @@ func TestCounters(t *testing.T) { require.Equal(t, uint64(i+1), c.Phy()) require.Equal(t, uint64(i+1), c.Logic()) + + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) } }) @@ -56,6 +73,14 @@ func TestCounters(t *testing.T) { db := newDB(t) oo := putObjs(t, db, objCount, false) + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) + for _, obj := range oo { + cnrID, _ := obj.ContainerID() + expPhy[cnrID]++ + expLog[cnrID]++ + } + var prm meta.DeletePrm for i := objCount - 1; i >= 0; i-- { prm.SetAddresses(objectcore.AddressOf(oo[i])) @@ -69,6 +94,27 @@ func TestCounters(t *testing.T) { require.Equal(t, uint64(i), c.Phy()) require.Equal(t, uint64(i), c.Logic()) + + cnrID, _ := oo[i].ContainerID() + if v, ok := expPhy[cnrID]; ok { + if v == 1 { + delete(expPhy, cnrID) + } else { + expPhy[cnrID]-- + } + } + if v, ok := expLog[cnrID]; ok { + if v == 1 { + delete(expLog, cnrID) + } else { + expLog[cnrID]-- + } + } + + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) } }) @@ -77,6 +123,14 @@ func TestCounters(t *testing.T) { db := newDB(t) oo := putObjs(t, db, objCount, false) + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) + for _, obj := range oo { + cnrID, _ := obj.ContainerID() + expPhy[cnrID]++ + expLog[cnrID]++ + } + inhumedObjs := make([]oid.Address, objCount/2) for i, o := range oo { @@ -87,6 +141,16 @@ func TestCounters(t *testing.T) { inhumedObjs[i] = objectcore.AddressOf(o) } + for _, addr := range inhumedObjs { + if v, ok := expLog[addr.Container()]; ok { + if v == 1 { + delete(expLog, addr.Container()) + } else { + expLog[addr.Container()]-- + } + } + } + var prm meta.InhumePrm prm.SetTombstoneAddress(oidtest.Address()) prm.SetAddresses(inhumedObjs...) @@ -100,6 +164,12 @@ func TestCounters(t *testing.T) { require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic()) + + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) }) t.Run("put_split", func(t *testing.T) { @@ -107,6 +177,9 @@ func TestCounters(t *testing.T) { db := newDB(t) parObj := testutil.GenerateObject() + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) + // put objects and check that parent info // does not affect the counter for i := 0; i < objCount; i++ { @@ -115,12 +188,21 @@ func TestCounters(t *testing.T) { o.SetParent(parObj) } + cnrID, _ := o.ContainerID() + expLog[cnrID]++ + expPhy[cnrID]++ + require.NoError(t, putBig(db, o)) c, err := db.ObjectCounters() require.NoError(t, err) require.Equal(t, uint64(i+1), c.Phy()) require.Equal(t, uint64(i+1), c.Logic()) + + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) } }) @@ -129,16 +211,40 @@ func TestCounters(t *testing.T) { db := newDB(t) oo := putObjs(t, db, objCount, true) + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) + for _, obj := range oo { + cnrID, _ := obj.ContainerID() + expPhy[cnrID]++ + expLog[cnrID]++ + } + // delete objects that have parent info // and check that it does not affect // the counter for i, o := range oo { - require.NoError(t, metaDelete(db, objectcore.AddressOf(o))) + addr := objectcore.AddressOf(o) + require.NoError(t, metaDelete(db, addr)) c, err := db.ObjectCounters() require.NoError(t, err) require.Equal(t, uint64(objCount-i-1), c.Phy()) require.Equal(t, uint64(objCount-i-1), c.Logic()) + + if v, ok := expPhy[addr.Container()]; ok { + if v == 1 { + delete(expPhy, addr.Container()) + } else { + expPhy[addr.Container()]-- + } + } + if v, ok := expLog[addr.Container()]; ok { + if v == 1 { + delete(expLog, addr.Container()) + } else { + expLog[addr.Container()]-- + } + } } }) @@ -147,6 +253,14 @@ func TestCounters(t *testing.T) { db := newDB(t) oo := putObjs(t, db, objCount, true) + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) + for _, obj := range oo { + cnrID, _ := obj.ContainerID() + expPhy[cnrID]++ + expLog[cnrID]++ + } + inhumedObjs := make([]oid.Address, objCount/2) for i, o := range oo { @@ -157,6 +271,16 @@ func TestCounters(t *testing.T) { inhumedObjs[i] = objectcore.AddressOf(o) } + for _, addr := range inhumedObjs { + if v, ok := expLog[addr.Container()]; ok { + if v == 1 { + delete(expLog, addr.Container()) + } else { + expLog[addr.Container()]-- + } + } + } + var prm meta.InhumePrm prm.SetTombstoneAddress(oidtest.Address()) prm.SetAddresses(inhumedObjs...) @@ -169,6 +293,12 @@ func TestCounters(t *testing.T) { require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic()) + + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) }) } @@ -190,6 +320,13 @@ func TestCounters_Expired(t *testing.T) { oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1) } + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) + for _, addr := range oo { + expPhy[addr.Container()]++ + expLog[addr.Container()]++ + } + // 1. objects are available and counters are correct c, err := db.ObjectCounters() @@ -197,6 +334,12 @@ func TestCounters_Expired(t *testing.T) { require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount), c.Logic()) + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) + for _, o := range oo { _, err := metaGet(db, o, true) require.NoError(t, err) @@ -212,6 +355,12 @@ func TestCounters_Expired(t *testing.T) { require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount), c.Logic()) + cc, err = db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) + for _, o := range oo { _, err := metaGet(db, o, true) require.ErrorIs(t, err, meta.ErrObjectIsExpired) @@ -235,6 +384,20 @@ func TestCounters_Expired(t *testing.T) { require.Equal(t, uint64(len(oo)), c.Phy()) require.Equal(t, uint64(len(oo)-1), c.Logic()) + if v, ok := expLog[oo[0].Container()]; ok { + if v == 1 { + delete(expLog, oo[0].Container()) + } else { + expLog[oo[0].Container()]-- + } + } + + cc, err = db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) + // 4. `Delete` an object with GCMark should decrease the // phy counter but does not affect the logic counter (after // that step they should be equal) @@ -246,6 +409,14 @@ func TestCounters_Expired(t *testing.T) { require.NoError(t, err) require.Zero(t, deleteRes.AvailableObjectsRemoved()) + if v, ok := expPhy[oo[0].Container()]; ok { + if v == 1 { + delete(expPhy, oo[0].Container()) + } else { + expPhy[oo[0].Container()]-- + } + } + oo = oo[1:] c, err = db.ObjectCounters() @@ -253,6 +424,12 @@ func TestCounters_Expired(t *testing.T) { require.Equal(t, uint64(len(oo)), c.Phy()) require.Equal(t, uint64(len(oo)), c.Logic()) + cc, err = db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) + // 5 `Delete` an expired object (like it would the control // service do) should decrease both counters despite the // expiration fact @@ -263,12 +440,34 @@ func TestCounters_Expired(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved()) + if v, ok := expLog[oo[0].Container()]; ok { + if v == 1 { + delete(expLog, oo[0].Container()) + } else { + expLog[oo[0].Container()]-- + } + } + + if v, ok := expPhy[oo[0].Container()]; ok { + if v == 1 { + delete(expPhy, oo[0].Container()) + } else { + expPhy[oo[0].Container()]-- + } + } + oo = oo[1:] c, err = db.ObjectCounters() require.NoError(t, err) require.Equal(t, uint64(len(oo)), c.Phy()) require.Equal(t, uint64(len(oo)), c.Logic()) + + cc, err = db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) } func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object { diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index e84759b88..653a88ae7 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -12,6 +12,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" @@ -132,8 +133,9 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64, ava refCounter := make(referenceCounter, len(addrs)) currEpoch := db.epochState.CurrentEpoch() - var rawDeleted uint64 - var availableDeleted uint64 + cnrIDDelta := make(map[cid.ID]ObjectCounters) + var rawDeletedTotal uint64 + var availableDeletedTotal uint64 for i := range addrs { removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch) @@ -142,40 +144,62 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64, ava } if removed { - rawDeleted++ + if v, ok := cnrIDDelta[addrs[i].Container()]; ok { + v.phy++ + cnrIDDelta[addrs[i].Container()] = v + } else { + cnrIDDelta[addrs[i].Container()] = ObjectCounters{ + phy: 1, + } + } + + rawDeletedTotal++ sizes[i] = size } if available { - availableDeleted++ + if v, ok := cnrIDDelta[addrs[i].Container()]; ok { + v.logic++ + cnrIDDelta[addrs[i].Container()] = v + } else { + cnrIDDelta[addrs[i].Container()] = ObjectCounters{ + logic: 1, + } + } + + availableDeletedTotal++ availableSizes[i] = size } } - if rawDeleted > 0 { - err := db.updateCounter(tx, phy, rawDeleted, false) + if rawDeletedTotal > 0 { + err := db.updateShardObjectCounter(tx, phy, rawDeletedTotal, false) if err != nil { return 0, 0, fmt.Errorf("could not decrease phy object counter: %w", err) } } - if availableDeleted > 0 { - err := db.updateCounter(tx, logical, availableDeleted, false) + if availableDeletedTotal > 0 { + err := db.updateShardObjectCounter(tx, logical, availableDeletedTotal, false) if err != nil { return 0, 0, fmt.Errorf("could not decrease logical object counter: %w", err) } } + if err := db.updateContainerCounter(tx, cnrIDDelta, false); err != nil { + return 0, 0, fmt.Errorf("could not decrease container object counter: %w", err) + } + for _, refNum := range refCounter { if refNum.cur == refNum.all { err := db.deleteObject(tx, refNum.obj, true) if err != nil { - return rawDeleted, availableDeleted, err // maybe log and continue? + return rawDeletedTotal, availableDeletedTotal, err // maybe log and continue? } } } - return rawDeleted, availableDeleted, nil + return rawDeletedTotal, availableDeletedTotal, nil } // delete removes object indexes from the metabase. Counts the references diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index fe8b8873e..e06e57109 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -73,6 +73,7 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) { Size: deletedSize, CID: containerID, }) + i.availableImhumed++ } // SetAddresses sets a list of object addresses that should be inhumed. @@ -224,7 +225,27 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes } } - return db.updateCounter(tx, logical, res.availableImhumed, false) + return db.applyInhumeResToCounters(tx, res) +} + +func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { + var inhumedCount uint64 + inhumedbyCnr := make(map[cid.ID]ObjectCounters) + for _, dd := range res.deletionDetails { + if v, ok := inhumedbyCnr[dd.CID]; ok { + v.logic++ + inhumedbyCnr[dd.CID] = v + } else { + inhumedbyCnr[dd.CID] = ObjectCounters{logic: 1} + } + inhumedCount++ + } + + if err := db.updateShardObjectCounter(tx, logical, inhumedCount, false); err != nil { + return err + } + + return db.updateContainerCounter(tx, inhumedbyCnr, false) } // getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket. @@ -279,7 +300,6 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error { containerID, _ := obj.ContainerID() if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 { - res.availableImhumed++ res.storeDeletionInfo(containerID, obj.PayloadSize()) } diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 875388e58..2b5c5421d 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -183,16 +183,8 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o } if !isParent { - err = db.updateCounter(tx, phy, 1, true) - if err != nil { - return fmt.Errorf("could not increase phy object counter: %w", err) - } - - // it is expected that putting an unavailable object is - // impossible and should be handled on the higher levels - err = db.updateCounter(tx, logical, 1, true) - if err != nil { - return fmt.Errorf("could not increase logical object counter: %w", err) + if err = db.incCounters(tx, cnr); err != nil { + return err } } diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index c9d9bb947..c04aff61c 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -19,9 +19,10 @@ var ( graveyardBucketName = []byte{graveyardPrefix} // garbageBucketName stores rows with the objects that should be physically // deleted by the node (Garbage Collector routine). - garbageBucketName = []byte{garbagePrefix} - toMoveItBucketName = []byte{toMoveItPrefix} - containerVolumeBucketName = []byte{containerVolumePrefix} + garbageBucketName = []byte{garbagePrefix} + toMoveItBucketName = []byte{toMoveItPrefix} + containerVolumeBucketName = []byte{containerVolumePrefix} + containerCounterBucketName = []byte{containerCountersPrefix} zeroValue = []byte{0xFF} ) @@ -111,6 +112,11 @@ const ( // Key: split ID // Value: list of object IDs splitPrefix + + // containerCountersPrefix is used for storing container object counters. + // Key: container ID + type + // Value: container size in bytes as little-endian uint64 + containerCountersPrefix ) const ( -- 2.45.3 From 70ab1ebd541f89926b112944bb3eaeee08d33619 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 2 Nov 2023 13:50:52 +0300 Subject: [PATCH 2/2] [#763] metrics: Add container_objects_total metric Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 1 + pkg/local_object_storage/engine/metrics.go | 4 + pkg/local_object_storage/engine/shards.go | 12 +++ pkg/local_object_storage/metabase/delete.go | 78 +++++++-------- pkg/local_object_storage/metabase/inhume.go | 28 +++++- pkg/local_object_storage/shard/delete.go | 1 + pkg/local_object_storage/shard/gc.go | 3 + pkg/local_object_storage/shard/inhume.go | 1 + .../shard/metrics_test.go | 94 ++++++++++++++++++- pkg/local_object_storage/shard/put.go | 2 +- pkg/local_object_storage/shard/shard.go | 34 ++++++- pkg/metrics/engine.go | 46 ++++++++- 12 files changed, 250 insertions(+), 54 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index f8d6cc9b4..5f361cefb 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -516,4 +516,5 @@ const ( RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped" FailedToCountWritecacheItems = "failed to count writecache items" AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza" + FailedToGetContainerCounters = "failed to get container counters values" ) diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index fcac2dc60..363d098f6 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -21,6 +21,10 @@ type MetricRegister interface { ClearErrorCounter(shardID string) DeleteShardMetrics(shardID string) + SetContainerObjectCounter(shardID, contID, objectType string, v uint64) + IncContainerObjectCounter(shardID, contID, objectType string) + SubContainerObjectCounter(shardID, contID, objectType string, v uint64) + WriteCache() metrics.WriteCacheMetrics GC() metrics.GCMetrics } diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 4b9d8752a..2e1cc7515 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -74,6 +74,18 @@ func (m *metricsWithID) DeleteShardMetrics() { m.mw.DeleteShardMetrics(m.id) } +func (m *metricsWithID) SetContainerObjectsCount(cnrID string, objectType string, value uint64) { + m.mw.SetContainerObjectCounter(m.id, cnrID, objectType, value) +} + +func (m *metricsWithID) IncContainerObjectsCount(cnrID string, objectType string) { + m.mw.IncContainerObjectCounter(m.id, cnrID, objectType) +} + +func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string, value uint64) { + m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value) +} + // AddShard adds a new shard to the storage engine. // // Returns any error encountered that did not allow adding a shard. diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 653a88ae7..5acd71164 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -31,6 +31,7 @@ type DeleteRes struct { availableRemoved uint64 sizes []uint64 availableSizes []uint64 + removedByCnrID map[cid.ID]ObjectCounters } // AvailableObjectsRemoved returns the number of removed available @@ -39,6 +40,11 @@ func (d DeleteRes) AvailableObjectsRemoved() uint64 { return d.availableRemoved } +// RemovedByCnrID returns the number of removed objects by container ID. +func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters { + return d.removedByCnrID +} + // RawObjectsRemoved returns the number of removed raw objects. func (d DeleteRes) RawObjectsRemoved() uint64 { return d.rawRemoved @@ -96,15 +102,11 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { return DeleteRes{}, ErrReadOnlyMode } - var rawRemoved uint64 - var availableRemoved uint64 var err error - sizes := make([]uint64, len(prm.addrs)) - availableSizes := make([]uint64, len(prm.addrs)) + var res DeleteRes err = db.boltDB.Update(func(tx *bbolt.Tx) error { - // We need to clear slice because tx can try to execute multiple times. - rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs, sizes, availableSizes) + res, err = db.deleteGroup(tx, prm.addrs) return err }) if err == nil { @@ -115,91 +117,83 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { storagelog.OpField("metabase DELETE")) } } - return DeleteRes{ - rawRemoved: rawRemoved, - availableRemoved: availableRemoved, - sizes: sizes, - availableSizes: availableSizes, - }, metaerr.Wrap(err) + return res, metaerr.Wrap(err) } // deleteGroup deletes object from the metabase. Handles removal of the // references of the split objects. -// The first return value is a physical objects removed number: physical -// objects that were stored. The second return value is a logical objects -// removed number: objects that were available (without Tombstones, GCMarks -// non-expired, etc.) -func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64, availableSizes []uint64) (uint64, uint64, error) { +func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) { + res := DeleteRes{ + sizes: make([]uint64, len(addrs)), + availableSizes: make([]uint64, len(addrs)), + removedByCnrID: make(map[cid.ID]ObjectCounters), + } refCounter := make(referenceCounter, len(addrs)) currEpoch := db.epochState.CurrentEpoch() - cnrIDDelta := make(map[cid.ID]ObjectCounters) - var rawDeletedTotal uint64 - var availableDeletedTotal uint64 - for i := range addrs { removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch) if err != nil { - return 0, 0, err // maybe log and continue? + return DeleteRes{}, err // maybe log and continue? } if removed { - if v, ok := cnrIDDelta[addrs[i].Container()]; ok { + if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { v.phy++ - cnrIDDelta[addrs[i].Container()] = v + res.removedByCnrID[addrs[i].Container()] = v } else { - cnrIDDelta[addrs[i].Container()] = ObjectCounters{ + res.removedByCnrID[addrs[i].Container()] = ObjectCounters{ phy: 1, } } - rawDeletedTotal++ - sizes[i] = size + res.rawRemoved++ + res.sizes[i] = size } if available { - if v, ok := cnrIDDelta[addrs[i].Container()]; ok { + if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { v.logic++ - cnrIDDelta[addrs[i].Container()] = v + res.removedByCnrID[addrs[i].Container()] = v } else { - cnrIDDelta[addrs[i].Container()] = ObjectCounters{ + res.removedByCnrID[addrs[i].Container()] = ObjectCounters{ logic: 1, } } - availableDeletedTotal++ - availableSizes[i] = size + res.availableRemoved++ + res.availableSizes[i] = size } } - if rawDeletedTotal > 0 { - err := db.updateShardObjectCounter(tx, phy, rawDeletedTotal, false) + if res.rawRemoved > 0 { + err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false) if err != nil { - return 0, 0, fmt.Errorf("could not decrease phy object counter: %w", err) + return DeleteRes{}, fmt.Errorf("could not decrease phy object counter: %w", err) } } - if availableDeletedTotal > 0 { - err := db.updateShardObjectCounter(tx, logical, availableDeletedTotal, false) + if res.availableRemoved > 0 { + err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false) if err != nil { - return 0, 0, fmt.Errorf("could not decrease logical object counter: %w", err) + return DeleteRes{}, fmt.Errorf("could not decrease logical object counter: %w", err) } } - if err := db.updateContainerCounter(tx, cnrIDDelta, false); err != nil { - return 0, 0, fmt.Errorf("could not decrease container object counter: %w", err) + if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil { + return DeleteRes{}, fmt.Errorf("could not decrease container object counter: %w", err) } for _, refNum := range refCounter { if refNum.cur == refNum.all { err := db.deleteObject(tx, refNum.obj, true) if err != nil { - return rawDeletedTotal, availableDeletedTotal, err // maybe log and continue? + return DeleteRes{}, err // maybe log and continue? } } } - return rawDeletedTotal, availableDeletedTotal, nil + return res, nil } // delete removes object indexes from the metabase. Counts the references diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index e06e57109..cf7921992 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -37,14 +37,21 @@ type DeletionInfo struct { // InhumeRes encapsulates results of Inhume operation. type InhumeRes struct { deletedLockObj []oid.Address - availableImhumed uint64 + availableInhumed uint64 + inhumedByCnrID map[cid.ID]ObjectCounters deletionDetails []DeletionInfo } // AvailableInhumed return number of available object // that have been inhumed. func (i InhumeRes) AvailableInhumed() uint64 { - return i.availableImhumed + return i.availableInhumed +} + +// InhumedByCnrID return number of object +// that have been inhumed by container ID. +func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters { + return i.inhumedByCnrID } // DeletedLockObjects returns deleted object of LOCK @@ -73,7 +80,15 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) { Size: deletedSize, CID: containerID, }) - i.availableImhumed++ + i.availableInhumed++ + if v, ok := i.inhumedByCnrID[containerID]; ok { + v.logic++ + i.inhumedByCnrID[containerID] = v + } else { + i.inhumedByCnrID[containerID] = ObjectCounters{ + logic: 1, + } + } } // SetAddresses sets a list of object addresses that should be inhumed. @@ -123,7 +138,7 @@ var ErrLockObjectRemoval = logicerr.New("lock object removal") // // NOTE: Marks any object with GC mark (despite any prohibitions on operations // with that object) if WithForceGCMark option has been provided. -func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err error) { +func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { var ( startedAt = time.Now() success = false @@ -143,8 +158,11 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err err return InhumeRes{}, ErrReadOnlyMode } + res := InhumeRes{ + inhumedByCnrID: make(map[cid.ID]ObjectCounters), + } currEpoch := db.epochState.CurrentEpoch() - err = db.boltDB.Update(func(tx *bbolt.Tx) error { + err := db.boltDB.Update(func(tx *bbolt.Tx) error { return db.inhumeTx(tx, currEpoch, prm, &res) }) success = err == nil diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index e24bd8541..663d781ad 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -118,6 +118,7 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error } s.decObjectCounterBy(physical, res.RawObjectsRemoved()) s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) + s.decContainerObjectCounter(res.RemovedByCnrID()) removedPayload := res.RemovedPhysicalObjectSizes()[0] logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0] if logicalRemovedPayload > 0 { diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 0ce99b475..346903c5c 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -416,6 +416,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular) s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 for i < res.GetDeletionInfoLength() { @@ -629,6 +630,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone) s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 for i < res.GetDeletionInfoLength() { @@ -675,6 +677,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers [] s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock) s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 for i < res.GetDeletionInfoLength() { diff --git a/pkg/local_object_storage/shard/inhume.go b/pkg/local_object_storage/shard/inhume.go index a5f8960c3..335df82f0 100644 --- a/pkg/local_object_storage/shard/inhume.go +++ b/pkg/local_object_storage/shard/inhume.go @@ -122,6 +122,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { s.m.RUnlock() s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 for i < res.GetDeletionInfoLength() { diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index 7724aa222..1e9bbf9fc 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -14,6 +14,7 @@ import ( meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/stretchr/testify/require" @@ -23,6 +24,7 @@ type metricsStore struct { mtx sync.Mutex objCounters map[string]uint64 cnrSize map[string]int64 + cnrCount map[string]uint64 pldSize int64 mode mode.Mode errCounter int64 @@ -126,6 +128,39 @@ func (m *metricsStore) DeleteShardMetrics() { m.errCounter = 0 } +func (m *metricsStore) SetContainerObjectsCount(cnrID string, objectType string, value uint64) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.cnrCount[cnrID+objectType] = value +} + +func (m *metricsStore) IncContainerObjectsCount(cnrID string, objectType string) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.cnrCount[cnrID+objectType]++ +} + +func (m *metricsStore) SubContainerObjectsCount(cnrID string, objectType string, value uint64) { + m.mtx.Lock() + defer m.mtx.Unlock() + existed := m.cnrCount[cnrID+objectType] + if existed < value { + panic("existed value smaller than value to sustract") + } + if existed == value { + delete(m.cnrCount, cnrID+objectType) + } else { + m.cnrCount[cnrID+objectType] -= value + } +} + +func (m *metricsStore) getContainerCount(cnrID, objectType string) (uint64, bool) { + m.mtx.Lock() + defer m.mtx.Unlock() + v, ok := m.cnrCount[cnrID+objectType] + return v, ok +} + func TestCounters(t *testing.T) { t.Parallel() @@ -143,21 +178,37 @@ func TestCounters(t *testing.T) { oo[i] = testutil.GenerateObject() } + cc := meta.ContainerCounters{Logical: make(map[cid.ID]uint64), Physical: make(map[cid.ID]uint64)} + t.Run("defaults", func(t *testing.T) { require.Zero(t, mm.getObjectCounter(physical)) require.Zero(t, mm.getObjectCounter(logical)) require.Empty(t, mm.containerSizes()) require.Zero(t, mm.payloadSize()) + + for _, obj := range oo { + contID, _ := obj.ContainerID() + v, ok := mm.getContainerCount(contID.EncodeToString(), physical) + require.Zero(t, v) + require.False(t, ok) + v, ok = mm.getContainerCount(contID.EncodeToString(), logical) + require.Zero(t, v) + require.False(t, ok) + } }) var totalPayload int64 expectedLogicalSizes := make(map[string]int64) + expectedLogCC := make(map[cid.ID]uint64) + expectedPhyCC := make(map[cid.ID]uint64) for i := range oo { cnr, _ := oo[i].ContainerID() oSize := int64(oo[i].PayloadSize()) expectedLogicalSizes[cnr.EncodeToString()] += oSize totalPayload += oSize + expectedLogCC[cnr]++ + expectedPhyCC[cnr]++ } var prm PutPrm @@ -174,6 +225,11 @@ func TestCounters(t *testing.T) { require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload, mm.payloadSize()) + cc, err := sh.metaBase.ContainerCounters(context.Background()) + require.NoError(t, err) + require.Equal(t, expectedLogCC, cc.Logical) + require.Equal(t, expectedPhyCC, cc.Physical) + t.Run("inhume_GC", func(t *testing.T) { var prm InhumePrm inhumedNumber := objNumber / 4 @@ -187,6 +243,11 @@ func TestCounters(t *testing.T) { cid, ok := oo[i].ContainerID() require.True(t, ok) expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize()) + + expectedLogCC[cid]-- + if expectedLogCC[cid] == 0 { + delete(expectedLogCC, cid) + } } require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical)) @@ -194,6 +255,11 @@ func TestCounters(t *testing.T) { require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload, mm.payloadSize()) + cc, err := sh.metaBase.ContainerCounters(context.Background()) + require.NoError(t, err) + require.Equal(t, expectedLogCC, cc.Logical) + require.Equal(t, expectedPhyCC, cc.Physical) + oo = oo[inhumedNumber:] }) @@ -214,6 +280,11 @@ func TestCounters(t *testing.T) { cid, ok := oo[i].ContainerID() require.True(t, ok) expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize()) + + expectedLogCC[cid]-- + if expectedLogCC[cid] == 0 { + delete(expectedLogCC, cid) + } } require.Equal(t, phy, mm.getObjectCounter(physical)) @@ -221,6 +292,11 @@ func TestCounters(t *testing.T) { require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload, mm.payloadSize()) + cc, err = sh.metaBase.ContainerCounters(context.Background()) + require.NoError(t, err) + require.Equal(t, expectedLogCC, cc.Logical) + require.Equal(t, expectedPhyCC, cc.Physical) + oo = oo[inhumedNumber:] }) @@ -245,9 +321,24 @@ func TestCounters(t *testing.T) { cnr, _ := oo[i].ContainerID() expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload) + + expectedLogCC[cnr]-- + if expectedLogCC[cnr] == 0 { + delete(expectedLogCC, cnr) + } + + expectedPhyCC[cnr]-- + if expectedPhyCC[cnr] == 0 { + delete(expectedPhyCC, cnr) + } } require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize()) + + cc, err = sh.metaBase.ContainerCounters(context.Background()) + require.NoError(t, err) + require.Equal(t, expectedLogCC, cc.Logical) + require.Equal(t, expectedPhyCC, cc.Physical) }) } @@ -268,7 +359,8 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) { "phy": 0, "logic": 0, }, - cnrSize: make(map[string]int64), + cnrSize: make(map[string]int64), + cnrCount: make(map[string]uint64), } sh := New( diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index 688b7aae7..10983d041 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -90,7 +90,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) { return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err) } - s.incObjectCounter() + s.incObjectCounter(putPrm.Address.Container()) s.addToPayloadSize(int64(prm.obj.PayloadSize())) s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize())) } diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 00f4fbb9e..07b774022 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -18,6 +18,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) @@ -85,6 +86,12 @@ type MetricsWriter interface { ClearErrorCounter() // DeleteShardMetrics deletes shard metrics from registry. DeleteShardMetrics() + // SetContainerObjectsCount sets container object count. + SetContainerObjectsCount(cnrID string, objectType string, value uint64) + // IncContainerObjectsCount increments container object count. + IncContainerObjectsCount(cnrID string, objectType string) + // SubContainerObjectsCount subtracts container object count. + SubContainerObjectsCount(cnrID string, objectType string, value uint64) } type cfg struct { @@ -425,15 +432,29 @@ func (s *Shard) updateMetrics(ctx context.Context) { } s.metricsWriter.AddToPayloadSize(int64(totalPayload)) + + contCount, err := s.metaBase.ContainerCounters(ctx) + if err != nil { + s.log.Warn(logs.FailedToGetContainerCounters, zap.Error(err)) + return + } + for contID, count := range contCount.Physical { + s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), physical, count) + } + for contID, count := range contCount.Logical { + s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), logical, count) + } } } // incObjectCounter increment both physical and logical object // counters. -func (s *Shard) incObjectCounter() { +func (s *Shard) incObjectCounter(cnrID cid.ID) { if s.cfg.metricsWriter != nil { s.cfg.metricsWriter.IncObjectCounter(physical) s.cfg.metricsWriter.IncObjectCounter(logical) + s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical) + s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical) } } @@ -443,6 +464,17 @@ func (s *Shard) decObjectCounterBy(typ string, v uint64) { } } +func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]meta.ObjectCounters) { + if s.cfg.metricsWriter == nil { + return + } + + for cnrID, count := range byCnr { + s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy()) + s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic()) + } +} + func (s *Shard) addToContainerSize(cnr string, size int64) { if s.cfg.metricsWriter != nil { s.cfg.metricsWriter.AddToContainerSize(cnr, size) diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go index 23d799e28..609c30f97 100644 --- a/pkg/metrics/engine.go +++ b/pkg/metrics/engine.go @@ -18,6 +18,9 @@ type EngineMetrics interface { SetObjectCounter(shardID, objectType string, v uint64) AddToPayloadCounter(shardID string, size int64) SetMode(shardID string, mode mode.Mode) + SetContainerObjectCounter(shardID, contID, objectType string, v uint64) + IncContainerObjectCounter(shardID, contID, objectType string) + SubContainerObjectCounter(shardID, contID, objectType string, v uint64) WriteCache() WriteCacheMetrics GC() GCMetrics @@ -30,6 +33,7 @@ type engineMetrics struct { payloadSize *prometheus.GaugeVec errorCounter *prometheus.GaugeVec mode *shardIDModeValue + contObjCounter *prometheus.GaugeVec gc *gcMetrics writeCache *writeCacheMetrics @@ -46,10 +50,13 @@ func newEngineMetrics() *engineMetrics { Name: "request_duration_seconds", Help: "Duration of Engine requests", }, []string{methodLabel}), - objectCounter: newEngineGaugeVector("objects_total", "Objects counters per shards", []string{shardIDLabel, typeLabel}), - gc: newGCMetrics(), - writeCache: newWriteCacheMetrics(), - mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"), + objectCounter: newEngineGaugeVector("objects_total", + "Objects counters per shards. DEPRECATED: Will be deleted in next releasese, use frostfs_node_engine_container_objects_total metric.", + []string{shardIDLabel, typeLabel}), + gc: newGCMetrics(), + writeCache: newWriteCacheMetrics(), + mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"), + contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}), } } @@ -88,6 +95,7 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) { m.errorCounter.Delete(prometheus.Labels{shardIDLabel: shardID}) m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID}) m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) + m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) m.mode.Delete(shardID) } @@ -109,6 +117,36 @@ func (m *engineMetrics) SetObjectCounter(shardID, objectType string, v uint64) { ).Set(float64(v)) } +func (m *engineMetrics) SetContainerObjectCounter(shardID, contID, objectType string, v uint64) { + m.contObjCounter.With( + prometheus.Labels{ + shardIDLabel: shardID, + containerIDLabelKey: contID, + typeLabel: objectType, + }, + ).Set(float64(v)) +} + +func (m *engineMetrics) IncContainerObjectCounter(shardID, contID, objectType string) { + m.contObjCounter.With( + prometheus.Labels{ + shardIDLabel: shardID, + containerIDLabelKey: contID, + typeLabel: objectType, + }, + ).Inc() +} + +func (m *engineMetrics) SubContainerObjectCounter(shardID, contID, objectType string, v uint64) { + m.contObjCounter.With( + prometheus.Labels{ + shardIDLabel: shardID, + containerIDLabelKey: contID, + typeLabel: objectType, + }, + ).Sub(float64(v)) +} + func (m *engineMetrics) SetMode(shardID string, mode mode.Mode) { m.mode.SetMode(shardID, mode.String()) } -- 2.45.3