Add container objects counter #778

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:feat/container_objects_total_metric into master 2024-09-04 19:51:04 +00:00
17 changed files with 765 additions and 83 deletions

View file

@ -516,4 +516,5 @@ const (
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped" RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
FailedToCountWritecacheItems = "failed to count writecache items" FailedToCountWritecacheItems = "failed to count writecache items"
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza" AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
FailedToGetContainerCounters = "failed to get container counters values"
) )

View file

@ -21,6 +21,10 @@ type MetricRegister interface {
ClearErrorCounter(shardID string) ClearErrorCounter(shardID string)
DeleteShardMetrics(shardID string) DeleteShardMetrics(shardID string)
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncContainerObjectCounter(shardID, contID, objectType string)
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
WriteCache() metrics.WriteCacheMetrics WriteCache() metrics.WriteCacheMetrics
GC() metrics.GCMetrics GC() metrics.GCMetrics
} }

View file

@ -74,6 +74,18 @@ func (m *metricsWithID) DeleteShardMetrics() {
m.mw.DeleteShardMetrics(m.id) m.mw.DeleteShardMetrics(m.id)
} }
func (m *metricsWithID) SetContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mw.SetContainerObjectCounter(m.id, cnrID, objectType, value)
}
func (m *metricsWithID) IncContainerObjectsCount(cnrID string, objectType string) {
m.mw.IncContainerObjectCounter(m.id, cnrID, objectType)
}
func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value)
}
// AddShard adds a new shard to the storage engine. // AddShard adds a new shard to the storage engine.
// //
// Returns any error encountered that did not allow adding a shard. // Returns any error encountered that did not allow adding a shard.

View file

@ -104,6 +104,7 @@ func (db *DB) init(reset bool) error {
mStaticBuckets := map[string]struct{}{ mStaticBuckets := map[string]struct{}{
string(containerVolumeBucketName): {}, string(containerVolumeBucketName): {},
string(containerCounterBucketName): {},
string(graveyardBucketName): {}, string(graveyardBucketName): {},
string(toMoveItBucketName): {}, string(toMoveItBucketName): {},
string(garbageBucketName): {}, string(garbageBucketName): {},
@ -135,7 +136,7 @@ func (db *DB) init(reset bool) error {
} }
} }
if !reset { if !reset { // counters will be recalculated by refill metabase
err = syncCounter(tx, false) err = syncCounter(tx, false)
if err != nil { if err != nil {
return fmt.Errorf("could not sync object counter: %w", err) return fmt.Errorf("could not sync object counter: %w", err)

View file

@ -1,10 +1,15 @@
package meta package meta
import ( import (
"bytes"
"context"
"encoding/binary" "encoding/binary"
"errors"
"fmt" "fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
@ -73,9 +78,128 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
return cc, metaerr.Wrap(err) return cc, metaerr.Wrap(err)
} }
// updateCounter updates the object counter. Tx MUST be writable. type ContainerCounters struct {
// If inc == `true`, increases the counter, decreases otherwise. Logical map[cid.ID]uint64
func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error { Physical map[cid.ID]uint64
}
// ContainerCounters returns object counters for each container
// that metabase has tracked since it was opened and initialized.
//
// Returns only the errors that do not allow reading counter
// in Bolt database.
//
// It is guaranteed that the ContainerCounters fields are not nil.
func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("ContainerCounters", time.Since(startedAt), success)
}()
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerCounters")
defer span.End()
cc := ContainerCounters{
Logical: make(map[cid.ID]uint64),
Physical: make(map[cid.ID]uint64),
}
lastKey := make([]byte, cidSize)
// there is no limit for containers count, so use batching with cancellation
for {
select {
case <-ctx.Done():
return cc, ctx.Err()
default:
}
completed, err := db.containerCountersNextBatch(lastKey, &cc)
if err != nil {
return cc, err
}
if completed {
break
}
}
success = true
return cc, nil
}
func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) (bool, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return false, ErrDegradedMode
}
counter := 0
const batchSize = 1000
err := db.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(containerCounterBucketName)
if b == nil {
return ErrInterruptIterator
}
c := b.Cursor()
var key, value []byte
for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() {
if bytes.Equal(lastKey, key) {
continue
}
copy(lastKey, key)
cnrID, err := parseContainerCounterKey(key)
if err != nil {
return err
}
phy, logic, err := parseContainerCounterValue(value)
if err != nil {
return err
}
if phy > 0 {
cc.Physical[cnrID] = phy
}
if logic > 0 {
cc.Logical[cnrID] = logic
}
counter++
if counter == batchSize {
break
}
}
if counter < batchSize { // last batch
return ErrInterruptIterator
}
return nil
})
if err != nil {
if errors.Is(err, ErrInterruptIterator) {
return true, nil
}
return false, metaerr.Wrap(err)
}
return false, nil
}
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID) error {
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
return fmt.Errorf("could not increase phy object counter: %w", err)
}
if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil {
return fmt.Errorf("could not increase logical object counter: %w", err)
}
return db.incContainerObjectCounter(tx, cnrID)
}
func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
b := tx.Bucket(shardInfoBucket) b := tx.Bucket(shardInfoBucket)
if b == nil { if b == nil {
return nil return nil
@ -112,6 +236,63 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool
return b.Put(counterKey, newCounter) return b.Put(counterKey, newCounter)
} }
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error {
b := tx.Bucket(containerCounterBucketName)
if b == nil {
return nil
}
key := make([]byte, cidSize)
for cnrID, cnrDelta := range delta {
cnrID.Encode(key)
if err := db.editContainerCounterValue(b, key, cnrDelta, inc); err != nil {
return err
}
}
return nil
}
fyrchik marked this conversation as resolved Outdated

It is always the same size, we can pre-allocate here.

It is always the same size, we can pre-allocate here.

done

done
func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error {
var phyValue, logicValue uint64
var err error
data := b.Get(key)
if len(data) > 0 {
phyValue, logicValue, err = parseContainerCounterValue(data)
if err != nil {
return err
}
}
phyValue = nextValue(phyValue, delta.phy, inc)
logicValue = nextValue(logicValue, delta.logic, inc)
if phyValue > 0 || logicValue > 0 {
value := containerCounterValue(phyValue, logicValue)
return b.Put(key, value)
}
return b.Delete(key)
}
func nextValue(existed, delta uint64, inc bool) uint64 {
if inc {
existed += delta
} else if existed <= delta {
existed = 0
} else {
existed -= delta
}
return existed
}
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error {
b := tx.Bucket(containerCounterBucketName)
if b == nil {
return nil
}
key := make([]byte, cidSize)
cnrID.Encode(key)
return db.editContainerCounterValue(b, key, ObjectCounters{logic: 1, phy: 1}, true)
}
// syncCounter updates object counters according to metabase state: // syncCounter updates object counters according to metabase state:
// it counts all the physically/logically stored objects using internal // it counts all the physically/logically stored objects using internal
// indexes. Tx MUST be writable. // indexes. Tx MUST be writable.
@ -119,26 +300,38 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool
// Does nothing if counters are not empty and force is false. If force is // Does nothing if counters are not empty and force is false. If force is
// true, updates the counters anyway. // true, updates the counters anyway.
func syncCounter(tx *bbolt.Tx, force bool) error { func syncCounter(tx *bbolt.Tx, force bool) error {
b, err := tx.CreateBucketIfNotExists(shardInfoBucket) shardInfoB, err := tx.CreateBucketIfNotExists(shardInfoBucket)
if err != nil { if err != nil {
return fmt.Errorf("could not get shard info bucket: %w", err) return fmt.Errorf("could not get shard info bucket: %w", err)
} }
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 && len(shardInfoB.Get(objectLogicCounterKey)) == 8
if !force && len(b.Get(objectPhyCounterKey)) == 8 && len(b.Get(objectLogicCounterKey)) == 8 { containerCounterInitialized := tx.Bucket(containerCounterBucketName) != nil
if !force && shardObjectCounterInitialized && containerCounterInitialized {
// the counters are already inited // the counters are already inited
return nil return nil
} }
containerCounterB, err := tx.CreateBucketIfNotExists(containerCounterBucketName)
if err != nil {
return fmt.Errorf("could not get container counter bucket: %w", err)
}
var addr oid.Address var addr oid.Address
var phyCounter uint64 counters := make(map[cid.ID]ObjectCounters)
var logicCounter uint64
graveyardBKT := tx.Bucket(graveyardBucketName) graveyardBKT := tx.Bucket(graveyardBucketName)
garbageBKT := tx.Bucket(garbageBucketName) garbageBKT := tx.Bucket(garbageBucketName)
key := make([]byte, addressKeySize) key := make([]byte, addressKeySize)
err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error { err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error {
phyCounter++ if v, ok := counters[cnr]; ok {
v.phy++
counters[cnr] = v
} else {
counters[cnr] = ObjectCounters{
phy: 1,
}
}
addr.SetContainer(cnr) addr.SetContainer(cnr)
addr.SetObject(obj) addr.SetObject(obj)
@ -146,7 +339,14 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
// check if an object is available: not with GCMark // check if an object is available: not with GCMark
// and not covered with a tombstone // and not covered with a tombstone
if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 { if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 {
logicCounter++ if v, ok := counters[cnr]; ok {
v.logic++
counters[cnr] = v
} else {
counters[cnr] = ObjectCounters{
logic: 1,
}
}
} }
return nil return nil
@ -155,21 +355,65 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
return fmt.Errorf("could not iterate objects: %w", err) return fmt.Errorf("could not iterate objects: %w", err)
} }
data := make([]byte, 8) return setObjectCounters(counters, shardInfoB, containerCounterB)
binary.LittleEndian.PutUint64(data, phyCounter) }
err = b.Put(objectPhyCounterKey, data) func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error {
var phyTotal uint64
var logicTotal uint64
key := make([]byte, cidSize)
for cnrID, count := range counters {
phyTotal += count.phy
logicTotal += count.logic
cnrID.Encode(key)
value := containerCounterValue(count.phy, count.logic)
err := containerCounterB.Put(key, value)
if err != nil {
return fmt.Errorf("could not update phy container object counter: %w", err)
}
}
phyData := make([]byte, 8)
binary.LittleEndian.PutUint64(phyData, phyTotal)
fyrchik marked this conversation as resolved Outdated

This will break if we later have subbuckets here, but I doubt we will.
Anyway, wlll containerCounterInitialized := containerCounterB.Stats().KeyN != 0 suffice?

This will break if we later have subbuckets here, but I doubt we will. Anyway, wlll `containerCounterInitialized := containerCounterB.Stats().KeyN != 0` suffice?

Also, can bucket != nil check serve as a sign that we have initialized everything?

Also, can `bucket != nil` check serve as a sign that we have initialized everything?

Stats() will read all bucket pages, but we need only first.

`Stats()` will read all bucket pages, but we need only first.

Also, can bucket != nil check serve as a sign that we have initialized everything?

Yes, fixed

> Also, can bucket != nil check serve as a sign that we have initialized everything? Yes, fixed
err := shardInfoB.Put(objectPhyCounterKey, phyData)
if err != nil { if err != nil {
return fmt.Errorf("could not update phy object counter: %w", err) return fmt.Errorf("could not update phy object counter: %w", err)
} }
data = make([]byte, 8) logData := make([]byte, 8)
binary.LittleEndian.PutUint64(data, logicCounter) binary.LittleEndian.PutUint64(logData, logicTotal)
err = b.Put(objectLogicCounterKey, data) err = shardInfoB.Put(objectLogicCounterKey, logData)
if err != nil { if err != nil {
return fmt.Errorf("could not update logic object counter: %w", err) return fmt.Errorf("could not update logic object counter: %w", err)
} }
return nil return nil
} }
func containerCounterValue(phy, logic uint64) []byte {
res := make([]byte, 16)
binary.LittleEndian.PutUint64(res, phy)
binary.LittleEndian.PutUint64(res[8:], logic)
return res
}
func parseContainerCounterKey(buf []byte) (cid.ID, error) {
if len(buf) != cidSize {
return cid.ID{}, fmt.Errorf("invalid key length")
}
var cnrID cid.ID
if err := cnrID.Decode(buf); err != nil {
return cid.ID{}, fmt.Errorf("failed to decode container ID: %w", err)
}
return cnrID, nil
}
// parseContainerCounterValue return phy, logic values.
func parseContainerCounterValue(buf []byte) (uint64, uint64, error) {
if len(buf) != 16 {
return 0, 0, fmt.Errorf("invalid value length")
}
return binary.LittleEndian.Uint64(buf), binary.LittleEndian.Uint64(buf[8:]), nil
}

View file

@ -7,6 +7,7 @@ import (
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
@ -25,6 +26,11 @@ func TestCounters(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Zero(t, c.Phy()) require.Zero(t, c.Phy())
require.Zero(t, c.Logic()) require.Zero(t, c.Logic())
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Zero(t, len(cc.Physical))
require.Zero(t, len(cc.Logical))
}) })
t.Run("put", func(t *testing.T) { t.Run("put", func(t *testing.T) {
@ -36,9 +42,14 @@ func TestCounters(t *testing.T) {
} }
var prm meta.PutPrm var prm meta.PutPrm
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
for i := 0; i < objCount; i++ { for i := 0; i < objCount; i++ {
prm.SetObject(oo[i]) prm.SetObject(oo[i])
cnrID, _ := oo[i].ContainerID()
expPhy[cnrID]++
expLog[cnrID]++
_, err := db.Put(context.Background(), prm) _, err := db.Put(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
@ -48,6 +59,12 @@ func TestCounters(t *testing.T) {
require.Equal(t, uint64(i+1), c.Phy()) require.Equal(t, uint64(i+1), c.Phy())
require.Equal(t, uint64(i+1), c.Logic()) require.Equal(t, uint64(i+1), c.Logic())
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
} }
}) })
@ -56,6 +73,14 @@ func TestCounters(t *testing.T) {
db := newDB(t) db := newDB(t)
oo := putObjs(t, db, objCount, false) oo := putObjs(t, db, objCount, false)
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
for _, obj := range oo {
cnrID, _ := obj.ContainerID()
expPhy[cnrID]++
expLog[cnrID]++
}
var prm meta.DeletePrm var prm meta.DeletePrm
for i := objCount - 1; i >= 0; i-- { for i := objCount - 1; i >= 0; i-- {
prm.SetAddresses(objectcore.AddressOf(oo[i])) prm.SetAddresses(objectcore.AddressOf(oo[i]))
@ -69,6 +94,27 @@ func TestCounters(t *testing.T) {
require.Equal(t, uint64(i), c.Phy()) require.Equal(t, uint64(i), c.Phy())
require.Equal(t, uint64(i), c.Logic()) require.Equal(t, uint64(i), c.Logic())
cnrID, _ := oo[i].ContainerID()
if v, ok := expPhy[cnrID]; ok {
if v == 1 {
delete(expPhy, cnrID)
} else {
expPhy[cnrID]--
}
}
if v, ok := expLog[cnrID]; ok {
if v == 1 {
delete(expLog, cnrID)
} else {
expLog[cnrID]--
}
}
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
} }
}) })
@ -77,6 +123,14 @@ func TestCounters(t *testing.T) {
db := newDB(t) db := newDB(t)
oo := putObjs(t, db, objCount, false) oo := putObjs(t, db, objCount, false)
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
for _, obj := range oo {
cnrID, _ := obj.ContainerID()
expPhy[cnrID]++
expLog[cnrID]++
}
inhumedObjs := make([]oid.Address, objCount/2) inhumedObjs := make([]oid.Address, objCount/2)
for i, o := range oo { for i, o := range oo {
@ -87,6 +141,16 @@ func TestCounters(t *testing.T) {
inhumedObjs[i] = objectcore.AddressOf(o) inhumedObjs[i] = objectcore.AddressOf(o)
} }
for _, addr := range inhumedObjs {
if v, ok := expLog[addr.Container()]; ok {
if v == 1 {
delete(expLog, addr.Container())
} else {
expLog[addr.Container()]--
}
}
}
var prm meta.InhumePrm var prm meta.InhumePrm
prm.SetTombstoneAddress(oidtest.Address()) prm.SetTombstoneAddress(oidtest.Address())
prm.SetAddresses(inhumedObjs...) prm.SetAddresses(inhumedObjs...)
@ -100,6 +164,12 @@ func TestCounters(t *testing.T) {
require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount), c.Phy())
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic()) require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
}) })
t.Run("put_split", func(t *testing.T) { t.Run("put_split", func(t *testing.T) {
@ -107,6 +177,9 @@ func TestCounters(t *testing.T) {
db := newDB(t) db := newDB(t)
parObj := testutil.GenerateObject() parObj := testutil.GenerateObject()
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
// put objects and check that parent info // put objects and check that parent info
// does not affect the counter // does not affect the counter
for i := 0; i < objCount; i++ { for i := 0; i < objCount; i++ {
@ -115,12 +188,21 @@ func TestCounters(t *testing.T) {
o.SetParent(parObj) o.SetParent(parObj)
} }
cnrID, _ := o.ContainerID()
expLog[cnrID]++
expPhy[cnrID]++
require.NoError(t, putBig(db, o)) require.NoError(t, putBig(db, o))
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(i+1), c.Phy()) require.Equal(t, uint64(i+1), c.Phy())
require.Equal(t, uint64(i+1), c.Logic()) require.Equal(t, uint64(i+1), c.Logic())
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
} }
}) })
@ -129,16 +211,40 @@ func TestCounters(t *testing.T) {
db := newDB(t) db := newDB(t)
oo := putObjs(t, db, objCount, true) oo := putObjs(t, db, objCount, true)
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
for _, obj := range oo {
cnrID, _ := obj.ContainerID()
expPhy[cnrID]++
expLog[cnrID]++
}
// delete objects that have parent info // delete objects that have parent info
// and check that it does not affect // and check that it does not affect
// the counter // the counter
for i, o := range oo { for i, o := range oo {
require.NoError(t, metaDelete(db, objectcore.AddressOf(o))) addr := objectcore.AddressOf(o)
require.NoError(t, metaDelete(db, addr))
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(objCount-i-1), c.Phy()) require.Equal(t, uint64(objCount-i-1), c.Phy())
require.Equal(t, uint64(objCount-i-1), c.Logic()) require.Equal(t, uint64(objCount-i-1), c.Logic())
if v, ok := expPhy[addr.Container()]; ok {
if v == 1 {
delete(expPhy, addr.Container())
} else {
expPhy[addr.Container()]--
}
}
if v, ok := expLog[addr.Container()]; ok {
if v == 1 {
delete(expLog, addr.Container())
} else {
expLog[addr.Container()]--
}
}
} }
}) })
@ -147,6 +253,14 @@ func TestCounters(t *testing.T) {
db := newDB(t) db := newDB(t)
oo := putObjs(t, db, objCount, true) oo := putObjs(t, db, objCount, true)
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
for _, obj := range oo {
cnrID, _ := obj.ContainerID()
expPhy[cnrID]++
expLog[cnrID]++
}
inhumedObjs := make([]oid.Address, objCount/2) inhumedObjs := make([]oid.Address, objCount/2)
for i, o := range oo { for i, o := range oo {
@ -157,6 +271,16 @@ func TestCounters(t *testing.T) {
inhumedObjs[i] = objectcore.AddressOf(o) inhumedObjs[i] = objectcore.AddressOf(o)
} }
for _, addr := range inhumedObjs {
if v, ok := expLog[addr.Container()]; ok {
if v == 1 {
delete(expLog, addr.Container())
} else {
expLog[addr.Container()]--
}
}
}
var prm meta.InhumePrm var prm meta.InhumePrm
prm.SetTombstoneAddress(oidtest.Address()) prm.SetTombstoneAddress(oidtest.Address())
prm.SetAddresses(inhumedObjs...) prm.SetAddresses(inhumedObjs...)
@ -169,6 +293,12 @@ func TestCounters(t *testing.T) {
require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount), c.Phy())
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic()) require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
}) })
} }
@ -190,6 +320,13 @@ func TestCounters_Expired(t *testing.T) {
oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1) oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1)
} }
expPhy := make(map[cid.ID]uint64)
expLog := make(map[cid.ID]uint64)
for _, addr := range oo {
expPhy[addr.Container()]++
expLog[addr.Container()]++
}
// 1. objects are available and counters are correct // 1. objects are available and counters are correct
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
@ -197,6 +334,12 @@ func TestCounters_Expired(t *testing.T) {
require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount), c.Phy())
require.Equal(t, uint64(objCount), c.Logic()) require.Equal(t, uint64(objCount), c.Logic())
cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
for _, o := range oo { for _, o := range oo {
_, err := metaGet(db, o, true) _, err := metaGet(db, o, true)
require.NoError(t, err) require.NoError(t, err)
@ -212,6 +355,12 @@ func TestCounters_Expired(t *testing.T) {
require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount), c.Phy())
require.Equal(t, uint64(objCount), c.Logic()) require.Equal(t, uint64(objCount), c.Logic())
cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
for _, o := range oo { for _, o := range oo {
_, err := metaGet(db, o, true) _, err := metaGet(db, o, true)
require.ErrorIs(t, err, meta.ErrObjectIsExpired) require.ErrorIs(t, err, meta.ErrObjectIsExpired)
@ -235,6 +384,20 @@ func TestCounters_Expired(t *testing.T) {
require.Equal(t, uint64(len(oo)), c.Phy()) require.Equal(t, uint64(len(oo)), c.Phy())
require.Equal(t, uint64(len(oo)-1), c.Logic()) require.Equal(t, uint64(len(oo)-1), c.Logic())
if v, ok := expLog[oo[0].Container()]; ok {
if v == 1 {
delete(expLog, oo[0].Container())
} else {
expLog[oo[0].Container()]--
}
}
cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
// 4. `Delete` an object with GCMark should decrease the // 4. `Delete` an object with GCMark should decrease the
// phy counter but does not affect the logic counter (after // phy counter but does not affect the logic counter (after
// that step they should be equal) // that step they should be equal)
@ -246,6 +409,14 @@ func TestCounters_Expired(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Zero(t, deleteRes.AvailableObjectsRemoved()) require.Zero(t, deleteRes.AvailableObjectsRemoved())
if v, ok := expPhy[oo[0].Container()]; ok {
if v == 1 {
delete(expPhy, oo[0].Container())
} else {
expPhy[oo[0].Container()]--
}
}
oo = oo[1:] oo = oo[1:]
c, err = db.ObjectCounters() c, err = db.ObjectCounters()
@ -253,6 +424,12 @@ func TestCounters_Expired(t *testing.T) {
require.Equal(t, uint64(len(oo)), c.Phy()) require.Equal(t, uint64(len(oo)), c.Phy())
require.Equal(t, uint64(len(oo)), c.Logic()) require.Equal(t, uint64(len(oo)), c.Logic())
cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
// 5 `Delete` an expired object (like it would the control // 5 `Delete` an expired object (like it would the control
// service do) should decrease both counters despite the // service do) should decrease both counters despite the
// expiration fact // expiration fact
@ -263,12 +440,34 @@ func TestCounters_Expired(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved()) require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
if v, ok := expLog[oo[0].Container()]; ok {
if v == 1 {
delete(expLog, oo[0].Container())
} else {
expLog[oo[0].Container()]--
}
}
if v, ok := expPhy[oo[0].Container()]; ok {
if v == 1 {
delete(expPhy, oo[0].Container())
} else {
expPhy[oo[0].Container()]--
}
}
oo = oo[1:] oo = oo[1:]
c, err = db.ObjectCounters() c, err = db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(len(oo)), c.Phy()) require.Equal(t, uint64(len(oo)), c.Phy())
require.Equal(t, uint64(len(oo)), c.Logic()) require.Equal(t, uint64(len(oo)), c.Logic())
cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical)
require.Equal(t, expLog, cc.Logical)
} }
func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object { func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object {

View file

@ -12,6 +12,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
@ -30,6 +31,7 @@ type DeleteRes struct {
availableRemoved uint64 availableRemoved uint64
sizes []uint64 sizes []uint64
availableSizes []uint64 availableSizes []uint64
removedByCnrID map[cid.ID]ObjectCounters
} }
// AvailableObjectsRemoved returns the number of removed available // AvailableObjectsRemoved returns the number of removed available
@ -38,6 +40,11 @@ func (d DeleteRes) AvailableObjectsRemoved() uint64 {
return d.availableRemoved return d.availableRemoved
} }
// RemovedByCnrID returns the number of removed objects by container ID.
func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
return d.removedByCnrID
}
// RawObjectsRemoved returns the number of removed raw objects. // RawObjectsRemoved returns the number of removed raw objects.
func (d DeleteRes) RawObjectsRemoved() uint64 { func (d DeleteRes) RawObjectsRemoved() uint64 {
return d.rawRemoved return d.rawRemoved
@ -95,15 +102,11 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
return DeleteRes{}, ErrReadOnlyMode return DeleteRes{}, ErrReadOnlyMode
} }
var rawRemoved uint64
var availableRemoved uint64
var err error var err error
sizes := make([]uint64, len(prm.addrs)) var res DeleteRes
availableSizes := make([]uint64, len(prm.addrs))
err = db.boltDB.Update(func(tx *bbolt.Tx) error { err = db.boltDB.Update(func(tx *bbolt.Tx) error {
// We need to clear slice because tx can try to execute multiple times. res, err = db.deleteGroup(tx, prm.addrs)
rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs, sizes, availableSizes)
return err return err
}) })
if err == nil { if err == nil {
@ -114,68 +117,83 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
storagelog.OpField("metabase DELETE")) storagelog.OpField("metabase DELETE"))
} }
} }
return DeleteRes{ return res, metaerr.Wrap(err)
rawRemoved: rawRemoved,
availableRemoved: availableRemoved,
sizes: sizes,
availableSizes: availableSizes,
}, metaerr.Wrap(err)
} }
// deleteGroup deletes object from the metabase. Handles removal of the // deleteGroup deletes object from the metabase. Handles removal of the
// references of the split objects. // references of the split objects.
// The first return value is a physical objects removed number: physical func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
// objects that were stored. The second return value is a logical objects res := DeleteRes{
// removed number: objects that were available (without Tombstones, GCMarks sizes: make([]uint64, len(addrs)),
// non-expired, etc.) availableSizes: make([]uint64, len(addrs)),
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64, availableSizes []uint64) (uint64, uint64, error) { removedByCnrID: make(map[cid.ID]ObjectCounters),
}
refCounter := make(referenceCounter, len(addrs)) refCounter := make(referenceCounter, len(addrs))
currEpoch := db.epochState.CurrentEpoch() currEpoch := db.epochState.CurrentEpoch()
var rawDeleted uint64
var availableDeleted uint64
for i := range addrs { for i := range addrs {
removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch) removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch)
if err != nil { if err != nil {
return 0, 0, err // maybe log and continue? return DeleteRes{}, err // maybe log and continue?
} }
if removed { if removed {
rawDeleted++ if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
sizes[i] = size v.phy++
res.removedByCnrID[addrs[i].Container()] = v
} else {
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
phy: 1,
}
}
res.rawRemoved++
res.sizes[i] = size
} }
if available { if available {
availableDeleted++ if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
availableSizes[i] = size v.logic++
res.removedByCnrID[addrs[i].Container()] = v
} else {
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
logic: 1,
} }
} }
if rawDeleted > 0 { res.availableRemoved++
err := db.updateCounter(tx, phy, rawDeleted, false) res.availableSizes[i] = size
if err != nil {
return 0, 0, fmt.Errorf("could not decrease phy object counter: %w", err)
} }
} }
if availableDeleted > 0 { if res.rawRemoved > 0 {
err := db.updateCounter(tx, logical, availableDeleted, false) err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false)
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("could not decrease logical object counter: %w", err) return DeleteRes{}, fmt.Errorf("could not decrease phy object counter: %w", err)
} }
} }
if res.availableRemoved > 0 {
err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false)
if err != nil {
return DeleteRes{}, fmt.Errorf("could not decrease logical object counter: %w", err)
}
}
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
return DeleteRes{}, fmt.Errorf("could not decrease container object counter: %w", err)
}
for _, refNum := range refCounter { for _, refNum := range refCounter {
if refNum.cur == refNum.all { if refNum.cur == refNum.all {
err := db.deleteObject(tx, refNum.obj, true) err := db.deleteObject(tx, refNum.obj, true)
if err != nil { if err != nil {
return rawDeleted, availableDeleted, err // maybe log and continue? return DeleteRes{}, err // maybe log and continue?
} }
} }
} }
return rawDeleted, availableDeleted, nil return res, nil
} }
// delete removes object indexes from the metabase. Counts the references // delete removes object indexes from the metabase. Counts the references

