Customer object counter #843
15 changed files with 509 additions and 354 deletions
|
@ -46,10 +46,6 @@ func (m *metricsWithID) IncObjectCounter(objectType string) {
|
||||||
m.mw.AddToObjectCounter(m.id, objectType, +1)
|
m.mw.AddToObjectCounter(m.id, objectType, +1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *metricsWithID) DecObjectCounter(objectType string) {
|
|
||||||
m.mw.AddToObjectCounter(m.id, objectType, -1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *metricsWithID) SetMode(mode mode.Mode) {
|
func (m *metricsWithID) SetMode(mode mode.Mode) {
|
||||||
m.mw.SetMode(m.id, mode)
|
m.mw.SetMode(m.id, mode)
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
@ -18,6 +19,7 @@ import (
|
||||||
var (
|
var (
|
||||||
objectPhyCounterKey = []byte("phy_counter")
|
objectPhyCounterKey = []byte("phy_counter")
|
||||||
objectLogicCounterKey = []byte("logic_counter")
|
objectLogicCounterKey = []byte("logic_counter")
|
||||||
|
objectUserCounterKey = []byte("user_counter")
|
||||||
)
|
)
|
||||||
|
|
||||||
type objectType uint8
|
type objectType uint8
|
||||||
|
@ -26,23 +28,19 @@ const (
|
||||||
_ objectType = iota
|
_ objectType = iota
|
||||||
phy
|
phy
|
||||||
logical
|
logical
|
||||||
|
user
|
||||||
)
|
)
|
||||||
|
|
||||||
// ObjectCounters groups object counter
|
// ObjectCounters groups object counter
|
||||||
// according to metabase state.
|
// according to metabase state.
|
||||||
type ObjectCounters struct {
|
type ObjectCounters struct {
|
||||||
logic uint64
|
Logic uint64
|
||||||
phy uint64
|
Phy uint64
|
||||||
|
User uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Logic returns logical object counter.
|
func (o ObjectCounters) IsZero() bool {
|
||||||
func (o ObjectCounters) Logic() uint64 {
|
return o.Phy == 0 && o.Logic == 0 && o.User == 0
|
||||||
return o.logic
|
|
||||||
}
|
|
||||||
|
|
||||||
// Phy returns physical object counter.
|
|
||||||
func (o ObjectCounters) Phy() uint64 {
|
|
||||||
return o.phy
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectCounters returns object counters that metabase has
|
// ObjectCounters returns object counters that metabase has
|
||||||
|
@ -63,12 +61,17 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
|
||||||
if b != nil {
|
if b != nil {
|
||||||
data := b.Get(objectPhyCounterKey)
|
data := b.Get(objectPhyCounterKey)
|
||||||
if len(data) == 8 {
|
if len(data) == 8 {
|
||||||
cc.phy = binary.LittleEndian.Uint64(data)
|
cc.Phy = binary.LittleEndian.Uint64(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
data = b.Get(objectLogicCounterKey)
|
data = b.Get(objectLogicCounterKey)
|
||||||
if len(data) == 8 {
|
if len(data) == 8 {
|
||||||
cc.logic = binary.LittleEndian.Uint64(data)
|
cc.Logic = binary.LittleEndian.Uint64(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
data = b.Get(objectUserCounterKey)
|
||||||
|
if len(data) == 8 {
|
||||||
|
cc.User = binary.LittleEndian.Uint64(data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,8 +82,7 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type ContainerCounters struct {
|
type ContainerCounters struct {
|
||||||
Logical map[cid.ID]uint64
|
Counts map[cid.ID]ObjectCounters
|
||||||
Physical map[cid.ID]uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ContainerCounters returns object counters for each container
|
// ContainerCounters returns object counters for each container
|
||||||
|
@ -103,8 +105,7 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
cc := ContainerCounters{
|
cc := ContainerCounters{
|
||||||
Logical: make(map[cid.ID]uint64),
|
Counts: make(map[cid.ID]ObjectCounters),
|
||||||
Physical: make(map[cid.ID]uint64),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
lastKey := make([]byte, cidSize)
|
lastKey := make([]byte, cidSize)
|
||||||
|
@ -158,16 +159,11 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
phy, logic, err := parseContainerCounterValue(value)
|
ent, err := parseContainerCounterValue(value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if phy > 0 {
|
cc.Counts[cnrID] = ent
|
||||||
cc.Physical[cnrID] = phy
|
|
||||||
}
|
|
||||||
if logic > 0 {
|
|
||||||
cc.Logical[cnrID] = logic
|
|
||||||
}
|
|
||||||
|
|
||||||
counter++
|
counter++
|
||||||
if counter == batchSize {
|
if counter == batchSize {
|
||||||
|
@ -189,14 +185,19 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID) error {
|
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
||||||
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
|
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
|
||||||
return fmt.Errorf("could not increase phy object counter: %w", err)
|
return fmt.Errorf("could not increase phy object counter: %w", err)
|
||||||
}
|
}
|
||||||
if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil {
|
if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil {
|
||||||
return fmt.Errorf("could not increase logical object counter: %w", err)
|
return fmt.Errorf("could not increase logical object counter: %w", err)
|
||||||
}
|
}
|
||||||
return db.incContainerObjectCounter(tx, cnrID)
|
if isUserObject {
|
||||||
|
if err := db.updateShardObjectCounter(tx, user, 1, true); err != nil {
|
||||||
|
return fmt.Errorf("could not increase user object counter: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
|
func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
|
||||||
|
@ -213,6 +214,8 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6
|
||||||
counterKey = objectPhyCounterKey
|
counterKey = objectPhyCounterKey
|
||||||
case logical:
|
case logical:
|
||||||
counterKey = objectLogicCounterKey
|
counterKey = objectLogicCounterKey
|
||||||
|
case user:
|
||||||
|
counterKey = objectUserCounterKey
|
||||||
default:
|
default:
|
||||||
panic("unknown object type counter")
|
panic("unknown object type counter")
|
||||||
}
|
}
|
||||||
|
@ -236,7 +239,7 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6
|
||||||
return b.Put(counterKey, newCounter)
|
return b.Put(counterKey, newCounter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error {
|
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { // TODO #838
|
||||||
b := tx.Bucket(containerCounterBucketName)
|
b := tx.Bucket(containerCounterBucketName)
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -253,22 +256,23 @@ func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error {
|
func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error {
|
||||||
var phyValue, logicValue uint64
|
var entity ObjectCounters
|
||||||
var err error
|
var err error
|
||||||
data := b.Get(key)
|
data := b.Get(key)
|
||||||
if len(data) > 0 {
|
if len(data) > 0 {
|
||||||
phyValue, logicValue, err = parseContainerCounterValue(data)
|
entity, err = parseContainerCounterValue(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
phyValue = nextValue(phyValue, delta.phy, inc)
|
entity.Phy = nextValue(entity.Phy, delta.Phy, inc)
|
||||||
logicValue = nextValue(logicValue, delta.logic, inc)
|
entity.Logic = nextValue(entity.Logic, delta.Logic, inc)
|
||||||
if phyValue > 0 || logicValue > 0 {
|
entity.User = nextValue(entity.User, delta.User, inc)
|
||||||
value := containerCounterValue(phyValue, logicValue)
|
if entity.IsZero() {
|
||||||
return b.Put(key, value)
|
return b.Delete(key)
|
||||||
}
|
}
|
||||||
return b.Delete(key)
|
value := containerCounterValue(entity)
|
||||||
|
return b.Put(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
func nextValue(existed, delta uint64, inc bool) uint64 {
|
func nextValue(existed, delta uint64, inc bool) uint64 {
|
||||||
|
@ -282,7 +286,7 @@ func nextValue(existed, delta uint64, inc bool) uint64 {
|
||||||
return existed
|
return existed
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error {
|
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
||||||
b := tx.Bucket(containerCounterBucketName)
|
b := tx.Bucket(containerCounterBucketName)
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -290,7 +294,11 @@ func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error {
|
||||||
|
|
||||||
key := make([]byte, cidSize)
|
key := make([]byte, cidSize)
|
||||||
cnrID.Encode(key)
|
cnrID.Encode(key)
|
||||||
return db.editContainerCounterValue(b, key, ObjectCounters{logic: 1, phy: 1}, true)
|
c := ObjectCounters{Logic: 1, Phy: 1}
|
||||||
|
if isUserObject {
|
||||||
|
c.User = 1
|
||||||
|
}
|
||||||
|
return db.editContainerCounterValue(b, key, c, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncCounter updates object counters according to metabase state:
|
// syncCounter updates object counters according to metabase state:
|
||||||
|
@ -304,9 +312,11 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not get shard info bucket: %w", err)
|
return fmt.Errorf("could not get shard info bucket: %w", err)
|
||||||
}
|
}
|
||||||
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 && len(shardInfoB.Get(objectLogicCounterKey)) == 8
|
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 &&
|
||||||
containerCounterInitialized := tx.Bucket(containerCounterBucketName) != nil
|
len(shardInfoB.Get(objectLogicCounterKey)) == 8 &&
|
||||||
if !force && shardObjectCounterInitialized && containerCounterInitialized {
|
len(shardInfoB.Get(objectUserCounterKey)) == 8
|
||||||
|
containerObjectCounterInitialized := containerObjectCounterInitialized(tx)
|
||||||
|
if !force && shardObjectCounterInitialized && containerObjectCounterInitialized {
|
||||||
// the counters are already inited
|
// the counters are already inited
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -322,29 +332,43 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
garbageBKT := tx.Bucket(garbageBucketName)
|
||||||
key := make([]byte, addressKeySize)
|
key := make([]byte, addressKeySize)
|
||||||
|
var isAvailable bool
|
||||||
|
|
||||||
err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error {
|
err = iteratePhyObjects(tx, func(cnr cid.ID, objID oid.ID, obj *objectSDK.Object) error {
|
||||||
if v, ok := counters[cnr]; ok {
|
if v, ok := counters[cnr]; ok {
|
||||||
v.phy++
|
v.Phy++
|
||||||
counters[cnr] = v
|
counters[cnr] = v
|
||||||
} else {
|
} else {
|
||||||
counters[cnr] = ObjectCounters{
|
counters[cnr] = ObjectCounters{
|
||||||
phy: 1,
|
Phy: 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
addr.SetContainer(cnr)
|
addr.SetContainer(cnr)
|
||||||
addr.SetObject(obj)
|
addr.SetObject(objID)
|
||||||
|
isAvailable = false
|
||||||
|
|
||||||
// check if an object is available: not with GCMark
|
// check if an object is available: not with GCMark
|
||||||
// and not covered with a tombstone
|
// and not covered with a tombstone
|
||||||
if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 {
|
if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 {
|
||||||
if v, ok := counters[cnr]; ok {
|
if v, ok := counters[cnr]; ok {
|
||||||
v.logic++
|
v.Logic++
|
||||||
counters[cnr] = v
|
counters[cnr] = v
|
||||||
} else {
|
} else {
|
||||||
counters[cnr] = ObjectCounters{
|
counters[cnr] = ObjectCounters{
|
||||||
logic: 1,
|
Logic: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
isAvailable = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if isAvailable && IsUserObject(obj) {
|
||||||
|
if v, ok := counters[cnr]; ok {
|
||||||
|
v.User++
|
||||||
|
counters[cnr] = v
|
||||||
|
} else {
|
||||||
|
counters[cnr] = ObjectCounters{
|
||||||
|
User: 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -361,13 +385,15 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||||
func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error {
|
func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error {
|
||||||
var phyTotal uint64
|
var phyTotal uint64
|
||||||
var logicTotal uint64
|
var logicTotal uint64
|
||||||
|
var userTotal uint64
|
||||||
key := make([]byte, cidSize)
|
key := make([]byte, cidSize)
|
||||||
for cnrID, count := range counters {
|
for cnrID, count := range counters {
|
||||||
phyTotal += count.phy
|
phyTotal += count.Phy
|
||||||
logicTotal += count.logic
|
logicTotal += count.Logic
|
||||||
|
userTotal += count.User
|
||||||
|
|
||||||
cnrID.Encode(key)
|
cnrID.Encode(key)
|
||||||
value := containerCounterValue(count.phy, count.logic)
|
value := containerCounterValue(count)
|
||||||
err := containerCounterB.Put(key, value)
|
err := containerCounterB.Put(key, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not update phy container object counter: %w", err)
|
return fmt.Errorf("could not update phy container object counter: %w", err)
|
||||||
|
@ -389,13 +415,22 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
|
||||||
return fmt.Errorf("could not update logic object counter: %w", err)
|
return fmt.Errorf("could not update logic object counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
userData := make([]byte, 8)
|
||||||
|
binary.LittleEndian.PutUint64(userData, userTotal)
|
||||||
|
|
||||||
|
err = shardInfoB.Put(objectUserCounterKey, userData)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not update user object counter: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func containerCounterValue(phy, logic uint64) []byte {
|
func containerCounterValue(entity ObjectCounters) []byte {
|
||||||
res := make([]byte, 16)
|
res := make([]byte, 24)
|
||||||
binary.LittleEndian.PutUint64(res, phy)
|
binary.LittleEndian.PutUint64(res, entity.Phy)
|
||||||
binary.LittleEndian.PutUint64(res[8:], logic)
|
binary.LittleEndian.PutUint64(res[8:], entity.Logic)
|
||||||
|
binary.LittleEndian.PutUint64(res[16:], entity.User)
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -411,9 +446,37 @@ func parseContainerCounterKey(buf []byte) (cid.ID, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseContainerCounterValue return phy, logic values.
|
// parseContainerCounterValue return phy, logic values.
|
||||||
func parseContainerCounterValue(buf []byte) (uint64, uint64, error) {
|
func parseContainerCounterValue(buf []byte) (ObjectCounters, error) {
|
||||||
if len(buf) != 16 {
|
if len(buf) != 24 {
|
||||||
return 0, 0, fmt.Errorf("invalid value length")
|
return ObjectCounters{}, fmt.Errorf("invalid value length")
|
||||||
}
|
}
|
||||||
return binary.LittleEndian.Uint64(buf), binary.LittleEndian.Uint64(buf[8:]), nil
|
return ObjectCounters{
|
||||||
|
Phy: binary.LittleEndian.Uint64(buf),
|
||||||
|
Logic: binary.LittleEndian.Uint64(buf[8:16]),
|
||||||
|
User: binary.LittleEndian.Uint64(buf[16:]),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func containerObjectCounterInitialized(tx *bbolt.Tx) bool {
|
||||||
|
b := tx.Bucket(containerCounterBucketName)
|
||||||
|
if b == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
k, v := b.Cursor().First()
|
||||||
|
if k == nil && v == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
_, err := parseContainerCounterKey(k)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
_, err = parseContainerCounterValue(v)
|
||||||
|
return err == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsUserObject(obj *objectSDK.Object) bool {
|
||||||
|
_, hasParentID := obj.ParentID()
|
||||||
|
return obj.Type() == objectSDK.TypeRegular &&
|
||||||
|
(obj.SplitID() == nil ||
|
||||||
|
(hasParentID && len(obj.Children()) == 0))
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,13 +24,13 @@ func TestCounters(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Zero(t, c.Phy())
|
require.Zero(t, c.Phy)
|
||||||
require.Zero(t, c.Logic())
|
require.Zero(t, c.Logic)
|
||||||
|
require.Zero(t, c.User)
|
||||||
|
|
||||||
cc, err := db.ContainerCounters(context.Background())
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Zero(t, len(cc.Physical))
|
require.Zero(t, len(cc.Counts))
|
||||||
require.Zero(t, len(cc.Logical))
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("put", func(t *testing.T) {
|
t.Run("put", func(t *testing.T) {
|
||||||
|
@ -42,29 +42,31 @@ func TestCounters(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm meta.PutPrm
|
var prm meta.PutPrm
|
||||||
expPhy := make(map[cid.ID]uint64)
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
||||||
expLog := make(map[cid.ID]uint64)
|
|
||||||
|
|
||||||
for i := 0; i < objCount; i++ {
|
for i := 0; i < objCount; i++ {
|
||||||
prm.SetObject(oo[i])
|
prm.SetObject(oo[i])
|
||||||
cnrID, _ := oo[i].ContainerID()
|
cnrID, _ := oo[i].ContainerID()
|
||||||
expPhy[cnrID]++
|
c := meta.ObjectCounters{}
|
||||||
expLog[cnrID]++
|
exp[cnrID] = meta.ObjectCounters{
|
||||||
|
Logic: 1,
|
||||||
|
Phy: 1,
|
||||||
|
User: 1,
|
||||||
|
}
|
||||||
|
|
||||||
_, err := db.Put(context.Background(), prm)
|
_, err := db.Put(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err = db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(i+1), c.Phy())
|
require.Equal(t, uint64(i+1), c.Phy)
|
||||||
require.Equal(t, uint64(i+1), c.Logic())
|
require.Equal(t, uint64(i+1), c.Logic)
|
||||||
|
|
||||||
cc, err := db.ContainerCounters(context.Background())
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, expPhy, cc.Physical)
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||||
require.Equal(t, expLog, cc.Logical)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -73,12 +75,14 @@ func TestCounters(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
oo := putObjs(t, db, objCount, false)
|
oo := putObjs(t, db, objCount, false)
|
||||||
|
|
||||||
expPhy := make(map[cid.ID]uint64)
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
||||||
expLog := make(map[cid.ID]uint64)
|
|
||||||
for _, obj := range oo {
|
for _, obj := range oo {
|
||||||
cnrID, _ := obj.ContainerID()
|
cnrID, _ := obj.ContainerID()
|
||||||
expPhy[cnrID]++
|
exp[cnrID] = meta.ObjectCounters{
|
||||||
expLog[cnrID]++
|
Logic: 1,
|
||||||
|
Phy: 1,
|
||||||
|
User: 1,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm meta.DeletePrm
|
var prm meta.DeletePrm
|
||||||
|
@ -92,29 +96,25 @@ func TestCounters(t *testing.T) {
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(i), c.Phy())
|
require.Equal(t, uint64(i), c.Phy)
|
||||||
require.Equal(t, uint64(i), c.Logic())
|
require.Equal(t, uint64(i), c.Logic)
|
||||||
|
require.Equal(t, uint64(i), c.User)
|
||||||
|
|
||||||
cnrID, _ := oo[i].ContainerID()
|
cnrID, _ := oo[i].ContainerID()
|
||||||
if v, ok := expPhy[cnrID]; ok {
|
if v, ok := exp[cnrID]; ok {
|
||||||
if v == 1 {
|
v.Phy--
|
||||||
delete(expPhy, cnrID)
|
v.Logic--
|
||||||
|
v.User--
|
||||||
|
if v.IsZero() {
|
||||||
|
delete(exp, cnrID)
|
||||||
} else {
|
} else {
|
||||||
expPhy[cnrID]--
|
exp[cnrID] = v
|
||||||
}
|
|
||||||
}
|
|
||||||
if v, ok := expLog[cnrID]; ok {
|
|
||||||
if v == 1 {
|
|
||||||
delete(expLog, cnrID)
|
|
||||||
} else {
|
|
||||||
expLog[cnrID]--
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cc, err := db.ContainerCounters(context.Background())
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expPhy, cc.Physical)
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||||
require.Equal(t, expLog, cc.Logical)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -123,12 +123,14 @@ func TestCounters(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
oo := putObjs(t, db, objCount, false)
|
oo := putObjs(t, db, objCount, false)
|
||||||
|
|
||||||
expPhy := make(map[cid.ID]uint64)
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
||||||
expLog := make(map[cid.ID]uint64)
|
|
||||||
for _, obj := range oo {
|
for _, obj := range oo {
|
||||||
cnrID, _ := obj.ContainerID()
|
cnrID, _ := obj.ContainerID()
|
||||||
expPhy[cnrID]++
|
exp[cnrID] = meta.ObjectCounters{
|
||||||
expLog[cnrID]++
|
Logic: 1,
|
||||||
|
Phy: 1,
|
||||||
|
User: 1,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
inhumedObjs := make([]oid.Address, objCount/2)
|
inhumedObjs := make([]oid.Address, objCount/2)
|
||||||
|
@ -142,11 +144,13 @@ func TestCounters(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, addr := range inhumedObjs {
|
for _, addr := range inhumedObjs {
|
||||||
if v, ok := expLog[addr.Container()]; ok {
|
if v, ok := exp[addr.Container()]; ok {
|
||||||
if v == 1 {
|
v.Logic--
|
||||||
delete(expLog, addr.Container())
|
v.User--
|
||||||
|
if v.IsZero() {
|
||||||
|
delete(exp, addr.Container())
|
||||||
} else {
|
} else {
|
||||||
expLog[addr.Container()]--
|
exp[addr.Container()] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,18 +162,19 @@ func TestCounters(t *testing.T) {
|
||||||
res, err := db.Inhume(context.Background(), prm)
|
res, err := db.Inhume(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed())
|
require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed())
|
||||||
|
require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed())
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(objCount), c.Phy())
|
require.Equal(t, uint64(objCount), c.Phy)
|
||||||
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
|
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic)
|
||||||
|
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.User)
|
||||||
|
|
||||||
cc, err := db.ContainerCounters(context.Background())
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, expPhy, cc.Physical)
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||||
require.Equal(t, expLog, cc.Logical)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("put_split", func(t *testing.T) {
|
t.Run("put_split", func(t *testing.T) {
|
||||||
|
@ -177,8 +182,7 @@ func TestCounters(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
parObj := testutil.GenerateObject()
|
parObj := testutil.GenerateObject()
|
||||||
|
|
||||||
expPhy := make(map[cid.ID]uint64)
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
||||||
expLog := make(map[cid.ID]uint64)
|
|
||||||
|
|
||||||
// put objects and check that parent info
|
// put objects and check that parent info
|
||||||
// does not affect the counter
|
// does not affect the counter
|
||||||
|
@ -186,23 +190,27 @@ func TestCounters(t *testing.T) {
|
||||||
o := testutil.GenerateObject()
|
o := testutil.GenerateObject()
|
||||||
if i < objCount/2 { // half of the objs will have the parent
|
if i < objCount/2 { // half of the objs will have the parent
|
||||||
o.SetParent(parObj)
|
o.SetParent(parObj)
|
||||||
|
o.SetSplitID(objectSDK.NewSplitID())
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrID, _ := o.ContainerID()
|
cnrID, _ := o.ContainerID()
|
||||||
expLog[cnrID]++
|
exp[cnrID] = meta.ObjectCounters{
|
||||||
expPhy[cnrID]++
|
Logic: 1,
|
||||||
|
Phy: 1,
|
||||||
|
User: 1,
|
||||||
|
}
|
||||||
|
|
||||||
require.NoError(t, putBig(db, o))
|
require.NoError(t, putBig(db, o))
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(i+1), c.Phy())
|
require.Equal(t, uint64(i+1), c.Phy)
|
||||||
require.Equal(t, uint64(i+1), c.Logic())
|
require.Equal(t, uint64(i+1), c.Logic)
|
||||||
|
require.Equal(t, uint64(i+1), c.User)
|
||||||
|
|
||||||
cc, err := db.ContainerCounters(context.Background())
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expPhy, cc.Physical)
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||||
require.Equal(t, expLog, cc.Logical)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -211,12 +219,14 @@ func TestCounters(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
oo := putObjs(t, db, objCount, true)
|
oo := putObjs(t, db, objCount, true)
|
||||||
|
|
||||||
expPhy := make(map[cid.ID]uint64)
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
||||||
expLog := make(map[cid.ID]uint64)
|
|
||||||
for _, obj := range oo {
|
for _, obj := range oo {
|
||||||
cnrID, _ := obj.ContainerID()
|
cnrID, _ := obj.ContainerID()
|
||||||
expPhy[cnrID]++
|
exp[cnrID] = meta.ObjectCounters{
|
||||||
expLog[cnrID]++
|
Logic: 1,
|
||||||
|
Phy: 1,
|
||||||
|
User: 1,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// delete objects that have parent info
|
// delete objects that have parent info
|
||||||
|
@ -228,21 +238,18 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(objCount-i-1), c.Phy())
|
require.Equal(t, uint64(objCount-i-1), c.Phy)
|
||||||
require.Equal(t, uint64(objCount-i-1), c.Logic())
|
require.Equal(t, uint64(objCount-i-1), c.Logic)
|
||||||
|
require.Equal(t, uint64(objCount-i-1), c.User)
|
||||||
|
|
||||||
if v, ok := expPhy[addr.Container()]; ok {
|
if v, ok := exp[addr.Container()]; ok {
|
||||||
if v == 1 {
|
v.Logic--
|
||||||
delete(expPhy, addr.Container())
|
v.Phy--
|
||||||
|
v.User--
|
||||||
|
if v.IsZero() {
|
||||||
|
delete(exp, addr.Container())
|
||||||
} else {
|
} else {
|
||||||
expPhy[addr.Container()]--
|
exp[addr.Container()] = v
|
||||||
}
|
|
||||||
}
|
|
||||||
if v, ok := expLog[addr.Container()]; ok {
|
|
||||||
if v == 1 {
|
|
||||||
delete(expLog, addr.Container())
|
|
||||||
} else {
|
|
||||||
expLog[addr.Container()]--
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -253,12 +260,14 @@ func TestCounters(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
oo := putObjs(t, db, objCount, true)
|
oo := putObjs(t, db, objCount, true)
|
||||||
|
|
||||||
expPhy := make(map[cid.ID]uint64)
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
||||||
expLog := make(map[cid.ID]uint64)
|
|
||||||
for _, obj := range oo {
|
for _, obj := range oo {
|
||||||
cnrID, _ := obj.ContainerID()
|
cnrID, _ := obj.ContainerID()
|
||||||
expPhy[cnrID]++
|
exp[cnrID] = meta.ObjectCounters{
|
||||||
expLog[cnrID]++
|
Logic: 1,
|
||||||
|
Phy: 1,
|
||||||
|
User: 1,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
inhumedObjs := make([]oid.Address, objCount/2)
|
inhumedObjs := make([]oid.Address, objCount/2)
|
||||||
|
@ -272,11 +281,13 @@ func TestCounters(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, addr := range inhumedObjs {
|
for _, addr := range inhumedObjs {
|
||||||
if v, ok := expLog[addr.Container()]; ok {
|
if v, ok := exp[addr.Container()]; ok {
|
||||||
if v == 1 {
|
v.Logic--
|
||||||
delete(expLog, addr.Container())
|
v.User--
|
||||||
|
if v.IsZero() {
|
||||||
|
delete(exp, addr.Container())
|
||||||
} else {
|
} else {
|
||||||
expLog[addr.Container()]--
|
exp[addr.Container()] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -291,14 +302,14 @@ func TestCounters(t *testing.T) {
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(objCount), c.Phy())
|
require.Equal(t, uint64(objCount), c.Phy)
|
||||||
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
|
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic)
|
||||||
|
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.User)
|
||||||
|
|
||||||
cc, err := db.ContainerCounters(context.Background())
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, expPhy, cc.Physical)
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||||
require.Equal(t, expLog, cc.Logical)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,25 +331,27 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1)
|
oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1)
|
||||||
}
|
}
|
||||||
|
|
||||||
expPhy := make(map[cid.ID]uint64)
|
exp := make(map[cid.ID]meta.ObjectCounters)
|
||||||
expLog := make(map[cid.ID]uint64)
|
|
||||||
for _, addr := range oo {
|
for _, addr := range oo {
|
||||||
expPhy[addr.Container()]++
|
exp[addr.Container()] = meta.ObjectCounters{
|
||||||
expLog[addr.Container()]++
|
Logic: 1,
|
||||||
|
Phy: 1,
|
||||||
|
User: 1,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1. objects are available and counters are correct
|
// 1. objects are available and counters are correct
|
||||||
|
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(objCount), c.Phy())
|
require.Equal(t, uint64(objCount), c.Phy)
|
||||||
require.Equal(t, uint64(objCount), c.Logic())
|
require.Equal(t, uint64(objCount), c.Logic)
|
||||||
|
require.Equal(t, uint64(objCount), c.User)
|
||||||
|
|
||||||
cc, err := db.ContainerCounters(context.Background())
|
cc, err := db.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, expPhy, cc.Physical)
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||||
require.Equal(t, expLog, cc.Logical)
|
|
||||||
|
|
||||||
for _, o := range oo {
|
for _, o := range oo {
|
||||||
_, err := metaGet(db, o, true)
|
_, err := metaGet(db, o, true)
|
||||||
|
@ -352,14 +365,14 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
|
|
||||||
c, err = db.ObjectCounters()
|
c, err = db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(objCount), c.Phy())
|
require.Equal(t, uint64(objCount), c.Phy)
|
||||||
require.Equal(t, uint64(objCount), c.Logic())
|
require.Equal(t, uint64(objCount), c.Logic)
|
||||||
|
require.Equal(t, uint64(objCount), c.User)
|
||||||
|
|
||||||
cc, err = db.ContainerCounters(context.Background())
|
cc, err = db.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, expPhy, cc.Physical)
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||||
require.Equal(t, expLog, cc.Logical)
|
|
||||||
|
|
||||||
for _, o := range oo {
|
for _, o := range oo {
|
||||||
_, err := metaGet(db, o, true)
|
_, err := metaGet(db, o, true)
|
||||||
|
@ -377,26 +390,29 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
inhumeRes, err := db.Inhume(context.Background(), inhumePrm)
|
inhumeRes, err := db.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(1), inhumeRes.AvailableInhumed())
|
require.Equal(t, uint64(1), inhumeRes.AvailableInhumed())
|
||||||
|
require.Equal(t, uint64(1), inhumeRes.UserInhumed())
|
||||||
|
|
||||||
c, err = db.ObjectCounters()
|
c, err = db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(len(oo)), c.Phy())
|
require.Equal(t, uint64(len(oo)), c.Phy)
|
||||||
require.Equal(t, uint64(len(oo)-1), c.Logic())
|
require.Equal(t, uint64(len(oo)-1), c.Logic)
|
||||||
|
require.Equal(t, uint64(len(oo)-1), c.User)
|
||||||
|
|
||||||
if v, ok := expLog[oo[0].Container()]; ok {
|
if v, ok := exp[oo[0].Container()]; ok {
|
||||||
if v == 1 {
|
v.Logic--
|
||||||
delete(expLog, oo[0].Container())
|
v.User--
|
||||||
|
if v.IsZero() {
|
||||||
|
delete(exp, oo[0].Container())
|
||||||
} else {
|
} else {
|
||||||
expLog[oo[0].Container()]--
|
exp[oo[0].Container()] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cc, err = db.ContainerCounters(context.Background())
|
cc, err = db.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, expPhy, cc.Physical)
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||||
require.Equal(t, expLog, cc.Logical)
|
|
||||||
|
|
||||||
// 4. `Delete` an object with GCMark should decrease the
|
// 4. `Delete` an object with GCMark should decrease the
|
||||||
// phy counter but does not affect the logic counter (after
|
// phy counter but does not affect the logic counter (after
|
||||||
|
@ -408,12 +424,14 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
deleteRes, err := db.Delete(context.Background(), deletePrm)
|
deleteRes, err := db.Delete(context.Background(), deletePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Zero(t, deleteRes.AvailableObjectsRemoved())
|
require.Zero(t, deleteRes.AvailableObjectsRemoved())
|
||||||
|
require.Zero(t, deleteRes.UserObjectsRemoved())
|
||||||
|
|
||||||
if v, ok := expPhy[oo[0].Container()]; ok {
|
if v, ok := exp[oo[0].Container()]; ok {
|
||||||
if v == 1 {
|
v.Phy--
|
||||||
delete(expPhy, oo[0].Container())
|
if v.IsZero() {
|
||||||
|
delete(exp, oo[0].Container())
|
||||||
} else {
|
} else {
|
||||||
expPhy[oo[0].Container()]--
|
exp[oo[0].Container()] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -421,14 +439,14 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
|
|
||||||
c, err = db.ObjectCounters()
|
c, err = db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(len(oo)), c.Phy())
|
require.Equal(t, uint64(len(oo)), c.Phy)
|
||||||
require.Equal(t, uint64(len(oo)), c.Logic())
|
require.Equal(t, uint64(len(oo)), c.Logic)
|
||||||
|
require.Equal(t, uint64(len(oo)), c.User)
|
||||||
|
|
||||||
cc, err = db.ContainerCounters(context.Background())
|
cc, err = db.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, expPhy, cc.Physical)
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||||
require.Equal(t, expLog, cc.Logical)
|
|
||||||
|
|
||||||
// 5 `Delete` an expired object (like it would the control
|
// 5 `Delete` an expired object (like it would the control
|
||||||
// service do) should decrease both counters despite the
|
// service do) should decrease both counters despite the
|
||||||
|
@ -439,20 +457,16 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
deleteRes, err = db.Delete(context.Background(), deletePrm)
|
deleteRes, err = db.Delete(context.Background(), deletePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
|
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
|
||||||
|
require.Equal(t, uint64(1), deleteRes.UserObjectsRemoved())
|
||||||
|
|
||||||
if v, ok := expLog[oo[0].Container()]; ok {
|
if v, ok := exp[oo[0].Container()]; ok {
|
||||||
if v == 1 {
|
v.Phy--
|
||||||
delete(expLog, oo[0].Container())
|
v.Logic--
|
||||||
|
v.User--
|
||||||
|
if v.IsZero() {
|
||||||
|
delete(exp, oo[0].Container())
|
||||||
} else {
|
} else {
|
||||||
expLog[oo[0].Container()]--
|
exp[oo[0].Container()] = v
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if v, ok := expPhy[oo[0].Container()]; ok {
|
|
||||||
if v == 1 {
|
|
||||||
delete(expPhy, oo[0].Container())
|
|
||||||
} else {
|
|
||||||
expPhy[oo[0].Container()]--
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -460,14 +474,14 @@ func TestCounters_Expired(t *testing.T) {
|
||||||
|
|
||||||
c, err = db.ObjectCounters()
|
c, err = db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(len(oo)), c.Phy())
|
require.Equal(t, uint64(len(oo)), c.Phy)
|
||||||
require.Equal(t, uint64(len(oo)), c.Logic())
|
require.Equal(t, uint64(len(oo)), c.Logic)
|
||||||
|
require.Equal(t, uint64(len(oo)), c.User)
|
||||||
|
|
||||||
cc, err = db.ContainerCounters(context.Background())
|
cc, err = db.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, expPhy, cc.Physical)
|
require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
|
||||||
require.Equal(t, expLog, cc.Logical)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object {
|
func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object {
|
||||||
|
@ -480,6 +494,7 @@ func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK
|
||||||
o := testutil.GenerateObject()
|
o := testutil.GenerateObject()
|
||||||
if withParent {
|
if withParent {
|
||||||
o.SetParent(parent)
|
o.SetParent(parent)
|
||||||
|
o.SetSplitID(objectSDK.NewSplitID())
|
||||||
}
|
}
|
||||||
|
|
||||||
oo = append(oo, o)
|
oo = append(oo, o)
|
||||||
|
@ -491,8 +506,8 @@ func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK
|
||||||
c, err := db.ObjectCounters()
|
c, err := db.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, uint64(i+1), c.Phy())
|
require.Equal(t, uint64(i+1), c.Phy)
|
||||||
require.Equal(t, uint64(i+1), c.Logic())
|
require.Equal(t, uint64(i+1), c.Logic)
|
||||||
}
|
}
|
||||||
|
|
||||||
return oo
|
return oo
|
||||||
|
|
|
@ -29,6 +29,7 @@ type DeletePrm struct {
|
||||||
type DeleteRes struct {
|
type DeleteRes struct {
|
||||||
rawRemoved uint64
|
rawRemoved uint64
|
||||||
availableRemoved uint64
|
availableRemoved uint64
|
||||||
|
userRemoved uint64
|
||||||
sizes []uint64
|
sizes []uint64
|
||||||
availableSizes []uint64
|
availableSizes []uint64
|
||||||
removedByCnrID map[cid.ID]ObjectCounters
|
removedByCnrID map[cid.ID]ObjectCounters
|
||||||
|
@ -40,6 +41,10 @@ func (d DeleteRes) AvailableObjectsRemoved() uint64 {
|
||||||
return d.availableRemoved
|
return d.availableRemoved
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d DeleteRes) UserObjectsRemoved() uint64 {
|
||||||
|
return d.userRemoved
|
||||||
|
}
|
||||||
|
|
||||||
// RemovedByCnrID returns the number of removed objects by container ID.
|
// RemovedByCnrID returns the number of removed objects by container ID.
|
||||||
func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
|
func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
|
||||||
return d.removedByCnrID
|
return d.removedByCnrID
|
||||||
|
@ -132,63 +137,23 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
for i := range addrs {
|
for i := range addrs {
|
||||||
removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch)
|
r, err := db.delete(tx, addrs[i], refCounter, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return DeleteRes{}, err // maybe log and continue?
|
return DeleteRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if removed {
|
applyDeleteSingleResult(r, &res, addrs, i)
|
||||||
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
|
||||||
v.phy++
|
|
||||||
res.removedByCnrID[addrs[i].Container()] = v
|
|
||||||
} else {
|
|
||||||
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
|
|
||||||
phy: 1,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res.rawRemoved++
|
|
||||||
res.sizes[i] = size
|
|
||||||
}
|
|
||||||
|
|
||||||
if available {
|
|
||||||
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
|
||||||
v.logic++
|
|
||||||
res.removedByCnrID[addrs[i].Container()] = v
|
|
||||||
} else {
|
|
||||||
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
|
|
||||||
logic: 1,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res.availableRemoved++
|
|
||||||
res.availableSizes[i] = size
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.rawRemoved > 0 {
|
if err := db.updateCountersDelete(tx, res); err != nil {
|
||||||
err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false)
|
return DeleteRes{}, err
|
||||||
if err != nil {
|
|
||||||
return DeleteRes{}, fmt.Errorf("could not decrease phy object counter: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if res.availableRemoved > 0 {
|
|
||||||
err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false)
|
|
||||||
if err != nil {
|
|
||||||
return DeleteRes{}, fmt.Errorf("could not decrease logical object counter: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
|
|
||||||
return DeleteRes{}, fmt.Errorf("could not decrease container object counter: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, refNum := range refCounter {
|
for _, refNum := range refCounter {
|
||||||
if refNum.cur == refNum.all {
|
if refNum.cur == refNum.all {
|
||||||
err := db.deleteObject(tx, refNum.obj, true)
|
err := db.deleteObject(tx, refNum.obj, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return DeleteRes{}, err // maybe log and continue?
|
return DeleteRes{}, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -196,13 +161,91 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
|
||||||
|
if res.rawRemoved > 0 {
|
||||||
|
err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not decrease phy object counter: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.availableRemoved > 0 {
|
||||||
|
err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not decrease logical object counter: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.userRemoved > 0 {
|
||||||
|
err := db.updateShardObjectCounter(tx, user, res.userRemoved, false)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not decrease user object counter: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
|
||||||
|
return fmt.Errorf("could not decrease container object counter: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.Address, i int) {
|
||||||
|
if r.Removed {
|
||||||
|
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
||||||
|
v.Phy++
|
||||||
|
res.removedByCnrID[addrs[i].Container()] = v
|
||||||
|
} else {
|
||||||
|
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
|
||||||
|
Phy: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res.rawRemoved++
|
||||||
|
res.sizes[i] = r.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.Available {
|
||||||
|
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
||||||
|
v.Logic++
|
||||||
|
res.removedByCnrID[addrs[i].Container()] = v
|
||||||
|
} else {
|
||||||
|
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
|
||||||
|
Logic: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res.availableRemoved++
|
||||||
|
res.availableSizes[i] = r.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.User {
|
||||||
|
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
||||||
|
v.User++
|
||||||
|
res.removedByCnrID[addrs[i].Container()] = v
|
||||||
|
} else {
|
||||||
|
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
|
||||||
|
User: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res.userRemoved++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type deleteSingleResult struct {
|
||||||
|
Removed bool
|
||||||
|
Available bool
|
||||||
|
User bool
|
||||||
|
Size uint64
|
||||||
|
}
|
||||||
|
|
||||||
// delete removes object indexes from the metabase. Counts the references
|
// delete removes object indexes from the metabase. Counts the references
|
||||||
// of the object that is being removed.
|
// of the object that is being removed.
|
||||||
// The first return value indicates if an object has been removed. (removing a
|
// The first return value indicates if an object has been removed. (removing a
|
||||||
// non-exist object is error-free). The second return value indicates if an
|
// non-exist object is error-free). The second return value indicates if an
|
||||||
// object was available before the removal (for calculating the logical object
|
// object was available before the removal (for calculating the logical object
|
||||||
// counter). The third return value is removed object payload size.
|
// counter). The third return value The fourth return value is removed object payload size.
|
||||||
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, uint64, error) {
|
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) {
|
||||||
key := make([]byte, addressKeySize)
|
key := make([]byte, addressKeySize)
|
||||||
addrKey := addressKey(addr, key)
|
addrKey := addressKey(addr, key)
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
garbageBKT := tx.Bucket(garbageBucketName)
|
||||||
|
@ -214,7 +257,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
if garbageBKT != nil {
|
if garbageBKT != nil {
|
||||||
err := garbageBKT.Delete(addrKey)
|
err := garbageBKT.Delete(addrKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, false, 0, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,10 +267,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
|
|
||||||
if client.IsErrObjectNotFound(err) || errors.As(err, &siErr) {
|
if client.IsErrObjectNotFound(err) || errors.As(err, &siErr) {
|
||||||
return false, false, 0, nil
|
return deleteSingleResult{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, false, 0, err
|
return deleteSingleResult{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// if object is an only link to a parent, then remove parent
|
// if object is an only link to a parent, then remove parent
|
||||||
|
@ -250,13 +293,20 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
nRef.cur++
|
nRef.cur++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
isUserObject := IsUserObject(obj)
|
||||||
|
|
||||||
// remove object
|
// remove object
|
||||||
err = db.deleteObject(tx, obj, false)
|
err = db.deleteObject(tx, obj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, false, 0, fmt.Errorf("could not remove object: %w", err)
|
return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return true, removeAvailableObject, obj.PayloadSize(), nil
|
return deleteSingleResult{
|
||||||
|
Removed: true,
|
||||||
|
Available: removeAvailableObject,
|
||||||
|
User: isUserObject && removeAvailableObject,
|
||||||
|
Size: obj.PayloadSize(),
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) deleteObject(
|
func (db *DB) deleteObject(
|
||||||
|
|
|
@ -30,14 +30,16 @@ type InhumePrm struct {
|
||||||
|
|
||||||
// DeletionInfo contains details on deleted object.
|
// DeletionInfo contains details on deleted object.
|
||||||
type DeletionInfo struct {
|
type DeletionInfo struct {
|
||||||
Size uint64
|
Size uint64
|
||||||
CID cid.ID
|
CID cid.ID
|
||||||
|
IsUser bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// InhumeRes encapsulates results of Inhume operation.
|
// InhumeRes encapsulates results of Inhume operation.
|
||||||
type InhumeRes struct {
|
type InhumeRes struct {
|
||||||
deletedLockObj []oid.Address
|
deletedLockObj []oid.Address
|
||||||
availableInhumed uint64
|
availableInhumed uint64
|
||||||
|
userInhumed uint64
|
||||||
inhumedByCnrID map[cid.ID]ObjectCounters
|
inhumedByCnrID map[cid.ID]ObjectCounters
|
||||||
deletionDetails []DeletionInfo
|
deletionDetails []DeletionInfo
|
||||||
}
|
}
|
||||||
|
@ -48,6 +50,10 @@ func (i InhumeRes) AvailableInhumed() uint64 {
|
||||||
return i.availableInhumed
|
return i.availableInhumed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i InhumeRes) UserInhumed() uint64 {
|
||||||
|
return i.userInhumed
|
||||||
|
}
|
||||||
|
|
||||||
// InhumedByCnrID return number of object
|
// InhumedByCnrID return number of object
|
||||||
// that have been inhumed by container ID.
|
// that have been inhumed by container ID.
|
||||||
func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters {
|
func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters {
|
||||||
|
@ -75,19 +81,31 @@ func (i InhumeRes) GetDeletionInfoByIndex(target int) DeletionInfo {
|
||||||
|
|
||||||
// StoreDeletionInfo stores size of deleted object and associated container ID
|
// StoreDeletionInfo stores size of deleted object and associated container ID
|
||||||
// in corresponding arrays.
|
// in corresponding arrays.
|
||||||
func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) {
|
func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, isUser bool) {
|
||||||
i.deletionDetails = append(i.deletionDetails, DeletionInfo{
|
i.deletionDetails = append(i.deletionDetails, DeletionInfo{
|
||||||
Size: deletedSize,
|
Size: deletedSize,
|
||||||
CID: containerID,
|
CID: containerID,
|
||||||
|
IsUser: isUser,
|
||||||
})
|
})
|
||||||
i.availableInhumed++
|
i.availableInhumed++
|
||||||
|
if isUser {
|
||||||
|
i.userInhumed++
|
||||||
|
}
|
||||||
|
|
||||||
if v, ok := i.inhumedByCnrID[containerID]; ok {
|
if v, ok := i.inhumedByCnrID[containerID]; ok {
|
||||||
v.logic++
|
v.Logic++
|
||||||
|
if isUser {
|
||||||
|
v.User++
|
||||||
|
}
|
||||||
i.inhumedByCnrID[containerID] = v
|
i.inhumedByCnrID[containerID] = v
|
||||||
} else {
|
} else {
|
||||||
i.inhumedByCnrID[containerID] = ObjectCounters{
|
v = ObjectCounters{
|
||||||
logic: 1,
|
Logic: 1,
|
||||||
}
|
}
|
||||||
|
if isUser {
|
||||||
|
v.User = 1
|
||||||
|
}
|
||||||
|
i.inhumedByCnrID[containerID] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -247,23 +265,14 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||||
var inhumedCount uint64
|
if err := db.updateShardObjectCounter(tx, logical, res.AvailableInhumed(), false); err != nil {
|
||||||
inhumedbyCnr := make(map[cid.ID]ObjectCounters)
|
return err
|
||||||
for _, dd := range res.deletionDetails {
|
|
||||||
if v, ok := inhumedbyCnr[dd.CID]; ok {
|
|
||||||
v.logic++
|
|
||||||
inhumedbyCnr[dd.CID] = v
|
|
||||||
} else {
|
|
||||||
inhumedbyCnr[dd.CID] = ObjectCounters{logic: 1}
|
|
||||||
}
|
|
||||||
inhumedCount++
|
|
||||||
}
|
}
|
||||||
|
if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil {
|
||||||
if err := db.updateShardObjectCounter(tx, logical, inhumedCount, false); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.updateContainerCounter(tx, inhumedbyCnr, false)
|
return db.updateContainerCounter(tx, res.inhumedByCnrID, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
|
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
|
||||||
|
@ -318,7 +327,7 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool
|
||||||
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
|
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
|
||||||
containerID, _ := obj.ContainerID()
|
containerID, _ := obj.ContainerID()
|
||||||
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
|
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
|
||||||
res.storeDeletionInfo(containerID, obj.PayloadSize())
|
res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj))
|
||||||
}
|
}
|
||||||
|
|
||||||
// if object is stored, and it is regular object then update bucket
|
// if object is stored, and it is regular object then update bucket
|
||||||
|
|
|
@ -202,9 +202,10 @@ func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]oid.Addres
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID) error) error {
|
func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
|
||||||
var cid cid.ID
|
var cid cid.ID
|
||||||
var oid oid.ID
|
var oid oid.ID
|
||||||
|
obj := objectSDK.New()
|
||||||
|
|
||||||
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
||||||
b58CID, postfix := parseContainerIDWithPrefix(&cid, name)
|
b58CID, postfix := parseContainerIDWithPrefix(&cid, name)
|
||||||
|
@ -221,8 +222,8 @@ func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID) error) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.ForEach(func(k, v []byte) error {
|
return b.ForEach(func(k, v []byte) error {
|
||||||
if oid.Decode(k) == nil {
|
if oid.Decode(k) == nil && obj.Unmarshal(v) == nil {
|
||||||
return f(cid, oid)
|
return f(cid, oid, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -183,7 +183,7 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
||||||
}
|
}
|
||||||
|
|
||||||
if !isParent {
|
if !isParent {
|
||||||
if err = db.incCounters(tx, cnr); err != nil {
|
if err = db.incCounters(tx, cnr, IsUserObject(obj)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -347,8 +347,8 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
c, err := sh.metaBase.ObjectCounters()
|
c, err := sh.metaBase.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
phyBefore := c.Phy()
|
phyBefore := c.Phy
|
||||||
logicalBefore := c.Logic()
|
logicalBefore := c.Logic
|
||||||
|
|
||||||
err = sh.Close()
|
err = sh.Close()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -382,8 +382,8 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
c, err = sh.metaBase.ObjectCounters()
|
c, err = sh.metaBase.ObjectCounters()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, phyBefore, c.Phy())
|
require.Equal(t, phyBefore, c.Phy)
|
||||||
require.Equal(t, logicalBefore, c.Logic())
|
require.Equal(t, logicalBefore, c.Logic)
|
||||||
|
|
||||||
checkAllObjs(true)
|
checkAllObjs(true)
|
||||||
checkObj(object.AddressOf(tombObj), tombObj)
|
checkObj(object.AddressOf(tombObj), tombObj)
|
||||||
|
|
|
@ -27,5 +27,5 @@ func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
return cc.Logic(), nil
|
return cc.Logic, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,6 +118,7 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error
|
||||||
}
|
}
|
||||||
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
|
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
|
||||||
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
|
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
|
||||||
|
s.decObjectCounterBy(user, res.UserObjectsRemoved())
|
||||||
s.decContainerObjectCounter(res.RemovedByCnrID())
|
s.decContainerObjectCounter(res.RemovedByCnrID())
|
||||||
removedPayload := res.RemovedPhysicalObjectSizes()[0]
|
removedPayload := res.RemovedPhysicalObjectSizes()[0]
|
||||||
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]
|
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]
|
||||||
|
|
|
@ -416,6 +416,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
|
||||||
|
|
||||||
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular)
|
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular)
|
||||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||||
|
s.decObjectCounterBy(user, res.UserInhumed())
|
||||||
s.decContainerObjectCounter(res.InhumedByCnrID())
|
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
|
@ -630,6 +631,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
|
||||||
|
|
||||||
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone)
|
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone)
|
||||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||||
|
s.decObjectCounterBy(user, res.UserInhumed())
|
||||||
s.decContainerObjectCounter(res.InhumedByCnrID())
|
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
|
@ -677,6 +679,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
|
||||||
|
|
||||||
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock)
|
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock)
|
||||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||||
|
s.decObjectCounterBy(user, res.UserInhumed())
|
||||||
s.decContainerObjectCounter(res.InhumedByCnrID())
|
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
|
|
|
@ -122,6 +122,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
||||||
s.m.RUnlock()
|
s.m.RUnlock()
|
||||||
|
|
||||||
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||||
|
s.decObjectCounterBy(user, res.UserInhumed())
|
||||||
s.decContainerObjectCounter(res.InhumedByCnrID())
|
s.decContainerObjectCounter(res.InhumedByCnrID())
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
|
|
|
@ -86,12 +86,6 @@ func (m *metricsStore) IncObjectCounter(objectType string) {
|
||||||
m.objCounters[objectType] += 1
|
m.objCounters[objectType] += 1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *metricsStore) DecObjectCounter(objectType string) {
|
|
||||||
m.mtx.Lock()
|
|
||||||
defer m.mtx.Unlock()
|
|
||||||
m.AddToObjectCounter(objectType, -1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *metricsStore) SetMode(mode mode.Mode) {
|
func (m *metricsStore) SetMode(mode mode.Mode) {
|
||||||
m.mtx.Lock()
|
m.mtx.Lock()
|
||||||
defer m.mtx.Unlock()
|
defer m.mtx.Unlock()
|
||||||
|
@ -178,8 +172,6 @@ func TestCounters(t *testing.T) {
|
||||||
oo[i] = testutil.GenerateObject()
|
oo[i] = testutil.GenerateObject()
|
||||||
}
|
}
|
||||||
|
|
||||||
cc := meta.ContainerCounters{Logical: make(map[cid.ID]uint64), Physical: make(map[cid.ID]uint64)}
|
|
||||||
|
|
||||||
t.Run("defaults", func(t *testing.T) {
|
t.Run("defaults", func(t *testing.T) {
|
||||||
require.Zero(t, mm.getObjectCounter(physical))
|
require.Zero(t, mm.getObjectCounter(physical))
|
||||||
require.Zero(t, mm.getObjectCounter(logical))
|
require.Zero(t, mm.getObjectCounter(logical))
|
||||||
|
@ -194,21 +186,26 @@ func TestCounters(t *testing.T) {
|
||||||
v, ok = mm.getContainerCount(contID.EncodeToString(), logical)
|
v, ok = mm.getContainerCount(contID.EncodeToString(), logical)
|
||||||
require.Zero(t, v)
|
require.Zero(t, v)
|
||||||
require.False(t, ok)
|
require.False(t, ok)
|
||||||
|
v, ok = mm.getContainerCount(contID.EncodeToString(), user)
|
||||||
|
require.Zero(t, v)
|
||||||
|
require.False(t, ok)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
var totalPayload int64
|
var totalPayload int64
|
||||||
|
|
||||||
expectedLogicalSizes := make(map[string]int64)
|
expectedLogicalSizes := make(map[string]int64)
|
||||||
expectedLogCC := make(map[cid.ID]uint64)
|
expected := make(map[cid.ID]meta.ObjectCounters)
|
||||||
expectedPhyCC := make(map[cid.ID]uint64)
|
|
||||||
for i := range oo {
|
for i := range oo {
|
||||||
cnr, _ := oo[i].ContainerID()
|
cnr, _ := oo[i].ContainerID()
|
||||||
oSize := int64(oo[i].PayloadSize())
|
oSize := int64(oo[i].PayloadSize())
|
||||||
expectedLogicalSizes[cnr.EncodeToString()] += oSize
|
expectedLogicalSizes[cnr.EncodeToString()] += oSize
|
||||||
totalPayload += oSize
|
totalPayload += oSize
|
||||||
expectedLogCC[cnr]++
|
expected[cnr] = meta.ObjectCounters{
|
||||||
expectedPhyCC[cnr]++
|
Logic: 1,
|
||||||
|
Phy: 1,
|
||||||
|
User: 1,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm PutPrm
|
var prm PutPrm
|
||||||
|
@ -222,13 +219,13 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical))
|
require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical))
|
||||||
require.Equal(t, uint64(objNumber), mm.getObjectCounter(logical))
|
require.Equal(t, uint64(objNumber), mm.getObjectCounter(logical))
|
||||||
|
require.Equal(t, uint64(objNumber), mm.getObjectCounter(user))
|
||||||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||||
require.Equal(t, totalPayload, mm.payloadSize())
|
require.Equal(t, totalPayload, mm.payloadSize())
|
||||||
|
|
||||||
cc, err := sh.metaBase.ContainerCounters(context.Background())
|
cc, err := sh.metaBase.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expectedLogCC, cc.Logical)
|
require.Equal(t, meta.ContainerCounters{Counts: expected}, cc)
|
||||||
require.Equal(t, expectedPhyCC, cc.Physical)
|
|
||||||
|
|
||||||
t.Run("inhume_GC", func(t *testing.T) {
|
t.Run("inhume_GC", func(t *testing.T) {
|
||||||
var prm InhumePrm
|
var prm InhumePrm
|
||||||
|
@ -244,21 +241,26 @@ func TestCounters(t *testing.T) {
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
|
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
|
||||||
|
|
||||||
expectedLogCC[cid]--
|
if v, ok := expected[cid]; ok {
|
||||||
if expectedLogCC[cid] == 0 {
|
v.Logic--
|
||||||
delete(expectedLogCC, cid)
|
v.User--
|
||||||
|
if v.IsZero() {
|
||||||
|
delete(expected, cid)
|
||||||
|
} else {
|
||||||
|
expected[cid] = v
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical))
|
require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical))
|
||||||
require.Equal(t, uint64(objNumber-inhumedNumber), mm.getObjectCounter(logical))
|
require.Equal(t, uint64(objNumber-inhumedNumber), mm.getObjectCounter(logical))
|
||||||
|
require.Equal(t, uint64(objNumber-inhumedNumber), mm.getObjectCounter(user))
|
||||||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||||
require.Equal(t, totalPayload, mm.payloadSize())
|
require.Equal(t, totalPayload, mm.payloadSize())
|
||||||
|
|
||||||
cc, err := sh.metaBase.ContainerCounters(context.Background())
|
cc, err := sh.metaBase.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expectedLogCC, cc.Logical)
|
require.Equal(t, meta.ContainerCounters{Counts: expected}, cc)
|
||||||
require.Equal(t, expectedPhyCC, cc.Physical)
|
|
||||||
|
|
||||||
oo = oo[inhumedNumber:]
|
oo = oo[inhumedNumber:]
|
||||||
})
|
})
|
||||||
|
@ -269,6 +271,7 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
phy := mm.getObjectCounter(physical)
|
phy := mm.getObjectCounter(physical)
|
||||||
logic := mm.getObjectCounter(logical)
|
logic := mm.getObjectCounter(logical)
|
||||||
|
custom := mm.getObjectCounter(user)
|
||||||
|
|
||||||
inhumedNumber := int(phy / 4)
|
inhumedNumber := int(phy / 4)
|
||||||
prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...)
|
prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...)
|
||||||
|
@ -281,21 +284,26 @@ func TestCounters(t *testing.T) {
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
|
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
|
||||||
|
|
||||||
expectedLogCC[cid]--
|
if v, ok := expected[cid]; ok {
|
||||||
if expectedLogCC[cid] == 0 {
|
v.Logic--
|
||||||
delete(expectedLogCC, cid)
|
v.User--
|
||||||
|
if v.IsZero() {
|
||||||
|
delete(expected, cid)
|
||||||
|
} else {
|
||||||
|
expected[cid] = v
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, phy, mm.getObjectCounter(physical))
|
require.Equal(t, phy, mm.getObjectCounter(physical))
|
||||||
require.Equal(t, logic-uint64(inhumedNumber), mm.getObjectCounter(logical))
|
require.Equal(t, logic-uint64(inhumedNumber), mm.getObjectCounter(logical))
|
||||||
|
require.Equal(t, custom-uint64(inhumedNumber), mm.getObjectCounter(user))
|
||||||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||||
require.Equal(t, totalPayload, mm.payloadSize())
|
require.Equal(t, totalPayload, mm.payloadSize())
|
||||||
|
|
||||||
cc, err = sh.metaBase.ContainerCounters(context.Background())
|
cc, err = sh.metaBase.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expectedLogCC, cc.Logical)
|
require.Equal(t, meta.ContainerCounters{Counts: expected}, cc)
|
||||||
require.Equal(t, expectedPhyCC, cc.Physical)
|
|
||||||
|
|
||||||
oo = oo[inhumedNumber:]
|
oo = oo[inhumedNumber:]
|
||||||
})
|
})
|
||||||
|
@ -305,6 +313,7 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
phy := mm.getObjectCounter(physical)
|
phy := mm.getObjectCounter(physical)
|
||||||
logic := mm.getObjectCounter(logical)
|
logic := mm.getObjectCounter(logical)
|
||||||
|
custom := mm.getObjectCounter(user)
|
||||||
|
|
||||||
deletedNumber := int(phy / 4)
|
deletedNumber := int(phy / 4)
|
||||||
prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...)
|
prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...)
|
||||||
|
@ -314,6 +323,7 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, phy-uint64(deletedNumber), mm.getObjectCounter(physical))
|
require.Equal(t, phy-uint64(deletedNumber), mm.getObjectCounter(physical))
|
||||||
require.Equal(t, logic-uint64(deletedNumber), mm.getObjectCounter(logical))
|
require.Equal(t, logic-uint64(deletedNumber), mm.getObjectCounter(logical))
|
||||||
|
require.Equal(t, custom-uint64(deletedNumber), mm.getObjectCounter(user))
|
||||||
var totalRemovedpayload uint64
|
var totalRemovedpayload uint64
|
||||||
for i := range oo[:deletedNumber] {
|
for i := range oo[:deletedNumber] {
|
||||||
removedPayload := oo[i].PayloadSize()
|
removedPayload := oo[i].PayloadSize()
|
||||||
|
@ -322,14 +332,15 @@ func TestCounters(t *testing.T) {
|
||||||
cnr, _ := oo[i].ContainerID()
|
cnr, _ := oo[i].ContainerID()
|
||||||
expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload)
|
expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload)
|
||||||
|
|
||||||
expectedLogCC[cnr]--
|
if v, ok := expected[cnr]; ok {
|
||||||
if expectedLogCC[cnr] == 0 {
|
v.Logic--
|
||||||
delete(expectedLogCC, cnr)
|
v.Phy--
|
||||||
}
|
v.User--
|
||||||
|
if v.IsZero() {
|
||||||
expectedPhyCC[cnr]--
|
delete(expected, cnr)
|
||||||
if expectedPhyCC[cnr] == 0 {
|
} else {
|
||||||
delete(expectedPhyCC, cnr)
|
expected[cnr] = v
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||||
|
@ -337,8 +348,7 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
cc, err = sh.metaBase.ContainerCounters(context.Background())
|
cc, err = sh.metaBase.ContainerCounters(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expectedLogCC, cc.Logical)
|
require.Equal(t, meta.ContainerCounters{Counts: expected}, cc)
|
||||||
require.Equal(t, expectedPhyCC, cc.Physical)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
||||||
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.incObjectCounter(putPrm.Address.Container())
|
s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj))
|
||||||
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
|
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
|
||||||
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
|
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,9 +72,6 @@ type MetricsWriter interface {
|
||||||
// IncObjectCounter must increment shard's object counter taking into account
|
// IncObjectCounter must increment shard's object counter taking into account
|
||||||
// object type.
|
// object type.
|
||||||
IncObjectCounter(objectType string)
|
IncObjectCounter(objectType string)
|
||||||
// DecObjectCounter must decrement shard's object counter taking into account
|
|
||||||
// object type.
|
|
||||||
DecObjectCounter(objectType string)
|
|
||||||
// SetShardID must set (update) the shard identifier that will be used in
|
// SetShardID must set (update) the shard identifier that will be used in
|
||||||
// metrics.
|
// metrics.
|
||||||
SetShardID(id string)
|
SetShardID(id string)
|
||||||
|
@ -395,66 +392,74 @@ const (
|
||||||
// counter type (excludes objects that are
|
// counter type (excludes objects that are
|
||||||
// stored but unavailable).
|
// stored but unavailable).
|
||||||
logical = "logic"
|
logical = "logic"
|
||||||
|
// user is an available small or big regular object.
|
||||||
|
user = "user"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Shard) updateMetrics(ctx context.Context) {
|
func (s *Shard) updateMetrics(ctx context.Context) {
|
||||||
if s.cfg.metricsWriter != nil && !s.GetMode().NoMetabase() {
|
if s.cfg.metricsWriter == nil || s.GetMode().NoMetabase() {
|
||||||
cc, err := s.metaBase.ObjectCounters()
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cc, err := s.metaBase.ObjectCounters()
|
||||||
|
if err != nil {
|
||||||
|
s.log.Warn(logs.ShardMetaObjectCounterRead,
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy)
|
||||||
|
s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic)
|
||||||
|
s.cfg.metricsWriter.SetObjectCounter(user, cc.User)
|
||||||
|
|
||||||
|
cnrList, err := s.metaBase.Containers(ctx)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Warn(logs.ShardMetaCantReadContainerList, zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var totalPayload uint64
|
||||||
|
|
||||||
|
for i := range cnrList {
|
||||||
|
size, err := s.metaBase.ContainerSize(cnrList[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn(logs.ShardMetaObjectCounterRead,
|
s.log.Warn(logs.ShardMetaCantReadContainerSize,
|
||||||
zap.Error(err),
|
zap.String("cid", cnrList[i].EncodeToString()),
|
||||||
)
|
zap.Error(err))
|
||||||
|
continue
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
s.metricsWriter.AddToContainerSize(cnrList[i].EncodeToString(), int64(size))
|
||||||
|
totalPayload += size
|
||||||
|
}
|
||||||
|
|
||||||
s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy())
|
s.metricsWriter.AddToPayloadSize(int64(totalPayload))
|
||||||
s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic())
|
|
||||||
|
|
||||||
cnrList, err := s.metaBase.Containers(ctx)
|
contCount, err := s.metaBase.ContainerCounters(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn(logs.ShardMetaCantReadContainerList, zap.Error(err))
|
s.log.Warn(logs.FailedToGetContainerCounters, zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
for contID, count := range contCount.Counts {
|
||||||
var totalPayload uint64
|
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), physical, count.Phy)
|
||||||
|
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), logical, count.Logic)
|
||||||
for i := range cnrList {
|
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), user, count.User)
|
||||||
size, err := s.metaBase.ContainerSize(cnrList[i])
|
|
||||||
if err != nil {
|
|
||||||
s.log.Warn(logs.ShardMetaCantReadContainerSize,
|
|
||||||
zap.String("cid", cnrList[i].EncodeToString()),
|
|
||||||
zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
s.metricsWriter.AddToContainerSize(cnrList[i].EncodeToString(), int64(size))
|
|
||||||
totalPayload += size
|
|
||||||
}
|
|
||||||
|
|
||||||
s.metricsWriter.AddToPayloadSize(int64(totalPayload))
|
|
||||||
|
|
||||||
contCount, err := s.metaBase.ContainerCounters(ctx)
|
|
||||||
if err != nil {
|
|
||||||
s.log.Warn(logs.FailedToGetContainerCounters, zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for contID, count := range contCount.Physical {
|
|
||||||
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), physical, count)
|
|
||||||
}
|
|
||||||
for contID, count := range contCount.Logical {
|
|
||||||
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), logical, count)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// incObjectCounter increment both physical and logical object
|
// incObjectCounter increment both physical and logical object
|
||||||
// counters.
|
// counters.
|
||||||
func (s *Shard) incObjectCounter(cnrID cid.ID) {
|
func (s *Shard) incObjectCounter(cnrID cid.ID, isUser bool) {
|
||||||
if s.cfg.metricsWriter != nil {
|
if s.cfg.metricsWriter != nil {
|
||||||
s.cfg.metricsWriter.IncObjectCounter(physical)
|
s.cfg.metricsWriter.IncObjectCounter(physical)
|
||||||
s.cfg.metricsWriter.IncObjectCounter(logical)
|
s.cfg.metricsWriter.IncObjectCounter(logical)
|
||||||
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical)
|
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical)
|
||||||
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical)
|
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical)
|
||||||
|
if isUser {
|
||||||
|
s.cfg.metricsWriter.IncObjectCounter(user)
|
||||||
|
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), user)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -470,8 +475,9 @@ func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]meta.ObjectCounters)
|
||||||
}
|
}
|
||||||
|
|
||||||
for cnrID, count := range byCnr {
|
for cnrID, count := range byCnr {
|
||||||
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy())
|
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy)
|
||||||
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic())
|
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic)
|
||||||
|
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), user, count.User)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue