[#838] metabase: Add user object type counter

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-12-04 17:35:11 +03:00
parent 29550fe600
commit f314da4af3
8 changed files with 247 additions and 227 deletions

View file

@ -18,6 +18,7 @@ import (
var ( var (
objectPhyCounterKey = []byte("phy_counter") objectPhyCounterKey = []byte("phy_counter")
objectLogicCounterKey = []byte("logic_counter") objectLogicCounterKey = []byte("logic_counter")
objectUserCounterKey = []byte("user_counter")
) )
type objectType uint8 type objectType uint8
@ -31,18 +32,13 @@ const (
// ObjectCounters groups object counter // ObjectCounters groups object counter
// according to metabase state. // according to metabase state.
type ObjectCounters struct { type ObjectCounters struct {
logic uint64 Logic uint64
phy uint64 Phy uint64
User uint64
} }
// Logic returns logical object counter. func (o ObjectCounters) IsZero() bool {
func (o ObjectCounters) Logic() uint64 { return o.Phy == 0 && o.Logic == 0 && o.User == 0
return o.logic
}
// Phy returns physical object counter.
func (o ObjectCounters) Phy() uint64 {
return o.phy
} }
// ObjectCounters returns object counters that metabase has // ObjectCounters returns object counters that metabase has
@ -63,12 +59,17 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
if b != nil { if b != nil {
data := b.Get(objectPhyCounterKey) data := b.Get(objectPhyCounterKey)
if len(data) == 8 { if len(data) == 8 {
cc.phy = binary.LittleEndian.Uint64(data) cc.Phy = binary.LittleEndian.Uint64(data)
} }
data = b.Get(objectLogicCounterKey) data = b.Get(objectLogicCounterKey)
if len(data) == 8 { if len(data) == 8 {
cc.logic = binary.LittleEndian.Uint64(data) cc.Logic = binary.LittleEndian.Uint64(data)
}
data = b.Get(objectUserCounterKey)
if len(data) == 8 {
cc.User = binary.LittleEndian.Uint64(data)
} }
} }
@ -79,8 +80,7 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
} }
type ContainerCounters struct { type ContainerCounters struct {
Logical map[cid.ID]uint64 Counts map[cid.ID]ObjectCounters
Physical map[cid.ID]uint64
} }
// ContainerCounters returns object counters for each container // ContainerCounters returns object counters for each container
@ -103,8 +103,7 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error)
defer span.End() defer span.End()
cc := ContainerCounters{ cc := ContainerCounters{
Logical: make(map[cid.ID]uint64), Counts: make(map[cid.ID]ObjectCounters),
Physical: make(map[cid.ID]uint64),
} }
lastKey := make([]byte, cidSize) lastKey := make([]byte, cidSize)
@ -158,16 +157,11 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters)
if err != nil { if err != nil {
return err return err
} }
phy, logic, err := parseContainerCounterValue(value) ent, err := parseContainerCounterValue(value)
if err != nil { if err != nil {
return err return err
} }
if phy > 0 { cc.Counts[cnrID] = ent
cc.Physical[cnrID] = phy
}
if logic > 0 {
cc.Logical[cnrID] = logic
}
counter++ counter++
if counter == batchSize { if counter == batchSize {
@ -253,22 +247,23 @@ func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounte
} }
func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error { func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error {
var phyValue, logicValue uint64 var entity ObjectCounters
var err error var err error
data := b.Get(key) data := b.Get(key)
if len(data) > 0 { if len(data) > 0 {
phyValue, logicValue, err = parseContainerCounterValue(data) entity, err = parseContainerCounterValue(data)
if err != nil { if err != nil {
return err return err
} }
} }
phyValue = nextValue(phyValue, delta.phy, inc) entity.Phy = nextValue(entity.Phy, delta.Phy, inc)
logicValue = nextValue(logicValue, delta.logic, inc) entity.Logic = nextValue(entity.Logic, delta.Logic, inc)
if phyValue > 0 || logicValue > 0 { entity.User = nextValue(entity.User, delta.User, inc)
value := containerCounterValue(phyValue, logicValue) if entity.IsZero() {
return b.Put(key, value) return b.Delete(key)
} }
return b.Delete(key) value := containerCounterValue(entity)
return b.Put(key, value)
} }
func nextValue(existed, delta uint64, inc bool) uint64 { func nextValue(existed, delta uint64, inc bool) uint64 {
@ -290,7 +285,7 @@ func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error {
key := make([]byte, cidSize) key := make([]byte, cidSize)
cnrID.Encode(key) cnrID.Encode(key)
return db.editContainerCounterValue(b, key, ObjectCounters{logic: 1, phy: 1}, true) return db.editContainerCounterValue(b, key, ObjectCounters{Logic: 1, Phy: 1}, true)
} }
// syncCounter updates object counters according to metabase state: // syncCounter updates object counters according to metabase state:
@ -304,9 +299,11 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
if err != nil { if err != nil {
return fmt.Errorf("could not get shard info bucket: %w", err) return fmt.Errorf("could not get shard info bucket: %w", err)
} }
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 && len(shardInfoB.Get(objectLogicCounterKey)) == 8 shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 &&
containerCounterInitialized := tx.Bucket(containerCounterBucketName) != nil len(shardInfoB.Get(objectLogicCounterKey)) == 8 &&
if !force && shardObjectCounterInitialized && containerCounterInitialized { len(shardInfoB.Get(objectUserCounterKey)) == 8
containerObjectCounterInitialized := containerObjectCounterInitialized(tx)
if !force && shardObjectCounterInitialized && containerObjectCounterInitialized {
// the counters are already inited // the counters are already inited
return nil return nil
} }
@ -325,11 +322,11 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error { err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error {
if v, ok := counters[cnr]; ok { if v, ok := counters[cnr]; ok {
v.phy++ v.Phy++
counters[cnr] = v counters[cnr] = v
} else { } else {
counters[cnr] = ObjectCounters{ counters[cnr] = ObjectCounters{
phy: 1, Phy: 1,
} }
} }
@ -340,11 +337,11 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
// and not covered with a tombstone // and not covered with a tombstone
if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 { if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 {
if v, ok := counters[cnr]; ok { if v, ok := counters[cnr]; ok {
v.logic++ v.Logic++
counters[cnr] = v counters[cnr] = v
} else { } else {
counters[cnr] = ObjectCounters{ counters[cnr] = ObjectCounters{
logic: 1, Logic: 1,
} }
} }
} }
@ -361,13 +358,15 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error { func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error {
var phyTotal uint64 var phyTotal uint64
var logicTotal uint64 var logicTotal uint64
var userTotal uint64
key := make([]byte, cidSize) key := make([]byte, cidSize)
for cnrID, count := range counters { for cnrID, count := range counters {
phyTotal += count.phy phyTotal += count.Phy
logicTotal += count.logic logicTotal += count.Logic
userTotal += count.User
cnrID.Encode(key) cnrID.Encode(key)
value := containerCounterValue(count.phy, count.logic) value := containerCounterValue(count)
err := containerCounterB.Put(key, value) err := containerCounterB.Put(key, value)
if err != nil { if err != nil {
return fmt.Errorf("could not update phy container object counter: %w", err) return fmt.Errorf("could not update phy container object counter: %w", err)
@ -389,13 +388,22 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container
return fmt.Errorf("could not update logic object counter: %w", err) return fmt.Errorf("could not update logic object counter: %w", err)
} }
userData := make([]byte, 8)
binary.LittleEndian.PutUint64(userData, userTotal)
err = shardInfoB.Put(objectUserCounterKey, userData)
if err != nil {
return fmt.Errorf("could not update user object counter: %w", err)
}
return nil return nil
} }
func containerCounterValue(phy, logic uint64) []byte { func containerCounterValue(entity ObjectCounters) []byte {
res := make([]byte, 16) res := make([]byte, 24)
binary.LittleEndian.PutUint64(res, phy) binary.LittleEndian.PutUint64(res, entity.Phy)
binary.LittleEndian.PutUint64(res[8:], logic) binary.LittleEndian.PutUint64(res[8:], entity.Logic)
binary.LittleEndian.PutUint64(res[16:], entity.User)
return res return res
} }
@ -411,9 +419,30 @@ func parseContainerCounterKey(buf []byte) (cid.ID, error) {
} }
// parseContainerCounterValue return phy, logic values. // parseContainerCounterValue return phy, logic values.
func parseContainerCounterValue(buf []byte) (uint64, uint64, error) { func parseContainerCounterValue(buf []byte) (ObjectCounters, error) {
if len(buf) != 16 { if len(buf) != 24 {
return 0, 0, fmt.Errorf("invalid value length") return ObjectCounters{}, fmt.Errorf("invalid value length")
} }
return binary.LittleEndian.Uint64(buf), binary.LittleEndian.Uint64(buf[8:]), nil return ObjectCounters{
Phy: binary.LittleEndian.Uint64(buf),
Logic: binary.LittleEndian.Uint64(buf[8:16]),
User: binary.LittleEndian.Uint64(buf[16:]),
}, nil
}
func containerObjectCounterInitialized(tx *bbolt.Tx) bool {
b := tx.Bucket(containerCounterBucketName)
if b == nil {
return false
}
k, v := b.Cursor().First()
if k == nil && v == nil {
return true
}
_, err := parseContainerCounterKey(k)
if err != nil {
return false
}
_, err = parseContainerCounterValue(v)
return err == nil
} }

View file

@ -24,13 +24,13 @@ func TestCounters(t *testing.T) {
db := newDB(t) db := newDB(t)
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Zero(t, c.Phy()) require.Zero(t, c.Phy)
require.Zero(t, c.Logic()) require.Zero(t, c.Logic)
require.Zero(t, c.User)
cc, err := db.ContainerCounters(context.Background()) cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Zero(t, len(cc.Physical)) require.Zero(t, len(cc.Counts))
require.Zero(t, len(cc.Logical))
}) })
t.Run("put", func(t *testing.T) { t.Run("put", func(t *testing.T) {
@ -42,29 +42,30 @@ func TestCounters(t *testing.T) {
} }
var prm meta.PutPrm var prm meta.PutPrm
expPhy := make(map[cid.ID]uint64) exp := make(map[cid.ID]meta.ObjectCounters)
expLog := make(map[cid.ID]uint64)
for i := 0; i < objCount; i++ { for i := 0; i < objCount; i++ {
prm.SetObject(oo[i]) prm.SetObject(oo[i])
cnrID, _ := oo[i].ContainerID() cnrID, _ := oo[i].ContainerID()
expPhy[cnrID]++ c := meta.ObjectCounters{}
expLog[cnrID]++ exp[cnrID] = meta.ObjectCounters{
Logic: 1,
Phy: 1,
}
_, err := db.Put(context.Background(), prm) _, err := db.Put(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
c, err := db.ObjectCounters() c, err = db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(i+1), c.Phy()) require.Equal(t, uint64(i+1), c.Phy)
require.Equal(t, uint64(i+1), c.Logic()) require.Equal(t, uint64(i+1), c.Logic)
cc, err := db.ContainerCounters(context.Background()) cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical) require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
require.Equal(t, expLog, cc.Logical)
} }
}) })
@ -73,12 +74,13 @@ func TestCounters(t *testing.T) {
db := newDB(t) db := newDB(t)
oo := putObjs(t, db, objCount, false) oo := putObjs(t, db, objCount, false)
expPhy := make(map[cid.ID]uint64) exp := make(map[cid.ID]meta.ObjectCounters)
expLog := make(map[cid.ID]uint64)
for _, obj := range oo { for _, obj := range oo {
cnrID, _ := obj.ContainerID() cnrID, _ := obj.ContainerID()
expPhy[cnrID]++ exp[cnrID] = meta.ObjectCounters{
expLog[cnrID]++ Logic: 1,
Phy: 1,
}
} }
var prm meta.DeletePrm var prm meta.DeletePrm
@ -92,29 +94,23 @@ func TestCounters(t *testing.T) {
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(i), c.Phy()) require.Equal(t, uint64(i), c.Phy)
require.Equal(t, uint64(i), c.Logic()) require.Equal(t, uint64(i), c.Logic)
cnrID, _ := oo[i].ContainerID() cnrID, _ := oo[i].ContainerID()
if v, ok := expPhy[cnrID]; ok { if v, ok := exp[cnrID]; ok {
if v == 1 { v.Phy--
delete(expPhy, cnrID) v.Logic--
if v.IsZero() {
delete(exp, cnrID)
} else { } else {
expPhy[cnrID]-- exp[cnrID] = v
}
}
if v, ok := expLog[cnrID]; ok {
if v == 1 {
delete(expLog, cnrID)
} else {
expLog[cnrID]--
} }
} }
cc, err := db.ContainerCounters(context.Background()) cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical) require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
require.Equal(t, expLog, cc.Logical)
} }
}) })
@ -123,12 +119,13 @@ func TestCounters(t *testing.T) {
db := newDB(t) db := newDB(t)
oo := putObjs(t, db, objCount, false) oo := putObjs(t, db, objCount, false)
expPhy := make(map[cid.ID]uint64) exp := make(map[cid.ID]meta.ObjectCounters)
expLog := make(map[cid.ID]uint64)
for _, obj := range oo { for _, obj := range oo {
cnrID, _ := obj.ContainerID() cnrID, _ := obj.ContainerID()
expPhy[cnrID]++ exp[cnrID] = meta.ObjectCounters{
expLog[cnrID]++ Logic: 1,
Phy: 1,
}
} }
inhumedObjs := make([]oid.Address, objCount/2) inhumedObjs := make([]oid.Address, objCount/2)
@ -142,11 +139,12 @@ func TestCounters(t *testing.T) {
} }
for _, addr := range inhumedObjs { for _, addr := range inhumedObjs {
if v, ok := expLog[addr.Container()]; ok { if v, ok := exp[addr.Container()]; ok {
if v == 1 { v.Logic--
delete(expLog, addr.Container()) if v.IsZero() {
delete(exp, addr.Container())
} else { } else {
expLog[addr.Container()]-- exp[addr.Container()] = v
} }
} }
} }
@ -162,14 +160,13 @@ func TestCounters(t *testing.T) {
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount), c.Phy)
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic()) require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic)
cc, err := db.ContainerCounters(context.Background()) cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical) require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
require.Equal(t, expLog, cc.Logical)
}) })
t.Run("put_split", func(t *testing.T) { t.Run("put_split", func(t *testing.T) {
@ -177,8 +174,7 @@ func TestCounters(t *testing.T) {
db := newDB(t) db := newDB(t)
parObj := testutil.GenerateObject() parObj := testutil.GenerateObject()
expPhy := make(map[cid.ID]uint64) exp := make(map[cid.ID]meta.ObjectCounters)
expLog := make(map[cid.ID]uint64)
// put objects and check that parent info // put objects and check that parent info
// does not affect the counter // does not affect the counter
@ -189,20 +185,21 @@ func TestCounters(t *testing.T) {
} }
cnrID, _ := o.ContainerID() cnrID, _ := o.ContainerID()
expLog[cnrID]++ exp[cnrID] = meta.ObjectCounters{
expPhy[cnrID]++ Logic: 1,
Phy: 1,
}
require.NoError(t, putBig(db, o)) require.NoError(t, putBig(db, o))
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(i+1), c.Phy()) require.Equal(t, uint64(i+1), c.Phy)
require.Equal(t, uint64(i+1), c.Logic()) require.Equal(t, uint64(i+1), c.Logic)
cc, err := db.ContainerCounters(context.Background()) cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical) require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
require.Equal(t, expLog, cc.Logical)
} }
}) })
@ -211,12 +208,13 @@ func TestCounters(t *testing.T) {
db := newDB(t) db := newDB(t)
oo := putObjs(t, db, objCount, true) oo := putObjs(t, db, objCount, true)
expPhy := make(map[cid.ID]uint64) exp := make(map[cid.ID]meta.ObjectCounters)
expLog := make(map[cid.ID]uint64)
for _, obj := range oo { for _, obj := range oo {
cnrID, _ := obj.ContainerID() cnrID, _ := obj.ContainerID()
expPhy[cnrID]++ exp[cnrID] = meta.ObjectCounters{
expLog[cnrID]++ Logic: 1,
Phy: 1,
}
} }
// delete objects that have parent info // delete objects that have parent info
@ -228,21 +226,15 @@ func TestCounters(t *testing.T) {
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(objCount-i-1), c.Phy()) require.Equal(t, uint64(objCount-i-1), c.Phy)
require.Equal(t, uint64(objCount-i-1), c.Logic())
if v, ok := expPhy[addr.Container()]; ok { if v, ok := exp[addr.Container()]; ok {
if v == 1 { v.Logic--
delete(expPhy, addr.Container()) v.Phy--
if v.IsZero() {
delete(exp, addr.Container())
} else { } else {
expPhy[addr.Container()]-- exp[addr.Container()] = v
}
}
if v, ok := expLog[addr.Container()]; ok {
if v == 1 {
delete(expLog, addr.Container())
} else {
expLog[addr.Container()]--
} }
} }
} }
@ -253,12 +245,13 @@ func TestCounters(t *testing.T) {
db := newDB(t) db := newDB(t)
oo := putObjs(t, db, objCount, true) oo := putObjs(t, db, objCount, true)
expPhy := make(map[cid.ID]uint64) exp := make(map[cid.ID]meta.ObjectCounters)
expLog := make(map[cid.ID]uint64)
for _, obj := range oo { for _, obj := range oo {
cnrID, _ := obj.ContainerID() cnrID, _ := obj.ContainerID()
expPhy[cnrID]++ exp[cnrID] = meta.ObjectCounters{
expLog[cnrID]++ Logic: 1,
Phy: 1,
}
} }
inhumedObjs := make([]oid.Address, objCount/2) inhumedObjs := make([]oid.Address, objCount/2)
@ -272,11 +265,12 @@ func TestCounters(t *testing.T) {
} }
for _, addr := range inhumedObjs { for _, addr := range inhumedObjs {
if v, ok := expLog[addr.Container()]; ok { if v, ok := exp[addr.Container()]; ok {
if v == 1 { v.Logic--
delete(expLog, addr.Container()) if v.IsZero() {
delete(exp, addr.Container())
} else { } else {
expLog[addr.Container()]-- exp[addr.Container()] = v
} }
} }
} }
@ -291,14 +285,13 @@ func TestCounters(t *testing.T) {
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount), c.Phy)
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic()) require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic)
cc, err := db.ContainerCounters(context.Background()) cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical) require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
require.Equal(t, expLog, cc.Logical)
}) })
} }
@ -320,25 +313,25 @@ func TestCounters_Expired(t *testing.T) {
oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1) oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1)
} }
expPhy := make(map[cid.ID]uint64) exp := make(map[cid.ID]meta.ObjectCounters)
expLog := make(map[cid.ID]uint64)
for _, addr := range oo { for _, addr := range oo {
expPhy[addr.Container()]++ exp[addr.Container()] = meta.ObjectCounters{
expLog[addr.Container()]++ Logic: 1,
Phy: 1,
}
} }
// 1. objects are available and counters are correct // 1. objects are available and counters are correct
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount), c.Phy)
require.Equal(t, uint64(objCount), c.Logic()) require.Equal(t, uint64(objCount), c.Logic)
cc, err := db.ContainerCounters(context.Background()) cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical) require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
require.Equal(t, expLog, cc.Logical)
for _, o := range oo { for _, o := range oo {
_, err := metaGet(db, o, true) _, err := metaGet(db, o, true)
@ -352,14 +345,13 @@ func TestCounters_Expired(t *testing.T) {
c, err = db.ObjectCounters() c, err = db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount), c.Phy)
require.Equal(t, uint64(objCount), c.Logic()) require.Equal(t, uint64(objCount), c.Logic)
cc, err = db.ContainerCounters(context.Background()) cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical) require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
require.Equal(t, expLog, cc.Logical)
for _, o := range oo { for _, o := range oo {
_, err := metaGet(db, o, true) _, err := metaGet(db, o, true)
@ -381,22 +373,22 @@ func TestCounters_Expired(t *testing.T) {
c, err = db.ObjectCounters() c, err = db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(len(oo)), c.Phy()) require.Equal(t, uint64(len(oo)), c.Phy)
require.Equal(t, uint64(len(oo)-1), c.Logic()) require.Equal(t, uint64(len(oo)-1), c.Logic)
if v, ok := expLog[oo[0].Container()]; ok { if v, ok := exp[oo[0].Container()]; ok {
if v == 1 { v.Logic--
delete(expLog, oo[0].Container()) if v.IsZero() {
delete(exp, oo[0].Container())
} else { } else {
expLog[oo[0].Container()]-- exp[oo[0].Container()] = v
} }
} }
cc, err = db.ContainerCounters(context.Background()) cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical) require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
require.Equal(t, expLog, cc.Logical)
// 4. `Delete` an object with GCMark should decrease the // 4. `Delete` an object with GCMark should decrease the
// phy counter but does not affect the logic counter (after // phy counter but does not affect the logic counter (after
@ -409,11 +401,12 @@ func TestCounters_Expired(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Zero(t, deleteRes.AvailableObjectsRemoved()) require.Zero(t, deleteRes.AvailableObjectsRemoved())
if v, ok := expPhy[oo[0].Container()]; ok { if v, ok := exp[oo[0].Container()]; ok {
if v == 1 { v.Phy--
delete(expPhy, oo[0].Container()) if v.IsZero() {
delete(exp, oo[0].Container())
} else { } else {
expPhy[oo[0].Container()]-- exp[oo[0].Container()] = v
} }
} }
@ -421,14 +414,13 @@ func TestCounters_Expired(t *testing.T) {
c, err = db.ObjectCounters() c, err = db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(len(oo)), c.Phy()) require.Equal(t, uint64(len(oo)), c.Phy)
require.Equal(t, uint64(len(oo)), c.Logic()) require.Equal(t, uint64(len(oo)), c.Logic)
cc, err = db.ContainerCounters(context.Background()) cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical) require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
require.Equal(t, expLog, cc.Logical)
// 5 `Delete` an expired object (like it would the control // 5 `Delete` an expired object (like it would the control
// service do) should decrease both counters despite the // service do) should decrease both counters despite the
@ -440,19 +432,13 @@ func TestCounters_Expired(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved()) require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
if v, ok := expLog[oo[0].Container()]; ok { if v, ok := exp[oo[0].Container()]; ok {
if v == 1 { v.Phy--
delete(expLog, oo[0].Container()) v.Logic--
if v.IsZero() {
delete(exp, oo[0].Container())
} else { } else {
expLog[oo[0].Container()]-- exp[oo[0].Container()] = v
}
}
if v, ok := expPhy[oo[0].Container()]; ok {
if v == 1 {
delete(expPhy, oo[0].Container())
} else {
expPhy[oo[0].Container()]--
} }
} }
@ -460,14 +446,13 @@ func TestCounters_Expired(t *testing.T) {
c, err = db.ObjectCounters() c, err = db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(len(oo)), c.Phy()) require.Equal(t, uint64(len(oo)), c.Phy)
require.Equal(t, uint64(len(oo)), c.Logic()) require.Equal(t, uint64(len(oo)), c.Logic)
cc, err = db.ContainerCounters(context.Background()) cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expPhy, cc.Physical) require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
require.Equal(t, expLog, cc.Logical)
} }
func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object { func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object {
@ -491,8 +476,8 @@ func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(i+1), c.Phy()) require.Equal(t, uint64(i+1), c.Phy)
require.Equal(t, uint64(i+1), c.Logic()) require.Equal(t, uint64(i+1), c.Logic)
} }
return oo return oo