View file

@ -37,14 +37,21 @@ type DeletionInfo struct {
// InhumeRes encapsulates results of Inhume operation. // InhumeRes encapsulates results of Inhume operation.
type InhumeRes struct { type InhumeRes struct {
deletedLockObj []oid.Address deletedLockObj []oid.Address
availableImhumed uint64 availableInhumed uint64
inhumedByCnrID map[cid.ID]ObjectCounters
deletionDetails []DeletionInfo deletionDetails []DeletionInfo
} }
// AvailableInhumed return number of available object // AvailableInhumed return number of available object
// that have been inhumed. // that have been inhumed.
func (i InhumeRes) AvailableInhumed() uint64 { func (i InhumeRes) AvailableInhumed() uint64 {
return i.availableImhumed return i.availableInhumed
}
// InhumedByCnrID return number of object
// that have been inhumed by container ID.
func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters {
return i.inhumedByCnrID
} }
// DeletedLockObjects returns deleted object of LOCK // DeletedLockObjects returns deleted object of LOCK
@ -73,6 +80,15 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) {
Size: deletedSize, Size: deletedSize,
CID: containerID, CID: containerID,
}) })
i.availableInhumed++
if v, ok := i.inhumedByCnrID[containerID]; ok {
v.logic++
i.inhumedByCnrID[containerID] = v
} else {
i.inhumedByCnrID[containerID] = ObjectCounters{
logic: 1,
}
}
} }
// SetAddresses sets a list of object addresses that should be inhumed. // SetAddresses sets a list of object addresses that should be inhumed.
@ -122,7 +138,7 @@ var ErrLockObjectRemoval = logicerr.New("lock object removal")
// //
// NOTE: Marks any object with GC mark (despite any prohibitions on operations // NOTE: Marks any object with GC mark (despite any prohibitions on operations
// with that object) if WithForceGCMark option has been provided. // with that object) if WithForceGCMark option has been provided.
func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err error) { func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
var ( var (
startedAt = time.Now() startedAt = time.Now()
success = false success = false
@ -142,8 +158,11 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err err
return InhumeRes{}, ErrReadOnlyMode return InhumeRes{}, ErrReadOnlyMode
} }
res := InhumeRes{
inhumedByCnrID: make(map[cid.ID]ObjectCounters),
}
currEpoch := db.epochState.CurrentEpoch() currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.Update(func(tx *bbolt.Tx) error { err := db.boltDB.Update(func(tx *bbolt.Tx) error {
return db.inhumeTx(tx, currEpoch, prm, &res) return db.inhumeTx(tx, currEpoch, prm, &res)
}) })
success = err == nil success = err == nil
@ -224,7 +243,27 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
} }
} }
return db.updateCounter(tx, logical, res.availableImhumed, false) return db.applyInhumeResToCounters(tx, res)
}
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
var inhumedCount uint64
inhumedbyCnr := make(map[cid.ID]ObjectCounters)
for _, dd := range res.deletionDetails {
if v, ok := inhumedbyCnr[dd.CID]; ok {
v.logic++
inhumedbyCnr[dd.CID] = v
} else {
inhumedbyCnr[dd.CID] = ObjectCounters{logic: 1}
}
inhumedCount++
}
if err := db.updateShardObjectCounter(tx, logical, inhumedCount, false); err != nil {
return err
}
return db.updateContainerCounter(tx, inhumedbyCnr, false)
} }
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket. // getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
@ -279,7 +318,6 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error { func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
containerID, _ := obj.ContainerID() containerID, _ := obj.ContainerID()
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 { if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
res.availableImhumed++
res.storeDeletionInfo(containerID, obj.PayloadSize()) res.storeDeletionInfo(containerID, obj.PayloadSize())
} }

