[#864] metabase: Refactor delete/inhume

Available -> Logic, Raw -> Phy for delete/inhume results.
Use single counter instead of vectors.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-12-25 10:15:26 +03:00
parent d5d3d8badb
commit dfd62ca6b1
7 changed files with 71 additions and 77 deletions

View file

@ -239,7 +239,7 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6
return b.Put(counterKey, newCounter)
}
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { // TODO #838
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error {
b := tx.Bucket(containerCounterBucketName)
if b == nil {
return nil

View file

@ -91,7 +91,7 @@ func TestCounters(t *testing.T) {
res, err := db.Delete(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(1), res.AvailableObjectsRemoved())
require.Equal(t, uint64(1), res.LogicCount())
c, err := db.ObjectCounters()
require.NoError(t, err)
@ -161,7 +161,7 @@ func TestCounters(t *testing.T) {
res, err := db.Inhume(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed())
require.Equal(t, uint64(len(inhumedObjs)), res.LogicInhumed())
require.Equal(t, uint64(len(inhumedObjs)), res.UserInhumed())
c, err := db.ObjectCounters()
@ -389,7 +389,7 @@ func TestCounters_Expired(t *testing.T) {
inhumeRes, err := db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
require.Equal(t, uint64(1), inhumeRes.AvailableInhumed())
require.Equal(t, uint64(1), inhumeRes.LogicInhumed())
require.Equal(t, uint64(1), inhumeRes.UserInhumed())
c, err = db.ObjectCounters()
@ -423,8 +423,8 @@ func TestCounters_Expired(t *testing.T) {
deleteRes, err := db.Delete(context.Background(), deletePrm)
require.NoError(t, err)
require.Zero(t, deleteRes.AvailableObjectsRemoved())
require.Zero(t, deleteRes.UserObjectsRemoved())
require.Zero(t, deleteRes.LogicCount())
require.Zero(t, deleteRes.UserCount())
if v, ok := exp[oo[0].Container()]; ok {
v.Phy--
@ -456,8 +456,8 @@ func TestCounters_Expired(t *testing.T) {
deleteRes, err = db.Delete(context.Background(), deletePrm)
require.NoError(t, err)
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
require.Equal(t, uint64(1), deleteRes.UserObjectsRemoved())
require.Equal(t, uint64(1), deleteRes.LogicCount())
require.Equal(t, uint64(1), deleteRes.UserCount())
if v, ok := exp[oo[0].Container()]; ok {
v.Phy--

View file

@ -27,22 +27,22 @@ type DeletePrm struct {
// DeleteRes groups the resulting values of Delete operation.
type DeleteRes struct {
rawRemoved uint64
availableRemoved uint64
userRemoved uint64
sizes []uint64
availableSizes []uint64
phyCount uint64
logicCount uint64
userCount uint64
phySize uint64
logicSize uint64
removedByCnrID map[cid.ID]ObjectCounters
}
// AvailableObjectsRemoved returns the number of removed available
// LogicCount returns the number of removed logic
// objects.
func (d DeleteRes) AvailableObjectsRemoved() uint64 {
return d.availableRemoved
func (d DeleteRes) LogicCount() uint64 {
return d.logicCount
}
func (d DeleteRes) UserObjectsRemoved() uint64 {
return d.userRemoved
func (d DeleteRes) UserCount() uint64 {
return d.userCount
}
// RemovedByCnrID returns the number of removed objects by container ID.
@ -50,19 +50,19 @@ func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
return d.removedByCnrID
}
// RawObjectsRemoved returns the number of removed raw objects.
func (d DeleteRes) RawObjectsRemoved() uint64 {
return d.rawRemoved
// PhyCount returns the number of removed physical objects.
func (d DeleteRes) PhyCount() uint64 {
return d.phyCount
}
// RemovedPhysicalObjectSizes returns the sizes of removed physical objects.
func (d DeleteRes) RemovedPhysicalObjectSizes() []uint64 {
return d.sizes
// PhySize returns the size of removed physical objects.
func (d DeleteRes) PhySize() uint64 {
return d.phySize
}
// RemovedLogicalObjectSizes returns the sizes of removed logical objects.
func (d DeleteRes) RemovedLogicalObjectSizes() []uint64 {
return d.availableSizes
// LogicSize returns the size of removed logical objects.
func (d DeleteRes) LogicSize() uint64 {
return d.logicSize
}
// SetAddresses is a Delete option to set the addresses of the objects to delete.
@ -129,8 +129,6 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
// references of the split objects.
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
res := DeleteRes{
sizes: make([]uint64, len(addrs)),
availableSizes: make([]uint64, len(addrs)),
removedByCnrID: make(map[cid.ID]ObjectCounters),
}
refCounter := make(referenceCounter, len(addrs))
@ -162,22 +160,22 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
}
func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
if res.rawRemoved > 0 {
err := db.updateShardObjectCounter(tx, phy, res.rawRemoved, false)
if res.phyCount > 0 {
err := db.updateShardObjectCounter(tx, phy, res.phyCount, false)
if err != nil {
return fmt.Errorf("could not decrease phy object counter: %w", err)
}
}
if res.availableRemoved > 0 {
err := db.updateShardObjectCounter(tx, logical, res.availableRemoved, false)
if res.logicCount > 0 {
err := db.updateShardObjectCounter(tx, logical, res.logicCount, false)
if err != nil {
return fmt.Errorf("could not decrease logical object counter: %w", err)
}
}
if res.userRemoved > 0 {
err := db.updateShardObjectCounter(tx, user, res.userRemoved, false)
if res.userCount > 0 {
err := db.updateShardObjectCounter(tx, user, res.userCount, false)
if err != nil {
return fmt.Errorf("could not decrease user object counter: %w", err)
}
@ -190,7 +188,7 @@ func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
}
func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.Address, i int) {
if r.Removed {
if r.Phy {
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.Phy++
res.removedByCnrID[addrs[i].Container()] = v
@ -200,11 +198,11 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A
}
}
res.rawRemoved++
res.sizes[i] = r.Size
res.phyCount++
res.phySize += r.Size
}
if r.Available {
if r.Logic {
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
v.Logic++
res.removedByCnrID[addrs[i].Container()] = v
@ -214,8 +212,8 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A
}
}
res.availableRemoved++
res.availableSizes[i] = r.Size
res.logicCount++
res.logicSize += r.Size
}
if r.User {
@ -228,13 +226,13 @@ func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.A
}
}
res.userRemoved++
res.userCount++
}
}
type deleteSingleResult struct {
Removed bool
Available bool
Phy bool
Logic bool
User bool
Size uint64
}
@ -302,8 +300,8 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
}
return deleteSingleResult{
Removed: true,
Available: removeAvailableObject,
Phy: true,
Logic: removeAvailableObject,
User: isUserObject && removeAvailableObject,
Size: obj.PayloadSize(),
}, nil

View file

@ -38,16 +38,16 @@ type DeletionInfo struct {
// InhumeRes encapsulates results of Inhume operation.
type InhumeRes struct {
deletedLockObj []oid.Address
availableInhumed uint64
logicInhumed uint64
userInhumed uint64
inhumedByCnrID map[cid.ID]ObjectCounters
deletionDetails []DeletionInfo
}
// AvailableInhumed return number of available object
// LogicInhumed return number of logic object
// that have been inhumed.
func (i InhumeRes) AvailableInhumed() uint64 {
return i.availableInhumed
func (i InhumeRes) LogicInhumed() uint64 {
return i.logicInhumed
}
func (i InhumeRes) UserInhumed() uint64 {
@ -87,7 +87,7 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, is
CID: containerID,
IsUser: isUser,
})
i.availableInhumed++
i.logicInhumed++
if isUser {
i.userInhumed++
}
@ -265,7 +265,7 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
}
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
if err := db.updateShardObjectCounter(tx, logical, res.AvailableInhumed(), false); err != nil {
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
return err
}
if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil {

View file

@ -141,16 +141,12 @@ func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error
if err != nil {
return err
}
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
s.decObjectCounterBy(user, res.UserObjectsRemoved())
s.decObjectCounterBy(physical, res.PhyCount())
s.decObjectCounterBy(logical, res.LogicCount())
s.decObjectCounterBy(user, res.UserCount())
s.decContainerObjectCounter(res.RemovedByCnrID())
removedPayload := res.RemovedPhysicalObjectSizes()[0]
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0]
if logicalRemovedPayload > 0 {
s.addToContainerSize(addr.Container().EncodeToString(), -int64(logicalRemovedPayload))
}
s.addToPayloadSize(-int64(removedPayload))
s.addToContainerSize(addr.Container().EncodeToString(), -int64(res.LogicSize()))
s.addToPayloadSize(-int64(res.PhySize()))
return nil
}

View file

@ -414,8 +414,8 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return
}
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular)
s.decObjectCounterBy(logical, res.AvailableInhumed())
s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeRegular)
s.decObjectCounterBy(logical, res.LogicInhumed())
s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
@ -629,8 +629,8 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
return
}
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone)
s.decObjectCounterBy(logical, res.AvailableInhumed())
s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeTombstone)
s.decObjectCounterBy(logical, res.LogicInhumed())
s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())
@ -677,8 +677,8 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
return
}
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock)
s.decObjectCounterBy(logical, res.AvailableInhumed())
s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeLock)
s.decObjectCounterBy(logical, res.LogicInhumed())
s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())

View file

@ -121,7 +121,7 @@ func (s *Shard) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
s.m.RUnlock()
s.decObjectCounterBy(logical, res.AvailableInhumed())
s.decObjectCounterBy(logical, res.LogicInhumed())
s.decObjectCounterBy(user, res.UserInhumed())
s.decContainerObjectCounter(res.InhumedByCnrID())