View file

@ -139,11 +139,11 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
if removed { if removed {
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.phy++ v.Phy++
res.removedByCnrID[addrs[i].Container()] = v res.removedByCnrID[addrs[i].Container()] = v
} else { } else {
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{ res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
phy: 1, Phy: 1,
} }
} }
@ -153,11 +153,11 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
if available { if available {
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.logic++ v.Logic++
res.removedByCnrID[addrs[i].Container()] = v res.removedByCnrID[addrs[i].Container()] = v
} else { } else {
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{ res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
logic: 1, Logic: 1,
} }
} }

View file

@ -82,11 +82,11 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) {
}) })
i.availableInhumed++ i.availableInhumed++
if v, ok := i.inhumedByCnrID[containerID]; ok { if v, ok := i.inhumedByCnrID[containerID]; ok {
v.logic++ v.Logic++
i.inhumedByCnrID[containerID] = v i.inhumedByCnrID[containerID] = v
} else { } else {
i.inhumedByCnrID[containerID] = ObjectCounters{ i.inhumedByCnrID[containerID] = ObjectCounters{
logic: 1, Logic: 1,
} }
} }
} }
@ -251,10 +251,10 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
inhumedbyCnr := make(map[cid.ID]ObjectCounters) inhumedbyCnr := make(map[cid.ID]ObjectCounters)
for _, dd := range res.deletionDetails { for _, dd := range res.deletionDetails {
if v, ok := inhumedbyCnr[dd.CID]; ok { if v, ok := inhumedbyCnr[dd.CID]; ok {
v.logic++ v.Logic++
inhumedbyCnr[dd.CID] = v inhumedbyCnr[dd.CID] = v
} else { } else {
inhumedbyCnr[dd.CID] = ObjectCounters{logic: 1} inhumedbyCnr[dd.CID] = ObjectCounters{Logic: 1}
} }
inhumedCount++ inhumedCount++
} }