View file

@ -183,16 +183,8 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
} }
if !isParent { if !isParent {
err = db.updateCounter(tx, phy, 1, true) if err = db.incCounters(tx, cnr); err != nil {
fyrchik marked this conversation as resolved Outdated

Can we avoid creating a map here?

Can we avoid creating a map here?

done

done
if err != nil { return err
return fmt.Errorf("could not increase phy object counter: %w", err)
}
// it is expected that putting an unavailable object is
// impossible and should be handled on the higher levels
err = db.updateCounter(tx, logical, 1, true)
if err != nil {
return fmt.Errorf("could not increase logical object counter: %w", err)
} }
} }

View file

@ -22,6 +22,7 @@ var (
garbageBucketName = []byte{garbagePrefix} garbageBucketName = []byte{garbagePrefix}
toMoveItBucketName = []byte{toMoveItPrefix} toMoveItBucketName = []byte{toMoveItPrefix}
containerVolumeBucketName = []byte{containerVolumePrefix} containerVolumeBucketName = []byte{containerVolumePrefix}
containerCounterBucketName = []byte{containerCountersPrefix}
zeroValue = []byte{0xFF} zeroValue = []byte{0xFF}
) )
@ -111,6 +112,11 @@ const (
// Key: split ID // Key: split ID
// Value: list of object IDs // Value: list of object IDs
splitPrefix splitPrefix
// containerCountersPrefix is used for storing container object counters.
// Key: container ID + type
// Value: container size in bytes as little-endian uint64
containerCountersPrefix
) )
const ( const (

View file

@ -118,6 +118,7 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error
} }
s.decObjectCounterBy(physical, res.RawObjectsRemoved()) s.decObjectCounterBy(physical, res.RawObjectsRemoved())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
s.decContainerObjectCounter(res.RemovedByCnrID())
removedPayload := res.RemovedPhysicalObjectSizes()[0] removedPayload := res.RemovedPhysicalObjectSizes()[0]
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0] logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]
if logicalRemovedPayload > 0 { if logicalRemovedPayload > 0 {

View file

@ -416,6 +416,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular) s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0 i := 0
for i < res.GetDeletionInfoLength() { for i < res.GetDeletionInfoLength() {
@ -629,6 +630,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone) s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0 i := 0
for i < res.GetDeletionInfoLength() { for i < res.GetDeletionInfoLength() {
@ -675,6 +677,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock) s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0 i := 0
for i < res.GetDeletionInfoLength() { for i < res.GetDeletionInfoLength() {

View file

@ -122,6 +122,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
s.m.RUnlock() s.m.RUnlock()
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0 i := 0
for i < res.GetDeletionInfoLength() { for i < res.GetDeletionInfoLength() {

View file

@ -14,6 +14,7 @@ import (
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -23,6 +24,7 @@ type metricsStore struct {
mtx sync.Mutex mtx sync.Mutex
objCounters map[string]uint64 objCounters map[string]uint64
cnrSize map[string]int64 cnrSize map[string]int64
cnrCount map[string]uint64
pldSize int64 pldSize int64
mode mode.Mode mode mode.Mode
errCounter int64 errCounter int64
@ -126,6 +128,39 @@ func (m *metricsStore) DeleteShardMetrics() {
m.errCounter = 0 m.errCounter = 0
} }
func (m *metricsStore) SetContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.cnrCount[cnrID+objectType] = value
}
func (m *metricsStore) IncContainerObjectsCount(cnrID string, objectType string) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.cnrCount[cnrID+objectType]++
}
func (m *metricsStore) SubContainerObjectsCount(cnrID string, objectType string, value uint64) {
m.mtx.Lock()
defer m.mtx.Unlock()
existed := m.cnrCount[cnrID+objectType]
if existed < value {
panic("existed value smaller than value to sustract")
}
if existed == value {
delete(m.cnrCount, cnrID+objectType)
} else {
m.cnrCount[cnrID+objectType] -= value
}
}
func (m *metricsStore) getContainerCount(cnrID, objectType string) (uint64, bool) {
m.mtx.Lock()
defer m.mtx.Unlock()
v, ok := m.cnrCount[cnrID+objectType]
return v, ok
}
func TestCounters(t *testing.T) { func TestCounters(t *testing.T) {
t.Parallel() t.Parallel()
@ -143,21 +178,37 @@ func TestCounters(t *testing.T) {
oo[i] = testutil.GenerateObject() oo[i] = testutil.GenerateObject()
} }
cc := meta.ContainerCounters{Logical: make(map[cid.ID]uint64), Physical: make(map[cid.ID]uint64)}
t.Run("defaults", func(t *testing.T) { t.Run("defaults", func(t *testing.T) {
require.Zero(t, mm.getObjectCounter(physical)) require.Zero(t, mm.getObjectCounter(physical))
require.Zero(t, mm.getObjectCounter(logical)) require.Zero(t, mm.getObjectCounter(logical))
require.Empty(t, mm.containerSizes()) require.Empty(t, mm.containerSizes())
require.Zero(t, mm.payloadSize()) require.Zero(t, mm.payloadSize())
for _, obj := range oo {
contID, _ := obj.ContainerID()
v, ok := mm.getContainerCount(contID.EncodeToString(), physical)
require.Zero(t, v)
require.False(t, ok)
v, ok = mm.getContainerCount(contID.EncodeToString(), logical)
require.Zero(t, v)
require.False(t, ok)
}
}) })
var totalPayload int64 var totalPayload int64
expectedLogicalSizes := make(map[string]int64) expectedLogicalSizes := make(map[string]int64)
expectedLogCC := make(map[cid.ID]uint64)
expectedPhyCC := make(map[cid.ID]uint64)
for i := range oo { for i := range oo {
cnr, _ := oo[i].ContainerID() cnr, _ := oo[i].ContainerID()
oSize := int64(oo[i].PayloadSize()) oSize := int64(oo[i].PayloadSize())
expectedLogicalSizes[cnr.EncodeToString()] += oSize expectedLogicalSizes[cnr.EncodeToString()] += oSize
totalPayload += oSize totalPayload += oSize
expectedLogCC[cnr]++
expectedPhyCC[cnr]++
} }
var prm PutPrm var prm PutPrm
@ -174,6 +225,11 @@ func TestCounters(t *testing.T) {
require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize()) require.Equal(t, totalPayload, mm.payloadSize())
cc, err := sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical)
require.Equal(t, expectedPhyCC, cc.Physical)
t.Run("inhume_GC", func(t *testing.T) { t.Run("inhume_GC", func(t *testing.T) {
var prm InhumePrm var prm InhumePrm
inhumedNumber := objNumber / 4 inhumedNumber := objNumber / 4
@ -187,6 +243,11 @@ func TestCounters(t *testing.T) {
cid, ok := oo[i].ContainerID() cid, ok := oo[i].ContainerID()
require.True(t, ok) require.True(t, ok)
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize()) expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
expectedLogCC[cid]--
if expectedLogCC[cid] == 0 {
delete(expectedLogCC, cid)
}
} }
require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical)) require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical))
@ -194,6 +255,11 @@ func TestCounters(t *testing.T) {
require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize()) require.Equal(t, totalPayload, mm.payloadSize())
fyrchik marked this conversation as resolved Outdated

How about mm.checkContainerCount(t, cnr, logical, v) or require.Equal(t, expV, mm.getContainerCount(t, ...)?
To make the function smaller

How about `mm.checkContainerCount(t, cnr, logical, v)` or `require.Equal(t, expV, mm.getContainerCount(t, ...)`? To make the function smaller

Done, thx

Done, thx
cc, err := sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical)
require.Equal(t, expectedPhyCC, cc.Physical)
oo = oo[inhumedNumber:] oo = oo[inhumedNumber:]
}) })
@ -214,6 +280,11 @@ func TestCounters(t *testing.T) {
cid, ok := oo[i].ContainerID() cid, ok := oo[i].ContainerID()
require.True(t, ok) require.True(t, ok)
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize()) expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
expectedLogCC[cid]--
if expectedLogCC[cid] == 0 {
delete(expectedLogCC, cid)
}
} }
require.Equal(t, phy, mm.getObjectCounter(physical)) require.Equal(t, phy, mm.getObjectCounter(physical))
@ -221,6 +292,11 @@ func TestCounters(t *testing.T) {
require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize()) require.Equal(t, totalPayload, mm.payloadSize())
cc, err = sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical)
require.Equal(t, expectedPhyCC, cc.Physical)
oo = oo[inhumedNumber:] oo = oo[inhumedNumber:]
}) })
@ -245,9 +321,24 @@ func TestCounters(t *testing.T) {
cnr, _ := oo[i].ContainerID() cnr, _ := oo[i].ContainerID()
expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload) expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload)
expectedLogCC[cnr]--
if expectedLogCC[cnr] == 0 {
delete(expectedLogCC, cnr)
}
expectedPhyCC[cnr]--
if expectedPhyCC[cnr] == 0 {
delete(expectedPhyCC, cnr)
}
} }
require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize()) require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize())
cc, err = sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical)
require.Equal(t, expectedPhyCC, cc.Physical)
}) })
} }
@ -269,6 +360,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
"logic": 0, "logic": 0,
}, },
cnrSize: make(map[string]int64), cnrSize: make(map[string]int64),
cnrCount: make(map[string]uint64),
} }
sh := New( sh := New(

View file

@ -90,7 +90,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err) return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
} }
s.incObjectCounter() s.incObjectCounter(putPrm.Address.Container())
s.addToPayloadSize(int64(prm.obj.PayloadSize())) s.addToPayloadSize(int64(prm.obj.PayloadSize()))
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize())) s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
} }

View file

@ -18,6 +18,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -85,6 +86,12 @@ type MetricsWriter interface {
ClearErrorCounter() ClearErrorCounter()
// DeleteShardMetrics deletes shard metrics from registry. // DeleteShardMetrics deletes shard metrics from registry.
DeleteShardMetrics() DeleteShardMetrics()
// SetContainerObjectsCount sets container object count.
SetContainerObjectsCount(cnrID string, objectType string, value uint64)
// IncContainerObjectsCount increments container object count.
IncContainerObjectsCount(cnrID string, objectType string)
// SubContainerObjectsCount subtracts container object count.
SubContainerObjectsCount(cnrID string, objectType string, value uint64)
} }
type cfg struct { type cfg struct {
@ -425,15 +432,29 @@ func (s *Shard) updateMetrics(ctx context.Context) {
} }
s.metricsWriter.AddToPayloadSize(int64(totalPayload)) s.metricsWriter.AddToPayloadSize(int64(totalPayload))
contCount, err := s.metaBase.ContainerCounters(ctx)
if err != nil {
s.log.Warn(logs.FailedToGetContainerCounters, zap.Error(err))
return
}
for contID, count := range contCount.Physical {
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), physical, count)
}
for contID, count := range contCount.Logical {
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), logical, count)
}
} }
} }
// incObjectCounter increment both physical and logical object // incObjectCounter increment both physical and logical object
// counters. // counters.
func (s *Shard) incObjectCounter() { func (s *Shard) incObjectCounter(cnrID cid.ID) {
if s.cfg.metricsWriter != nil { if s.cfg.metricsWriter != nil {
s.cfg.metricsWriter.IncObjectCounter(physical) s.cfg.metricsWriter.IncObjectCounter(physical)
s.cfg.metricsWriter.IncObjectCounter(logical) s.cfg.metricsWriter.IncObjectCounter(logical)
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical)
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical)
} }
} }
@ -443,6 +464,17 @@ func (s *Shard) decObjectCounterBy(typ string, v uint64) {
} }
} }
func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]meta.ObjectCounters) {
if s.cfg.metricsWriter == nil {
return
}
for cnrID, count := range byCnr {
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy())
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic())
}
}
func (s *Shard) addToContainerSize(cnr string, size int64) { func (s *Shard) addToContainerSize(cnr string, size int64) {
if s.cfg.metricsWriter != nil { if s.cfg.metricsWriter != nil {
s.cfg.metricsWriter.AddToContainerSize(cnr, size) s.cfg.metricsWriter.AddToContainerSize(cnr, size)

View file

@ -18,6 +18,9 @@ type EngineMetrics interface {
SetObjectCounter(shardID, objectType string, v uint64) SetObjectCounter(shardID, objectType string, v uint64)
AddToPayloadCounter(shardID string, size int64) AddToPayloadCounter(shardID string, size int64)
SetMode(shardID string, mode mode.Mode) SetMode(shardID string, mode mode.Mode)
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncContainerObjectCounter(shardID, contID, objectType string)
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
WriteCache() WriteCacheMetrics WriteCache() WriteCacheMetrics
GC() GCMetrics GC() GCMetrics
@ -30,6 +33,7 @@ type engineMetrics struct {
payloadSize *prometheus.GaugeVec payloadSize *prometheus.GaugeVec
errorCounter *prometheus.GaugeVec errorCounter *prometheus.GaugeVec
mode *shardIDModeValue mode *shardIDModeValue
contObjCounter *prometheus.GaugeVec
gc *gcMetrics gc *gcMetrics
writeCache *writeCacheMetrics writeCache *writeCacheMetrics
@ -46,10 +50,13 @@ func newEngineMetrics() *engineMetrics {
Name: "request_duration_seconds", Name: "request_duration_seconds",
Help: "Duration of Engine requests", Help: "Duration of Engine requests",
}, []string{methodLabel}), }, []string{methodLabel}),
objectCounter: newEngineGaugeVector("objects_total", "Objects counters per shards", []string{shardIDLabel, typeLabel}), objectCounter: newEngineGaugeVector("objects_total",
fyrchik marked this conversation as resolved Outdated

Why is it deprecated? It just has different usage: we count object in the container to calculate quotas, we count objects per shards to calculate the amount of space.

Why is it deprecated? It just has different usage: we count object in the container to calculate quotas, we count objects per shards to calculate the amount of space.

Deprecated metric (objects per shard) can be calculated from new one (objects per shard per container) with PromQL query. So it looks redundant.

Deprecated metric (objects per shard) can be calculated from new one (objects per shard per container) with PromQL query. So it looks redundant.
"Objects counters per shards. DEPRECATED: Will be deleted in next releasese, use frostfs_node_engine_container_objects_total metric.",
[]string{shardIDLabel, typeLabel}),
gc: newGCMetrics(), gc: newGCMetrics(),
writeCache: newWriteCacheMetrics(), writeCache: newWriteCacheMetrics(),
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"), mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}),
} }
} }
@ -88,6 +95,7 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
m.errorCounter.Delete(prometheus.Labels{shardIDLabel: shardID}) m.errorCounter.Delete(prometheus.Labels{shardIDLabel: shardID})
m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID}) m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID})
m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.mode.Delete(shardID) m.mode.Delete(shardID)
} }
@ -109,6 +117,36 @@ func (m *engineMetrics) SetObjectCounter(shardID, objectType string, v uint64) {
).Set(float64(v)) ).Set(float64(v))
} }
func (m *engineMetrics) SetContainerObjectCounter(shardID, contID, objectType string, v uint64) {
m.contObjCounter.With(
prometheus.Labels{
shardIDLabel: shardID,
containerIDLabelKey: contID,
typeLabel: objectType,
},
).Set(float64(v))
}
func (m *engineMetrics) IncContainerObjectCounter(shardID, contID, objectType string) {
m.contObjCounter.With(
prometheus.Labels{
shardIDLabel: shardID,
containerIDLabelKey: contID,
typeLabel: objectType,
},
).Inc()
}
func (m *engineMetrics) SubContainerObjectCounter(shardID, contID, objectType string, v uint64) {
m.contObjCounter.With(
prometheus.Labels{
shardIDLabel: shardID,
containerIDLabelKey: contID,
typeLabel: objectType,
},
).Sub(float64(v))
}
func (m *engineMetrics) SetMode(shardID string, mode mode.Mode) { func (m *engineMetrics) SetMode(shardID string, mode mode.Mode) {
m.mode.SetMode(shardID, mode.String()) m.mode.SetMode(shardID, mode.String())
} }