[#838] metabase: Count user objects

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-12-05 17:00:16 +03:00
parent f314da4af3
commit d30ab5f29e
13 changed files with 180 additions and 58 deletions

View file

@ -46,10 +46,6 @@ func (m *metricsWithID) IncObjectCounter(objectType string) {
m.mw.AddToObjectCounter(m.id, objectType, +1) m.mw.AddToObjectCounter(m.id, objectType, +1)
} }
func (m *metricsWithID) DecObjectCounter(objectType string) {
m.mw.AddToObjectCounter(m.id, objectType, -1)
}
func (m *metricsWithID) SetMode(mode mode.Mode) { func (m *metricsWithID) SetMode(mode mode.Mode) {
m.mw.SetMode(m.id, mode) m.mw.SetMode(m.id, mode)
} }

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
) )
@ -27,6 +28,7 @@ const (
_ objectType = iota _ objectType = iota
phy phy
logical logical
user
) )
// ObjectCounters groups object counter // ObjectCounters groups object counter
@ -183,14 +185,19 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters)
return false, nil return false, nil
} }
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID) error { func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil { if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
return fmt.Errorf("could not increase phy object counter: %w", err) return fmt.Errorf("could not increase phy object counter: %w", err)
} }
if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil { if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil {
return fmt.Errorf("could not increase logical object counter: %w", err) return fmt.Errorf("could not increase logical object counter: %w", err)
} }
return db.incContainerObjectCounter(tx, cnrID) if isUserObject {
if err := db.updateShardObjectCounter(tx, user, 1, true); err != nil {
return fmt.Errorf("could not increase user object counter: %w", err)
}
}
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
} }
func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error { func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
@ -207,6 +214,8 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6
counterKey = objectPhyCounterKey counterKey = objectPhyCounterKey
case logical: case logical:
counterKey = objectLogicCounterKey counterKey = objectLogicCounterKey
case user:
counterKey = objectUserCounterKey
default: default:
panic("unknown object type counter") panic("unknown object type counter")
} }
@ -230,7 +239,7 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6
return b.Put(counterKey, newCounter) return b.Put(counterKey, newCounter)
} }
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { // TODO #838
b := tx.Bucket(containerCounterBucketName) b := tx.Bucket(containerCounterBucketName)
if b == nil { if b == nil {
return nil return nil
@ -277,7 +286,7 @@ func nextValue(existed, delta uint64, inc bool) uint64 {
return existed return existed
} }
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error { func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
b := tx.Bucket(containerCounterBucketName) b := tx.Bucket(containerCounterBucketName)
if b == nil { if b == nil {
return nil return nil
@ -285,7 +294,11 @@ func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error {
key := make([]byte, cidSize) key := make([]byte, cidSize)
cnrID.Encode(key) cnrID.Encode(key)
return db.editContainerCounterValue(b, key, ObjectCounters{Logic: 1, Phy: 1}, true) c := ObjectCounters{Logic: 1, Phy: 1}
if isUserObject {
c.User = 1
}
return db.editContainerCounterValue(b, key, c, true)
} }
// syncCounter updates object counters according to metabase state: // syncCounter updates object counters according to metabase state:
@ -319,8 +332,9 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
graveyardBKT := tx.Bucket(graveyardBucketName) graveyardBKT := tx.Bucket(graveyardBucketName)
garbageBKT := tx.Bucket(garbageBucketName) garbageBKT := tx.Bucket(garbageBucketName)
key := make([]byte, addressKeySize) key := make([]byte, addressKeySize)
var isAvailable bool
err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error { err = iteratePhyObjects(tx, func(cnr cid.ID, objID oid.ID, obj *objectSDK.Object) error {
if v, ok := counters[cnr]; ok { if v, ok := counters[cnr]; ok {
v.Phy++ v.Phy++
counters[cnr] = v counters[cnr] = v
@ -331,7 +345,8 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
} }
addr.SetContainer(cnr) addr.SetContainer(cnr)
addr.SetObject(obj) addr.SetObject(objID)
isAvailable = false
// check if an object is available: not with GCMark // check if an object is available: not with GCMark
// and not covered with a tombstone // and not covered with a tombstone
@ -344,6 +359,18 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
Logic: 1, Logic: 1,
} }
} }
isAvailable = true
}
if isAvailable && IsUserObject(obj) {
if v, ok := counters[cnr]; ok {
v.User++
counters[cnr] = v
} else {
counters[cnr] = ObjectCounters{
User: 1,
}
}
} }
return nil return nil
@ -446,3 +473,10 @@ func containerObjectCounterInitialized(tx *bbolt.Tx) bool {
_, err = parseContainerCounterValue(v) _, err = parseContainerCounterValue(v)
return err == nil return err == nil
} }
func IsUserObject(obj *objectSDK.Object) bool {
_, hasParentID := obj.ParentID()
return obj.Type() == objectSDK.TypeRegular &&
(obj.SplitID() == nil ||
(hasParentID && len(obj.Children()) == 0))
}

View file

@ -51,6 +51,7 @@ func TestCounters(t *testing.T) {
exp[cnrID] = meta.ObjectCounters{ exp[cnrID] = meta.ObjectCounters{
Logic: 1, Logic: 1,
Phy: 1, Phy: 1,
User: 1,
} }
_, err := db.Put(context.Background(), prm) _, err := db.Put(context.Background(), prm)
@ -80,6 +81,7 @@ func TestCounters(t *testing.T) {
exp[cnrID] = meta.ObjectCounters{ exp[cnrID] = meta.ObjectCounters{
Logic: 1, Logic: 1,
Phy: 1, Phy: 1,
User: 1,
} }
} }
@ -96,11 +98,13 @@ func TestCounters(t *testing.T) {
require.Equal(t, uint64(i), c.Phy) require.Equal(t, uint64(i), c.Phy)
require.Equal(t, uint64(i), c.Logic) require.Equal(t, uint64(i), c.Logic)
require.Equal(t, uint64(i), c.User)
cnrID, _ := oo[i].ContainerID() cnrID, _ := oo[i].ContainerID()
if v, ok := exp[cnrID]; ok { if v, ok := exp[cnrID]; ok {
v.Phy-- v.Phy--
v.Logic-- v.Logic--
v.User--
if v.IsZero() { if v.IsZero() {
delete(exp, cnrID) delete(exp, cnrID)
} else { } else {
@ -125,6 +129,7 @@ func TestCounters(t *testing.T) {
exp[cnrID] = meta.ObjectCounters{ exp[cnrID] = meta.ObjectCounters{
Logic: 1, Logic: 1,
Phy: 1, Phy: 1,
User: 1,
} }
} }
@ -141,6 +146,7 @@ func TestCounters(t *testing.T) {
for _, addr := range inhumedObjs { for _, addr := range inhumedObjs {
if v, ok := exp[addr.Container()]; ok { if v, ok := exp[addr.Container()]; ok {
v.Logic-- v.Logic--
v.User--
if v.IsZero() { if v.IsZero() {
delete(exp, addr.Container()) delete(exp, addr.Container())
} else { } else {
@ -156,12 +162,14 @@ func TestCounters(t *testing.T) {
res, err := db.Inhume(context.Background(), prm) res, err := db.Inhume(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed()) require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed())
require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed())
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(objCount), c.Phy) require.Equal(t, uint64(objCount), c.Phy)
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic) require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic)
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.User)
cc, err := db.ContainerCounters(context.Background()) cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
@ -182,12 +190,14 @@ func TestCounters(t *testing.T) {
o := testutil.GenerateObject() o := testutil.GenerateObject()
if i < objCount/2 { // half of the objs will have the parent if i < objCount/2 { // half of the objs will have the parent
o.SetParent(parObj) o.SetParent(parObj)
o.SetSplitID(objectSDK.NewSplitID())
} }
cnrID, _ := o.ContainerID() cnrID, _ := o.ContainerID()
exp[cnrID] = meta.ObjectCounters{ exp[cnrID] = meta.ObjectCounters{
Logic: 1, Logic: 1,
Phy: 1, Phy: 1,
User: 1,
} }
require.NoError(t, putBig(db, o)) require.NoError(t, putBig(db, o))
@ -196,6 +206,7 @@ func TestCounters(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(i+1), c.Phy) require.Equal(t, uint64(i+1), c.Phy)
require.Equal(t, uint64(i+1), c.Logic) require.Equal(t, uint64(i+1), c.Logic)
require.Equal(t, uint64(i+1), c.User)
cc, err := db.ContainerCounters(context.Background()) cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
@ -214,6 +225,7 @@ func TestCounters(t *testing.T) {
exp[cnrID] = meta.ObjectCounters{ exp[cnrID] = meta.ObjectCounters{
Logic: 1, Logic: 1,
Phy: 1, Phy: 1,
User: 1,
} }
} }
@ -227,10 +239,13 @@ func TestCounters(t *testing.T) {
c, err := db.ObjectCounters() c, err := db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(objCount-i-1), c.Phy) require.Equal(t, uint64(objCount-i-1), c.Phy)
require.Equal(t, uint64(objCount-i-1), c.Logic)
require.Equal(t, uint64(objCount-i-1), c.User)
if v, ok := exp[addr.Container()]; ok { if v, ok := exp[addr.Container()]; ok {
v.Logic-- v.Logic--
v.Phy-- v.Phy--
v.User--
if v.IsZero() { if v.IsZero() {
delete(exp, addr.Container()) delete(exp, addr.Container())
} else { } else {
@ -251,6 +266,7 @@ func TestCounters(t *testing.T) {
exp[cnrID] = meta.ObjectCounters{ exp[cnrID] = meta.ObjectCounters{
Logic: 1, Logic: 1,
Phy: 1, Phy: 1,
User: 1,
} }
} }
@ -267,6 +283,7 @@ func TestCounters(t *testing.T) {
for _, addr := range inhumedObjs { for _, addr := range inhumedObjs {
if v, ok := exp[addr.Container()]; ok { if v, ok := exp[addr.Container()]; ok {
v.Logic-- v.Logic--
v.User--
if v.IsZero() { if v.IsZero() {
delete(exp, addr.Container()) delete(exp, addr.Container())
} else { } else {
@ -287,6 +304,7 @@ func TestCounters(t *testing.T) {
require.Equal(t, uint64(objCount), c.Phy) require.Equal(t, uint64(objCount), c.Phy)
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic) require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic)
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.User)
cc, err := db.ContainerCounters(context.Background()) cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
@ -318,6 +336,7 @@ func TestCounters_Expired(t *testing.T) {
exp[addr.Container()] = meta.ObjectCounters{ exp[addr.Container()] = meta.ObjectCounters{
Logic: 1, Logic: 1,
Phy: 1, Phy: 1,
User: 1,
} }
} }
@ -327,6 +346,7 @@ func TestCounters_Expired(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(objCount), c.Phy) require.Equal(t, uint64(objCount), c.Phy)
require.Equal(t, uint64(objCount), c.Logic) require.Equal(t, uint64(objCount), c.Logic)
require.Equal(t, uint64(objCount), c.User)
cc, err := db.ContainerCounters(context.Background()) cc, err := db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
@ -347,6 +367,7 @@ func TestCounters_Expired(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(objCount), c.Phy) require.Equal(t, uint64(objCount), c.Phy)
require.Equal(t, uint64(objCount), c.Logic) require.Equal(t, uint64(objCount), c.Logic)
require.Equal(t, uint64(objCount), c.User)
cc, err = db.ContainerCounters(context.Background()) cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
@ -369,15 +390,18 @@ func TestCounters_Expired(t *testing.T) {
inhumeRes, err := db.Inhume(context.Background(), inhumePrm) inhumeRes, err := db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(1), inhumeRes.AvailableInhumed()) require.Equal(t, uint64(1), inhumeRes.AvailableInhumed())
require.Equal(t, uint64(1), inhumeRes.UserInhumed())
c, err = db.ObjectCounters() c, err = db.ObjectCounters()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(len(oo)), c.Phy) require.Equal(t, uint64(len(oo)), c.Phy)
require.Equal(t, uint64(len(oo)-1), c.Logic) require.Equal(t, uint64(len(oo)-1), c.Logic)
require.Equal(t, uint64(len(oo)-1), c.User)
if v, ok := exp[oo[0].Container()]; ok { if v, ok := exp[oo[0].Container()]; ok {
v.Logic-- v.Logic--
v.User--
if v.IsZero() { if v.IsZero() {
delete(exp, oo[0].Container()) delete(exp, oo[0].Container())
} else { } else {
@ -400,6 +424,7 @@ func TestCounters_Expired(t *testing.T) {
deleteRes, err := db.Delete(context.Background(), deletePrm) deleteRes, err := db.Delete(context.Background(), deletePrm)
require.NoError(t, err) require.NoError(t, err)
require.Zero(t, deleteRes.AvailableObjectsRemoved()) require.Zero(t, deleteRes.AvailableObjectsRemoved())
require.Zero(t, deleteRes.UserObjectsRemoved())
if v, ok := exp[oo[0].Container()]; ok { if v, ok := exp[oo[0].Container()]; ok {
v.Phy-- v.Phy--
@ -416,6 +441,7 @@ func TestCounters_Expired(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(len(oo)), c.Phy) require.Equal(t, uint64(len(oo)), c.Phy)
require.Equal(t, uint64(len(oo)), c.Logic) require.Equal(t, uint64(len(oo)), c.Logic)
require.Equal(t, uint64(len(oo)), c.User)
cc, err = db.ContainerCounters(context.Background()) cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
@ -431,10 +457,12 @@ func TestCounters_Expired(t *testing.T) {
deleteRes, err = db.Delete(context.Background(), deletePrm) deleteRes, err = db.Delete(context.Background(), deletePrm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved()) require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
require.Equal(t, uint64(1), deleteRes.UserObjectsRemoved())
if v, ok := exp[oo[0].Container()]; ok { if v, ok := exp[oo[0].Container()]; ok {
v.Phy-- v.Phy--
v.Logic-- v.Logic--
v.User--
if v.IsZero() { if v.IsZero() {
delete(exp, oo[0].Container()) delete(exp, oo[0].Container())
} else { } else {
@ -448,6 +476,7 @@ func TestCounters_Expired(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(len(oo)), c.Phy) require.Equal(t, uint64(len(oo)), c.Phy)
require.Equal(t, uint64(len(oo)), c.Logic) require.Equal(t, uint64(len(oo)), c.Logic)
require.Equal(t, uint64(len(oo)), c.User)
cc, err = db.ContainerCounters(context.Background()) cc, err = db.ContainerCounters(context.Background())
require.NoError(t, err) require.NoError(t, err)
@ -465,6 +494,7 @@ func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK
o := testutil.GenerateObject() o := testutil.GenerateObject()
if withParent { if withParent {
o.SetParent(parent) o.SetParent(parent)
o.SetSplitID(objectSDK.NewSplitID())
} }
oo = append(oo, o) oo = append(oo, o)

View file

@ -29,6 +29,7 @@ type DeletePrm struct {
type DeleteRes struct { type DeleteRes struct {
rawRemoved uint64 rawRemoved uint64
availableRemoved uint64 availableRemoved uint64
userRemoved uint64
sizes []uint64 sizes []uint64
availableSizes []uint64 availableSizes []uint64
removedByCnrID map[cid.ID]ObjectCounters removedByCnrID map[cid.ID]ObjectCounters
@ -40,6 +41,10 @@ func (d DeleteRes) AvailableObjectsRemoved() uint64 {
return d.availableRemoved return d.availableRemoved
} }
func (d DeleteRes) UserObjectsRemoved() uint64 {
return d.userRemoved
}
// RemovedByCnrID returns the number of removed objects by container ID. // RemovedByCnrID returns the number of removed objects by container ID.
func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters { func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
return d.removedByCnrID return d.removedByCnrID
@ -132,12 +137,12 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
currEpoch := db.epochState.CurrentEpoch() currEpoch := db.epochState.CurrentEpoch()
for i := range addrs { for i := range addrs {
removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch) r, err := db.delete(tx, addrs[i], refCounter, currEpoch)
if err != nil { if err != nil {
return DeleteRes{}, err // maybe log and continue? return DeleteRes{}, err // maybe log and continue?
} }
if removed { if r.Removed {
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.Phy++ v.Phy++
res.removedByCnrID[addrs[i].Container()] = v res.removedByCnrID[addrs[i].Container()] = v
@ -148,10 +153,10 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
} }
res.rawRemoved++ res.rawRemoved++
res.sizes[i] = size res.sizes[i] = r.Size
} }
if available { if r.Available {
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.Logic++ v.Logic++
res.removedByCnrID[addrs[i].Container()] = v res.removedByCnrID[addrs[i].Container()] = v
@ -162,7 +167,20 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
} }
res.availableRemoved++ res.availableRemoved++
res.availableSizes[i] = size res.availableSizes[i] = r.Size
}
if r.User {
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.User++
res.removedByCnrID[addrs[i].Container()] = v
} else {
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
User: 1,
}
}
res.userRemoved++
} }
} }
@ -180,6 +198,13 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
} }
} }
if res.userRemoved > 0 {
err := db.updateShardObjectCounter(tx, user, res.userRemoved, false)
if err != nil {
return DeleteRes{}, fmt.Errorf("could not decrease user object counter: %w", err)
}
}
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil { if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
return DeleteRes{}, fmt.Errorf("could not decrease container object counter: %w", err) return DeleteRes{}, fmt.Errorf("could not decrease container object counter: %w", err)
} }
@ -196,13 +221,20 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
return res, nil return res, nil
} }
type deleteSingleResult struct {
Removed bool
Available bool
User bool
Size uint64
}
// delete removes object indexes from the metabase. Counts the references // delete removes object indexes from the metabase. Counts the references
// of the object that is being removed. // of the object that is being removed.
// The first return value indicates if an object has been removed. (removing a // The first return value indicates if an object has been removed. (removing a
// non-exist object is error-free). The second return value indicates if an // non-exist object is error-free). The second return value indicates if an
// object was available before the removal (for calculating the logical object // object was available before the removal (for calculating the logical object
// counter). The third return value is removed object payload size. // counter). The third return value The fourth return value is removed object payload size.
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, uint64, error) { func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) {
key := make([]byte, addressKeySize) key := make([]byte, addressKeySize)
addrKey := addressKey(addr, key) addrKey := addressKey(addr, key)
garbageBKT := tx.Bucket(garbageBucketName) garbageBKT := tx.Bucket(garbageBucketName)
@ -214,7 +246,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
if garbageBKT != nil { if garbageBKT != nil {
err := garbageBKT.Delete(addrKey) err := garbageBKT.Delete(addrKey)
if err != nil { if err != nil {
return false, false, 0, fmt.Errorf("could not remove from garbage bucket: %w", err) return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
} }
} }
@ -224,10 +256,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
var siErr *objectSDK.SplitInfoError var siErr *objectSDK.SplitInfoError
if client.IsErrObjectNotFound(err) || errors.As(err, &siErr) { if client.IsErrObjectNotFound(err) || errors.As(err, &siErr) {
return false, false, 0, nil return deleteSingleResult{}, nil
} }
return false, false, 0, err return deleteSingleResult{}, err
} }
// if object is an only link to a parent, then remove parent // if object is an only link to a parent, then remove parent
@ -250,13 +282,20 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
nRef.cur++ nRef.cur++
} }
isUserObject := IsUserObject(obj)
// remove object // remove object
err = db.deleteObject(tx, obj, false) err = db.deleteObject(tx, obj, false)
if err != nil { if err != nil {
return false, false, 0, fmt.Errorf("could not remove object: %w", err) return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err)
} }
return true, removeAvailableObject, obj.PayloadSize(), nil return deleteSingleResult{
Removed: true,
Available: removeAvailableObject,
User: isUserObject && removeAvailableObject,
Size: obj.PayloadSize(),
}, nil
} }
func (db *DB) deleteObject( func (db *DB) deleteObject(

View file

@ -32,12 +32,14 @@ type InhumePrm struct {
type DeletionInfo struct { type DeletionInfo struct {
Size uint64 Size uint64
CID cid.ID CID cid.ID
IsUser bool
} }
// InhumeRes encapsulates results of Inhume operation. // InhumeRes encapsulates results of Inhume operation.
type InhumeRes struct { type InhumeRes struct {
deletedLockObj []oid.Address deletedLockObj []oid.Address
availableInhumed uint64 availableInhumed uint64
userInhumed uint64
inhumedByCnrID map[cid.ID]ObjectCounters inhumedByCnrID map[cid.ID]ObjectCounters
deletionDetails []DeletionInfo deletionDetails []DeletionInfo
} }
@ -48,6 +50,10 @@ func (i InhumeRes) AvailableInhumed() uint64 {
return i.availableInhumed return i.availableInhumed
} }
func (i InhumeRes) UserInhumed() uint64 {
return i.userInhumed
}
// InhumedByCnrID return number of object // InhumedByCnrID return number of object
// that have been inhumed by container ID. // that have been inhumed by container ID.
func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters { func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters {
@ -75,19 +81,31 @@ func (i InhumeRes) GetDeletionInfoByIndex(target int) DeletionInfo {
// StoreDeletionInfo stores size of deleted object and associated container ID // StoreDeletionInfo stores size of deleted object and associated container ID
// in corresponding arrays. // in corresponding arrays.
func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) { func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, isUser bool) {
i.deletionDetails = append(i.deletionDetails, DeletionInfo{ i.deletionDetails = append(i.deletionDetails, DeletionInfo{
Size: deletedSize, Size: deletedSize,
CID: containerID, CID: containerID,
IsUser: isUser,
}) })
i.availableInhumed++ i.availableInhumed++
if isUser {
i.userInhumed++
}
if v, ok := i.inhumedByCnrID[containerID]; ok { if v, ok := i.inhumedByCnrID[containerID]; ok {
v.Logic++ v.Logic++
if isUser {
v.User++
}
i.inhumedByCnrID[containerID] = v i.inhumedByCnrID[containerID] = v
} else { } else {
i.inhumedByCnrID[containerID] = ObjectCounters{ v = ObjectCounters{
Logic: 1, Logic: 1,
} }
if isUser {
v.User = 1
}
i.inhumedByCnrID[containerID] = v
} }
} }
@ -247,23 +265,14 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
} }
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
var inhumedCount uint64 if err := db.updateShardObjectCounter(tx, logical, res.AvailableInhumed(), false); err != nil {
inhumedbyCnr := make(map[cid.ID]ObjectCounters) return err
for _, dd := range res.deletionDetails {
if v, ok := inhumedbyCnr[dd.CID]; ok {
v.Logic++
inhumedbyCnr[dd.CID] = v
} else {
inhumedbyCnr[dd.CID] = ObjectCounters{Logic: 1}
} }
inhumedCount++ if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil {
}
if err := db.updateShardObjectCounter(tx, logical, inhumedCount, false); err != nil {
return err return err
} }
return db.updateContainerCounter(tx, inhumedbyCnr, false) return db.updateContainerCounter(tx, res.inhumedByCnrID, false)
} }
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket. // getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
@ -318,7 +327,7 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error { func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
containerID, _ := obj.ContainerID() containerID, _ := obj.ContainerID()
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 { if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
res.storeDeletionInfo(containerID, obj.PayloadSize()) res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj))
} }
// if object is stored, and it is regular object then update bucket // if object is stored, and it is regular object then update bucket

View file

@ -202,9 +202,10 @@ func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]oid.Addres
return err return err
} }
func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID) error) error { func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
var cid cid.ID var cid cid.ID
var oid oid.ID var oid oid.ID
obj := objectSDK.New()
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
b58CID, postfix := parseContainerIDWithPrefix(&cid, name) b58CID, postfix := parseContainerIDWithPrefix(&cid, name)
@ -221,8 +222,8 @@ func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID) error) error {
} }
return b.ForEach(func(k, v []byte) error { return b.ForEach(func(k, v []byte) error {
if oid.Decode(k) == nil { if oid.Decode(k) == nil && obj.Unmarshal(v) == nil {
return f(cid, oid) return f(cid, oid, obj)
} }
return nil return nil

View file

@ -183,7 +183,7 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
} }
if !isParent { if !isParent {
if err = db.incCounters(tx, cnr); err != nil { if err = db.incCounters(tx, cnr, IsUserObject(obj)); err != nil {
return err return err
} }
} }

View file

@ -118,6 +118,7 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error
} }
s.decObjectCounterBy(physical, res.RawObjectsRemoved()) s.decObjectCounterBy(physical, res.RawObjectsRemoved())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
s.decObjectCounterBy(user, res.UserObjectsRemoved())
s.decContainerObjectCounter(res.RemovedByCnrID()) s.decContainerObjectCounter(res.RemovedByCnrID())
removedPayload := res.RemovedPhysicalObjectSizes()[0] removedPayload := res.RemovedPhysicalObjectSizes()[0]
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0] logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]

View file

@ -416,6 +416,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular) s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID()) s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0 i := 0
@ -630,6 +631,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone) s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID()) s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0 i := 0
@ -677,6 +679,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock) s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID()) s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0 i := 0

View file

@ -122,6 +122,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
s.m.RUnlock() s.m.RUnlock()
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID()) s.decContainerObjectCounter(res.InhumedByCnrID())
i := 0 i := 0

View file

@ -86,12 +86,6 @@ func (m *metricsStore) IncObjectCounter(objectType string) {
m.objCounters[objectType] += 1 m.objCounters[objectType] += 1
} }
func (m *metricsStore) DecObjectCounter(objectType string) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.AddToObjectCounter(objectType, -1)
}
func (m *metricsStore) SetMode(mode mode.Mode) { func (m *metricsStore) SetMode(mode mode.Mode) {
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
@ -192,6 +186,9 @@ func TestCounters(t *testing.T) {
v, ok = mm.getContainerCount(contID.EncodeToString(), logical) v, ok = mm.getContainerCount(contID.EncodeToString(), logical)
require.Zero(t, v) require.Zero(t, v)
require.False(t, ok) require.False(t, ok)
v, ok = mm.getContainerCount(contID.EncodeToString(), user)
require.Zero(t, v)
require.False(t, ok)
} }
}) })
@ -207,6 +204,7 @@ func TestCounters(t *testing.T) {
expected[cnr] = meta.ObjectCounters{ expected[cnr] = meta.ObjectCounters{
Logic: 1, Logic: 1,
Phy: 1, Phy: 1,
User: 1,
} }
} }
@ -221,6 +219,7 @@ func TestCounters(t *testing.T) {
require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical)) require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical))
require.Equal(t, uint64(objNumber), mm.getObjectCounter(logical)) require.Equal(t, uint64(objNumber), mm.getObjectCounter(logical))
require.Equal(t, uint64(objNumber), mm.getObjectCounter(user))
require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize()) require.Equal(t, totalPayload, mm.payloadSize())
@ -244,6 +243,7 @@ func TestCounters(t *testing.T) {
if v, ok := expected[cid]; ok { if v, ok := expected[cid]; ok {
v.Logic-- v.Logic--
v.User--
if v.IsZero() { if v.IsZero() {
delete(expected, cid) delete(expected, cid)
} else { } else {
@ -254,6 +254,7 @@ func TestCounters(t *testing.T) {
require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical)) require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical))
require.Equal(t, uint64(objNumber-inhumedNumber), mm.getObjectCounter(logical)) require.Equal(t, uint64(objNumber-inhumedNumber), mm.getObjectCounter(logical))
require.Equal(t, uint64(objNumber-inhumedNumber), mm.getObjectCounter(user))
require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize()) require.Equal(t, totalPayload, mm.payloadSize())
@ -270,6 +271,7 @@ func TestCounters(t *testing.T) {
phy := mm.getObjectCounter(physical) phy := mm.getObjectCounter(physical)
logic := mm.getObjectCounter(logical) logic := mm.getObjectCounter(logical)
custom := mm.getObjectCounter(user)
inhumedNumber := int(phy / 4) inhumedNumber := int(phy / 4)
prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...) prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...)
@ -284,6 +286,7 @@ func TestCounters(t *testing.T) {
if v, ok := expected[cid]; ok { if v, ok := expected[cid]; ok {
v.Logic-- v.Logic--
v.User--
if v.IsZero() { if v.IsZero() {
delete(expected, cid) delete(expected, cid)
} else { } else {
@ -294,6 +297,7 @@ func TestCounters(t *testing.T) {
require.Equal(t, phy, mm.getObjectCounter(physical)) require.Equal(t, phy, mm.getObjectCounter(physical))
require.Equal(t, logic-uint64(inhumedNumber), mm.getObjectCounter(logical)) require.Equal(t, logic-uint64(inhumedNumber), mm.getObjectCounter(logical))
require.Equal(t, custom-uint64(inhumedNumber), mm.getObjectCounter(user))
require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, expectedLogicalSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize()) require.Equal(t, totalPayload, mm.payloadSize())
@ -309,6 +313,7 @@ func TestCounters(t *testing.T) {
phy := mm.getObjectCounter(physical) phy := mm.getObjectCounter(physical)
logic := mm.getObjectCounter(logical) logic := mm.getObjectCounter(logical)
custom := mm.getObjectCounter(user)
deletedNumber := int(phy / 4) deletedNumber := int(phy / 4)
prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...) prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...)
@ -318,6 +323,7 @@ func TestCounters(t *testing.T) {
require.Equal(t, phy-uint64(deletedNumber), mm.getObjectCounter(physical)) require.Equal(t, phy-uint64(deletedNumber), mm.getObjectCounter(physical))
require.Equal(t, logic-uint64(deletedNumber), mm.getObjectCounter(logical)) require.Equal(t, logic-uint64(deletedNumber), mm.getObjectCounter(logical))
require.Equal(t, custom-uint64(deletedNumber), mm.getObjectCounter(user))
var totalRemovedpayload uint64 var totalRemovedpayload uint64
for i := range oo[:deletedNumber] { for i := range oo[:deletedNumber] {
removedPayload := oo[i].PayloadSize() removedPayload := oo[i].PayloadSize()
@ -329,6 +335,7 @@ func TestCounters(t *testing.T) {
if v, ok := expected[cnr]; ok { if v, ok := expected[cnr]; ok {
v.Logic-- v.Logic--
v.Phy-- v.Phy--
v.User--
if v.IsZero() { if v.IsZero() {
delete(expected, cnr) delete(expected, cnr)
} else { } else {

View file

@ -90,7 +90,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err) return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
} }
s.incObjectCounter(putPrm.Address.Container()) s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj))
s.addToPayloadSize(int64(prm.obj.PayloadSize())) s.addToPayloadSize(int64(prm.obj.PayloadSize()))
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize())) s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
} }

View file

@ -72,9 +72,6 @@ type MetricsWriter interface {
// IncObjectCounter must increment shard's object counter taking into account // IncObjectCounter must increment shard's object counter taking into account
// object type. // object type.
IncObjectCounter(objectType string) IncObjectCounter(objectType string)
// DecObjectCounter must decrement shard's object counter taking into account
// object type.
DecObjectCounter(objectType string)
// SetShardID must set (update) the shard identifier that will be used in // SetShardID must set (update) the shard identifier that will be used in
// metrics. // metrics.
SetShardID(id string) SetShardID(id string)
@ -453,12 +450,16 @@ func (s *Shard) updateMetrics(ctx context.Context) {
// incObjectCounter increment both physical and logical object // incObjectCounter increment both physical and logical object
// counters. // counters.
func (s *Shard) incObjectCounter(cnrID cid.ID) { func (s *Shard) incObjectCounter(cnrID cid.ID, isUser bool) {
if s.cfg.metricsWriter != nil { if s.cfg.metricsWriter != nil {
s.cfg.metricsWriter.IncObjectCounter(physical) s.cfg.metricsWriter.IncObjectCounter(physical)
s.cfg.metricsWriter.IncObjectCounter(logical) s.cfg.metricsWriter.IncObjectCounter(logical)
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical) s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical)
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical) s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical)
if isUser {
s.cfg.metricsWriter.IncObjectCounter(user)
s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), user)
}
} }
} }