View file

@ -347,8 +347,8 @@ func TestRefillMetabase(t *testing.T) {
c, err := sh.metaBase.ObjectCounters() c, err := sh.metaBase.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
phyBefore := c.Phy() phyBefore := c.Phy
logicalBefore := c.Logic() logicalBefore := c.Logic
err = sh.Close() err = sh.Close()
require.NoError(t, err) require.NoError(t, err)
@ -382,8 +382,8 @@ func TestRefillMetabase(t *testing.T) {
c, err = sh.metaBase.ObjectCounters() c, err = sh.metaBase.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, phyBefore, c.Phy()) require.Equal(t, phyBefore, c.Phy)
require.Equal(t, logicalBefore, c.Logic()) require.Equal(t, logicalBefore, c.Logic)
checkAllObjs(true) checkAllObjs(true)
checkObj(object.AddressOf(tombObj), tombObj) checkObj(object.AddressOf(tombObj), tombObj)

View file

@ -27,5 +27,5 @@ func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) {
if err != nil { if err != nil {
return 0, err return 0, err
} }
return cc.Logic(), nil return cc.Logic, nil
} }

View file

@ -178,8 +178,6 @@ func TestCounters(t *testing.T) {
oo[i] = testutil.GenerateObject() oo[i] = testutil.GenerateObject()
} }
cc := meta.ContainerCounters{Logical: make(map[cid.ID]uint64), Physical: make(map[cid.ID]uint64)}
t.Run("defaults", func(t *testing.T) { t.Run("defaults", func(t *testing.T) {
require.Zero(t, mm.getObjectCounter(physical)) require.Zero(t, mm.getObjectCounter(physical))
require.Zero(t, mm.getObjectCounter(logical)) require.Zero(t, mm.getObjectCounter(logical))
@ -200,15 +198,16 @@ func TestCounters(t *testing.T) {
var totalPayload int64 var totalPayload int64
expectedLogicalSizes := make(map[string]int64) expectedLogicalSizes := make(map[string]int64)
expectedLogCC := make(map[cid.ID]uint64) expected := make(map[cid.ID]meta.ObjectCounters)
expectedPhyCC := make(map[cid.ID]uint64)
for i := range oo { for i := range oo {
cnr, _ := oo[i].ContainerID() cnr, _ := oo[i].ContainerID()
oSize := int64(oo[i].PayloadSize()) oSize := int64(oo[i].PayloadSize())
expectedLogicalSizes[cnr.EncodeToString()] += oSize expectedLogicalSizes[cnr.EncodeToString()] += oSize
totalPayload += oSize totalPayload += oSize
expectedLogCC[cnr]++ expected[cnr] = meta.ObjectCounters{
expectedPhyCC[cnr]++ Logic: 1,
Phy: 1,
}
} }
var prm PutPrm var prm PutPrm
@ -227,8 +226,7 @@ func TestCounters(t *testing.T) {
cc, err := sh.metaBase.ContainerCounters(context.Background()) cc, err := sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical) require.Equal(t, meta.ContainerCounters{Counts: expected}, cc)
require.Equal(t, expectedPhyCC, cc.Physical)
t.Run("inhume_GC", func(t *testing.T) { t.Run("inhume_GC", func(t *testing.T) {
var prm InhumePrm var prm InhumePrm
@ -244,9 +242,13 @@ func TestCounters(t *testing.T) {
require.True(t, ok) require.True(t, ok)
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize()) expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
expectedLogCC[cid]-- if v, ok := expected[cid]; ok {
if expectedLogCC[cid] == 0 { v.Logic--
delete(expectedLogCC, cid) if v.IsZero() {
delete(expected, cid)
} else {
expected[cid] = v
}
} }
} }
@ -257,8 +259,7 @@ func TestCounters(t *testing.T) {
cc, err := sh.metaBase.ContainerCounters(context.Background()) cc, err := sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical) require.Equal(t, meta.ContainerCounters{Counts: expected}, cc)
require.Equal(t, expectedPhyCC, cc.Physical)
oo = oo[inhumedNumber:] oo = oo[inhumedNumber:]
}) })
@ -281,9 +282,13 @@ func TestCounters(t *testing.T) {
require.True(t, ok) require.True(t, ok)
expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize()) expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize())
expectedLogCC[cid]-- if v, ok := expected[cid]; ok {
if expectedLogCC[cid] == 0 { v.Logic--
delete(expectedLogCC, cid) if v.IsZero() {
delete(expected, cid)
} else {
expected[cid] = v
}
} }
} }
@ -294,8 +299,7 @@ func TestCounters(t *testing.T) {
cc, err = sh.metaBase.ContainerCounters(context.Background()) cc, err = sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical) require.Equal(t, meta.ContainerCounters{Counts: expected}, cc)
require.Equal(t, expectedPhyCC, cc.Physical)
oo = oo[inhumedNumber:] oo = oo[inhumedNumber:]
}) })
@ -322,14 +326,14 @@ func TestCounters(t *testing.T) {
cnr, _ := oo[i].ContainerID() cnr, _ := oo[i].ContainerID()
expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload) expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload)
expectedLogCC[cnr]-- if v, ok := expected[cnr]; ok {
if expectedLogCC[cnr] == 0 { v.Logic--
delete(expectedLogCC, cnr) v.Phy--
} if v.IsZero() {
delete(expected, cnr)
expectedPhyCC[cnr]-- } else {
if expectedPhyCC[cnr] == 0 { expected[cnr] = v
delete(expectedPhyCC, cnr) }
} }
} }
require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, expectedLogicalSizes, mm.containerSizes())
@ -337,8 +341,7 @@ func TestCounters(t *testing.T) {
cc, err = sh.metaBase.ContainerCounters(context.Background()) cc, err = sh.metaBase.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expectedLogCC, cc.Logical) require.Equal(t, meta.ContainerCounters{Counts: expected}, cc)
require.Equal(t, expectedPhyCC, cc.Physical)
}) })
} }

