diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 2e1cc7515..5d7fb598e 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -46,10 +46,6 @@ func (m *metricsWithID) IncObjectCounter(objectType string) { m.mw.AddToObjectCounter(m.id, objectType, +1) } -func (m *metricsWithID) DecObjectCounter(objectType string) { - m.mw.AddToObjectCounter(m.id, objectType, -1) -} - func (m *metricsWithID) SetMode(mode mode.Mode) { m.mw.SetMode(m.id, mode) } diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index d1aafa3db..96d80077d 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" ) @@ -18,6 +19,7 @@ import ( var ( objectPhyCounterKey = []byte("phy_counter") objectLogicCounterKey = []byte("logic_counter") + objectUserCounterKey = []byte("user_counter") ) type objectType uint8 @@ -26,23 +28,19 @@ const ( _ objectType = iota phy logical + user ) // ObjectCounters groups object counter // according to metabase state. type ObjectCounters struct { - logic uint64 - phy uint64 + Logic uint64 + Phy uint64 + User uint64 } -// Logic returns logical object counter. -func (o ObjectCounters) Logic() uint64 { - return o.logic -} - -// Phy returns physical object counter. -func (o ObjectCounters) Phy() uint64 { - return o.phy +func (o ObjectCounters) IsZero() bool { + return o.Phy == 0 && o.Logic == 0 && o.User == 0 } // ObjectCounters returns object counters that metabase has @@ -63,12 +61,17 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) { if b != nil { data := b.Get(objectPhyCounterKey) if len(data) == 8 { - cc.phy = binary.LittleEndian.Uint64(data) + cc.Phy = binary.LittleEndian.Uint64(data) } data = b.Get(objectLogicCounterKey) if len(data) == 8 { - cc.logic = binary.LittleEndian.Uint64(data) + cc.Logic = binary.LittleEndian.Uint64(data) + } + + data = b.Get(objectUserCounterKey) + if len(data) == 8 { + cc.User = binary.LittleEndian.Uint64(data) } } @@ -79,8 +82,7 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) { } type ContainerCounters struct { - Logical map[cid.ID]uint64 - Physical map[cid.ID]uint64 + Counts map[cid.ID]ObjectCounters } // ContainerCounters returns object counters for each container @@ -103,8 +105,7 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) defer span.End() cc := ContainerCounters{ - Logical: make(map[cid.ID]uint64), - Physical: make(map[cid.ID]uint64), + Counts: make(map[cid.ID]ObjectCounters), } lastKey := make([]byte, cidSize) @@ -158,16 +159,11 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) if err != nil { return err } - phy, logic, err := parseContainerCounterValue(value) + ent, err := parseContainerCounterValue(value) if err != nil { return err } - if phy > 0 { - cc.Physical[cnrID] = phy - } - if logic > 0 { - cc.Logical[cnrID] = logic - } + cc.Counts[cnrID] = ent counter++ if counter == batchSize { @@ -189,14 +185,19 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) return false, nil } -func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID) error { +func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil { return fmt.Errorf("could not increase phy object counter: %w", err) } if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil { return fmt.Errorf("could not increase logical object counter: %w", err) } - return db.incContainerObjectCounter(tx, cnrID) + if isUserObject { + if err := db.updateShardObjectCounter(tx, user, 1, true); err != nil { + return fmt.Errorf("could not increase user object counter: %w", err) + } + } + return db.incContainerObjectCounter(tx, cnrID, isUserObject) } func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error { @@ -213,6 +214,8 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6 counterKey = objectPhyCounterKey case logical: counterKey = objectLogicCounterKey + case user: + counterKey = objectUserCounterKey default: panic("unknown object type counter") } @@ -236,7 +239,7 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6 return b.Put(counterKey, newCounter) } -func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { +func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { // TODO #838 b := tx.Bucket(containerCounterBucketName) if b == nil { return nil @@ -253,22 +256,23 @@ func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounte } func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error { - var phyValue, logicValue uint64 + var entity ObjectCounters var err error data := b.Get(key) if len(data) > 0 { - phyValue, logicValue, err = parseContainerCounterValue(data) + entity, err = parseContainerCounterValue(data) if err != nil { return err } } - phyValue = nextValue(phyValue, delta.phy, inc) - logicValue = nextValue(logicValue, delta.logic, inc) - if phyValue > 0 || logicValue > 0 { - value := containerCounterValue(phyValue, logicValue) - return b.Put(key, value) + entity.Phy = nextValue(entity.Phy, delta.Phy, inc) + entity.Logic = nextValue(entity.Logic, delta.Logic, inc) + entity.User = nextValue(entity.User, delta.User, inc) + if entity.IsZero() { + return b.Delete(key) } - return b.Delete(key) + value := containerCounterValue(entity) + return b.Put(key, value) } func nextValue(existed, delta uint64, inc bool) uint64 { @@ -282,7 +286,7 @@ func nextValue(existed, delta uint64, inc bool) uint64 { return existed } -func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error { +func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { b := tx.Bucket(containerCounterBucketName) if b == nil { return nil @@ -290,7 +294,11 @@ func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error { key := make([]byte, cidSize) cnrID.Encode(key) - return db.editContainerCounterValue(b, key, ObjectCounters{logic: 1, phy: 1}, true) + c := ObjectCounters{Logic: 1, Phy: 1} + if isUserObject { + c.User = 1 + } + return db.editContainerCounterValue(b, key, c, true) } // syncCounter updates object counters according to metabase state: @@ -304,9 +312,11 @@ func syncCounter(tx *bbolt.Tx, force bool) error { if err != nil { return fmt.Errorf("could not get shard info bucket: %w", err) } - shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 && len(shardInfoB.Get(objectLogicCounterKey)) == 8 - containerCounterInitialized := tx.Bucket(containerCounterBucketName) != nil - if !force && shardObjectCounterInitialized && containerCounterInitialized { + shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 && + len(shardInfoB.Get(objectLogicCounterKey)) == 8 && + len(shardInfoB.Get(objectUserCounterKey)) == 8 + containerObjectCounterInitialized := containerObjectCounterInitialized(tx) + if !force && shardObjectCounterInitialized && containerObjectCounterInitialized { // the counters are already inited return nil } @@ -322,29 +332,43 @@ func syncCounter(tx *bbolt.Tx, force bool) error { graveyardBKT := tx.Bucket(graveyardBucketName) garbageBKT := tx.Bucket(garbageBucketName) key := make([]byte, addressKeySize) + var isAvailable bool - err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error { + err = iteratePhyObjects(tx, func(cnr cid.ID, objID oid.ID, obj *objectSDK.Object) error { if v, ok := counters[cnr]; ok { - v.phy++ + v.Phy++ counters[cnr] = v } else { counters[cnr] = ObjectCounters{ - phy: 1, + Phy: 1, } } addr.SetContainer(cnr) - addr.SetObject(obj) + addr.SetObject(objID) + isAvailable = false // check if an object is available: not with GCMark // and not covered with a tombstone if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 { if v, ok := counters[cnr]; ok { - v.logic++ + v.Logic++ counters[cnr] = v } else { counters[cnr] = ObjectCounters{ - logic: 1, + Logic: 1, + } + } + isAvailable = true + } + + if isAvailable && IsUserObject(obj) { + if v, ok := counters[cnr]; ok { + v.User++ + counters[cnr] = v + } else { + counters[cnr] = ObjectCounters{ + User: 1, } } } @@ -361,13 +385,15 @@ func syncCounter(tx *bbolt.Tx, force bool) error { func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error { var phyTotal uint64 var logicTotal uint64 + var userTotal uint64 key := make([]byte, cidSize) for cnrID, count := range counters { - phyTotal += count.phy - logicTotal += count.logic + phyTotal += count.Phy + logicTotal += count.Logic + userTotal += count.User cnrID.Encode(key) - value := containerCounterValue(count.phy, count.logic) + value := containerCounterValue(count) err := containerCounterB.Put(key, value) if err != nil { return fmt.Errorf("could not update phy container object counter: %w", err) @@ -389,13 +415,22 @@ func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, container return fmt.Errorf("could not update logic object counter: %w", err) } + userData := make([]byte, 8) + binary.LittleEndian.PutUint64(userData, userTotal) + + err = shardInfoB.Put(objectUserCounterKey, userData) + if err != nil { + return fmt.Errorf("could not update user object counter: %w", err) + } + return nil } -func containerCounterValue(phy, logic uint64) []byte { - res := make([]byte, 16) - binary.LittleEndian.PutUint64(res, phy) - binary.LittleEndian.PutUint64(res[8:], logic) +func containerCounterValue(entity ObjectCounters) []byte { + res := make([]byte, 24) + binary.LittleEndian.PutUint64(res, entity.Phy) + binary.LittleEndian.PutUint64(res[8:], entity.Logic) + binary.LittleEndian.PutUint64(res[16:], entity.User) return res } @@ -411,9 +446,37 @@ func parseContainerCounterKey(buf []byte) (cid.ID, error) { } // parseContainerCounterValue return phy, logic values. -func parseContainerCounterValue(buf []byte) (uint64, uint64, error) { - if len(buf) != 16 { - return 0, 0, fmt.Errorf("invalid value length") +func parseContainerCounterValue(buf []byte) (ObjectCounters, error) { + if len(buf) != 24 { + return ObjectCounters{}, fmt.Errorf("invalid value length") } - return binary.LittleEndian.Uint64(buf), binary.LittleEndian.Uint64(buf[8:]), nil + return ObjectCounters{ + Phy: binary.LittleEndian.Uint64(buf), + Logic: binary.LittleEndian.Uint64(buf[8:16]), + User: binary.LittleEndian.Uint64(buf[16:]), + }, nil +} + +func containerObjectCounterInitialized(tx *bbolt.Tx) bool { + b := tx.Bucket(containerCounterBucketName) + if b == nil { + return false + } + k, v := b.Cursor().First() + if k == nil && v == nil { + return true + } + _, err := parseContainerCounterKey(k) + if err != nil { + return false + } + _, err = parseContainerCounterValue(v) + return err == nil +} + +func IsUserObject(obj *objectSDK.Object) bool { + _, hasParentID := obj.ParentID() + return obj.Type() == objectSDK.TypeRegular && + (obj.SplitID() == nil || + (hasParentID && len(obj.Children()) == 0)) } diff --git a/pkg/local_object_storage/metabase/counter_test.go b/pkg/local_object_storage/metabase/counter_test.go index 51062f1fe..9241c97a0 100644 --- a/pkg/local_object_storage/metabase/counter_test.go +++ b/pkg/local_object_storage/metabase/counter_test.go @@ -24,13 +24,13 @@ func TestCounters(t *testing.T) { db := newDB(t) c, err := db.ObjectCounters() require.NoError(t, err) - require.Zero(t, c.Phy()) - require.Zero(t, c.Logic()) + require.Zero(t, c.Phy) + require.Zero(t, c.Logic) + require.Zero(t, c.User) cc, err := db.ContainerCounters(context.Background()) require.NoError(t, err) - require.Zero(t, len(cc.Physical)) - require.Zero(t, len(cc.Logical)) + require.Zero(t, len(cc.Counts)) }) t.Run("put", func(t *testing.T) { @@ -42,29 +42,31 @@ func TestCounters(t *testing.T) { } var prm meta.PutPrm - expPhy := make(map[cid.ID]uint64) - expLog := make(map[cid.ID]uint64) + exp := make(map[cid.ID]meta.ObjectCounters) for i := 0; i < objCount; i++ { prm.SetObject(oo[i]) cnrID, _ := oo[i].ContainerID() - expPhy[cnrID]++ - expLog[cnrID]++ + c := meta.ObjectCounters{} + exp[cnrID] = meta.ObjectCounters{ + Logic: 1, + Phy: 1, + User: 1, + } _, err := db.Put(context.Background(), prm) require.NoError(t, err) - c, err := db.ObjectCounters() + c, err = db.ObjectCounters() require.NoError(t, err) - require.Equal(t, uint64(i+1), c.Phy()) - require.Equal(t, uint64(i+1), c.Logic()) + require.Equal(t, uint64(i+1), c.Phy) + require.Equal(t, uint64(i+1), c.Logic) cc, err := db.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expPhy, cc.Physical) - require.Equal(t, expLog, cc.Logical) + require.Equal(t, meta.ContainerCounters{Counts: exp}, cc) } }) @@ -73,12 +75,14 @@ func TestCounters(t *testing.T) { db := newDB(t) oo := putObjs(t, db, objCount, false) - expPhy := make(map[cid.ID]uint64) - expLog := make(map[cid.ID]uint64) + exp := make(map[cid.ID]meta.ObjectCounters) for _, obj := range oo { cnrID, _ := obj.ContainerID() - expPhy[cnrID]++ - expLog[cnrID]++ + exp[cnrID] = meta.ObjectCounters{ + Logic: 1, + Phy: 1, + User: 1, + } } var prm meta.DeletePrm @@ -92,29 +96,25 @@ func TestCounters(t *testing.T) { c, err := db.ObjectCounters() require.NoError(t, err) - require.Equal(t, uint64(i), c.Phy()) - require.Equal(t, uint64(i), c.Logic()) + require.Equal(t, uint64(i), c.Phy) + require.Equal(t, uint64(i), c.Logic) + require.Equal(t, uint64(i), c.User) cnrID, _ := oo[i].ContainerID() - if v, ok := expPhy[cnrID]; ok { - if v == 1 { - delete(expPhy, cnrID) + if v, ok := exp[cnrID]; ok { + v.Phy-- + v.Logic-- + v.User-- + if v.IsZero() { + delete(exp, cnrID) } else { - expPhy[cnrID]-- - } - } - if v, ok := expLog[cnrID]; ok { - if v == 1 { - delete(expLog, cnrID) - } else { - expLog[cnrID]-- + exp[cnrID] = v } } cc, err := db.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expPhy, cc.Physical) - require.Equal(t, expLog, cc.Logical) + require.Equal(t, meta.ContainerCounters{Counts: exp}, cc) } }) @@ -123,12 +123,14 @@ func TestCounters(t *testing.T) { db := newDB(t) oo := putObjs(t, db, objCount, false) - expPhy := make(map[cid.ID]uint64) - expLog := make(map[cid.ID]uint64) + exp := make(map[cid.ID]meta.ObjectCounters) for _, obj := range oo { cnrID, _ := obj.ContainerID() - expPhy[cnrID]++ - expLog[cnrID]++ + exp[cnrID] = meta.ObjectCounters{ + Logic: 1, + Phy: 1, + User: 1, + } } inhumedObjs := make([]oid.Address, objCount/2) @@ -142,11 +144,13 @@ func TestCounters(t *testing.T) { } for _, addr := range inhumedObjs { - if v, ok := expLog[addr.Container()]; ok { - if v == 1 { - delete(expLog, addr.Container()) + if v, ok := exp[addr.Container()]; ok { + v.Logic-- + v.User-- + if v.IsZero() { + delete(exp, addr.Container()) } else { - expLog[addr.Container()]-- + exp[addr.Container()] = v } } } @@ -158,18 +162,19 @@ func TestCounters(t *testing.T) { res, err := db.Inhume(context.Background(), prm) require.NoError(t, err) require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed()) + require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed()) c, err := db.ObjectCounters() require.NoError(t, err) - require.Equal(t, uint64(objCount), c.Phy()) - require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic()) + require.Equal(t, uint64(objCount), c.Phy) + require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic) + require.Equal(t, uint64(objCount-len(inhumedObjs)), c.User) cc, err := db.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expPhy, cc.Physical) - require.Equal(t, expLog, cc.Logical) + require.Equal(t, meta.ContainerCounters{Counts: exp}, cc) }) t.Run("put_split", func(t *testing.T) { @@ -177,8 +182,7 @@ func TestCounters(t *testing.T) { db := newDB(t) parObj := testutil.GenerateObject() - expPhy := make(map[cid.ID]uint64) - expLog := make(map[cid.ID]uint64) + exp := make(map[cid.ID]meta.ObjectCounters) // put objects and check that parent info // does not affect the counter @@ -186,23 +190,27 @@ func TestCounters(t *testing.T) { o := testutil.GenerateObject() if i < objCount/2 { // half of the objs will have the parent o.SetParent(parObj) + o.SetSplitID(objectSDK.NewSplitID()) } cnrID, _ := o.ContainerID() - expLog[cnrID]++ - expPhy[cnrID]++ + exp[cnrID] = meta.ObjectCounters{ + Logic: 1, + Phy: 1, + User: 1, + } require.NoError(t, putBig(db, o)) c, err := db.ObjectCounters() require.NoError(t, err) - require.Equal(t, uint64(i+1), c.Phy()) - require.Equal(t, uint64(i+1), c.Logic()) + require.Equal(t, uint64(i+1), c.Phy) + require.Equal(t, uint64(i+1), c.Logic) + require.Equal(t, uint64(i+1), c.User) cc, err := db.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expPhy, cc.Physical) - require.Equal(t, expLog, cc.Logical) + require.Equal(t, meta.ContainerCounters{Counts: exp}, cc) } }) @@ -211,12 +219,14 @@ func TestCounters(t *testing.T) { db := newDB(t) oo := putObjs(t, db, objCount, true) - expPhy := make(map[cid.ID]uint64) - expLog := make(map[cid.ID]uint64) + exp := make(map[cid.ID]meta.ObjectCounters) for _, obj := range oo { cnrID, _ := obj.ContainerID() - expPhy[cnrID]++ - expLog[cnrID]++ + exp[cnrID] = meta.ObjectCounters{ + Logic: 1, + Phy: 1, + User: 1, + } } // delete objects that have parent info @@ -228,21 +238,18 @@ func TestCounters(t *testing.T) { c, err := db.ObjectCounters() require.NoError(t, err) - require.Equal(t, uint64(objCount-i-1), c.Phy()) - require.Equal(t, uint64(objCount-i-1), c.Logic()) + require.Equal(t, uint64(objCount-i-1), c.Phy) + require.Equal(t, uint64(objCount-i-1), c.Logic) + require.Equal(t, uint64(objCount-i-1), c.User) - if v, ok := expPhy[addr.Container()]; ok { - if v == 1 { - delete(expPhy, addr.Container()) + if v, ok := exp[addr.Container()]; ok { + v.Logic-- + v.Phy-- + v.User-- + if v.IsZero() { + delete(exp, addr.Container()) } else { - expPhy[addr.Container()]-- - } - } - if v, ok := expLog[addr.Container()]; ok { - if v == 1 { - delete(expLog, addr.Container()) - } else { - expLog[addr.Container()]-- + exp[addr.Container()] = v } } } @@ -253,12 +260,14 @@ func TestCounters(t *testing.T) { db := newDB(t) oo := putObjs(t, db, objCount, true) - expPhy := make(map[cid.ID]uint64) - expLog := make(map[cid.ID]uint64) + exp := make(map[cid.ID]meta.ObjectCounters) for _, obj := range oo { cnrID, _ := obj.ContainerID() - expPhy[cnrID]++ - expLog[cnrID]++ + exp[cnrID] = meta.ObjectCounters{ + Logic: 1, + Phy: 1, + User: 1, + } } inhumedObjs := make([]oid.Address, objCount/2) @@ -272,11 +281,13 @@ func TestCounters(t *testing.T) { } for _, addr := range inhumedObjs { - if v, ok := expLog[addr.Container()]; ok { - if v == 1 { - delete(expLog, addr.Container()) + if v, ok := exp[addr.Container()]; ok { + v.Logic-- + v.User-- + if v.IsZero() { + delete(exp, addr.Container()) } else { - expLog[addr.Container()]-- + exp[addr.Container()] = v } } } @@ -291,14 +302,14 @@ func TestCounters(t *testing.T) { c, err := db.ObjectCounters() require.NoError(t, err) - require.Equal(t, uint64(objCount), c.Phy()) - require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic()) + require.Equal(t, uint64(objCount), c.Phy) + require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic) + require.Equal(t, uint64(objCount-len(inhumedObjs)), c.User) cc, err := db.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expPhy, cc.Physical) - require.Equal(t, expLog, cc.Logical) + require.Equal(t, meta.ContainerCounters{Counts: exp}, cc) }) } @@ -320,25 +331,27 @@ func TestCounters_Expired(t *testing.T) { oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1) } - expPhy := make(map[cid.ID]uint64) - expLog := make(map[cid.ID]uint64) + exp := make(map[cid.ID]meta.ObjectCounters) for _, addr := range oo { - expPhy[addr.Container()]++ - expLog[addr.Container()]++ + exp[addr.Container()] = meta.ObjectCounters{ + Logic: 1, + Phy: 1, + User: 1, + } } // 1. objects are available and counters are correct c, err := db.ObjectCounters() require.NoError(t, err) - require.Equal(t, uint64(objCount), c.Phy()) - require.Equal(t, uint64(objCount), c.Logic()) + require.Equal(t, uint64(objCount), c.Phy) + require.Equal(t, uint64(objCount), c.Logic) + require.Equal(t, uint64(objCount), c.User) cc, err := db.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expPhy, cc.Physical) - require.Equal(t, expLog, cc.Logical) + require.Equal(t, meta.ContainerCounters{Counts: exp}, cc) for _, o := range oo { _, err := metaGet(db, o, true) @@ -352,14 +365,14 @@ func TestCounters_Expired(t *testing.T) { c, err = db.ObjectCounters() require.NoError(t, err) - require.Equal(t, uint64(objCount), c.Phy()) - require.Equal(t, uint64(objCount), c.Logic()) + require.Equal(t, uint64(objCount), c.Phy) + require.Equal(t, uint64(objCount), c.Logic) + require.Equal(t, uint64(objCount), c.User) cc, err = db.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expPhy, cc.Physical) - require.Equal(t, expLog, cc.Logical) + require.Equal(t, meta.ContainerCounters{Counts: exp}, cc) for _, o := range oo { _, err := metaGet(db, o, true) @@ -377,26 +390,29 @@ func TestCounters_Expired(t *testing.T) { inhumeRes, err := db.Inhume(context.Background(), inhumePrm) require.NoError(t, err) require.Equal(t, uint64(1), inhumeRes.AvailableInhumed()) + require.Equal(t, uint64(1), inhumeRes.UserInhumed()) c, err = db.ObjectCounters() require.NoError(t, err) - require.Equal(t, uint64(len(oo)), c.Phy()) - require.Equal(t, uint64(len(oo)-1), c.Logic()) + require.Equal(t, uint64(len(oo)), c.Phy) + require.Equal(t, uint64(len(oo)-1), c.Logic) + require.Equal(t, uint64(len(oo)-1), c.User) - if v, ok := expLog[oo[0].Container()]; ok { - if v == 1 { - delete(expLog, oo[0].Container()) + if v, ok := exp[oo[0].Container()]; ok { + v.Logic-- + v.User-- + if v.IsZero() { + delete(exp, oo[0].Container()) } else { - expLog[oo[0].Container()]-- + exp[oo[0].Container()] = v } } cc, err = db.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expPhy, cc.Physical) - require.Equal(t, expLog, cc.Logical) + require.Equal(t, meta.ContainerCounters{Counts: exp}, cc) // 4. `Delete` an object with GCMark should decrease the // phy counter but does not affect the logic counter (after @@ -408,12 +424,14 @@ func TestCounters_Expired(t *testing.T) { deleteRes, err := db.Delete(context.Background(), deletePrm) require.NoError(t, err) require.Zero(t, deleteRes.AvailableObjectsRemoved()) + require.Zero(t, deleteRes.UserObjectsRemoved()) - if v, ok := expPhy[oo[0].Container()]; ok { - if v == 1 { - delete(expPhy, oo[0].Container()) + if v, ok := exp[oo[0].Container()]; ok { + v.Phy-- + if v.IsZero() { + delete(exp, oo[0].Container()) } else { - expPhy[oo[0].Container()]-- + exp[oo[0].Container()] = v } } @@ -421,14 +439,14 @@ func TestCounters_Expired(t *testing.T) { c, err = db.ObjectCounters() require.NoError(t, err) - require.Equal(t, uint64(len(oo)), c.Phy()) - require.Equal(t, uint64(len(oo)), c.Logic()) + require.Equal(t, uint64(len(oo)), c.Phy) + require.Equal(t, uint64(len(oo)), c.Logic) + require.Equal(t, uint64(len(oo)), c.User) cc, err = db.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expPhy, cc.Physical) - require.Equal(t, expLog, cc.Logical) + require.Equal(t, meta.ContainerCounters{Counts: exp}, cc) // 5 `Delete` an expired object (like it would the control // service do) should decrease both counters despite the @@ -439,20 +457,16 @@ func TestCounters_Expired(t *testing.T) { deleteRes, err = db.Delete(context.Background(), deletePrm) require.NoError(t, err) require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved()) + require.Equal(t, uint64(1), deleteRes.UserObjectsRemoved()) - if v, ok := expLog[oo[0].Container()]; ok { - if v == 1 { - delete(expLog, oo[0].Container()) + if v, ok := exp[oo[0].Container()]; ok { + v.Phy-- + v.Logic-- + v.User-- + if v.IsZero() { + delete(exp, oo[0].Container()) } else { - expLog[oo[0].Container()]-- - } - } - - if v, ok := expPhy[oo[0].Container()]; ok { - if v == 1 { - delete(expPhy, oo[0].Container()) - } else { - expPhy[oo[0].Container()]-- + exp[oo[0].Container()] = v } } @@ -460,14 +474,14 @@ func TestCounters_Expired(t *testing.T) { c, err = db.ObjectCounters() require.NoError(t, err) - require.Equal(t, uint64(len(oo)), c.Phy()) - require.Equal(t, uint64(len(oo)), c.Logic()) + require.Equal(t, uint64(len(oo)), c.Phy) + require.Equal(t, uint64(len(oo)), c.Logic) + require.Equal(t, uint64(len(oo)), c.User) cc, err = db.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expPhy, cc.Physical) - require.Equal(t, expLog, cc.Logical) + require.Equal(t, meta.ContainerCounters{Counts: exp}, cc) } func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object { @@ -480,6 +494,7 @@ func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK o := testutil.GenerateObject() if withParent { o.SetParent(parent) + o.SetSplitID(objectSDK.NewSplitID()) } oo = append(oo, o) @@ -491,8 +506,8 @@ func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK c, err := db.ObjectCounters() require.NoError(t, err) - require.Equal(t, uint64(i+1), c.Phy()) - require.Equal(t, uint64(i+1), c.Logic()) + require.Equal(t, uint64(i+1), c.Phy) + require.Equal(t, uint64(i+1), c.Logic) } return oo diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 5acd71164..6a5661ed1 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -29,6 +29,7 @@ type DeletePrm struct { type DeleteRes struct { rawRemoved uint64 availableRemoved uint64 + userRemoved uint64 sizes []uint64 availableSizes []uint64 removedByCnrID map[cid.ID]ObjectCounters @@ -40,6 +41,10 @@ func (d DeleteRes) AvailableObjectsRemoved() uint64 { return d.availableRemoved } +func (d DeleteRes) UserObjectsRemoved() uint64 { + return d.userRemoved +} + // RemovedByCnrID returns the number of removed objects by container ID. func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters { return d.removedByCnrID @@ -132,63 +137,23 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) currEpoch := db.epochState.CurrentEpoch() for i := range addrs { - removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch) + r, err := db.delete(tx, addrs[i], refCounter, currEpoch) if err != nil { - return DeleteRes{}, err // maybe log and continue? + return DeleteRes{}, err } - if removed { - if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { - v.phy++ - res.removedByCnrID[addrs[i].Container()] = v - } else { - res.removedByCnrID[addrs[i].Container()] = ObjectCounters{ - phy: 1, - } - } - - res.rawRemoved++ - res.sizes[i] = size - } - - if available { - if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { - v.logic++ - res.removedByCnrID[addrs[i].Container()] = v - } else { - res.removedByCnrID[addrs[i].Container()] = ObjectCounters{ - logic: 1, - } - } - - res.availableRemoved++ - res.availableSizes[i] = size - } + applyDeleteSingleResult(r, &res, addrs, i) } - if res.rawRemoved > 0 { - err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false) - if err != nil { - return DeleteRes{}, fmt.Errorf("could not decrease phy object counter: %w", err) - } - } - - if res.availableRemoved > 0 { - err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false) - if err != nil { - return DeleteRes{}, fmt.Errorf("could not decrease logical object counter: %w", err) - } - } - - if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil { - return DeleteRes{}, fmt.Errorf("could not decrease container object counter: %w", err) + if err := db.updateCountersDelete(tx, res); err != nil { + return DeleteRes{}, err } for _, refNum := range refCounter { if refNum.cur == refNum.all { err := db.deleteObject(tx, refNum.obj, true) if err != nil { - return DeleteRes{}, err // maybe log and continue? + return DeleteRes{}, err } } } @@ -196,13 +161,91 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) return res, nil } +func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error { + if res.rawRemoved > 0 { + err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false) + if err != nil { + return fmt.Errorf("could not decrease phy object counter: %w", err) + } + } + + if res.availableRemoved > 0 { + err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false) + if err != nil { + return fmt.Errorf("could not decrease logical object counter: %w", err) + } + } + + if res.userRemoved > 0 { + err := db.updateShardObjectCounter(tx, user, res.userRemoved, false) + if err != nil { + return fmt.Errorf("could not decrease user object counter: %w", err) + } + } + + if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil { + return fmt.Errorf("could not decrease container object counter: %w", err) + } + return nil +} + +func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.Address, i int) { + if r.Removed { + if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { + v.Phy++ + res.removedByCnrID[addrs[i].Container()] = v + } else { + res.removedByCnrID[addrs[i].Container()] = ObjectCounters{ + Phy: 1, + } + } + + res.rawRemoved++ + res.sizes[i] = r.Size + } + + if r.Available { + if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { + v.Logic++ + res.removedByCnrID[addrs[i].Container()] = v + } else { + res.removedByCnrID[addrs[i].Container()] = ObjectCounters{ + Logic: 1, + } + } + + res.availableRemoved++ + res.availableSizes[i] = r.Size + } + + if r.User { + if v, ok := res.removedByCnrID[addrs[i].Container()]; ok { + v.User++ + res.removedByCnrID[addrs[i].Container()] = v + } else { + res.removedByCnrID[addrs[i].Container()] = ObjectCounters{ + User: 1, + } + } + + res.userRemoved++ + } +} + +type deleteSingleResult struct { + Removed bool + Available bool + User bool + Size uint64 +} + // delete removes object indexes from the metabase. Counts the references // of the object that is being removed. // The first return value indicates if an object has been removed. (removing a // non-exist object is error-free). The second return value indicates if an // object was available before the removal (for calculating the logical object -// counter). The third return value is removed object payload size. -func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, uint64, error) { +// counter). The third return value The fourth return value is removed object payload size. +func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) { key := make([]byte, addressKeySize) addrKey := addressKey(addr, key) garbageBKT := tx.Bucket(garbageBucketName) @@ -214,7 +257,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter if garbageBKT != nil { err := garbageBKT.Delete(addrKey) if err != nil { - return false, false, 0, fmt.Errorf("could not remove from garbage bucket: %w", err) + return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err) } } @@ -224,10 +267,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter var siErr *objectSDK.SplitInfoError if client.IsErrObjectNotFound(err) || errors.As(err, &siErr) { - return false, false, 0, nil + return deleteSingleResult{}, nil } - return false, false, 0, err + return deleteSingleResult{}, err } // if object is an only link to a parent, then remove parent @@ -250,13 +293,20 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter nRef.cur++ } + isUserObject := IsUserObject(obj) + // remove object err = db.deleteObject(tx, obj, false) if err != nil { - return false, false, 0, fmt.Errorf("could not remove object: %w", err) + return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err) } - return true, removeAvailableObject, obj.PayloadSize(), nil + return deleteSingleResult{ + Removed: true, + Available: removeAvailableObject, + User: isUserObject && removeAvailableObject, + Size: obj.PayloadSize(), + }, nil } func (db *DB) deleteObject( diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index cf7921992..21db9a629 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -30,14 +30,16 @@ type InhumePrm struct { // DeletionInfo contains details on deleted object. type DeletionInfo struct { - Size uint64 - CID cid.ID + Size uint64 + CID cid.ID + IsUser bool } // InhumeRes encapsulates results of Inhume operation. type InhumeRes struct { deletedLockObj []oid.Address availableInhumed uint64 + userInhumed uint64 inhumedByCnrID map[cid.ID]ObjectCounters deletionDetails []DeletionInfo } @@ -48,6 +50,10 @@ func (i InhumeRes) AvailableInhumed() uint64 { return i.availableInhumed } +func (i InhumeRes) UserInhumed() uint64 { + return i.userInhumed +} + // InhumedByCnrID return number of object // that have been inhumed by container ID. func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters { @@ -75,19 +81,31 @@ func (i InhumeRes) GetDeletionInfoByIndex(target int) DeletionInfo { // StoreDeletionInfo stores size of deleted object and associated container ID // in corresponding arrays. -func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) { +func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, isUser bool) { i.deletionDetails = append(i.deletionDetails, DeletionInfo{ - Size: deletedSize, - CID: containerID, + Size: deletedSize, + CID: containerID, + IsUser: isUser, }) i.availableInhumed++ + if isUser { + i.userInhumed++ + } + if v, ok := i.inhumedByCnrID[containerID]; ok { - v.logic++ + v.Logic++ + if isUser { + v.User++ + } i.inhumedByCnrID[containerID] = v } else { - i.inhumedByCnrID[containerID] = ObjectCounters{ - logic: 1, + v = ObjectCounters{ + Logic: 1, } + if isUser { + v.User = 1 + } + i.inhumedByCnrID[containerID] = v } } @@ -247,23 +265,14 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes } func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { - var inhumedCount uint64 - inhumedbyCnr := make(map[cid.ID]ObjectCounters) - for _, dd := range res.deletionDetails { - if v, ok := inhumedbyCnr[dd.CID]; ok { - v.logic++ - inhumedbyCnr[dd.CID] = v - } else { - inhumedbyCnr[dd.CID] = ObjectCounters{logic: 1} - } - inhumedCount++ + if err := db.updateShardObjectCounter(tx, logical, res.AvailableInhumed(), false); err != nil { + return err } - - if err := db.updateShardObjectCounter(tx, logical, inhumedCount, false); err != nil { + if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil { return err } - return db.updateContainerCounter(tx, inhumedbyCnr, false) + return db.updateContainerCounter(tx, res.inhumedByCnrID, false) } // getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket. @@ -318,7 +327,7 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error { containerID, _ := obj.ContainerID() if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 { - res.storeDeletionInfo(containerID, obj.PayloadSize()) + res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj)) } // if object is stored, and it is regular object then update bucket diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go index a1e21ef25..7b60b7d50 100644 --- a/pkg/local_object_storage/metabase/iterators.go +++ b/pkg/local_object_storage/metabase/iterators.go @@ -202,9 +202,10 @@ func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]oid.Addres return err } -func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID) error) error { +func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) error) error { var cid cid.ID var oid oid.ID + obj := objectSDK.New() return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { b58CID, postfix := parseContainerIDWithPrefix(&cid, name) @@ -221,8 +222,8 @@ func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID) error) error { } return b.ForEach(func(k, v []byte) error { - if oid.Decode(k) == nil { - return f(cid, oid) + if oid.Decode(k) == nil && obj.Unmarshal(v) == nil { + return f(cid, oid, obj) } return nil diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 2b5c5421d..2f5108953 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -183,7 +183,7 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o } if !isParent { - if err = db.incCounters(tx, cnr); err != nil { + if err = db.incCounters(tx, cnr, IsUserObject(obj)); err != nil { return err } } diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index 749229cc3..f08af6fc7 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -347,8 +347,8 @@ func TestRefillMetabase(t *testing.T) { c, err := sh.metaBase.ObjectCounters() require.NoError(t, err) - phyBefore := c.Phy() - logicalBefore := c.Logic() + phyBefore := c.Phy + logicalBefore := c.Logic err = sh.Close() require.NoError(t, err) @@ -382,8 +382,8 @@ func TestRefillMetabase(t *testing.T) { c, err = sh.metaBase.ObjectCounters() require.NoError(t, err) - require.Equal(t, phyBefore, c.Phy()) - require.Equal(t, logicalBefore, c.Logic()) + require.Equal(t, phyBefore, c.Phy) + require.Equal(t, logicalBefore, c.Logic) checkAllObjs(true) checkObj(object.AddressOf(tombObj), tombObj) diff --git a/pkg/local_object_storage/shard/count.go b/pkg/local_object_storage/shard/count.go index abed5278e..b3bc6a30b 100644 --- a/pkg/local_object_storage/shard/count.go +++ b/pkg/local_object_storage/shard/count.go @@ -27,5 +27,5 @@ func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) { if err != nil { return 0, err } - return cc.Logic(), nil + return cc.Logic, nil } diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index 663d781ad..0856bda10 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -118,6 +118,7 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error } s.decObjectCounterBy(physical, res.RawObjectsRemoved()) s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) + s.decObjectCounterBy(user, res.UserObjectsRemoved()) s.decContainerObjectCounter(res.RemovedByCnrID()) removedPayload := res.RemovedPhysicalObjectSizes()[0] logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0] diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index e16f89457..0f978f26e 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -416,6 +416,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular) s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.decObjectCounterBy(user, res.UserInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 @@ -630,6 +631,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone) s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.decObjectCounterBy(user, res.UserInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 @@ -677,6 +679,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers [] s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock) s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.decObjectCounterBy(user, res.UserInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 diff --git a/pkg/local_object_storage/shard/inhume.go b/pkg/local_object_storage/shard/inhume.go index 335df82f0..48cfaa98e 100644 --- a/pkg/local_object_storage/shard/inhume.go +++ b/pkg/local_object_storage/shard/inhume.go @@ -122,6 +122,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { s.m.RUnlock() s.decObjectCounterBy(logical, res.AvailableInhumed()) + s.decObjectCounterBy(user, res.UserInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index 1e9bbf9fc..f59565cb3 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -86,12 +86,6 @@ func (m *metricsStore) IncObjectCounter(objectType string) { m.objCounters[objectType] += 1 } -func (m *metricsStore) DecObjectCounter(objectType string) { - m.mtx.Lock() - defer m.mtx.Unlock() - m.AddToObjectCounter(objectType, -1) -} - func (m *metricsStore) SetMode(mode mode.Mode) { m.mtx.Lock() defer m.mtx.Unlock() @@ -178,8 +172,6 @@ func TestCounters(t *testing.T) { oo[i] = testutil.GenerateObject() } - cc := meta.ContainerCounters{Logical: make(map[cid.ID]uint64), Physical: make(map[cid.ID]uint64)} - t.Run("defaults", func(t *testing.T) { require.Zero(t, mm.getObjectCounter(physical)) require.Zero(t, mm.getObjectCounter(logical)) @@ -194,21 +186,26 @@ func TestCounters(t *testing.T) { v, ok = mm.getContainerCount(contID.EncodeToString(), logical) require.Zero(t, v) require.False(t, ok) + v, ok = mm.getContainerCount(contID.EncodeToString(), user) + require.Zero(t, v) + require.False(t, ok) } }) var totalPayload int64 expectedLogicalSizes := make(map[string]int64) - expectedLogCC := make(map[cid.ID]uint64) - expectedPhyCC := make(map[cid.ID]uint64) + expected := make(map[cid.ID]meta.ObjectCounters) for i := range oo { cnr, _ := oo[i].ContainerID() oSize := int64(oo[i].PayloadSize()) expectedLogicalSizes[cnr.EncodeToString()] += oSize totalPayload += oSize - expectedLogCC[cnr]++ - expectedPhyCC[cnr]++ + expected[cnr] = meta.ObjectCounters{ + Logic: 1, + Phy: 1, + User: 1, + } } var prm PutPrm @@ -222,13 +219,13 @@ func TestCounters(t *testing.T) { require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical)) require.Equal(t, uint64(objNumber), mm.getObjectCounter(logical)) + require.Equal(t, uint64(objNumber), mm.getObjectCounter(user)) require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload, mm.payloadSize()) cc, err := sh.metaBase.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expectedLogCC, cc.Logical) - require.Equal(t, expectedPhyCC, cc.Physical) + require.Equal(t, meta.ContainerCounters{Counts: expected}, cc) t.Run("inhume_GC", func(t *testing.T) { var prm InhumePrm @@ -244,21 +241,26 @@ func TestCounters(t *testing.T) { require.True(t, ok) expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize()) - expectedLogCC[cid]-- - if expectedLogCC[cid] == 0 { - delete(expectedLogCC, cid) + if v, ok := expected[cid]; ok { + v.Logic-- + v.User-- + if v.IsZero() { + delete(expected, cid) + } else { + expected[cid] = v + } } } require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical)) require.Equal(t, uint64(objNumber-inhumedNumber), mm.getObjectCounter(logical)) + require.Equal(t, uint64(objNumber-inhumedNumber), mm.getObjectCounter(user)) require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload, mm.payloadSize()) cc, err := sh.metaBase.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expectedLogCC, cc.Logical) - require.Equal(t, expectedPhyCC, cc.Physical) + require.Equal(t, meta.ContainerCounters{Counts: expected}, cc) oo = oo[inhumedNumber:] }) @@ -269,6 +271,7 @@ func TestCounters(t *testing.T) { phy := mm.getObjectCounter(physical) logic := mm.getObjectCounter(logical) + custom := mm.getObjectCounter(user) inhumedNumber := int(phy / 4) prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...) @@ -281,21 +284,26 @@ func TestCounters(t *testing.T) { require.True(t, ok) expectedLogicalSizes[cid.EncodeToString()] -= int64(oo[i].PayloadSize()) - expectedLogCC[cid]-- - if expectedLogCC[cid] == 0 { - delete(expectedLogCC, cid) + if v, ok := expected[cid]; ok { + v.Logic-- + v.User-- + if v.IsZero() { + delete(expected, cid) + } else { + expected[cid] = v + } } } require.Equal(t, phy, mm.getObjectCounter(physical)) require.Equal(t, logic-uint64(inhumedNumber), mm.getObjectCounter(logical)) + require.Equal(t, custom-uint64(inhumedNumber), mm.getObjectCounter(user)) require.Equal(t, expectedLogicalSizes, mm.containerSizes()) require.Equal(t, totalPayload, mm.payloadSize()) cc, err = sh.metaBase.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expectedLogCC, cc.Logical) - require.Equal(t, expectedPhyCC, cc.Physical) + require.Equal(t, meta.ContainerCounters{Counts: expected}, cc) oo = oo[inhumedNumber:] }) @@ -305,6 +313,7 @@ func TestCounters(t *testing.T) { phy := mm.getObjectCounter(physical) logic := mm.getObjectCounter(logical) + custom := mm.getObjectCounter(user) deletedNumber := int(phy / 4) prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...) @@ -314,6 +323,7 @@ func TestCounters(t *testing.T) { require.Equal(t, phy-uint64(deletedNumber), mm.getObjectCounter(physical)) require.Equal(t, logic-uint64(deletedNumber), mm.getObjectCounter(logical)) + require.Equal(t, custom-uint64(deletedNumber), mm.getObjectCounter(user)) var totalRemovedpayload uint64 for i := range oo[:deletedNumber] { removedPayload := oo[i].PayloadSize() @@ -322,14 +332,15 @@ func TestCounters(t *testing.T) { cnr, _ := oo[i].ContainerID() expectedLogicalSizes[cnr.EncodeToString()] -= int64(removedPayload) - expectedLogCC[cnr]-- - if expectedLogCC[cnr] == 0 { - delete(expectedLogCC, cnr) - } - - expectedPhyCC[cnr]-- - if expectedPhyCC[cnr] == 0 { - delete(expectedPhyCC, cnr) + if v, ok := expected[cnr]; ok { + v.Logic-- + v.Phy-- + v.User-- + if v.IsZero() { + delete(expected, cnr) + } else { + expected[cnr] = v + } } } require.Equal(t, expectedLogicalSizes, mm.containerSizes()) @@ -337,8 +348,7 @@ func TestCounters(t *testing.T) { cc, err = sh.metaBase.ContainerCounters(context.Background()) require.NoError(t, err) - require.Equal(t, expectedLogCC, cc.Logical) - require.Equal(t, expectedPhyCC, cc.Physical) + require.Equal(t, meta.ContainerCounters{Counts: expected}, cc) }) } diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index 10983d041..09a98260d 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -90,7 +90,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) { return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err) } - s.incObjectCounter(putPrm.Address.Container()) + s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj)) s.addToPayloadSize(int64(prm.obj.PayloadSize())) s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize())) } diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index aa87e0cfc..2c4ce91dc 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -72,9 +72,6 @@ type MetricsWriter interface { // IncObjectCounter must increment shard's object counter taking into account // object type. IncObjectCounter(objectType string) - // DecObjectCounter must decrement shard's object counter taking into account - // object type. - DecObjectCounter(objectType string) // SetShardID must set (update) the shard identifier that will be used in // metrics. SetShardID(id string) @@ -395,66 +392,74 @@ const ( // counter type (excludes objects that are // stored but unavailable). logical = "logic" + // user is an available small or big regular object. + user = "user" ) func (s *Shard) updateMetrics(ctx context.Context) { - if s.cfg.metricsWriter != nil && !s.GetMode().NoMetabase() { - cc, err := s.metaBase.ObjectCounters() + if s.cfg.metricsWriter == nil || s.GetMode().NoMetabase() { + return + } + + cc, err := s.metaBase.ObjectCounters() + if err != nil { + s.log.Warn(logs.ShardMetaObjectCounterRead, + zap.Error(err), + ) + + return + } + + s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy) + s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic) + s.cfg.metricsWriter.SetObjectCounter(user, cc.User) + + cnrList, err := s.metaBase.Containers(ctx) + if err != nil { + s.log.Warn(logs.ShardMetaCantReadContainerList, zap.Error(err)) + return + } + + var totalPayload uint64 + + for i := range cnrList { + size, err := s.metaBase.ContainerSize(cnrList[i]) if err != nil { - s.log.Warn(logs.ShardMetaObjectCounterRead, - zap.Error(err), - ) - - return + s.log.Warn(logs.ShardMetaCantReadContainerSize, + zap.String("cid", cnrList[i].EncodeToString()), + zap.Error(err)) + continue } + s.metricsWriter.AddToContainerSize(cnrList[i].EncodeToString(), int64(size)) + totalPayload += size + } - s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy()) - s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic()) + s.metricsWriter.AddToPayloadSize(int64(totalPayload)) - cnrList, err := s.metaBase.Containers(ctx) - if err != nil { - s.log.Warn(logs.ShardMetaCantReadContainerList, zap.Error(err)) - return - } - - var totalPayload uint64 - - for i := range cnrList { - size, err := s.metaBase.ContainerSize(cnrList[i]) - if err != nil { - s.log.Warn(logs.ShardMetaCantReadContainerSize, - zap.String("cid", cnrList[i].EncodeToString()), - zap.Error(err)) - continue - } - s.metricsWriter.AddToContainerSize(cnrList[i].EncodeToString(), int64(size)) - totalPayload += size - } - - s.metricsWriter.AddToPayloadSize(int64(totalPayload)) - - contCount, err := s.metaBase.ContainerCounters(ctx) - if err != nil { - s.log.Warn(logs.FailedToGetContainerCounters, zap.Error(err)) - return - } - for contID, count := range contCount.Physical { - s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), physical, count) - } - for contID, count := range contCount.Logical { - s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), logical, count) - } + contCount, err := s.metaBase.ContainerCounters(ctx) + if err != nil { + s.log.Warn(logs.FailedToGetContainerCounters, zap.Error(err)) + return + } + for contID, count := range contCount.Counts { + s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), physical, count.Phy) + s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), logical, count.Logic) + s.metricsWriter.SetContainerObjectsCount(contID.EncodeToString(), user, count.User) } } // incObjectCounter increment both physical and logical object // counters. -func (s *Shard) incObjectCounter(cnrID cid.ID) { +func (s *Shard) incObjectCounter(cnrID cid.ID, isUser bool) { if s.cfg.metricsWriter != nil { s.cfg.metricsWriter.IncObjectCounter(physical) s.cfg.metricsWriter.IncObjectCounter(logical) s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical) s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical) + if isUser { + s.cfg.metricsWriter.IncObjectCounter(user) + s.cfg.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), user) + } } } @@ -470,8 +475,9 @@ func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]meta.ObjectCounters) } for cnrID, count := range byCnr { - s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy()) - s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic()) + s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy) + s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic) + s.cfg.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), user, count.User) } }