From d30ab5f29ea2bef513cff00af10edc915a582b9a Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 5 Dec 2023 17:00:16 +0300 Subject: [PATCH] [#838] metabase: Count user objects Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/engine/shards.go | 4 -- pkg/local_object_storage/metabase/counter.go | 48 +++++++++++--- .../metabase/counter_test.go | 30 +++++++++ pkg/local_object_storage/metabase/delete.go | 63 +++++++++++++++---- pkg/local_object_storage/metabase/inhume.go | 49 +++++++++------ .../metabase/iterators.go | 7 ++- pkg/local_object_storage/metabase/put.go | 2 +- pkg/local_object_storage/shard/delete.go | 1 + pkg/local_object_storage/shard/gc.go | 3 + pkg/local_object_storage/shard/inhume.go | 1 + .../shard/metrics_test.go | 19 ++++-- pkg/local_object_storage/shard/put.go | 2 +- pkg/local_object_storage/shard/shard.go | 9 +-- 13 files changed, 180 insertions(+), 58 deletions(-) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 2e1cc751..5d7fb598 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -46,10 +46,6 @@ func (m *metricsWithID) IncObjectCounter(objectType string) { m.mw.AddToObjectCounter(m.id, objectType, +1) } -func (m *metricsWithID) DecObjectCounter(objectType string) { - m.mw.AddToObjectCounter(m.id, objectType, -1) -} - func (m *metricsWithID) SetMode(mode mode.Mode) { m.mw.SetMode(m.id, mode) } diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index be860830..96d80077 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" ) @@ -27,6 +28,7 @@ const ( _ objectType = iota phy logical + user ) // ObjectCounters groups object counter @@ -183,14 +185,19 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) return false, nil } -func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID) error { +func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil { return fmt.Errorf("could not increase phy object counter: %w", err) } if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil { return fmt.Errorf("could not increase logical object counter: %w", err) } - return db.incContainerObjectCounter(tx, cnrID) + if isUserObject { + if err := db.updateShardObjectCounter(tx, user, 1, true); err != nil { + return fmt.Errorf("could not increase user object counter: %w", err) + } + } + return db.incContainerObjectCounter(tx, cnrID, isUserObject) } func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error { @@ -207,6 +214,8 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6 counterKey = objectPhyCounterKey case logical: counterKey = objectLogicCounterKey + case user: + counterKey = objectUserCounterKey default: panic("unknown object type counter") } @@ -230,7 +239,7 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6 return b.Put(counterKey, newCounter) } -func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { +func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { // TODO #838 b := tx.Bucket(containerCounterBucketName) if b == nil { return nil @@ -277,7 +286,7 @@ func nextValue(existed, delta uint64, inc bool) uint64 { return existed } -func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error { +func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { b := tx.Bucket(containerCounterBucketName) if b == nil { return nil @@ -285,7 +294,11 @@ func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error { key := make([]byte, cidSize) cnrID.Encode(key) - return db.editContainerCounterValue(b, key, ObjectCounters{Logic: 1, Phy: 1}, true) + c := ObjectCounters{Logic: 1, Phy: 1} + if isUserObject { + c.User = 1 + } + return db.editContainerCounterValue(b, key, c, true) } // syncCounter updates object counters according to metabase state: @@ -319,8 +332,9 @@ func syncCounter(tx *bbolt.Tx, force bool) error { graveyardBKT := tx.Bucket(graveyardBucketName) garbageBKT := tx.Bucket(garbageBucketName) key := make([]byte, addressKeySize) + var isAvailable bool - err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error { + err = iteratePhyObjects(tx, func(cnr cid.ID, objID oid.ID, obj *objectSDK.Object) error { if v, ok := counters[cnr]; ok { v.Phy++ counters[cnr] = v @@ -331,7 +345,8 @@ func syncCounter(tx *bbolt.Tx, force bool) error { } addr.SetContainer(cnr) - addr.SetObject(obj) + addr.SetObject(objID) + isAvailable = false // check if an object is available: not with GCMark // and not covered with a tombstone @@ -344,6 +359,18 @@ func syncCounter(tx *bbolt.Tx, force bool) error { Logic: 1, } } + isAvailable = true + } + + if isAvailable && IsUserObject(obj) { + if v, ok := counters[cnr]; ok { + v.User++ + counters[cnr] = v + } else { + counters[cnr] = ObjectCounters{ + User: 1, + } + } } return nil @@ -446,3 +473,10 @@ func containerObjectCounterInitialized(tx *bbolt.Tx) bool { _, err = parseContainerCounterValue(v) return err == nil } + +func IsUserObject(obj *objectSDK.Object) bool { + _, hasParentID := obj.ParentID() + return obj.Type() == objectSDK.TypeRegular && + (obj.SplitID() == nil || + (hasParentID && len(obj.Children()) == 0)) +} diff --git a/pkg/local_object_storage/metabase/counter_test.go b/pkg/local_object_storage/metabase/counter_test.go index 27d0b37a..9241c97a 100644 --- a/pkg/local_object_storage/metabase/counter_test.go +++ b/pkg/local_object_storage/metabase/counter_test.go @@ -51,6 +51,7 @@ func TestCounters(t *testing.T) { exp[cnrID] = meta.ObjectCounters{ Logic: 1, Phy: 1, + User: 1, } _, err := db.Put(context.Background(), prm) @@ -80,6 +81,7 @@ func TestCounters(t *testing.T) { exp[cnrID] = meta.ObjectCounters{ Logic: 1, Phy: 1, + User: 1, } } @@ -96,11 +98,13 @@ func TestCounters(t *testing.T) { require.Equal(t, uint64(i), c.Phy) require.Equal(t, uint64(i), c.Logic) + require.Equal(t, uint64(i), c.User) cnrID, _ := oo[i].ContainerID() if v, ok := exp[cnrID]; ok { v.Phy-- v.Logic-- + v.User-- if v.IsZero() { delete(exp, cnrID) } else { @@ -125,6 +129,7 @@ func TestCounters(t *testing.T) { exp[cnrID] = meta.ObjectCounters{ Logic: 1, Phy: 1, + User: 1, } } @@ -141,6 +146,7 @@ func TestCounters(t *testing.T) { for _, addr := range inhumedObjs { if v, ok := exp[addr.Container()]; ok { v.Logic-- + v.User-- if v.IsZero() { delete(exp, addr.Container()) } else { @@ -156,12 +162,14 @@ func TestCounters(t *testing.T) { res, err := db.Inhume(context.Background(), prm) require.NoError(t, err) require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed()) + require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed()) c, err := db.ObjectCounters() require.NoError(t, err) require.Equal(t, uint64(objCount), c.Phy) require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic) + require.Equal(t, uint64(objCount-len(inhumedObjs)), c.User) cc, err := db.ContainerCounters(context.Background()) require.NoError(t, err) @@ -182,12 +190,14 @@ func TestCounters(t *testing.T) { o := testutil.GenerateObject() if i < objCount/2 { // half of the objs will have the parent o.SetParent(parObj) + o.SetSplitID(objectSDK.NewSplitID()) } cnrID, _ := o.ContainerID() exp[cnrID] = meta.ObjectCounters{ Logic: 1, Phy: 1, + User: 1, } require.NoError(t, putBig(db, o)) @@ -196,6 +206,7 @@ func TestCounters(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(i+1), c.Phy) require.Equal(t, uint64(i+1), c.Logic) + require.Equal(t, uint64(i+1), c.User) cc, err := db.ContainerCounters(context.Background()) require.NoError(t, err) @@ -214,6 +225,7 @@ func TestCounters(t *testing.T) { exp[cnrID] = meta.ObjectCounters{ Logic: 1, Phy: 1, + User: 1, } } @@ -227,10 +239,13 @@ func TestCounters(t *testing.T) { c, err := db.ObjectCounters() require.NoError(t, err) require.Equal(t, uint64(objCount-i-1), c.Phy) + require.Equal(t, uint64(objCount-i-1), c.Logic) + require.Equal(t, uint64(objCount-i-1), c.User) if v, ok := exp[addr.Container()]; ok { v.Logic-- v.Phy-- + v.User-- if v.IsZero() { delete(exp, addr.Container()) } else { @@ -251,6 +266,7 @@ func TestCounters(t *testing.T) { exp[cnrID] = meta.ObjectCounters{ Logic: 1, Phy: 1, + User: 1, } } @@ -267,6 +283,7 @@ func TestCounters(t *testing.T) { for _, addr := range inhumedObjs { if v, ok := exp[addr.Container()]; ok { v.Logic-- + v.User-- if v.IsZero() { delete(exp, addr.Container()) } else { @@ -287,6 +304,7 @@ func TestCounters(t *testing.T) { require.Equal(t, uint64(objCount), c.Phy) require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic) + require.Equal(t, uint64(objCount-len(inhumedObjs)), c.User) cc, err := db.ContainerCounters(context.Background()) require.NoError(t, err) @@ -318,6 +336,7 @@ func TestCounters_Expired(t *testing.T) { exp[addr.Container()] = meta.ObjectCounters{ Logic: 1, Phy: 1, + User: 1, } } @@ -327,6 +346,7 @@ func TestCounters_Expired(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(objCount), c.Phy) require.Equal(t, uint64(objCount), c.Logic) + require.Equal(t, uint64(objCount), c.User) cc, err := db.ContainerCounters(context.Background()) require.NoError(t, err) @@ -347,6 +367,7 @@ func TestCounters_Expired(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(objCount), c.Phy) require.Equal(t, uint64(objCount), c.Logic) + require.Equal(t, uint64(objCount), c.User) cc, err = db.ContainerCounters(context.Background()) require.NoError(t, err) @@ -369,15 +390,18 @@ func TestCounters_Expired(t *testing.T) { inhumeRes, err := db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) require.Equal(t, uint64(1), inhumeRes.AvailableInhumed()) + require.Equal(t, uint64(1), inhumeRes.UserInhumed()) c, err = db.ObjectCounters() require.NoError(t, err) require.Equal(t, uint64(len(oo)), c.Phy) require.Equal(t, uint64(len(oo)-1), c.Logic) + require.Equal(t, uint64(len(oo)-1), c.User) if v, ok := exp[oo[0].Container()]; ok { v.Logic-- + v.User-- if v.IsZero() { delete(exp, oo[0].Container()) } else { @@ -400,6 +424,7 @@ func TestCounters_Expired(t *testing.T) { deleteRes, err := db.Delete(context.Background(), deletePrm) require.NoError(t, err) require.Zero(t, deleteRes.AvailableObjectsRemoved()) + require.Zero(t, deleteRes.UserObjectsRemoved()) if v, ok := exp[oo[0].Container()]; ok { v.Phy-- @@ -416,6 +441,7 @@ func TestCounters_Expired(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(len(oo)), c.Phy) require.Equal(t, uint64(len(oo)), c.Logic) + require.Equal(t, uint64(len(oo)), c.User) cc, err = db.ContainerCounters(context.Background()) require.NoError(t, err) @@ -431,10 +457,12 @@ func TestCounters_Expired(t *testing.T) { deleteRes, err = db.Delete(context.Background(), deletePrm) require.NoError(t, err) require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved()) + require.Equal(t, uint64(1), deleteRes.UserObjectsRemoved()) if v, ok := exp[oo[0].Container()]; ok { v.Phy-- v.Logic-- + v.User-- if v.IsZero() { delete(exp, oo[0].Container()) } else { @@ -448,6 +476,7 @@ func TestCounters_Expired(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(len(oo)), c.Phy) require.Equal(t, uint64(len(oo)), c.Logic) + require.Equal(t, uint64(len(oo)), c.User) cc, err = db.ContainerCounters(context.Background()) require.NoError(t, err) @@ -465,6 +494,7 @@ func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK o := testutil.GenerateObject() if withParent { o.SetParent(parent) + o.SetSplitID(objectSDK.NewSplitID()) } oo = append(oo, o) diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index a2100f36..4fea4b4f 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -29,6 +29,7 @@ type DeletePrm struct { type DeleteRes struct { rawRemoved uint64 availableRemoved uint64 + userRemoved uint64 sizes []uint64 availableSizes []uint64 removedByCnrID map[cid.ID]ObjectCounters @@ -40,6 +41,10 @@ func (d DeleteRes) AvailableObjectsRemoved() uint64 { return d.availableRemoved } +func (d DeleteRes) UserObjectsRemoved() uint64 { + return d.userRemoved +} + // RemovedByCnrID returns the number of removed objects by container ID. func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters { return d.removedByCnrID @@ -132,12 +137,12 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) currEpoch := db.epochState.CurrentEpoch() for i := range addrs { - removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch) + r, err := db.delete(tx, addrs[i], refCounter, currEpoch) if err != nil { return DeleteRes{}, err // maybe log and continue? } - if removed { + if r.Removed { if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { v.Phy++ res.removedByCnrID[addrs[i].Container()] = v @@ -148,10 +153,10 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) } res.rawRemoved++ - res.sizes[i] = size + res.sizes[i] = r.Size } - if available { + if r.Available { if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { v.Logic++ res.removedByCnrID[addrs[i].Container()] = v @@ -162,7 +167,20 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) } res.availableRemoved++ - res.availableSizes[i] = size + res.availableSizes[i] = r.Size + } + + if r.User { + if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { + v.User++ + res.removedByCnrID[addrs[i].Container()] = v + } else { + res.removedByCnrID[addrs[i].Container()] = ObjectCounters{ + User: 1, + } + } + + res.userRemoved++ } } @@ -180,6 +198,13 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) } } + if res.userRemoved > 0 { + err := db.updateShardObjectCounter(tx, user, res.userRemoved, false) + if err != nil { + return DeleteRes{}, fmt.Errorf("could not decrease user object counter: %w", err) + } + } + if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil { return DeleteRes{}, fmt.Errorf("could not decrease container object counter: %w", err) } @@ -196,13 +221,20 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) return res, nil } +type deleteSingleResult struct { + Removed bool + Available bool + User bool + Size uint64 +} + // delete removes object indexes from the metabase. Counts the references // of the object that is being removed. // The first return value indicates if an object has been removed. (removing a // non-exist object is error-free). The second return value indicates if an // object was available before the removal (for calculating the logical object -// counter). The third return value is removed object payload size. -func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, uint64, error) { +// counter). The third return value The fourth return value is removed object payload size. +func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) { key := make([]byte, addressKeySize) addrKey := addressKey(addr, key) garbageBKT := tx.Bucket(garbageBucketName) @@ -214,7 +246,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter if garbageBKT != nil { err := garbageBKT.Delete(addrKey) if err != nil { - return false, false, 0, fmt.Errorf("could not remove from garbage bucket: %w", err) + return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err) } } @@ -224,10 +256,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter var siErr *objectSDK.SplitInfoError if client.IsErrObjectNotFound(err) || errors.As(err, &siErr) { - return false, false, 0, nil + return deleteSingleResult{}, nil } - return false, false, 0, err + return deleteSingleResult{}, err } // if object is an only link to a parent, then remove parent @@ -250,13 +282,20 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter nRef.cur++ } + isUserObject := IsUserObject(obj) + // remove object err = db.deleteObject(tx, obj, false) if err != nil { - return false, false, 0, fmt.Errorf("could not remove object: %w", err) + return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err) } - return true, removeAvailableObject, obj.PayloadSize(), nil + return deleteSingleResult{ + Removed: true, + Available: removeAvailableObject, + User: isUserObject && removeAvailableObject, + Size: obj.PayloadSize(), + }, nil } func (db *DB) deleteObject( diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index 2a9adff3..21db9a62 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -30,14 +30,16 @@ type InhumePrm struct { // DeletionInfo contains details on deleted object. type DeletionInfo struct { - Size uint64 - CID cid.ID + Size uint64 + CID cid.ID + IsUser bool } // InhumeRes encapsulates results of Inhume operation. type InhumeRes struct { deletedLockObj []oid.Address availableInhumed uint64 + userInhumed uint64 inhumedByCnrID map[cid.ID]ObjectCounters deletionDetails []DeletionInfo } @@ -48,6 +50,10 @@ func (i InhumeRes) AvailableInhumed() uint64 { return i.availableInhumed } +func (i InhumeRes) UserInhumed() uint64 { + return i.userInhumed +} + // InhumedByCnrID return number of object // that have been inhumed by container ID. func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters { @@ -75,19 +81,31 @@ func (i InhumeRes) GetDeletionInfoByIndex(target int) DeletionInfo { // StoreDeletionInfo stores size of deleted object and associated container ID // in corresponding arrays. -func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) { +func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, isUser bool) { i.deletionDetails = append(i.deletionDetails, DeletionInfo{ - Size: deletedSize, - CID: containerID, + Size: deletedSize, + CID: containerID, + IsUser: isUser, }) i.availableInhumed++ + if isUser { + i.userInhumed++ + } + if v, ok := i.inhumedByCnrID[containerID]; ok { v.Logic++ + if isUser { + v.User++ + } i.inhumedByCnrID[containerID] = v } else { - i.inhumedByCnrID[containerID] = ObjectCounters{ + v = ObjectCounters{ Logic: 1, } + if isUser { + v.User = 1 + } + i.inhumedByCnrID[containerID] = v } } @@ -247,23 +265,14 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes } func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { - var inhumedCount uint64 - inhumedbyCnr := make(map[cid.ID]ObjectCounters) - for _, dd := range res.deletionDetails { - if v, ok := inhumedbyCnr[dd.CID]; ok { - v.Logic++ - inhumedbyCnr[dd.CID] = v - } else { - inhumedbyCnr[dd.CID] = ObjectCounters{Logic: 1} - } - inhumedCount++ + if err := db.updateShardObjectCounter(tx, logical, res.AvailableInhumed(), false); err != nil { + return err } - - if err := db.updateShardObjectCounter(tx, logical, inhumedCount, false); err != nil { + if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil { return err } - return db.updateContainerCounter(tx, inhumedbyCnr, false) + return db.updateContainerCounter(tx, res.inhumedByCnrID, false) } // getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket. @@ -318,7 +327,7 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error { containerID, _ := obj.ContainerID() if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 { - res.storeDeletionInfo(containerID, obj.PayloadSize()) + res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj)) } // if object is stored, and it is regular object then update bucket diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go index a1e21ef2..7b60b7d5 100644 --- a/pkg/local_object_storage/metabase/iterators.go +++ b/pkg/local_object_storage/metabase/iterators.go @@ -202,9 +202,10 @@ func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]oid.Addres return err } -func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID) error) error { +func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) error) error { var cid cid.ID var oid oid.ID + obj := objectSDK.New() return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { b58CID, postfix := parseContainerIDWithPrefix(&cid, name) @@ -221,8 +222,8 @@ func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID) error) error { } return b.ForEach(func(k, v []byte) error { - if oid.Decode(k) == nil { - return f(cid, oid) + if oid.Decode(k) == nil && obj.Unmarshal(v) == nil { + return f(cid, oid, obj) } return nil diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 2b5c5421..2f510895 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -183,7 +183,7 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o } if !isParent { - if err = db.incCounters(tx, cnr); err != nil { + if err = db.incCounters(tx, cnr, IsUserObject(obj)); err != nil { return err } } diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index 663d781a..0856bda1 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -118,6 +118,7 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error } s.decObjectCounterBy(physical, res.RawObjectsRemoved()) s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) + s.decObjectCounterBy(user, res.UserObjectsRemoved()) s.decContainerObjectCounter(res.RemovedByCnrID()) removedPayload := res.RemovedPhysicalObjectSizes()[0] logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0] diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index e16f8945..0f978f26 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -416,6 +416,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular) s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.decObjectCounterBy(user, res.UserInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 @@ -630,6 +631,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone) s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.decObjectCounterBy(user, res.UserInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 @@ -677,6 +679,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers [] s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock) s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.decObjectCounterBy(user, res.UserInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 diff --git a/pkg/local_object_storage/shard/inhume.go b/pkg/local_object_storage/shard/inhume.go index 335df82f..48cfaa98 100644 --- a/pkg/local_object_storage/shard/inhume.go +++ b/pkg/local_object_storage/shard/inhume.go @@ -122,6 +122,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { s.m.RUnlock() s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.decObjectCounterBy(user, res.UserInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index 138009a7..f59565cb 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -86,12 +86,6 @@ func (m *metricsStore) IncObjectCounter(objectType string) { m.objCounters[objectType] += 1 } -func (m *metricsStore) DecObjectCounter(objectType string) { - m.mtx.Lock() - defer m.mtx.Unlock() - m.AddToObjectCounter(objectType, -1) -} - func (m *metricsStore) SetMode(mode mode.Mode) { m.mtx.Lock() defer m.mtx.Unlock() @@ -192,6 +186,9 @@ func TestCounters(t *testing.T) { v, ok = mm.getContainerCount(contID.EncodeToString(), logical) require.Zero(t, v) require.False(t, ok) + v, ok = mm.getContainerCount(contID.EncodeToString(), user) + require.Zero(t, v) + require.False(t, ok) } }) @@ -207,6 +204,7 @@ func TestCounters(t *testing.T) { expected[cnr] = meta.ObjectCounters{ Logic: 1, Phy: 1, + User: 1, } } @@ -221,6 +219,7 @@ func TestCounters(t *testing.T) { require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical)) require.Equal(t, uint64(objNumber), mm.getObjectCounter(logical)) + require.Equal(t, uint64(objNumber), mm.getObjectCounter(user)) require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload, mm.payloadSize()) @@ -244,6 +243,7 @@ func TestCounters(t *testing.T) { if v, ok := expected[cid]; ok { v.Logic-- + v.User-- if v.IsZero() { delete(expected, cid) } else { @@ -254,6 +254,7 @@ func TestCounters(t *testing.T) { require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical)) require.Equal(t, uint64(objNumber-inhumedNumber), mm.getObjectCounter(logical)) + require.Equal(t, uint64(objNumber-inhumedNumber), mm.getObjectCounter(user)) require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload, mm.payloadSize()) @@ -270,6 +271,7 @@ func TestCounters(t *testing.T) { phy := mm.getObjectCounter(physical) logic := mm.getObjectCounter(logical) + custom := mm.getObjectCounter(user) inhumedNumber := int(phy / 4) prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...) @@ -284,6 +286,7 @@ func TestCounters(t *testing.T) { if v, ok := expected[cid]; ok { v.Logic-- + v.User-- if v.IsZero() { delete(expected, cid) } else { @@ -294,6 +297,7 @@ func TestCounters(t *testing.T) { require.Equal(t, phy, mm.getObjectCounter(physical)) require.Equal(t, logic-uint64(inhumedNumber), mm.getObjectCounter(logical)) + require.Equal(t, custom-uint64(inhumedNumber), mm.getObjectCounter(user)) require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload, mm.payloadSize()) @@ -309,6 +313,7 @@ func TestCounters(t *testing.T) { phy := mm.getObjectCounter(physical) logic := mm.getObjectCounter(logical) + custom := mm.getObjectCounter(user) deletedNumber := int(phy / 4) prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...) @@ -318,6 +323,7 @@ func TestCounters(t *testing.T) { require.Equal(t, phy-uint64(deletedNumber), mm.getObjectCounter(physical)) require.Equal(t, logic-uint64(deletedNumber), mm.getObjectCounter(logical)) + require.Equal(t, custom-uint64(deletedNumber), mm.getObjectCounter(user)) var totalRemovedpayload uint64 for i := range oo[:deletedNumber] { removedPayload := oo[i].PayloadSize() @@ -329,6 +335,7 @@ func TestCounters(t *testing.T) { if v, ok := expected[cnr]; ok { v.Logic-- v.Phy-- + v.User-- if v.IsZero() { delete(expected, cnr) } else { diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index 10983d04..09a98260 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -90,7 +90,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) { return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err) } - s.incObjectCounter(putPrm.Address.Container()) + s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj)) s.addToPayloadSize(int64(prm.obj.PayloadSize())) s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize())) } diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 853a87b9..2c4ce91d 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -72,9 +72,6 @@ type MetricsWriter interface { // IncObjectCounter must increment shard's object counter taking into account // object type. IncObjectCounter(objectType string) - // DecObjectCounter must decrement shard's object counter taking into account - // object type. - DecObjectCounter(objectType string) // SetShardID must set (update) the shard identifier that will be used in // metrics. SetShardID(id string) @@ -453,12 +450,16 @@ func (s *Shard) updateMetrics(ctx context.Context) { // incObjectCounter increment both physical and logical object // counters. -func (s *Shard) incObjectCounter(cnrID cid.ID) { +func (s *Shard) incObjectCounter(cnrID cid.ID, isUser bool) { if s.cfg.metricsWriter != nil { s.cfg.metricsWriter.IncObjectCounter(physical) s.cfg.metricsWriter.IncObjectCounter(logical) s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical) s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical) + if isUser { + s.cfg.metricsWriter.IncObjectCounter(user) + s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), user) + } } }