Add container objects counter #778
17 changed files with 765 additions and 83 deletions
|
@ -516,4 +516,5 @@ const (
|
||||||
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
||||||
FailedToCountWritecacheItems = "failed to count writecache items"
|
FailedToCountWritecacheItems = "failed to count writecache items"
|
||||||
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
|
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
|
||||||
|
FailedToGetContainerCounters = "failed to get container counters values"
|
||||||
)
|
)
|
||||||
|
|
|
@ -21,6 +21,10 @@ type MetricRegister interface {
|
||||||
ClearErrorCounter(shardID string)
|
ClearErrorCounter(shardID string)
|
||||||
DeleteShardMetrics(shardID string)
|
DeleteShardMetrics(shardID string)
|
||||||
|
|
||||||
|
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||||
|
IncContainerObjectCounter(shardID, contID, objectType string)
|
||||||
|
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||||
|
|
||||||
WriteCache() metrics.WriteCacheMetrics
|
WriteCache() metrics.WriteCacheMetrics
|
||||||
GC() metrics.GCMetrics
|
GC() metrics.GCMetrics
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,6 +74,18 @@ func (m *metricsWithID) DeleteShardMetrics() {
|
||||||
m.mw.DeleteShardMetrics(m.id)
|
m.mw.DeleteShardMetrics(m.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *metricsWithID) SetContainerObjectsCount(cnrID string, objectType string, value uint64) {
|
||||||
|
m.mw.SetContainerObjectCounter(m.id, cnrID, objectType, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricsWithID) IncContainerObjectsCount(cnrID string, objectType string) {
|
||||||
|
m.mw.IncContainerObjectCounter(m.id, cnrID, objectType)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string, value uint64) {
|
||||||
|
m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value)
|
||||||
|
}
|
||||||
|
|
||||||
// AddShard adds a new shard to the storage engine.
|
// AddShard adds a new shard to the storage engine.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that did not allow adding a shard.
|
// Returns any error encountered that did not allow adding a shard.
|
||||||
|
|
|
@ -103,12 +103,13 @@ func (db *DB) init(reset bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
mStaticBuckets := map[string]struct{}{
|
mStaticBuckets := map[string]struct{}{
|
||||||
string(containerVolumeBucketName): {},
|
string(containerVolumeBucketName): {},
|
||||||
string(graveyardBucketName): {},
|
string(containerCounterBucketName): {},
|
||||||
string(toMoveItBucketName): {},
|
string(graveyardBucketName): {},
|
||||||
string(garbageBucketName): {},
|
string(toMoveItBucketName): {},
|
||||||
string(shardInfoBucket): {},
|
string(garbageBucketName): {},
|
||||||
string(bucketNameLocked): {},
|
string(shardInfoBucket): {},
|
||||||
|
string(bucketNameLocked): {},
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
@ -135,7 +136,7 @@ func (db *DB) init(reset bool) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !reset {
|
if !reset { // counters will be recalculated by refill metabase
|
||||||
err = syncCounter(tx, false)
|
err = syncCounter(tx, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not sync object counter: %w", err)
|
return fmt.Errorf("could not sync object counter: %w", err)
|
||||||
|
|
|
@ -1,10 +1,15 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
@ -73,9 +78,128 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
|
||||||
return cc, metaerr.Wrap(err)
|
return cc, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateCounter updates the object counter. Tx MUST be writable.
|
type ContainerCounters struct {
|
||||||
// If inc == `true`, increases the counter, decreases otherwise.
|
Logical map[cid.ID]uint64
|
||||||
func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
|
Physical map[cid.ID]uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// ContainerCounters returns object counters for each container
|
||||||
|
// that metabase has tracked since it was opened and initialized.
|
||||||
|
//
|
||||||
|
// Returns only the errors that do not allow reading counter
|
||||||
|
// in Bolt database.
|
||||||
|
//
|
||||||
|
// It is guaranteed that the ContainerCounters fields are not nil.
|
||||||
|
func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("ContainerCounters", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerCounters")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
cc := ContainerCounters{
|
||||||
|
Logical: make(map[cid.ID]uint64),
|
||||||
|
Physical: make(map[cid.ID]uint64),
|
||||||
|
}
|
||||||
|
|
||||||
|
lastKey := make([]byte, cidSize)
|
||||||
|
|
||||||
|
// there is no limit for containers count, so use batching with cancellation
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return cc, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
completed, err := db.containerCountersNextBatch(lastKey, &cc)
|
||||||
|
if err != nil {
|
||||||
|
return cc, err
|
||||||
|
}
|
||||||
|
if completed {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
success = true
|
||||||
|
return cc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) (bool, error) {
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return false, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
counter := 0
|
||||||
|
const batchSize = 1000
|
||||||
|
|
||||||
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
b := tx.Bucket(containerCounterBucketName)
|
||||||
|
if b == nil {
|
||||||
|
return ErrInterruptIterator
|
||||||
|
}
|
||||||
|
c := b.Cursor()
|
||||||
|
var key, value []byte
|
||||||
|
for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() {
|
||||||
|
if bytes.Equal(lastKey, key) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
copy(lastKey, key)
|
||||||
|
|
||||||
|
cnrID, err := parseContainerCounterKey(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
phy, logic, err := parseContainerCounterValue(value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if phy > 0 {
|
||||||
|
cc.Physical[cnrID] = phy
|
||||||
|
}
|
||||||
|
if logic > 0 {
|
||||||
|
cc.Logical[cnrID] = logic
|
||||||
|
}
|
||||||
|
|
||||||
|
counter++
|
||||||
|
if counter == batchSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if counter < batchSize { // last batch
|
||||||
|
return ErrInterruptIterator
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, ErrInterruptIterator) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID) error {
|
||||||
|
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
|
||||||
|
return fmt.Errorf("could not increase phy object counter: %w", err)
|
||||||
|
}
|
||||||
|
if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil {
|
||||||
|
return fmt.Errorf("could not increase logical object counter: %w", err)
|
||||||
|
}
|
||||||
|
return db.incContainerObjectCounter(tx, cnrID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
|
||||||
b := tx.Bucket(shardInfoBucket)
|
b := tx.Bucket(shardInfoBucket)
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -112,6 +236,63 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool
|
||||||
return b.Put(counterKey, newCounter)
|
return b.Put(counterKey, newCounter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error {
|
||||||
|
b := tx.Bucket(containerCounterBucketName)
|
||||||
|
if b == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
key := make([]byte, cidSize)
|
||||||
|
for cnrID, cnrDelta := range delta {
|
||||||
|
cnrID.Encode(key)
|
||||||
|
if err := db.editContainerCounterValue(b, key, cnrDelta, inc); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error {
|
||||||
|
var phyValue, logicValue uint64
|
||||||
|
var err error
|
||||||
|
data := b.Get(key)
|
||||||
|
if len(data) > 0 {
|
||||||
|
phyValue, logicValue, err = parseContainerCounterValue(data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
phyValue = nextValue(phyValue, delta.phy, inc)
|
||||||
|
logicValue = nextValue(logicValue, delta.logic, inc)
|
||||||
|
if phyValue > 0 || logicValue > 0 {
|
||||||
|
value := containerCounterValue(phyValue, logicValue)
|
||||||
|
return b.Put(key, value)
|
||||||
|
}
|
||||||
|
return b.Delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func nextValue(existed, delta uint64, inc bool) uint64 {
|
||||||
|
if inc {
|
||||||
|
existed += delta
|
||||||
|
} else if existed <= delta {
|
||||||
|
existed = 0
|
||||||
|
} else {
|
||||||
|
existed -= delta
|
||||||
|
}
|
||||||
|
return existed
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error {
|
||||||
|
b := tx.Bucket(containerCounterBucketName)
|
||||||
|
if b == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
key := make([]byte, cidSize)
|
||||||
|
cnrID.Encode(key)
|
||||||
|
return db.editContainerCounterValue(b, key, ObjectCounters{logic: 1, phy: 1}, true)
|
||||||
|
}
|
||||||
|
|
||||||
// syncCounter updates object counters according to metabase state:
|
// syncCounter updates object counters according to metabase state:
|
||||||
// it counts all the physically/logically stored objects using internal
|
// it counts all the physically/logically stored objects using internal
|
||||||
// indexes. Tx MUST be writable.
|
// indexes. Tx MUST be writable.
|
||||||
|
@ -119,26 +300,38 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool
|
||||||
// Does nothing if counters are not empty and force is false. If force is
|
// Does nothing if counters are not empty and force is false. If force is
|
||||||
// true, updates the counters anyway.
|
// true, updates the counters anyway.
|
||||||
func syncCounter(tx *bbolt.Tx, force bool) error {
|
func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
shardInfoB, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not get shard info bucket: %w", err)
|
return fmt.Errorf("could not get shard info bucket: %w", err)
|
||||||
}
|
}
|
||||||
|
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 && len(shardInfoB.Get(objectLogicCounterKey)) == 8
|
||||||
if !force && len(b.Get(objectPhyCounterKey)) == 8 && len(b.Get(objectLogicCounterKey)) == 8 {
|
containerCounterInitialized := tx.Bucket(containerCounterBucketName) != nil
|
||||||
|
if !force && shardObjectCounterInitialized && containerCounterInitialized {
|
||||||
// the counters are already inited
|
// the counters are already inited
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
containerCounterB, err := tx.CreateBucketIfNotExists(containerCounterBucketName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not get container counter bucket: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
var phyCounter uint64
|
counters := make(map[cid.ID]ObjectCounters)
|
||||||
var logicCounter uint64
|
|
||||||
|
|
||||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
garbageBKT := tx.Bucket(garbageBucketName)
|
||||||
key := make([]byte, addressKeySize)
|
key := make([]byte, addressKeySize)
|
||||||
|
|
||||||
err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error {
|
err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error {
|
||||||
phyCounter++
|
if v, ok := counters[cnr]; ok {
|
||||||
|
v.phy++
|
||||||
|
counters[cnr] = v
|
||||||
|
} else {
|
||||||
|
counters[cnr] = ObjectCounters{
|
||||||
|
phy: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
addr.SetContainer(cnr)
|
addr.SetContainer(cnr)
|
||||||
addr.SetObject(obj)
|
addr.SetObject(obj)
|
||||||
|
@ -146,7 +339,14 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
// check if an object is available: not with GCMark
|
// check if an object is available: not with GCMark
|
||||||
// and not covered with a tombstone
|
// and not covered with a tombstone
|
||||||
if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 {
|
if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 {
|
||||||
logicCounter++
|
if v, ok := counters[cnr]; ok {
|
||||||
|
v.logic++
|
||||||
|
counters[cnr] = v
|
||||||
|
} else {
|
||||||
|
counters[cnr] = ObjectCounters{
|
||||||
|
logic: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -155,21 +355,65 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
return fmt.Errorf("could not iterate objects: %w", err)
|
return fmt.Errorf("could not iterate objects: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
data := make([]byte, 8)
|
return setObjectCounters(counters, shardInfoB, containerCounterB)
|
||||||
binary.LittleEndian.PutUint64(data, phyCounter)
|
}
|
||||||
|
|
||||||
err = b.Put(objectPhyCounterKey, data)
|
func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error {
|
||||||
|
var phyTotal uint64
|
||||||
|
var logicTotal uint64
|
||||||
|
key := make([]byte, cidSize)
|
||||||
|
for cnrID, count := range counters {
|
||||||
|
phyTotal += count.phy
|
||||||
|
logicTotal += count.logic
|
||||||
|
|
||||||
|
cnrID.Encode(key)
|
||||||
|
value := containerCounterValue(count.phy, count.logic)
|
||||||
|
err := containerCounterB.Put(key, value)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not update phy container object counter: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
phyData := make([]byte, 8)
|
||||||
|
binary.LittleEndian.PutUint64(phyData, phyTotal)
|
||||||
|
|
||||||
|
err := shardInfoB.Put(objectPhyCounterKey, phyData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not update phy object counter: %w", err)
|
return fmt.Errorf("could not update phy object counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
data = make([]byte, 8)
|
logData := make([]byte, 8)
|
||||||
binary.LittleEndian.PutUint64(data, logicCounter)
|
binary.LittleEndian.PutUint64(logData, logicTotal)
|
||||||
|
|
||||||
err = b.Put(objectLogicCounterKey, data)
|
err = shardInfoB.Put(objectLogicCounterKey, logData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not update logic object counter: %w", err)
|
return fmt.Errorf("could not update logic object counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func containerCounterValue(phy, logic uint64) []byte {
|
||||||
|
res := make([]byte, 16)
|
||||||
|
binary.LittleEndian.PutUint64(res, phy)
|
||||||
|
binary.LittleEndian.PutUint64(res[8:], logic)
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseContainerCounterKey(buf []byte) (cid.ID, error) {
|
||||||
|
if len(buf) != cidSize {
|
||||||
|
return cid.ID{}, fmt.Errorf("invalid key length")
|
||||||
|
}
|
||||||
|
var cnrID cid.ID
|
||||||
|
if err := cnrID.Decode(buf); err != nil {
|
||||||
|
return cid.ID{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||||
|
}
|
||||||
|
return cnrID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseContainerCounterValue return phy, logic values.
|
||||||
|
func parseContainerCounterValue(buf []byte) (uint64, uint64, error) {
|
||||||
|
if len(buf) != 16 {
|
||||||
|
return 0, 0, fmt.Errorf("invalid value length")
|
||||||
|
}
|
||||||
|
return binary.LittleEndian.Uint64(buf), binary.LittleEndian.Uint64(buf[8:]), nil
|
||||||
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
@ -25,6 +26,11 @@ func TestCounters(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Zero(t, c.Phy())
|
require.Zero(t, c.Phy())
|
||||||
require.Zero(t, c.Logic())
|
require.Zero(t, c.Logic())
|
||||||
|
|
||||||
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Zero(t, len(cc.Physical))
|
||||||
|
require.Zero(t, len(cc.Logical))
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("put", func(t *testing.T) {
|
t.Run("put", func(t *testing.T) {
|
||||||
|
@ -36,9 +42,14 @@ func TestCounters(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm meta.PutPrm
|
var prm meta.PutPrm
|
||||||
|
expPhy := make(map[cid.ID]uint64)
|
||||||
|
expLog := make(map[cid.ID]uint64)
|
||||||
|
|
||||||
for i := 0; i < objCount; i++ {
|
for i := 0; i < objCount; i++ {
|
||||||
prm.SetObject(oo[i])
|
prm.SetObject(oo[i])
|
||||||
|
cnrID, _ := oo[i].ContainerID()
|
||||||
|
expPhy[cnrID]++
|
||||||
|
expLog[cnrID]++
|
||||||
|
|
||||||
_, err := db.Put(context.Background(), prm)
|
_, err := db.Put(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -48,6 +59,12 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, uint64(i+1), c.Phy())
|
require.Equal(t, uint64(i+1), c.Phy())
|
||||||
require.Equal(t, uint64(i+1), c.Logic())
|
require.Equal(t, uint64(i+1), c.Logic())
|
||||||
|
|
||||||
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, expPhy, cc.Physical)
|
||||||
|
require.Equal(t, expLog, cc.Logical)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -56,6 +73,14 @@ func TestCounters(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
oo := putObjs(t, db, objCount, false)
|
oo := putObjs(t, db, objCount, false)
|
||||||
|
|
||||||
|
expPhy := make(map[cid.ID]uint64)
|
||||||
|
expLog := make(map[cid.ID]uint64)
|
||||||
|
for _, obj := range oo {
|
||||||
|
cnrID, _ := obj.ContainerID()
|
||||||
|
expPhy[cnrID]++
|
||||||
|
expLog[cnrID]++
|
||||||
|
}
|
||||||
|
|
||||||
var prm meta.DeletePrm
|
var prm meta.DeletePrm
|
||||||
for i := objCount - 1; i >= 0; i-- {
|
for i := objCount - 1; i >= 0; i-- {
|
||||||
prm.SetAddresses(objectcore.AddressOf(oo[i]))
|
prm.SetAddresses(objectcore.AddressOf(oo[i]))
|
||||||
|
@ -69,6 +94,27 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, uint64(i), c.Phy())
|
require.Equal(t, uint64(i), c.Phy())
|
||||||
require.Equal(t, uint64(i), c.Logic())
|
require.Equal(t, uint64(i), c.Logic())
|
||||||
|
|
||||||
|
cnrID, _ := oo[i].ContainerID()
|
||||||
|
if v, ok := expPhy[cnrID]; ok {
|
||||||
|
if v == 1 {
|
||||||
|
delete(expPhy, cnrID)
|
||||||
|
} else {
|
||||||
|
expPhy[cnrID]--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v, ok := expLog[cnrID]; ok {
|
||||||
|
if v == 1 {
|
||||||
|
delete(expLog, cnrID)
|
||||||
|
} else {
|
||||||
|
expLog[cnrID]--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expPhy, cc.Physical)
|
||||||
|
require.Equal(t, expLog, cc.Logical)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -77,6 +123,14 @@ func TestCounters(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
oo := putObjs(t, db, objCount, false)
|
oo := putObjs(t, db, objCount, false)
|
||||||
|
|
||||||
|
expPhy := make(map[cid.ID]uint64)
|
||||||
|
expLog := make(map[cid.ID]uint64)
|
||||||
|
for _, obj := range oo {
|
||||||
|
cnrID, _ := obj.ContainerID()
|
||||||
|
expPhy[cnrID]++
|
||||||
|
expLog[cnrID]++
|
||||||
|
}
|
||||||
|
|
||||||
inhumedObjs := make([]oid.Address, objCount/2)
|
inhumedObjs := make([]oid.Address, objCount/2)
|
||||||
|
|
||||||
for i, o := range oo {
|
for i, o := range oo {
|
||||||
|
@ -87,6 +141,16 @@ func TestCounters(t *testing.T) {
|
||||||
inhumedObjs[i] = objectcore.AddressOf(o)
|
inhumedObjs[i] = objectcore.AddressOf(o)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, addr := range inhumedObjs {
|
||||||
|
if v, ok := expLog[addr.Container()]; ok {
|
||||||
|
if v == 1 {
|
||||||
|
delete(expLog, addr.Container())
|
||||||
|
} else {
|
||||||
|
expLog[addr.Container()]--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var prm meta.InhumePrm
|
var prm meta.InhumePrm
|
||||||
prm.SetTombstoneAddress(oidtest.Address())
|
prm.SetTombstoneAddress(oidtest.Address())
|
||||||
prm.SetAddresses(inhumedObjs...)
|
prm.SetAddresses(inhumedObjs...)
|
||||||
|
@ -100,6 +164,12 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, uint64(objCount), c.Phy())
|
require.Equal(t, uint64(objCount), c.Phy())
|
||||||
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
|
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
|
||||||
|
|
||||||
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, expPhy, cc.Physical)
|
||||||
|
require.Equal(t, expLog, cc.Logical)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("put_split", func(t *testing.T) {
|
t.Run("put_split", func(t *testing.T) {
|
||||||
|
@ -107,6 +177,9 @@ func TestCounters(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
parObj := testutil.GenerateObject()
|
parObj := testutil.GenerateObject()
|
||||||
|
|
||||||
|
expPhy := make(map[cid.ID]uint64)
|
||||||
|
expLog := make(map[cid.ID]uint64)
|
||||||
|
|
||||||
// put objects and check that parent info
|
// put objects and check that parent info
|
||||||
// does not affect the counter
|
// does not affect the counter
|
||||||
for i := 0; i < objCount; i++ {
|
for i := 0; i < objCount; i++ {
|
||||||
|
@ -115,12 +188,21 @@ func TestCounters(t *testing.T) {
|
||||||
o.SetParent(parObj)
|
o.SetParent(parObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cnrID, _ := o.ContainerID()
|
||||||
|
expLog[cnrID]++
|
||||||
|
expPhy[cnrID]++
|
||||||
|
|
||||||
require.NoError(t, putBig(db, o))
|
require.NoError(t, putBig(db, o))
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(i+1), c.Phy())
|
require.Equal(t, uint64(i+1), c.Phy())
|
||||||
require.Equal(t, uint64(i+1), c.Logic())
|
require.Equal(t, uint64(i+1), c.Logic())
|
||||||
|
|
||||||
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expPhy, cc.Physical)
|
||||||
|
require.Equal(t, expLog, cc.Logical)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -129,16 +211,40 @@ func TestCounters(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
oo := putObjs(t, db, objCount, true)
|
oo := putObjs(t, db, objCount, true)
|
||||||
|
|
||||||
|
expPhy := make(map[cid.ID]uint64)
|
||||||
|
expLog := make(map[cid.ID]uint64)
|
||||||
|
for _, obj := range oo {
|
||||||
|
cnrID, _ := obj.ContainerID()
|
||||||
|
expPhy[cnrID]++
|
||||||
|
expLog[cnrID]++
|
||||||
|
}
|
||||||
|
|
||||||
// delete objects that have parent info
|
// delete objects that have parent info
|
||||||
// and check that it does not affect
|
// and check that it does not affect
|
||||||
// the counter
|
// the counter
|
||||||
for i, o := range oo {
|
for i, o := range oo {
|
||||||
require.NoError(t, metaDelete(db, objectcore.AddressOf(o)))
|
addr := objectcore.AddressOf(o)
|
||||||
|
require.NoError(t, metaDelete(db, addr))
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(objCount-i-1), c.Phy())
|
require.Equal(t, uint64(objCount-i-1), c.Phy())
|
||||||
require.Equal(t, uint64(objCount-i-1), c.Logic())
|
require.Equal(t, uint64(objCount-i-1), c.Logic())
|
||||||
|
|
||||||
|
if v, ok := expPhy[addr.Container()]; ok {
|
||||||
|
if v == 1 {
|
||||||
|
delete(expPhy, addr.Container())
|
||||||
|
} else {
|
||||||
|
expPhy[addr.Container()]--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v, ok := expLog[addr.Container()]; ok {
|
||||||
|
if v == 1 {
|
||||||
|
delete(expLog, addr.Container())
|
||||||
|
} else {
|
||||||
|
expLog[addr.Container()]--
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -147,6 +253,14 @@ func TestCounters(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
oo := putObjs(t, db, objCount, true)
|
oo := putObjs(t, db, objCount, true)
|
||||||
|
|
||||||
|
expPhy := make(map[cid.ID]uint64)
|
||||||
|
expLog := make(map[cid.ID]uint64)
|
||||||
|
for _, obj := range oo {
|
||||||
|
cnrID, _ := obj.ContainerID()
|
||||||
|
expPhy[cnrID]++
|
||||||
|
expLog[cnrID]++
|
||||||
|
}
|
||||||
|
|
||||||
inhumedObjs := make([]oid.Address, objCount/2)
|
inhumedObjs := make([]oid.Address, objCount/2)
|
||||||
|
|
||||||
for i, o := range oo {
|
for i, o := range oo {
|
||||||
|
@ -157,6 +271,16 @@ func TestCounters(t *testing.T) {
|
||||||
inhumedObjs[i] = objectcore.AddressOf(o)
|
inhumedObjs[i] = objectcore.AddressOf(o)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, addr := range inhumedObjs {
|
||||||
|
if v, ok := expLog[addr.Container()]; ok {
|
||||||
|
if v == 1 {
|
||||||
|
delete(expLog, addr.Container())
|
||||||
|
} else {
|
||||||
|
expLog[addr.Container()]--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var prm meta.InhumePrm
|
var prm meta.InhumePrm
|
||||||
prm.SetTombstoneAddress(oidtest.Address())
|
prm.SetTombstoneAddress(oidtest.Address())
|
||||||
prm.SetAddresses(inhumedObjs...)
|
prm.SetAddresses(inhumedObjs...)
|
||||||
|
@ -169,6 +293,12 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, uint64(objCount), c.Phy())
|
require.Equal(t, uint64(objCount), c.Phy())
|
||||||
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
|
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
|
||||||
|
|
||||||
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, expPhy, cc.Physical)
|
||||||
|
require.Equal(t, expLog, cc.Logical)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,6 +320,13 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1)
|
oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
expPhy := make(map[cid.ID]uint64)
|
||||||
|
expLog := make(map[cid.ID]uint64)
|
||||||
|
for _, addr := range oo {
|
||||||
|
expPhy[addr.Container()]++
|
||||||
|
expLog[addr.Container()]++
|
||||||
|
}
|
||||||
|
|
||||||
// 1. objects are available and counters are correct
|
// 1. objects are available and counters are correct
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters()
|
||||||
|
@ -197,6 +334,12 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
require.Equal(t, uint64(objCount), c.Phy())
|
require.Equal(t, uint64(objCount), c.Phy())
|
||||||
require.Equal(t, uint64(objCount), c.Logic())
|
require.Equal(t, uint64(objCount), c.Logic())
|
||||||
|
|
||||||
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, expPhy, cc.Physical)
|
||||||
|
require.Equal(t, expLog, cc.Logical)
|
||||||
|
|
||||||
for _, o := range oo {
|
for _, o := range oo {
|
||||||
_, err := metaGet(db, o, true)
|
_, err := metaGet(db, o, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -212,6 +355,12 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
require.Equal(t, uint64(objCount), c.Phy())
|
require.Equal(t, uint64(objCount), c.Phy())
|
||||||
require.Equal(t, uint64(objCount), c.Logic())
|
require.Equal(t, uint64(objCount), c.Logic())
|
||||||
|
|
||||||
|
cc, err = db.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, expPhy, cc.Physical)
|
||||||
|
require.Equal(t, expLog, cc.Logical)
|
||||||
|
|
||||||
for _, o := range oo {
|
for _, o := range oo {
|
||||||
_, err := metaGet(db, o, true)
|
_, err := metaGet(db, o, true)
|
||||||
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
|
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
|
||||||
|
@ -235,6 +384,20 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
require.Equal(t, uint64(len(oo)), c.Phy())
|
require.Equal(t, uint64(len(oo)), c.Phy())
|
||||||
require.Equal(t, uint64(len(oo)-1), c.Logic())
|
require.Equal(t, uint64(len(oo)-1), c.Logic())
|
||||||
|
|
||||||
|
if v, ok := expLog[oo[0].Container()]; ok {
|
||||||
|
if v == 1 {
|
||||||
|
delete(expLog, oo[0].Container())
|
||||||
|
} else {
|
||||||
|
expLog[oo[0].Container()]--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cc, err = db.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, expPhy, cc.Physical)
|
||||||
|
require.Equal(t, expLog, cc.Logical)
|
||||||
|
|
||||||
// 4. `Delete` an object with GCMark should decrease the
|
// 4. `Delete` an object with GCMark should decrease the
|
||||||
// phy counter but does not affect the logic counter (after
|
// phy counter but does not affect the logic counter (after
|
||||||
// that step they should be equal)
|
// that step they should be equal)
|
||||||
|
@ -246,6 +409,14 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Zero(t, deleteRes.AvailableObjectsRemoved())
|
require.Zero(t, deleteRes.AvailableObjectsRemoved())
|
||||||
|
|
||||||
|
if v, ok := expPhy[oo[0].Container()]; ok {
|
||||||
|
if v == 1 {
|
||||||
|
delete(expPhy, oo[0].Container())
|
||||||
|
} else {
|
||||||
|
expPhy[oo[0].Container()]--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
oo = oo[1:]
|
oo = oo[1:]
|
||||||
|
|
||||||
c, err = db.ObjectCounters()
|
c, err = db.ObjectCounters()
|
||||||
|
@ -253,6 +424,12 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
require.Equal(t, uint64(len(oo)), c.Phy())
|
require.Equal(t, uint64(len(oo)), c.Phy())
|
||||||
require.Equal(t, uint64(len(oo)), c.Logic())
|
require.Equal(t, uint64(len(oo)), c.Logic())
|
||||||
|
|
||||||
|
cc, err = db.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, expPhy, cc.Physical)
|
||||||
|
require.Equal(t, expLog, cc.Logical)
|
||||||
|
|
||||||
// 5 `Delete` an expired object (like it would the control
|
// 5 `Delete` an expired object (like it would the control
|
||||||
// service do) should decrease both counters despite the
|
// service do) should decrease both counters despite the
|
||||||
// expiration fact
|
// expiration fact
|
||||||
|
@ -263,12 +440,34 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
|
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
|
||||||
|
|
||||||
|
if v, ok := expLog[oo[0].Container()]; ok {
|
||||||
|
if v == 1 {
|
||||||
|
delete(expLog, oo[0].Container())
|
||||||
|
} else {
|
||||||
|
expLog[oo[0].Container()]--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if v, ok := expPhy[oo[0].Container()]; ok {
|
||||||
|
if v == 1 {
|
||||||
|
delete(expPhy, oo[0].Container())
|
||||||
|
} else {
|
||||||
|
expPhy[oo[0].Container()]--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
oo = oo[1:]
|
oo = oo[1:]
|
||||||
|
|
||||||
c, err = db.ObjectCounters()
|
c, err = db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(len(oo)), c.Phy())
|
require.Equal(t, uint64(len(oo)), c.Phy())
|
||||||
require.Equal(t, uint64(len(oo)), c.Logic())
|
require.Equal(t, uint64(len(oo)), c.Logic())
|
||||||
|
|
||||||
|
cc, err = db.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, expPhy, cc.Physical)
|
||||||
|
require.Equal(t, expLog, cc.Logical)
|
||||||
}
|
}
|
||||||
|
|
||||||
func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object {
|
func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object {
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
@ -30,6 +31,7 @@ type DeleteRes struct {
|
||||||
availableRemoved uint64
|
availableRemoved uint64
|
||||||
sizes []uint64
|
sizes []uint64
|
||||||
availableSizes []uint64
|
availableSizes []uint64
|
||||||
|
removedByCnrID map[cid.ID]ObjectCounters
|
||||||
}
|
}
|
||||||
|
|
||||||
// AvailableObjectsRemoved returns the number of removed available
|
// AvailableObjectsRemoved returns the number of removed available
|
||||||
|
@ -38,6 +40,11 @@ func (d DeleteRes) AvailableObjectsRemoved() uint64 {
|
||||||
return d.availableRemoved
|
return d.availableRemoved
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RemovedByCnrID returns the number of removed objects by container ID.
|
||||||
|
func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
|
||||||
|
return d.removedByCnrID
|
||||||
|
}
|
||||||
|
|
||||||
// RawObjectsRemoved returns the number of removed raw objects.
|
// RawObjectsRemoved returns the number of removed raw objects.
|
||||||
func (d DeleteRes) RawObjectsRemoved() uint64 {
|
func (d DeleteRes) RawObjectsRemoved() uint64 {
|
||||||
return d.rawRemoved
|
return d.rawRemoved
|
||||||
|
@ -95,15 +102,11 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
return DeleteRes{}, ErrReadOnlyMode
|
return DeleteRes{}, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
var rawRemoved uint64
|
|
||||||
var availableRemoved uint64
|
|
||||||
var err error
|
var err error
|
||||||
sizes := make([]uint64, len(prm.addrs))
|
var res DeleteRes
|
||||||
availableSizes := make([]uint64, len(prm.addrs))
|
|
||||||
|
|
||||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
// We need to clear slice because tx can try to execute multiple times.
|
res, err = db.deleteGroup(tx, prm.addrs)
|
||||||
rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs, sizes, availableSizes)
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -114,68 +117,83 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
storagelog.OpField("metabase DELETE"))
|
storagelog.OpField("metabase DELETE"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return DeleteRes{
|
return res, metaerr.Wrap(err)
|
||||||
rawRemoved: rawRemoved,
|
|
||||||
availableRemoved: availableRemoved,
|
|
||||||
sizes: sizes,
|
|
||||||
availableSizes: availableSizes,
|
|
||||||
}, metaerr.Wrap(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteGroup deletes object from the metabase. Handles removal of the
|
// deleteGroup deletes object from the metabase. Handles removal of the
|
||||||
// references of the split objects.
|
// references of the split objects.
|
||||||
// The first return value is a physical objects removed number: physical
|
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
|
||||||
// objects that were stored. The second return value is a logical objects
|
res := DeleteRes{
|
||||||
// removed number: objects that were available (without Tombstones, GCMarks
|
sizes: make([]uint64, len(addrs)),
|
||||||
// non-expired, etc.)
|
availableSizes: make([]uint64, len(addrs)),
|
||||||
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64, availableSizes []uint64) (uint64, uint64, error) {
|
removedByCnrID: make(map[cid.ID]ObjectCounters),
|
||||||
|
}
|
||||||
refCounter := make(referenceCounter, len(addrs))
|
refCounter := make(referenceCounter, len(addrs))
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
var rawDeleted uint64
|
|
||||||
var availableDeleted uint64
|
|
||||||
|
|
||||||
for i := range addrs {
|
for i := range addrs {
|
||||||
removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch)
|
removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, err // maybe log and continue?
|
return DeleteRes{}, err // maybe log and continue?
|
||||||
}
|
}
|
||||||
|
|
||||||
if removed {
|
if removed {
|
||||||
rawDeleted++
|
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
||||||
sizes[i] = size
|
v.phy++
|
||||||
|
res.removedByCnrID[addrs[i].Container()] = v
|
||||||
|
} else {
|
||||||
|
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
|
||||||
|
phy: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res.rawRemoved++
|
||||||
|
res.sizes[i] = size
|
||||||
}
|
}
|
||||||
|
|
||||||
if available {
|
if available {
|
||||||
availableDeleted++
|
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
||||||
availableSizes[i] = size
|
v.logic++
|
||||||
|
res.removedByCnrID[addrs[i].Container()] = v
|
||||||
|
} else {
|
||||||
|
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
|
||||||
|
logic: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res.availableRemoved++
|
||||||
|
res.availableSizes[i] = size
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if rawDeleted > 0 {
|
if res.rawRemoved > 0 {
|
||||||
err := db.updateCounter(tx, phy, rawDeleted, false)
|
err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, fmt.Errorf("could not decrease phy object counter: %w", err)
|
return DeleteRes{}, fmt.Errorf("could not decrease phy object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if availableDeleted > 0 {
|
if res.availableRemoved > 0 {
|
||||||
err := db.updateCounter(tx, logical, availableDeleted, false)
|
err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, fmt.Errorf("could not decrease logical object counter: %w", err)
|
return DeleteRes{}, fmt.Errorf("could not decrease logical object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
|
||||||
|
return DeleteRes{}, fmt.Errorf("could not decrease container object counter: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
for _, refNum := range refCounter {
|
for _, refNum := range refCounter {
|
||||||
if refNum.cur == refNum.all {
|
if refNum.cur == refNum.all {
|
||||||
err := db.deleteObject(tx, refNum.obj, true)
|
err := db.deleteObject(tx, refNum.obj, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rawDeleted, availableDeleted, err // maybe log and continue?
|
return DeleteRes{}, err // maybe log and continue?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return rawDeleted, availableDeleted, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// delete removes object indexes from the metabase. Counts the references
|
// delete removes object indexes from the metabase. Counts the references
|
||||||
|
|
|
@ -37,14 +37,21 @@ type DeletionInfo struct {
|
||||||
// InhumeRes encapsulates results of Inhume operation.
|
// InhumeRes encapsulates results of Inhume operation.
|
||||||
type InhumeRes struct {
|
type InhumeRes struct {
|
||||||
deletedLockObj []oid.Address
|
deletedLockObj []oid.Address
|
||||||
availableImhumed uint64
|
availableInhumed uint64
|
||||||
|
inhumedByCnrID map[cid.ID]ObjectCounters
|
||||||
deletionDetails []DeletionInfo
|
deletionDetails []DeletionInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
// AvailableInhumed return number of available object
|
// AvailableInhumed return number of available object
|
||||||
// that have been inhumed.
|
// that have been inhumed.
|
||||||
func (i InhumeRes) AvailableInhumed() uint64 {
|
func (i InhumeRes) AvailableInhumed() uint64 {
|
||||||
return i.availableImhumed
|
return i.availableInhumed
|
||||||
|
}
|
||||||
|
|
||||||
|
// InhumedByCnrID return number of object
|
||||||
|
// that have been inhumed by container ID.
|
||||||
|
func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters {
|
||||||
|
return i.inhumedByCnrID
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeletedLockObjects returns deleted object of LOCK
|
// DeletedLockObjects returns deleted object of LOCK
|
||||||
|
@ -73,6 +80,15 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) {
|
||||||
Size: deletedSize,
|
Size: deletedSize,
|
||||||
CID: containerID,
|
CID: containerID,
|
||||||
})
|
})
|
||||||
|
i.availableInhumed++
|
||||||
|
if v, ok := i.inhumedByCnrID[containerID]; ok {
|
||||||
|
v.logic++
|
||||||
|
i.inhumedByCnrID[containerID] = v
|
||||||
|
} else {
|
||||||
|
i.inhumedByCnrID[containerID] = ObjectCounters{
|
||||||
|
logic: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetAddresses sets a list of object addresses that should be inhumed.
|
// SetAddresses sets a list of object addresses that should be inhumed.
|
||||||
|
@ -122,7 +138,7 @@ var ErrLockObjectRemoval = logicerr.New("lock object removal")
|
||||||
//
|
//
|
||||||
// NOTE: Marks any object with GC mark (despite any prohibitions on operations
|
// NOTE: Marks any object with GC mark (despite any prohibitions on operations
|
||||||
// with that object) if WithForceGCMark option has been provided.
|
// with that object) if WithForceGCMark option has been provided.
|
||||||
func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err error) {
|
func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
||||||
var (
|
var (
|
||||||
startedAt = time.Now()
|
startedAt = time.Now()
|
||||||
success = false
|
success = false
|
||||||
|
@ -142,8 +158,11 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err err
|
||||||
return InhumeRes{}, ErrReadOnlyMode
|
return InhumeRes{}, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
res := InhumeRes{
|
||||||
|
inhumedByCnrID: make(map[cid.ID]ObjectCounters),
|
||||||
|
}
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
return db.inhumeTx(tx, currEpoch, prm, &res)
|
return db.inhumeTx(tx, currEpoch, prm, &res)
|
||||||
})
|
})
|
||||||
success = err == nil
|
success = err == nil
|
||||||
|
@ -224,7 +243,27 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.updateCounter(tx, logical, res.availableImhumed, false)
|
return db.applyInhumeResToCounters(tx, res)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||||
|
var inhumedCount uint64
|
||||||
|
inhumedbyCnr := make(map[cid.ID]ObjectCounters)
|
||||||
|
for _, dd := range res.deletionDetails {
|
||||||
|
if v, ok := inhumedbyCnr[dd.CID]; ok {
|
||||||
|
v.logic++
|
||||||
|
inhumedbyCnr[dd.CID] = v
|
||||||
|
} else {
|
||||||
|
inhumedbyCnr[dd.CID] = ObjectCounters{logic: 1}
|
||||||
|
}
|
||||||
|
inhumedCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.updateShardObjectCounter(tx, logical, inhumedCount, false); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return db.updateContainerCounter(tx, inhumedbyCnr, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
|
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
|
||||||
|
@ -279,7 +318,6 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool
|
||||||
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
|
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
|
||||||
containerID, _ := obj.ContainerID()
|
containerID, _ := obj.ContainerID()
|
||||||
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
|
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
|
||||||
res.availableImhumed++
|
|
||||||
res.storeDeletionInfo(containerID, obj.PayloadSize())
|
res.storeDeletionInfo(containerID, obj.PayloadSize())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -183,16 +183,8 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
||||||
}
|
}
|
||||||
|
|
||||||
if !isParent {
|
if !isParent {
|
||||||
err = db.updateCounter(tx, phy, 1, true)
|
if err = db.incCounters(tx, cnr); err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
return fmt.Errorf("could not increase phy object counter: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// it is expected that putting an unavailable object is
|
|
||||||
// impossible and should be handled on the higher levels
|
|
||||||
err = db.updateCounter(tx, logical, 1, true)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not increase logical object counter: %w", err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,9 +19,10 @@ var (
|
||||||
graveyardBucketName = []byte{graveyardPrefix}
|
graveyardBucketName = []byte{graveyardPrefix}
|
||||||
// garbageBucketName stores rows with the objects that should be physically
|
// garbageBucketName stores rows with the objects that should be physically
|
||||||
// deleted by the node (Garbage Collector routine).
|
// deleted by the node (Garbage Collector routine).
|
||||||
garbageBucketName = []byte{garbagePrefix}
|
garbageBucketName = []byte{garbagePrefix}
|
||||||
toMoveItBucketName = []byte{toMoveItPrefix}
|
toMoveItBucketName = []byte{toMoveItPrefix}
|
||||||
containerVolumeBucketName = []byte{containerVolumePrefix}
|
containerVolumeBucketName = []byte{containerVolumePrefix}
|
||||||
|
containerCounterBucketName = []byte{containerCountersPrefix}
|
||||||
|
|
||||||
zeroValue = []byte{0xFF}
|
zeroValue = []byte{0xFF}
|
||||||
)
|
)
|
||||||
|
@ -111,6 +112,11 @@ const (
|
||||||
// Key: split ID
|
// Key: split ID
|
||||||
// Value: list of object IDs
|
// Value: list of object IDs
|
||||||
splitPrefix
|
splitPrefix
|
||||||
|
|
||||||
|
// containerCountersPrefix is used for storing container object counters.
|
||||||
|
// Key: container ID + type
|
||||||
|
// Value: container size in bytes as little-endian uint64
|
||||||
|
containerCountersPrefix
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
|
@ -118,6 +118,7 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error
|
||||||
}
|
}
|
||||||
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
|
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
|
||||||
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
|
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
|
||||||
|
s.decContainerObjectCounter(res.RemovedByCnrID())
|
||||||
removedPayload := res.RemovedPhysicalObjectSizes()[0]
|
removedPayload := res.RemovedPhysicalObjectSizes()[0]
|
||||||
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]
|
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]
|
||||||
if logicalRemovedPayload > 0 {
|
if logicalRemovedPayload > 0 {
|
||||||
|
|
|
@ -416,6 +416,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
|
||||||
|
|
||||||
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular)
|
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular)
|
||||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||||
|
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
for i < res.GetDeletionInfoLength() {
|
for i < res.GetDeletionInfoLength() {
|
||||||
|
@ -629,6 +630,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
|
||||||
|
|
||||||
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone)
|
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone)
|
||||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||||
|
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
for i < res.GetDeletionInfoLength() {
|
for i < res.GetDeletionInfoLength() {
|
||||||
|
@ -675,6 +677,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
|
||||||
|
|
||||||
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock)
|
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock)
|
||||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||||
|
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
for i < res.GetDeletionInfoLength() {
|
for i < res.GetDeletionInfoLength() {
|
||||||
|
|
|
@ -122,6 +122,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
||||||
s.m.RUnlock()
|
s.m.RUnlock()
|
||||||
|
|
||||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||||
|
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
for i < res.GetDeletionInfoLength() {
|
for i < res.GetDeletionInfoLength() {
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -23,6 +24,7 @@ type metricsStore struct {
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
objCounters map[string]uint64
|
objCounters map[string]uint64
|
||||||
cnrSize map[string]int64
|
cnrSize map[string]int64
|
||||||
|
cnrCount map[string]uint64
|
||||||
pldSize int64
|
pldSize int64
|
||||||
mode mode.Mode
|
mode mode.Mode
|
||||||
errCounter int64
|
errCounter int64
|
||||||
|
@ -126,6 +128,39 @@ func (m *metricsStore) DeleteShardMetrics() {
|
||||||
m.errCounter = 0
|
m.errCounter = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *metricsStore) SetContainerObjectsCount(cnrID string, objectType string, value uint64) {
|
||||||
|
m.mtx.Lock()
|
||||||
|
defer m.mtx.Unlock()
|
||||||
|
m.cnrCount[cnrID+objectType] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricsStore) IncContainerObjectsCount(cnrID string, objectType string) {
|
||||||
|
m.mtx.Lock()
|
||||||
|
defer m.mtx.Unlock()
|
||||||
|
m.cnrCount[cnrID+objectType]++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricsStore) SubContainerObjectsCount(cnrID string, objectType string, value uint64) {
|
||||||
|
m.mtx.Lock()
|
||||||
|
defer m.mtx.Unlock()
|
||||||
|
existed := m.cnrCount[cnrID+objectType]
|
||||||
|
if existed < value {
|
||||||
|
panic("existed value smaller than value to sustract")
|
||||||
|
}
|
||||||
|
if existed == value {
|
||||||
|
delete(m.cnrCount, cnrID+objectType)
|
||||||
|
} else {
|
||||||
|
m.cnrCount[cnrID+objectType] -= value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricsStore) getContainerCount(cnrID, objectType string) (uint64, bool) {
|
||||||
|
m.mtx.Lock()
|
||||||
|
defer m.mtx.Unlock()
|
||||||
|
v, ok := m.cnrCount[cnrID+objectType]
|
||||||
|
return v, ok
|
||||||
|
}
|
||||||
|
|
||||||
func TestCounters(t *testing.T) {
|
func TestCounters(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
@ -143,21 +178,37 @@ func TestCounters(t *testing.T) {
|
||||||
oo[i] = testutil.GenerateObject()
|
oo[i] = testutil.GenerateObject()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cc := meta.ContainerCounters{Logical: make(map[cid.ID]uint64), Physical: make(map[cid.ID]uint64)}
|
||||||
|
|
||||||
t.Run("defaults", func(t *testing.T) {
|
t.Run("defaults", func(t *testing.T) {
|
||||||
require.Zero(t, mm.getObjectCounter(physical))
|
require.Zero(t, mm.getObjectCounter(physical))
|
||||||
require.Zero(t, mm.getObjectCounter(logical))
|
require.Zero(t, mm.getObjectCounter(logical))
|
||||||
require.Empty(t, mm.containerSizes())
|
require.Empty(t, mm.containerSizes())
|
||||||
require.Zero(t, mm.payloadSize())
|
require.Zero(t, mm.payloadSize())
|
||||||
|
|
||||||
|
for _, obj := range oo {
|
||||||
|
contID, _ := obj.ContainerID()
|
||||||
|
v, ok := mm.getContainerCount(contID.EncodeToString(), physical)
|
||||||
|
require.Zero(t, v)
|
||||||
|
require.False(t, ok)
|
||||||
|
v, ok = mm.getContainerCount(contID.EncodeToString(), logical)
|
||||||
|
require.Zero(t, v)
|
||||||
|
require.False(t, ok)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
var totalPayload int64
|
var totalPayload int64
|
||||||
|
|
||||||
expectedLogicalSizes := make(map[string]int64)
|
expectedLogicalSizes := make(map[string]int64)
|
||||||
|
expectedLogCC := make(map[cid.ID]uint64)
|
||||||
|
expectedPhyCC := make(map[cid.ID]uint64)
|
||||||
for i := range oo {
|
for i := range oo {
|
||||||
cnr, _ := oo[i].ContainerID()
|
cnr, _ := oo[i].ContainerID()
|
||||||
oSize := int64(oo[i].PayloadSize())
|
oSize := int64(oo[i].PayloadSize())
|
||||||
expectedLogicalSizes[cnr.EncodeToString()] += oSize
|
expectedLogicalSizes[cnr.EncodeToString()] += oSize
|
||||||
totalPayload += oSize
|
totalPayload += oSize
|
||||||
|
expectedLogCC[cnr]++
|
||||||
|
expectedPhyCC[cnr]++
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm PutPrm
|
var prm PutPrm
|
||||||
|
@ -174,6 +225,11 @@ func TestCounters(t *testing.T) {
|
||||||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||||
require.Equal(t, totalPayload, mm.payloadSize())
|
require.Equal(t, totalPayload, mm.payloadSize())
|
||||||
|
|
||||||
|
cc, err := sh.metaBase.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expectedLogCC, cc.Logical)
|
||||||
|
require.Equal(t, expectedPhyCC, cc.Physical)
|
||||||
|
|
||||||
t.Run("inhume_GC", func(t *testing.T) {
|
t.Run("inhume_GC", func(t *testing.T) {
|
||||||
var prm InhumePrm
|
var prm InhumePrm
|
||||||
inhumedNumber := objNumber / 4
|
inhumedNumber := objNumber / 4
|
||||||
|
@ -187,6 +243,11 @@ func TestCounters(t *testing.T) {
|
||||||
cid, ok := oo[i].ContainerID()
|
cid, ok := oo[i].ContainerID()
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
|
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
|
||||||
|
|
||||||
|
expectedLogCC[cid]--
|
||||||
|
if expectedLogCC[cid] == 0 {
|
||||||
|
delete(expectedLogCC, cid)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical))
|
require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical))
|
||||||
|
@ -194,6 +255,11 @@ func TestCounters(t *testing.T) {
|
||||||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||||
require.Equal(t, totalPayload, mm.payloadSize())
|
require.Equal(t, totalPayload, mm.payloadSize())
|
||||||
|
|
||||||
|
cc, err := sh.metaBase.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expectedLogCC, cc.Logical)
|
||||||
|
require.Equal(t, expectedPhyCC, cc.Physical)
|
||||||
|
|
||||||
oo = oo[inhumedNumber:]
|
oo = oo[inhumedNumber:]
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -214,6 +280,11 @@ func TestCounters(t *testing.T) {
|
||||||
cid, ok := oo[i].ContainerID()
|
cid, ok := oo[i].ContainerID()
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
|
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
|
||||||
|
|
||||||
|
expectedLogCC[cid]--
|
||||||
|
if expectedLogCC[cid] == 0 {
|
||||||
|
delete(expectedLogCC, cid)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, phy, mm.getObjectCounter(physical))
|
require.Equal(t, phy, mm.getObjectCounter(physical))
|
||||||
|
@ -221,6 +292,11 @@ func TestCounters(t *testing.T) {
|
||||||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||||
require.Equal(t, totalPayload, mm.payloadSize())
|
require.Equal(t, totalPayload, mm.payloadSize())
|
||||||
|
|
||||||
|
cc, err = sh.metaBase.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expectedLogCC, cc.Logical)
|
||||||
|
require.Equal(t, expectedPhyCC, cc.Physical)
|
||||||
|
|
||||||
oo = oo[inhumedNumber:]
|
oo = oo[inhumedNumber:]
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -245,9 +321,24 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
cnr, _ := oo[i].ContainerID()
|
cnr, _ := oo[i].ContainerID()
|
||||||
expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload)
|
expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload)
|
||||||
|
|
||||||
|
expectedLogCC[cnr]--
|
||||||
|
if expectedLogCC[cnr] == 0 {
|
||||||
|
delete(expectedLogCC, cnr)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedPhyCC[cnr]--
|
||||||
|
if expectedPhyCC[cnr] == 0 {
|
||||||
|
delete(expectedPhyCC, cnr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||||
require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize())
|
require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize())
|
||||||
|
|
||||||
|
cc, err = sh.metaBase.ContainerCounters(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expectedLogCC, cc.Logical)
|
||||||
|
require.Equal(t, expectedPhyCC, cc.Physical)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,7 +359,8 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
|
||||||
"phy": 0,
|
"phy": 0,
|
||||||
"logic": 0,
|
"logic": 0,
|
||||||
},
|
},
|
||||||
cnrSize: make(map[string]int64),
|
cnrSize: make(map[string]int64),
|
||||||
|
cnrCount: make(map[string]uint64),
|
||||||
}
|
}
|
||||||
|
|
||||||
sh := New(
|
sh := New(
|
||||||
|
|
|
@ -90,7 +90,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
||||||
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.incObjectCounter()
|
s.incObjectCounter(putPrm.Address.Container())
|
||||||
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
|
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
|
||||||
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
|
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -85,6 +86,12 @@ type MetricsWriter interface {
|
||||||
ClearErrorCounter()
|
ClearErrorCounter()
|
||||||
// DeleteShardMetrics deletes shard metrics from registry.
|
// DeleteShardMetrics deletes shard metrics from registry.
|
||||||
DeleteShardMetrics()
|
DeleteShardMetrics()
|
||||||
|
// SetContainerObjectsCount sets container object count.
|
||||||
|
SetContainerObjectsCount(cnrID string, objectType string, value uint64)
|
||||||
|
// IncContainerObjectsCount increments container object count.
|
||||||
|
IncContainerObjectsCount(cnrID string, objectType string)
|
||||||
|
// SubContainerObjectsCount subtracts container object count.
|
||||||
|
SubContainerObjectsCount(cnrID string, objectType string, value uint64)
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
|
@ -425,15 +432,29 @@ func (s *Shard) updateMetrics(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
s.metricsWriter.AddToPayloadSize(int64(totalPayload))
|
s.metricsWriter.AddToPayloadSize(int64(totalPayload))
|
||||||
|
|
||||||
|
contCount, err := s.metaBase.ContainerCounters(ctx)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Warn(logs.FailedToGetContainerCounters, zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for contID, count := range contCount.Physical {
|
||||||
|
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), physical, count)
|
||||||
|
}
|
||||||
|
for contID, count := range contCount.Logical {
|
||||||
|
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), logical, count)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// incObjectCounter increment both physical and logical object
|
// incObjectCounter increment both physical and logical object
|
||||||
// counters.
|
// counters.
|
||||||
func (s *Shard) incObjectCounter() {
|
func (s *Shard) incObjectCounter(cnrID cid.ID) {
|
||||||
if s.cfg.metricsWriter != nil {
|
if s.cfg.metricsWriter != nil {
|
||||||
s.cfg.metricsWriter.IncObjectCounter(physical)
|
s.cfg.metricsWriter.IncObjectCounter(physical)
|
||||||
s.cfg.metricsWriter.IncObjectCounter(logical)
|
s.cfg.metricsWriter.IncObjectCounter(logical)
|
||||||
|
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical)
|
||||||
|
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -443,6 +464,17 @@ func (s *Shard) decObjectCounterBy(typ string, v uint64) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]meta.ObjectCounters) {
|
||||||
|
if s.cfg.metricsWriter == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for cnrID, count := range byCnr {
|
||||||
|
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy())
|
||||||
|
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Shard) addToContainerSize(cnr string, size int64) {
|
func (s *Shard) addToContainerSize(cnr string, size int64) {
|
||||||
if s.cfg.metricsWriter != nil {
|
if s.cfg.metricsWriter != nil {
|
||||||
s.cfg.metricsWriter.AddToContainerSize(cnr, size)
|
s.cfg.metricsWriter.AddToContainerSize(cnr, size)
|
||||||
|
|
|
@ -18,6 +18,9 @@ type EngineMetrics interface {
|
||||||
SetObjectCounter(shardID, objectType string, v uint64)
|
SetObjectCounter(shardID, objectType string, v uint64)
|
||||||
AddToPayloadCounter(shardID string, size int64)
|
AddToPayloadCounter(shardID string, size int64)
|
||||||
SetMode(shardID string, mode mode.Mode)
|
SetMode(shardID string, mode mode.Mode)
|
||||||
|
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||||
|
IncContainerObjectCounter(shardID, contID, objectType string)
|
||||||
|
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||||
|
|
||||||
WriteCache() WriteCacheMetrics
|
WriteCache() WriteCacheMetrics
|
||||||
GC() GCMetrics
|
GC() GCMetrics
|
||||||
|
@ -30,6 +33,7 @@ type engineMetrics struct {
|
||||||
payloadSize *prometheus.GaugeVec
|
payloadSize *prometheus.GaugeVec
|
||||||
errorCounter *prometheus.GaugeVec
|
errorCounter *prometheus.GaugeVec
|
||||||
mode *shardIDModeValue
|
mode *shardIDModeValue
|
||||||
|
contObjCounter *prometheus.GaugeVec
|
||||||
|
|
||||||
gc *gcMetrics
|
gc *gcMetrics
|
||||||
writeCache *writeCacheMetrics
|
writeCache *writeCacheMetrics
|
||||||
|
@ -46,10 +50,13 @@ func newEngineMetrics() *engineMetrics {
|
||||||
Name: "request_duration_seconds",
|
Name: "request_duration_seconds",
|
||||||
Help: "Duration of Engine requests",
|
Help: "Duration of Engine requests",
|
||||||
}, []string{methodLabel}),
|
}, []string{methodLabel}),
|
||||||
objectCounter: newEngineGaugeVector("objects_total", "Objects counters per shards", []string{shardIDLabel, typeLabel}),
|
objectCounter: newEngineGaugeVector("objects_total",
|
||||||
gc: newGCMetrics(),
|
"Objects counters per shards. DEPRECATED: Will be deleted in next releasese, use frostfs_node_engine_container_objects_total metric.",
|
||||||
writeCache: newWriteCacheMetrics(),
|
[]string{shardIDLabel, typeLabel}),
|
||||||
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
|
gc: newGCMetrics(),
|
||||||
|
writeCache: newWriteCacheMetrics(),
|
||||||
|
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
|
||||||
|
contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,6 +95,7 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
|
||||||
m.errorCounter.Delete(prometheus.Labels{shardIDLabel: shardID})
|
m.errorCounter.Delete(prometheus.Labels{shardIDLabel: shardID})
|
||||||
m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID})
|
m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID})
|
||||||
m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||||
|
m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||||
m.mode.Delete(shardID)
|
m.mode.Delete(shardID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,6 +117,36 @@ func (m *engineMetrics) SetObjectCounter(shardID, objectType string, v uint64) {
|
||||||
).Set(float64(v))
|
).Set(float64(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *engineMetrics) SetContainerObjectCounter(shardID, contID, objectType string, v uint64) {
|
||||||
|
m.contObjCounter.With(
|
||||||
|
prometheus.Labels{
|
||||||
|
shardIDLabel: shardID,
|
||||||
|
containerIDLabelKey: contID,
|
||||||
|
typeLabel: objectType,
|
||||||
|
},
|
||||||
|
).Set(float64(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *engineMetrics) IncContainerObjectCounter(shardID, contID, objectType string) {
|
||||||
|
m.contObjCounter.With(
|
||||||
|
prometheus.Labels{
|
||||||
|
shardIDLabel: shardID,
|
||||||
|
containerIDLabelKey: contID,
|
||||||
|
typeLabel: objectType,
|
||||||
|
},
|
||||||
|
).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *engineMetrics) SubContainerObjectCounter(shardID, contID, objectType string, v uint64) {
|
||||||
|
m.contObjCounter.With(
|
||||||
|
prometheus.Labels{
|
||||||
|
shardIDLabel: shardID,
|
||||||
|
containerIDLabelKey: contID,
|
||||||
|
typeLabel: objectType,
|
||||||
|
},
|
||||||
|
).Sub(float64(v))
|
||||||
|
}
|
||||||
|
|
||||||
func (m *engineMetrics) SetMode(shardID string, mode mode.Mode) {
|
func (m *engineMetrics) SetMode(shardID string, mode mode.Mode) {
|
||||||
m.mode.SetMode(shardID, mode.String())
|
m.mode.SetMode(shardID, mode.String())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue