Add container objects counter #778
|
@ -516,4 +516,5 @@ const (
|
|||
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
||||
FailedToCountWritecacheItems = "failed to count writecache items"
|
||||
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
|
||||
FailedToGetContainerCounters = "failed to get container counters values"
|
||||
)
|
||||
|
|
|
@ -21,6 +21,10 @@ type MetricRegister interface {
|
|||
ClearErrorCounter(shardID string)
|
||||
DeleteShardMetrics(shardID string)
|
||||
|
||||
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||
IncContainerObjectCounter(shardID, contID, objectType string)
|
||||
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||
|
||||
WriteCache() metrics.WriteCacheMetrics
|
||||
GC() metrics.GCMetrics
|
||||
}
|
||||
|
|
|
@ -74,6 +74,18 @@ func (m *metricsWithID) DeleteShardMetrics() {
|
|||
m.mw.DeleteShardMetrics(m.id)
|
||||
}
|
||||
|
||||
func (m *metricsWithID) SetContainerObjectsCount(cnrID string, objectType string, value uint64) {
|
||||
m.mw.SetContainerObjectCounter(m.id, cnrID, objectType, value)
|
||||
}
|
||||
|
||||
func (m *metricsWithID) IncContainerObjectsCount(cnrID string, objectType string) {
|
||||
m.mw.IncContainerObjectCounter(m.id, cnrID, objectType)
|
||||
}
|
||||
|
||||
func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string, value uint64) {
|
||||
m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value)
|
||||
}
|
||||
|
||||
// AddShard adds a new shard to the storage engine.
|
||||
//
|
||||
// Returns any error encountered that did not allow adding a shard.
|
||||
|
|
|
@ -104,6 +104,7 @@ func (db *DB) init(reset bool) error {
|
|||
|
||||
mStaticBuckets := map[string]struct{}{
|
||||
string(containerVolumeBucketName): {},
|
||||
string(containerCounterBucketName): {},
|
||||
string(graveyardBucketName): {},
|
||||
string(toMoveItBucketName): {},
|
||||
string(garbageBucketName): {},
|
||||
|
@ -135,7 +136,7 @@ func (db *DB) init(reset bool) error {
|
|||
}
|
||||
}
|
||||
|
||||
if !reset {
|
||||
if !reset { // counters will be recalculated by refill metabase
|
||||
err = syncCounter(tx, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not sync object counter: %w", err)
|
||||
|
|
|
@ -1,10 +1,15 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
|
@ -73,9 +78,128 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
|
|||
return cc, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// updateCounter updates the object counter. Tx MUST be writable.
|
||||
// If inc == `true`, increases the counter, decreases otherwise.
|
||||
func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
|
||||
type ContainerCounters struct {
|
||||
Logical map[cid.ID]uint64
|
||||
Physical map[cid.ID]uint64
|
||||
}
|
||||
|
||||
// ContainerCounters returns object counters for each container
|
||||
// that metabase has tracked since it was opened and initialized.
|
||||
//
|
||||
// Returns only the errors that do not allow reading counter
|
||||
// in Bolt database.
|
||||
//
|
||||
// It is guaranteed that the ContainerCounters fields are not nil.
|
||||
func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("ContainerCounters", time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerCounters")
|
||||
defer span.End()
|
||||
|
||||
cc := ContainerCounters{
|
||||
Logical: make(map[cid.ID]uint64),
|
||||
Physical: make(map[cid.ID]uint64),
|
||||
}
|
||||
|
||||
lastKey := make([]byte, cidSize)
|
||||
|
||||
// there is no limit for containers count, so use batching with cancellation
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return cc, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
completed, err := db.containerCountersNextBatch(lastKey, &cc)
|
||||
if err != nil {
|
||||
return cc, err
|
||||
}
|
||||
if completed {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
success = true
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) (bool, error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return false, ErrDegradedMode
|
||||
}
|
||||
|
||||
counter := 0
|
||||
const batchSize = 1000
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
if b == nil {
|
||||
return ErrInterruptIterator
|
||||
}
|
||||
c := b.Cursor()
|
||||
var key, value []byte
|
||||
for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() {
|
||||
if bytes.Equal(lastKey, key) {
|
||||
continue
|
||||
}
|
||||
copy(lastKey, key)
|
||||
|
||||
cnrID, err := parseContainerCounterKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
phy, logic, err := parseContainerCounterValue(value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if phy > 0 {
|
||||
cc.Physical[cnrID] = phy
|
||||
}
|
||||
if logic > 0 {
|
||||
cc.Logical[cnrID] = logic
|
||||
}
|
||||
|
||||
counter++
|
||||
if counter == batchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if counter < batchSize { // last batch
|
||||
return ErrInterruptIterator
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrInterruptIterator) {
|
||||
return true, nil
|
||||
}
|
||||
return false, metaerr.Wrap(err)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID) error {
|
||||
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
|
||||
return fmt.Errorf("could not increase phy object counter: %w", err)
|
||||
}
|
||||
if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil {
|
||||
return fmt.Errorf("could not increase logical object counter: %w", err)
|
||||
}
|
||||
return db.incContainerObjectCounter(tx, cnrID)
|
||||
}
|
||||
|
||||
func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b == nil {
|
||||
return nil
|
||||
|
@ -112,6 +236,63 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool
|
|||
return b.Put(counterKey, newCounter)
|
||||
}
|
||||
|
||||
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
key := make([]byte, cidSize)
|
||||
for cnrID, cnrDelta := range delta {
|
||||
cnrID.Encode(key)
|
||||
if err := db.editContainerCounterValue(b, key, cnrDelta, inc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
|
||||
func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error {
|
||||
var phyValue, logicValue uint64
|
||||
var err error
|
||||
data := b.Get(key)
|
||||
if len(data) > 0 {
|
||||
phyValue, logicValue, err = parseContainerCounterValue(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
phyValue = nextValue(phyValue, delta.phy, inc)
|
||||
logicValue = nextValue(logicValue, delta.logic, inc)
|
||||
if phyValue > 0 || logicValue > 0 {
|
||||
value := containerCounterValue(phyValue, logicValue)
|
||||
return b.Put(key, value)
|
||||
}
|
||||
return b.Delete(key)
|
||||
}
|
||||
|
||||
func nextValue(existed, delta uint64, inc bool) uint64 {
|
||||
if inc {
|
||||
existed += delta
|
||||
} else if existed <= delta {
|
||||
existed = 0
|
||||
} else {
|
||||
existed -= delta
|
||||
}
|
||||
return existed
|
||||
}
|
||||
|
||||
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
key := make([]byte, cidSize)
|
||||
cnrID.Encode(key)
|
||||
return db.editContainerCounterValue(b, key, ObjectCounters{logic: 1, phy: 1}, true)
|
||||
}
|
||||
|
||||
// syncCounter updates object counters according to metabase state:
|
||||
// it counts all the physically/logically stored objects using internal
|
||||
// indexes. Tx MUST be writable.
|
||||
|
@ -119,26 +300,38 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool
|
|||
// Does nothing if counters are not empty and force is false. If force is
|
||||
// true, updates the counters anyway.
|
||||
func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||
shardInfoB, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get shard info bucket: %w", err)
|
||||
}
|
||||
|
||||
if !force && len(b.Get(objectPhyCounterKey)) == 8 && len(b.Get(objectLogicCounterKey)) == 8 {
|
||||
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 && len(shardInfoB.Get(objectLogicCounterKey)) == 8
|
||||
containerCounterInitialized := tx.Bucket(containerCounterBucketName) != nil
|
||||
if !force && shardObjectCounterInitialized && containerCounterInitialized {
|
||||
// the counters are already inited
|
||||
return nil
|
||||
}
|
||||
|
||||
containerCounterB, err := tx.CreateBucketIfNotExists(containerCounterBucketName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get container counter bucket: %w", err)
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
var phyCounter uint64
|
||||
var logicCounter uint64
|
||||
counters := make(map[cid.ID]ObjectCounters)
|
||||
|
||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
key := make([]byte, addressKeySize)
|
||||
|
||||
err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error {
|
||||
phyCounter++
|
||||
if v, ok := counters[cnr]; ok {
|
||||
v.phy++
|
||||
counters[cnr] = v
|
||||
} else {
|
||||
counters[cnr] = ObjectCounters{
|
||||
phy: 1,
|
||||
}
|
||||
}
|
||||
|
||||
addr.SetContainer(cnr)
|
||||
addr.SetObject(obj)
|
||||
|
@ -146,7 +339,14 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
|||
// check if an object is available: not with GCMark
|
||||
// and not covered with a tombstone
|
||||
if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 {
|
||||
logicCounter++
|
||||
if v, ok := counters[cnr]; ok {
|
||||
v.logic++
|
||||
counters[cnr] = v
|
||||
} else {
|
||||
counters[cnr] = ObjectCounters{
|
||||
logic: 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -155,21 +355,65 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
|||
return fmt.Errorf("could not iterate objects: %w", err)
|
||||
}
|
||||
|
||||
data := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(data, phyCounter)
|
||||
return setObjectCounters(counters, shardInfoB, containerCounterB)
|
||||
}
|
||||
|
||||
err = b.Put(objectPhyCounterKey, data)
|
||||
func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error {
|
||||
var phyTotal uint64
|
||||
var logicTotal uint64
|
||||
key := make([]byte, cidSize)
|
||||
for cnrID, count := range counters {
|
||||
phyTotal += count.phy
|
||||
logicTotal += count.logic
|
||||
|
||||
cnrID.Encode(key)
|
||||
value := containerCounterValue(count.phy, count.logic)
|
||||
err := containerCounterB.Put(key, value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update phy container object counter: %w", err)
|
||||
}
|
||||
}
|
||||
phyData := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(phyData, phyTotal)
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
This will break if we later have subbuckets here, but I doubt we will. This will break if we later have subbuckets here, but I doubt we will.
Anyway, wlll `containerCounterInitialized := containerCounterB.Stats().KeyN != 0` suffice?
fyrchik
commented
Also, can Also, can `bucket != nil` check serve as a sign that we have initialized everything?
dstepanov-yadro
commented
`Stats()` will read all bucket pages, but we need only first.
dstepanov-yadro
commented
Yes, fixed > Also, can bucket != nil check serve as a sign that we have initialized everything?
Yes, fixed
|
||||
|
||||
err := shardInfoB.Put(objectPhyCounterKey, phyData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update phy object counter: %w", err)
|
||||
}
|
||||
|
||||
data = make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(data, logicCounter)
|
||||
logData := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(logData, logicTotal)
|
||||
|
||||
err = b.Put(objectLogicCounterKey, data)
|
||||
err = shardInfoB.Put(objectLogicCounterKey, logData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update logic object counter: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func containerCounterValue(phy, logic uint64) []byte {
|
||||
res := make([]byte, 16)
|
||||
binary.LittleEndian.PutUint64(res, phy)
|
||||
binary.LittleEndian.PutUint64(res[8:], logic)
|
||||
return res
|
||||
}
|
||||
|
||||
func parseContainerCounterKey(buf []byte) (cid.ID, error) {
|
||||
if len(buf) != cidSize {
|
||||
return cid.ID{}, fmt.Errorf("invalid key length")
|
||||
}
|
||||
var cnrID cid.ID
|
||||
if err := cnrID.Decode(buf); err != nil {
|
||||
return cid.ID{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||
}
|
||||
return cnrID, nil
|
||||
}
|
||||
|
||||
// parseContainerCounterValue return phy, logic values.
|
||||
func parseContainerCounterValue(buf []byte) (uint64, uint64, error) {
|
||||
if len(buf) != 16 {
|
||||
return 0, 0, fmt.Errorf("invalid value length")
|
||||
}
|
||||
return binary.LittleEndian.Uint64(buf), binary.LittleEndian.Uint64(buf[8:]), nil
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
|
@ -25,6 +26,11 @@ func TestCounters(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Zero(t, c.Phy())
|
||||
require.Zero(t, c.Logic())
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, len(cc.Physical))
|
||||
require.Zero(t, len(cc.Logical))
|
||||
})
|
||||
|
||||
t.Run("put", func(t *testing.T) {
|
||||
|
@ -36,9 +42,14 @@ func TestCounters(t *testing.T) {
|
|||
}
|
||||
|
||||
var prm meta.PutPrm
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
|
||||
for i := 0; i < objCount; i++ {
|
||||
prm.SetObject(oo[i])
|
||||
cnrID, _ := oo[i].ContainerID()
|
||||
expPhy[cnrID]++
|
||||
expLog[cnrID]++
|
||||
|
||||
_, err := db.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
@ -48,6 +59,12 @@ func TestCounters(t *testing.T) {
|
|||
|
||||
require.Equal(t, uint64(i+1), c.Phy())
|
||||
require.Equal(t, uint64(i+1), c.Logic())
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -56,6 +73,14 @@ func TestCounters(t *testing.T) {
|
|||
db := newDB(t)
|
||||
oo := putObjs(t, db, objCount, false)
|
||||
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
for _, obj := range oo {
|
||||
cnrID, _ := obj.ContainerID()
|
||||
expPhy[cnrID]++
|
||||
expLog[cnrID]++
|
||||
}
|
||||
|
||||
var prm meta.DeletePrm
|
||||
for i := objCount - 1; i >= 0; i-- {
|
||||
prm.SetAddresses(objectcore.AddressOf(oo[i]))
|
||||
|
@ -69,6 +94,27 @@ func TestCounters(t *testing.T) {
|
|||
|
||||
require.Equal(t, uint64(i), c.Phy())
|
||||
require.Equal(t, uint64(i), c.Logic())
|
||||
|
||||
cnrID, _ := oo[i].ContainerID()
|
||||
if v, ok := expPhy[cnrID]; ok {
|
||||
if v == 1 {
|
||||
delete(expPhy, cnrID)
|
||||
} else {
|
||||
expPhy[cnrID]--
|
||||
}
|
||||
}
|
||||
if v, ok := expLog[cnrID]; ok {
|
||||
if v == 1 {
|
||||
delete(expLog, cnrID)
|
||||
} else {
|
||||
expLog[cnrID]--
|
||||
}
|
||||
}
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -77,6 +123,14 @@ func TestCounters(t *testing.T) {
|
|||
db := newDB(t)
|
||||
oo := putObjs(t, db, objCount, false)
|
||||
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
for _, obj := range oo {
|
||||
cnrID, _ := obj.ContainerID()
|
||||
expPhy[cnrID]++
|
||||
expLog[cnrID]++
|
||||
}
|
||||
|
||||
inhumedObjs := make([]oid.Address, objCount/2)
|
||||
|
||||
for i, o := range oo {
|
||||
|
@ -87,6 +141,16 @@ func TestCounters(t *testing.T) {
|
|||
inhumedObjs[i] = objectcore.AddressOf(o)
|
||||
}
|
||||
|
||||
for _, addr := range inhumedObjs {
|
||||
if v, ok := expLog[addr.Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expLog, addr.Container())
|
||||
} else {
|
||||
expLog[addr.Container()]--
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var prm meta.InhumePrm
|
||||
prm.SetTombstoneAddress(oidtest.Address())
|
||||
prm.SetAddresses(inhumedObjs...)
|
||||
|
@ -100,6 +164,12 @@ func TestCounters(t *testing.T) {
|
|||
|
||||
require.Equal(t, uint64(objCount), c.Phy())
|
||||
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
})
|
||||
|
||||
t.Run("put_split", func(t *testing.T) {
|
||||
|
@ -107,6 +177,9 @@ func TestCounters(t *testing.T) {
|
|||
db := newDB(t)
|
||||
parObj := testutil.GenerateObject()
|
||||
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
|
||||
// put objects and check that parent info
|
||||
// does not affect the counter
|
||||
for i := 0; i < objCount; i++ {
|
||||
|
@ -115,12 +188,21 @@ func TestCounters(t *testing.T) {
|
|||
o.SetParent(parObj)
|
||||
}
|
||||
|
||||
cnrID, _ := o.ContainerID()
|
||||
expLog[cnrID]++
|
||||
expPhy[cnrID]++
|
||||
|
||||
require.NoError(t, putBig(db, o))
|
||||
|
||||
c, err := db.ObjectCounters()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(i+1), c.Phy())
|
||||
require.Equal(t, uint64(i+1), c.Logic())
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -129,16 +211,40 @@ func TestCounters(t *testing.T) {
|
|||
db := newDB(t)
|
||||
oo := putObjs(t, db, objCount, true)
|
||||
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
for _, obj := range oo {
|
||||
cnrID, _ := obj.ContainerID()
|
||||
expPhy[cnrID]++
|
||||
expLog[cnrID]++
|
||||
}
|
||||
|
||||
// delete objects that have parent info
|
||||
// and check that it does not affect
|
||||
// the counter
|
||||
for i, o := range oo {
|
||||
require.NoError(t, metaDelete(db, objectcore.AddressOf(o)))
|
||||
addr := objectcore.AddressOf(o)
|
||||
require.NoError(t, metaDelete(db, addr))
|
||||
|
||||
c, err := db.ObjectCounters()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(objCount-i-1), c.Phy())
|
||||
require.Equal(t, uint64(objCount-i-1), c.Logic())
|
||||
|
||||
if v, ok := expPhy[addr.Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expPhy, addr.Container())
|
||||
} else {
|
||||
expPhy[addr.Container()]--
|
||||
}
|
||||
}
|
||||
if v, ok := expLog[addr.Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expLog, addr.Container())
|
||||
} else {
|
||||
expLog[addr.Container()]--
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -147,6 +253,14 @@ func TestCounters(t *testing.T) {
|
|||
db := newDB(t)
|
||||
oo := putObjs(t, db, objCount, true)
|
||||
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
for _, obj := range oo {
|
||||
cnrID, _ := obj.ContainerID()
|
||||
expPhy[cnrID]++
|
||||
expLog[cnrID]++
|
||||
}
|
||||
|
||||
inhumedObjs := make([]oid.Address, objCount/2)
|
||||
|
||||
for i, o := range oo {
|
||||
|
@ -157,6 +271,16 @@ func TestCounters(t *testing.T) {
|
|||
inhumedObjs[i] = objectcore.AddressOf(o)
|
||||
}
|
||||
|
||||
for _, addr := range inhumedObjs {
|
||||
if v, ok := expLog[addr.Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expLog, addr.Container())
|
||||
} else {
|
||||
expLog[addr.Container()]--
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var prm meta.InhumePrm
|
||||
prm.SetTombstoneAddress(oidtest.Address())
|
||||
prm.SetAddresses(inhumedObjs...)
|
||||
|
@ -169,6 +293,12 @@ func TestCounters(t *testing.T) {
|
|||
|
||||
require.Equal(t, uint64(objCount), c.Phy())
|
||||
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -190,6 +320,13 @@ func TestCounters_Expired(t *testing.T) {
|
|||
oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1)
|
||||
}
|
||||
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
for _, addr := range oo {
|
||||
expPhy[addr.Container()]++
|
||||
expLog[addr.Container()]++
|
||||
}
|
||||
|
||||
// 1. objects are available and counters are correct
|
||||
|
||||
c, err := db.ObjectCounters()
|
||||
|
@ -197,6 +334,12 @@ func TestCounters_Expired(t *testing.T) {
|
|||
require.Equal(t, uint64(objCount), c.Phy())
|
||||
require.Equal(t, uint64(objCount), c.Logic())
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
|
||||
for _, o := range oo {
|
||||
_, err := metaGet(db, o, true)
|
||||
require.NoError(t, err)
|
||||
|
@ -212,6 +355,12 @@ func TestCounters_Expired(t *testing.T) {
|
|||
require.Equal(t, uint64(objCount), c.Phy())
|
||||
require.Equal(t, uint64(objCount), c.Logic())
|
||||
|
||||
cc, err = db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
|
||||
for _, o := range oo {
|
||||
_, err := metaGet(db, o, true)
|
||||
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
|
||||
|
@ -235,6 +384,20 @@ func TestCounters_Expired(t *testing.T) {
|
|||
require.Equal(t, uint64(len(oo)), c.Phy())
|
||||
require.Equal(t, uint64(len(oo)-1), c.Logic())
|
||||
|
||||
if v, ok := expLog[oo[0].Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expLog, oo[0].Container())
|
||||
} else {
|
||||
expLog[oo[0].Container()]--
|
||||
}
|
||||
}
|
||||
|
||||
cc, err = db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
|
||||
// 4. `Delete` an object with GCMark should decrease the
|
||||
// phy counter but does not affect the logic counter (after
|
||||
// that step they should be equal)
|
||||
|
@ -246,6 +409,14 @@ func TestCounters_Expired(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Zero(t, deleteRes.AvailableObjectsRemoved())
|
||||
|
||||
if v, ok := expPhy[oo[0].Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expPhy, oo[0].Container())
|
||||
} else {
|
||||
expPhy[oo[0].Container()]--
|
||||
}
|
||||
}
|
||||
|
||||
oo = oo[1:]
|
||||
|
||||
c, err = db.ObjectCounters()
|
||||
|
@ -253,6 +424,12 @@ func TestCounters_Expired(t *testing.T) {
|
|||
require.Equal(t, uint64(len(oo)), c.Phy())
|
||||
require.Equal(t, uint64(len(oo)), c.Logic())
|
||||
|
||||
cc, err = db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
|
||||
// 5 `Delete` an expired object (like it would the control
|
||||
// service do) should decrease both counters despite the
|
||||
// expiration fact
|
||||
|
@ -263,12 +440,34 @@ func TestCounters_Expired(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
|
||||
|
||||
if v, ok := expLog[oo[0].Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expLog, oo[0].Container())
|
||||
} else {
|
||||
expLog[oo[0].Container()]--
|
||||
}
|
||||
}
|
||||
|
||||
if v, ok := expPhy[oo[0].Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expPhy, oo[0].Container())
|
||||
} else {
|
||||
expPhy[oo[0].Container()]--
|
||||
}
|
||||
}
|
||||
|
||||
oo = oo[1:]
|
||||
|
||||
c, err = db.ObjectCounters()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(len(oo)), c.Phy())
|
||||
require.Equal(t, uint64(len(oo)), c.Logic())
|
||||
|
||||
cc, err = db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
}
|
||||
|
||||
func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object {
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
|
@ -30,6 +31,7 @@ type DeleteRes struct {
|
|||
availableRemoved uint64
|
||||
sizes []uint64
|
||||
availableSizes []uint64
|
||||
removedByCnrID map[cid.ID]ObjectCounters
|
||||
}
|
||||
|
||||
// AvailableObjectsRemoved returns the number of removed available
|
||||
|
@ -38,6 +40,11 @@ func (d DeleteRes) AvailableObjectsRemoved() uint64 {
|
|||
return d.availableRemoved
|
||||
}
|
||||
|
||||
// RemovedByCnrID returns the number of removed objects by container ID.
|
||||
func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
|
||||
return d.removedByCnrID
|
||||
}
|
||||
|
||||
// RawObjectsRemoved returns the number of removed raw objects.
|
||||
func (d DeleteRes) RawObjectsRemoved() uint64 {
|
||||
return d.rawRemoved
|
||||
|
@ -95,15 +102,11 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
|||
return DeleteRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
var rawRemoved uint64
|
||||
var availableRemoved uint64
|
||||
var err error
|
||||
sizes := make([]uint64, len(prm.addrs))
|
||||
availableSizes := make([]uint64, len(prm.addrs))
|
||||
var res DeleteRes
|
||||
|
||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
// We need to clear slice because tx can try to execute multiple times.
|
||||
rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs, sizes, availableSizes)
|
||||
res, err = db.deleteGroup(tx, prm.addrs)
|
||||
return err
|
||||
})
|
||||
if err == nil {
|
||||
|
@ -114,68 +117,83 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
|||
storagelog.OpField("metabase DELETE"))
|
||||
}
|
||||
}
|
||||
return DeleteRes{
|
||||
rawRemoved: rawRemoved,
|
||||
availableRemoved: availableRemoved,
|
||||
sizes: sizes,
|
||||
availableSizes: availableSizes,
|
||||
}, metaerr.Wrap(err)
|
||||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// deleteGroup deletes object from the metabase. Handles removal of the
|
||||
// references of the split objects.
|
||||
// The first return value is a physical objects removed number: physical
|
||||
// objects that were stored. The second return value is a logical objects
|
||||
// removed number: objects that were available (without Tombstones, GCMarks
|
||||
// non-expired, etc.)
|
||||
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64, availableSizes []uint64) (uint64, uint64, error) {
|
||||
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
|
||||
res := DeleteRes{
|
||||
sizes: make([]uint64, len(addrs)),
|
||||
availableSizes: make([]uint64, len(addrs)),
|
||||
removedByCnrID: make(map[cid.ID]ObjectCounters),
|
||||
}
|
||||
refCounter := make(referenceCounter, len(addrs))
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
var rawDeleted uint64
|
||||
var availableDeleted uint64
|
||||
|
||||
for i := range addrs {
|
||||
removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch)
|
||||
if err != nil {
|
||||
return 0, 0, err // maybe log and continue?
|
||||
return DeleteRes{}, err // maybe log and continue?
|
||||
}
|
||||
|
||||
if removed {
|
||||
rawDeleted++
|
||||
sizes[i] = size
|
||||
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
||||
v.phy++
|
||||
res.removedByCnrID[addrs[i].Container()] = v
|
||||
} else {
|
||||
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
|
||||
phy: 1,
|
||||
}
|
||||
}
|
||||
|
||||
res.rawRemoved++
|
||||
res.sizes[i] = size
|
||||
}
|
||||
|
||||
if available {
|
||||
availableDeleted++
|
||||
availableSizes[i] = size
|
||||
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
||||
v.logic++
|
||||
res.removedByCnrID[addrs[i].Container()] = v
|
||||
} else {
|
||||
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
|
||||
logic: 1,
|
||||
}
|
||||
}
|
||||
|
||||
if rawDeleted > 0 {
|
||||
err := db.updateCounter(tx, phy, rawDeleted, false)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("could not decrease phy object counter: %w", err)
|
||||
res.availableRemoved++
|
||||
res.availableSizes[i] = size
|
||||
}
|
||||
}
|
||||
|
||||
if availableDeleted > 0 {
|
||||
err := db.updateCounter(tx, logical, availableDeleted, false)
|
||||
if res.rawRemoved > 0 {
|
||||
err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("could not decrease logical object counter: %w", err)
|
||||
return DeleteRes{}, fmt.Errorf("could not decrease phy object counter: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if res.availableRemoved > 0 {
|
||||
err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false)
|
||||
if err != nil {
|
||||
return DeleteRes{}, fmt.Errorf("could not decrease logical object counter: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
|
||||
return DeleteRes{}, fmt.Errorf("could not decrease container object counter: %w", err)
|
||||
}
|
||||
|
||||
for _, refNum := range refCounter {
|
||||
if refNum.cur == refNum.all {
|
||||
err := db.deleteObject(tx, refNum.obj, true)
|
||||
if err != nil {
|
||||
return rawDeleted, availableDeleted, err // maybe log and continue?
|
||||
return DeleteRes{}, err // maybe log and continue?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rawDeleted, availableDeleted, nil
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// delete removes object indexes from the metabase. Counts the references
|
||||
|
|
|
@ -37,14 +37,21 @@ type DeletionInfo struct {
|
|||
// InhumeRes encapsulates results of Inhume operation.
|
||||
type InhumeRes struct {
|
||||
deletedLockObj []oid.Address
|
||||
availableImhumed uint64
|
||||
availableInhumed uint64
|
||||
inhumedByCnrID map[cid.ID]ObjectCounters
|
||||
deletionDetails []DeletionInfo
|
||||
}
|
||||
|
||||
// AvailableInhumed return number of available object
|
||||
// that have been inhumed.
|
||||
func (i InhumeRes) AvailableInhumed() uint64 {
|
||||
return i.availableImhumed
|
||||
return i.availableInhumed
|
||||
}
|
||||
|
||||
// InhumedByCnrID return number of object
|
||||
// that have been inhumed by container ID.
|
||||
func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters {
|
||||
return i.inhumedByCnrID
|
||||
}
|
||||
|
||||
// DeletedLockObjects returns deleted object of LOCK
|
||||
|
@ -73,6 +80,15 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) {
|
|||
Size: deletedSize,
|
||||
CID: containerID,
|
||||
})
|
||||
i.availableInhumed++
|
||||
if v, ok := i.inhumedByCnrID[containerID]; ok {
|
||||
v.logic++
|
||||
i.inhumedByCnrID[containerID] = v
|
||||
} else {
|
||||
i.inhumedByCnrID[containerID] = ObjectCounters{
|
||||
logic: 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SetAddresses sets a list of object addresses that should be inhumed.
|
||||
|
@ -122,7 +138,7 @@ var ErrLockObjectRemoval = logicerr.New("lock object removal")
|
|||
//
|
||||
// NOTE: Marks any object with GC mark (despite any prohibitions on operations
|
||||
// with that object) if WithForceGCMark option has been provided.
|
||||
func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err error) {
|
||||
func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
|
@ -142,8 +158,11 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err err
|
|||
return InhumeRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
res := InhumeRes{
|
||||
inhumedByCnrID: make(map[cid.ID]ObjectCounters),
|
||||
}
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return db.inhumeTx(tx, currEpoch, prm, &res)
|
||||
})
|
||||
success = err == nil
|
||||
|
@ -224,7 +243,27 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
}
|
||||
}
|
||||
|
||||
return db.updateCounter(tx, logical, res.availableImhumed, false)
|
||||
return db.applyInhumeResToCounters(tx, res)
|
||||
}
|
||||
|
||||
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||
var inhumedCount uint64
|
||||
inhumedbyCnr := make(map[cid.ID]ObjectCounters)
|
||||
for _, dd := range res.deletionDetails {
|
||||
if v, ok := inhumedbyCnr[dd.CID]; ok {
|
||||
v.logic++
|
||||
inhumedbyCnr[dd.CID] = v
|
||||
} else {
|
||||
inhumedbyCnr[dd.CID] = ObjectCounters{logic: 1}
|
||||
}
|
||||
inhumedCount++
|
||||
}
|
||||
|
||||
if err := db.updateShardObjectCounter(tx, logical, inhumedCount, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return db.updateContainerCounter(tx, inhumedbyCnr, false)
|
||||
}
|
||||
|
||||
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
|
||||
|
@ -279,7 +318,6 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool
|
|||
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
|
||||
containerID, _ := obj.ContainerID()
|
||||
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
|
||||
res.availableImhumed++
|
||||
res.storeDeletionInfo(containerID, obj.PayloadSize())
|
||||
}
|
||||
|
||||
|
|
|
@ -183,16 +183,8 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
|||
}
|
||||
|
||||
if !isParent {
|
||||
err = db.updateCounter(tx, phy, 1, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not increase phy object counter: %w", err)
|
||||
}
|
||||
|
||||
// it is expected that putting an unavailable object is
|
||||
// impossible and should be handled on the higher levels
|
||||
err = db.updateCounter(tx, logical, 1, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not increase logical object counter: %w", err)
|
||||
if err = db.incCounters(tx, cnr); err != nil {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Can we avoid creating a map here? Can we avoid creating a map here?
dstepanov-yadro
commented
done done
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ var (
|
|||
garbageBucketName = []byte{garbagePrefix}
|
||||
toMoveItBucketName = []byte{toMoveItPrefix}
|
||||
containerVolumeBucketName = []byte{containerVolumePrefix}
|
||||
containerCounterBucketName = []byte{containerCountersPrefix}
|
||||
|
||||
zeroValue = []byte{0xFF}
|
||||
)
|
||||
|
@ -111,6 +112,11 @@ const (
|
|||
// Key: split ID
|
||||
// Value: list of object IDs
|
||||
splitPrefix
|
||||
|
||||
// containerCountersPrefix is used for storing container object counters.
|
||||
// Key: container ID + type
|
||||
// Value: container size in bytes as little-endian uint64
|
||||
containerCountersPrefix
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
@ -118,6 +118,7 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error
|
|||
}
|
||||
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
|
||||
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
|
||||
s.decContainerObjectCounter(res.RemovedByCnrID())
|
||||
removedPayload := res.RemovedPhysicalObjectSizes()[0]
|
||||
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]
|
||||
if logicalRemovedPayload > 0 {
|
||||
|
|
|
@ -416,6 +416,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
|
|||
|
||||
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular)
|
||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||
|
||||
i := 0
|
||||
for i < res.GetDeletionInfoLength() {
|
||||
|
@ -629,6 +630,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
|
|||
|
||||
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone)
|
||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||
|
||||
i := 0
|
||||
for i < res.GetDeletionInfoLength() {
|
||||
|
@ -675,6 +677,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
|
|||
|
||||
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock)
|
||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||
|
||||
i := 0
|
||||
for i < res.GetDeletionInfoLength() {
|
||||
|
|
|
@ -122,6 +122,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
|||
s.m.RUnlock()
|
||||
|
||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||
|
||||
i := 0
|
||||
for i < res.GetDeletionInfoLength() {
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -23,6 +24,7 @@ type metricsStore struct {
|
|||
mtx sync.Mutex
|
||||
objCounters map[string]uint64
|
||||
cnrSize map[string]int64
|
||||
cnrCount map[string]uint64
|
||||
pldSize int64
|
||||
mode mode.Mode
|
||||
errCounter int64
|
||||
|
@ -126,6 +128,39 @@ func (m *metricsStore) DeleteShardMetrics() {
|
|||
m.errCounter = 0
|
||||
}
|
||||
|
||||
func (m *metricsStore) SetContainerObjectsCount(cnrID string, objectType string, value uint64) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
m.cnrCount[cnrID+objectType] = value
|
||||
}
|
||||
|
||||
func (m *metricsStore) IncContainerObjectsCount(cnrID string, objectType string) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
m.cnrCount[cnrID+objectType]++
|
||||
}
|
||||
|
||||
func (m *metricsStore) SubContainerObjectsCount(cnrID string, objectType string, value uint64) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
existed := m.cnrCount[cnrID+objectType]
|
||||
if existed < value {
|
||||
panic("existed value smaller than value to sustract")
|
||||
}
|
||||
if existed == value {
|
||||
delete(m.cnrCount, cnrID+objectType)
|
||||
} else {
|
||||
m.cnrCount[cnrID+objectType] -= value
|
||||
}
|
||||
}
|
||||
|
||||
func (m *metricsStore) getContainerCount(cnrID, objectType string) (uint64, bool) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
v, ok := m.cnrCount[cnrID+objectType]
|
||||
return v, ok
|
||||
}
|
||||
|
||||
func TestCounters(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -143,21 +178,37 @@ func TestCounters(t *testing.T) {
|
|||
oo[i] = testutil.GenerateObject()
|
||||
}
|
||||
|
||||
cc := meta.ContainerCounters{Logical: make(map[cid.ID]uint64), Physical: make(map[cid.ID]uint64)}
|
||||
|
||||
t.Run("defaults", func(t *testing.T) {
|
||||
require.Zero(t, mm.getObjectCounter(physical))
|
||||
require.Zero(t, mm.getObjectCounter(logical))
|
||||
require.Empty(t, mm.containerSizes())
|
||||
require.Zero(t, mm.payloadSize())
|
||||
|
||||
for _, obj := range oo {
|
||||
contID, _ := obj.ContainerID()
|
||||
v, ok := mm.getContainerCount(contID.EncodeToString(), physical)
|
||||
require.Zero(t, v)
|
||||
require.False(t, ok)
|
||||
v, ok = mm.getContainerCount(contID.EncodeToString(), logical)
|
||||
require.Zero(t, v)
|
||||
require.False(t, ok)
|
||||
}
|
||||
})
|
||||
|
||||
var totalPayload int64
|
||||
|
||||
expectedLogicalSizes := make(map[string]int64)
|
||||
expectedLogCC := make(map[cid.ID]uint64)
|
||||
expectedPhyCC := make(map[cid.ID]uint64)
|
||||
for i := range oo {
|
||||
cnr, _ := oo[i].ContainerID()
|
||||
oSize := int64(oo[i].PayloadSize())
|
||||
expectedLogicalSizes[cnr.EncodeToString()] += oSize
|
||||
totalPayload += oSize
|
||||
expectedLogCC[cnr]++
|
||||
expectedPhyCC[cnr]++
|
||||
}
|
||||
|
||||
var prm PutPrm
|
||||
|
@ -174,6 +225,11 @@ func TestCounters(t *testing.T) {
|
|||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||
require.Equal(t, totalPayload, mm.payloadSize())
|
||||
|
||||
cc, err := sh.metaBase.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedLogCC, cc.Logical)
|
||||
require.Equal(t, expectedPhyCC, cc.Physical)
|
||||
|
||||
t.Run("inhume_GC", func(t *testing.T) {
|
||||
var prm InhumePrm
|
||||
inhumedNumber := objNumber / 4
|
||||
|
@ -187,6 +243,11 @@ func TestCounters(t *testing.T) {
|
|||
cid, ok := oo[i].ContainerID()
|
||||
require.True(t, ok)
|
||||
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
|
||||
|
||||
expectedLogCC[cid]--
|
||||
if expectedLogCC[cid] == 0 {
|
||||
delete(expectedLogCC, cid)
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical))
|
||||
|
@ -194,6 +255,11 @@ func TestCounters(t *testing.T) {
|
|||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||
require.Equal(t, totalPayload, mm.payloadSize())
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
How about How about `mm.checkContainerCount(t, cnr, logical, v)` or `require.Equal(t, expV, mm.getContainerCount(t, ...)`?
To make the function smaller
dstepanov-yadro
commented
Done, thx Done, thx
|
||||
cc, err := sh.metaBase.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedLogCC, cc.Logical)
|
||||
require.Equal(t, expectedPhyCC, cc.Physical)
|
||||
|
||||
oo = oo[inhumedNumber:]
|
||||
})
|
||||
|
||||
|
@ -214,6 +280,11 @@ func TestCounters(t *testing.T) {
|
|||
cid, ok := oo[i].ContainerID()
|
||||
require.True(t, ok)
|
||||
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
|
||||
|
||||
expectedLogCC[cid]--
|
||||
if expectedLogCC[cid] == 0 {
|
||||
delete(expectedLogCC, cid)
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(t, phy, mm.getObjectCounter(physical))
|
||||
|
@ -221,6 +292,11 @@ func TestCounters(t *testing.T) {
|
|||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||
require.Equal(t, totalPayload, mm.payloadSize())
|
||||
|
||||
cc, err = sh.metaBase.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedLogCC, cc.Logical)
|
||||
require.Equal(t, expectedPhyCC, cc.Physical)
|
||||
|
||||
oo = oo[inhumedNumber:]
|
||||
})
|
||||
|
||||
|
@ -245,9 +321,24 @@ func TestCounters(t *testing.T) {
|
|||
|
||||
cnr, _ := oo[i].ContainerID()
|
||||
expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload)
|
||||
|
||||
expectedLogCC[cnr]--
|
||||
if expectedLogCC[cnr] == 0 {
|
||||
delete(expectedLogCC, cnr)
|
||||
}
|
||||
|
||||
expectedPhyCC[cnr]--
|
||||
if expectedPhyCC[cnr] == 0 {
|
||||
delete(expectedPhyCC, cnr)
|
||||
}
|
||||
}
|
||||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||
require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize())
|
||||
|
||||
cc, err = sh.metaBase.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedLogCC, cc.Logical)
|
||||
require.Equal(t, expectedPhyCC, cc.Physical)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -269,6 +360,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
|
|||
"logic": 0,
|
||||
},
|
||||
cnrSize: make(map[string]int64),
|
||||
cnrCount: make(map[string]uint64),
|
||||
}
|
||||
|
||||
sh := New(
|
||||
|
|
|
@ -90,7 +90,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
|||
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
||||
}
|
||||
|
||||
s.incObjectCounter()
|
||||
s.incObjectCounter(putPrm.Address.Container())
|
||||
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
|
||||
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -85,6 +86,12 @@ type MetricsWriter interface {
|
|||
ClearErrorCounter()
|
||||
// DeleteShardMetrics deletes shard metrics from registry.
|
||||
DeleteShardMetrics()
|
||||
// SetContainerObjectsCount sets container object count.
|
||||
SetContainerObjectsCount(cnrID string, objectType string, value uint64)
|
||||
// IncContainerObjectsCount increments container object count.
|
||||
IncContainerObjectsCount(cnrID string, objectType string)
|
||||
// SubContainerObjectsCount subtracts container object count.
|
||||
SubContainerObjectsCount(cnrID string, objectType string, value uint64)
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
|
@ -425,15 +432,29 @@ func (s *Shard) updateMetrics(ctx context.Context) {
|
|||
}
|
||||
|
||||
s.metricsWriter.AddToPayloadSize(int64(totalPayload))
|
||||
|
||||
contCount, err := s.metaBase.ContainerCounters(ctx)
|
||||
if err != nil {
|
||||
s.log.Warn(logs.FailedToGetContainerCounters, zap.Error(err))
|
||||
return
|
||||
}
|
||||
for contID, count := range contCount.Physical {
|
||||
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), physical, count)
|
||||
}
|
||||
for contID, count := range contCount.Logical {
|
||||
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), logical, count)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// incObjectCounter increment both physical and logical object
|
||||
// counters.
|
||||
func (s *Shard) incObjectCounter() {
|
||||
func (s *Shard) incObjectCounter(cnrID cid.ID) {
|
||||
if s.cfg.metricsWriter != nil {
|
||||
s.cfg.metricsWriter.IncObjectCounter(physical)
|
||||
s.cfg.metricsWriter.IncObjectCounter(logical)
|
||||
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical)
|
||||
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -443,6 +464,17 @@ func (s *Shard) decObjectCounterBy(typ string, v uint64) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]meta.ObjectCounters) {
|
||||
if s.cfg.metricsWriter == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for cnrID, count := range byCnr {
|
||||
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy())
|
||||
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic())
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Shard) addToContainerSize(cnr string, size int64) {
|
||||
if s.cfg.metricsWriter != nil {
|
||||
s.cfg.metricsWriter.AddToContainerSize(cnr, size)
|
||||
|
|
|
@ -18,6 +18,9 @@ type EngineMetrics interface {
|
|||
SetObjectCounter(shardID, objectType string, v uint64)
|
||||
AddToPayloadCounter(shardID string, size int64)
|
||||
SetMode(shardID string, mode mode.Mode)
|
||||
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||
IncContainerObjectCounter(shardID, contID, objectType string)
|
||||
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||
|
||||
WriteCache() WriteCacheMetrics
|
||||
GC() GCMetrics
|
||||
|
@ -30,6 +33,7 @@ type engineMetrics struct {
|
|||
payloadSize *prometheus.GaugeVec
|
||||
errorCounter *prometheus.GaugeVec
|
||||
mode *shardIDModeValue
|
||||
contObjCounter *prometheus.GaugeVec
|
||||
|
||||
gc *gcMetrics
|
||||
writeCache *writeCacheMetrics
|
||||
|
@ -46,10 +50,13 @@ func newEngineMetrics() *engineMetrics {
|
|||
Name: "request_duration_seconds",
|
||||
Help: "Duration of Engine requests",
|
||||
}, []string{methodLabel}),
|
||||
objectCounter: newEngineGaugeVector("objects_total", "Objects counters per shards", []string{shardIDLabel, typeLabel}),
|
||||
objectCounter: newEngineGaugeVector("objects_total",
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why is it deprecated? It just has different usage: we count object in the container to calculate quotas, we count objects per shards to calculate the amount of space. Why is it deprecated? It just has different usage: we count object in the container to calculate quotas, we count objects per shards to calculate the amount of space.
dstepanov-yadro
commented
Deprecated metric (objects per shard) can be calculated from new one (objects per shard per container) with PromQL query. So it looks redundant. Deprecated metric (objects per shard) can be calculated from new one (objects per shard per container) with PromQL query. So it looks redundant.
|
||||
"Objects counters per shards. DEPRECATED: Will be deleted in next releasese, use frostfs_node_engine_container_objects_total metric.",
|
||||
[]string{shardIDLabel, typeLabel}),
|
||||
gc: newGCMetrics(),
|
||||
writeCache: newWriteCacheMetrics(),
|
||||
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
|
||||
contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -88,6 +95,7 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
|
|||
m.errorCounter.Delete(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.mode.Delete(shardID)
|
||||
}
|
||||
|
||||
|
@ -109,6 +117,36 @@ func (m *engineMetrics) SetObjectCounter(shardID, objectType string, v uint64) {
|
|||
).Set(float64(v))
|
||||
}
|
||||
|
||||
func (m *engineMetrics) SetContainerObjectCounter(shardID, contID, objectType string, v uint64) {
|
||||
m.contObjCounter.With(
|
||||
prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
containerIDLabelKey: contID,
|
||||
typeLabel: objectType,
|
||||
},
|
||||
).Set(float64(v))
|
||||
}
|
||||
|
||||
func (m *engineMetrics) IncContainerObjectCounter(shardID, contID, objectType string) {
|
||||
m.contObjCounter.With(
|
||||
prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
containerIDLabelKey: contID,
|
||||
typeLabel: objectType,
|
||||
},
|
||||
).Inc()
|
||||
}
|
||||
|
||||
func (m *engineMetrics) SubContainerObjectCounter(shardID, contID, objectType string, v uint64) {
|
||||
m.contObjCounter.With(
|
||||
prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
containerIDLabelKey: contID,
|
||||
typeLabel: objectType,
|
||||
},
|
||||
).Sub(float64(v))
|
||||
}
|
||||
|
||||
func (m *engineMetrics) SetMode(shardID string, mode mode.Mode) {
|
||||
m.mode.SetMode(shardID, mode.String())
|
||||
}
|
||||
|
|
It is always the same size, we can pre-allocate here.
done