View file

@ -395,6 +395,8 @@ const (
// counter type (excludes objects that are // counter type (excludes objects that are
// stored but unavailable). // stored but unavailable).
logical = "logic" logical = "logic"
// user is an available small or big regular object.
user = "user"
) )
func (s *Shard) updateMetrics(ctx context.Context) { func (s *Shard) updateMetrics(ctx context.Context) {
@ -411,8 +413,9 @@ func (s *Shard) updateMetrics(ctx context.Context) {
return return
} }
s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy()) s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy)
s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic()) s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic)
s.cfg.metricsWriter.SetObjectCounter(user, cc.User)
cnrList, err := s.metaBase.Containers(ctx) cnrList, err := s.metaBase.Containers(ctx)
if err != nil { if err != nil {
@ -441,11 +444,10 @@ func (s *Shard) updateMetrics(ctx context.Context) {
s.log.Warn(logs.FailedToGetContainerCounters, zap.Error(err)) s.log.Warn(logs.FailedToGetContainerCounters, zap.Error(err))
return return
} }
for contID, count := range contCount.Physical { for contID, count := range contCount.Counts {
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), physical, count) s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), physical, count.Phy)
} s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), logical, count.Logic)
for contID, count := range contCount.Logical { s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), user, count.User)
s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), logical, count)
} }
} }
@ -472,8 +474,9 @@ func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]meta.ObjectCounters)
} }
for cnrID, count := range byCnr { for cnrID, count := range byCnr {
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy()) s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy)
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic()) s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic)
s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), user, count.User)
} }
} }