forked from TrueCloudLab/frostfs-node
[#763] metabase: Add container objects counter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
226e84d782
commit
9c98fa6152
7 changed files with 535 additions and 49 deletions
|
@ -103,12 +103,13 @@ func (db *DB) init(reset bool) error {
|
|||
}
|
||||
|
||||
mStaticBuckets := map[string]struct{}{
|
||||
string(containerVolumeBucketName): {},
|
||||
string(graveyardBucketName): {},
|
||||
string(toMoveItBucketName): {},
|
||||
string(garbageBucketName): {},
|
||||
string(shardInfoBucket): {},
|
||||
string(bucketNameLocked): {},
|
||||
string(containerVolumeBucketName): {},
|
||||
string(containerCounterBucketName): {},
|
||||
string(graveyardBucketName): {},
|
||||
string(toMoveItBucketName): {},
|
||||
string(garbageBucketName): {},
|
||||
string(shardInfoBucket): {},
|
||||
string(bucketNameLocked): {},
|
||||
}
|
||||
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
|
@ -135,7 +136,7 @@ func (db *DB) init(reset bool) error {
|
|||
}
|
||||
}
|
||||
|
||||
if !reset {
|
||||
if !reset { // counters will be recalculated by refill metabase
|
||||
err = syncCounter(tx, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not sync object counter: %w", err)
|
||||
|
|
|
@ -1,10 +1,15 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
|
@ -73,9 +78,128 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
|
|||
return cc, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// updateCounter updates the object counter. Tx MUST be writable.
|
||||
// If inc == `true`, increases the counter, decreases otherwise.
|
||||
func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
|
||||
type ContainerCounters struct {
|
||||
Logical map[cid.ID]uint64
|
||||
Physical map[cid.ID]uint64
|
||||
}
|
||||
|
||||
// ContainerCounters returns object counters for each container
|
||||
// that metabase has tracked since it was opened and initialized.
|
||||
//
|
||||
// Returns only the errors that do not allow reading counter
|
||||
// in Bolt database.
|
||||
//
|
||||
// It is guaranteed that the ContainerCounters fields are not nil.
|
||||
func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("ContainerCounters", time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerCounters")
|
||||
defer span.End()
|
||||
|
||||
cc := ContainerCounters{
|
||||
Logical: make(map[cid.ID]uint64),
|
||||
Physical: make(map[cid.ID]uint64),
|
||||
}
|
||||
|
||||
lastKey := make([]byte, cidSize)
|
||||
|
||||
// there is no limit for containers count, so use batching with cancellation
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return cc, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
completed, err := db.containerCountersNextBatch(lastKey, &cc)
|
||||
if err != nil {
|
||||
return cc, err
|
||||
}
|
||||
if completed {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
success = true
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) (bool, error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return false, ErrDegradedMode
|
||||
}
|
||||
|
||||
counter := 0
|
||||
const batchSize = 1000
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
if b == nil {
|
||||
return ErrInterruptIterator
|
||||
}
|
||||
c := b.Cursor()
|
||||
var key, value []byte
|
||||
for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() {
|
||||
if bytes.Equal(lastKey, key) {
|
||||
continue
|
||||
}
|
||||
copy(lastKey, key)
|
||||
|
||||
cnrID, err := parseContainerCounterKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
phy, logic, err := parseContainerCounterValue(value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if phy > 0 {
|
||||
cc.Physical[cnrID] = phy
|
||||
}
|
||||
if logic > 0 {
|
||||
cc.Logical[cnrID] = logic
|
||||
}
|
||||
|
||||
counter++
|
||||
if counter == batchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if counter < batchSize { // last batch
|
||||
return ErrInterruptIterator
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrInterruptIterator) {
|
||||
return true, nil
|
||||
}
|
||||
return false, metaerr.Wrap(err)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID) error {
|
||||
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
|
||||
return fmt.Errorf("could not increase phy object counter: %w", err)
|
||||
}
|
||||
if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil {
|
||||
return fmt.Errorf("could not increase logical object counter: %w", err)
|
||||
}
|
||||
return db.incContainerObjectCounter(tx, cnrID)
|
||||
}
|
||||
|
||||
func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b == nil {
|
||||
return nil
|
||||
|
@ -112,6 +236,63 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool
|
|||
return b.Put(counterKey, newCounter)
|
||||
}
|
||||
|
||||
func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
key := make([]byte, cidSize)
|
||||
for cnrID, cnrDelta := range delta {
|
||||
cnrID.Encode(key)
|
||||
if err := db.editContainerCounterValue(b, key, cnrDelta, inc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error {
|
||||
var phyValue, logicValue uint64
|
||||
var err error
|
||||
data := b.Get(key)
|
||||
if len(data) > 0 {
|
||||
phyValue, logicValue, err = parseContainerCounterValue(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
phyValue = nextValue(phyValue, delta.phy, inc)
|
||||
logicValue = nextValue(logicValue, delta.logic, inc)
|
||||
if phyValue > 0 || logicValue > 0 {
|
||||
value := containerCounterValue(phyValue, logicValue)
|
||||
return b.Put(key, value)
|
||||
}
|
||||
return b.Delete(key)
|
||||
}
|
||||
|
||||
func nextValue(existed, delta uint64, inc bool) uint64 {
|
||||
if inc {
|
||||
existed += delta
|
||||
} else if existed <= delta {
|
||||
existed = 0
|
||||
} else {
|
||||
existed -= delta
|
||||
}
|
||||
return existed
|
||||
}
|
||||
|
||||
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
key := make([]byte, cidSize)
|
||||
cnrID.Encode(key)
|
||||
return db.editContainerCounterValue(b, key, ObjectCounters{logic: 1, phy: 1}, true)
|
||||
}
|
||||
|
||||
// syncCounter updates object counters according to metabase state:
|
||||
// it counts all the physically/logically stored objects using internal
|
||||
// indexes. Tx MUST be writable.
|
||||
|
@ -119,26 +300,38 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool
|
|||
// Does nothing if counters are not empty and force is false. If force is
|
||||
// true, updates the counters anyway.
|
||||
func syncCounter(tx *bbolt.Tx, force bool) error {
|
||||
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||
shardInfoB, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get shard info bucket: %w", err)
|
||||
}
|
||||
|
||||
if !force && len(b.Get(objectPhyCounterKey)) == 8 && len(b.Get(objectLogicCounterKey)) == 8 {
|
||||
shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 && len(shardInfoB.Get(objectLogicCounterKey)) == 8
|
||||
containerCounterInitialized := tx.Bucket(containerCounterBucketName) != nil
|
||||
if !force && shardObjectCounterInitialized && containerCounterInitialized {
|
||||
// the counters are already inited
|
||||
return nil
|
||||
}
|
||||
|
||||
containerCounterB, err := tx.CreateBucketIfNotExists(containerCounterBucketName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get container counter bucket: %w", err)
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
var phyCounter uint64
|
||||
var logicCounter uint64
|
||||
counters := make(map[cid.ID]ObjectCounters)
|
||||
|
||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
key := make([]byte, addressKeySize)
|
||||
|
||||
err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error {
|
||||
phyCounter++
|
||||
if v, ok := counters[cnr]; ok {
|
||||
v.phy++
|
||||
counters[cnr] = v
|
||||
} else {
|
||||
counters[cnr] = ObjectCounters{
|
||||
phy: 1,
|
||||
}
|
||||
}
|
||||
|
||||
addr.SetContainer(cnr)
|
||||
addr.SetObject(obj)
|
||||
|
@ -146,7 +339,14 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
|||
// check if an object is available: not with GCMark
|
||||
// and not covered with a tombstone
|
||||
if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 {
|
||||
logicCounter++
|
||||
if v, ok := counters[cnr]; ok {
|
||||
v.logic++
|
||||
counters[cnr] = v
|
||||
} else {
|
||||
counters[cnr] = ObjectCounters{
|
||||
logic: 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -155,21 +355,65 @@ func syncCounter(tx *bbolt.Tx, force bool) error {
|
|||
return fmt.Errorf("could not iterate objects: %w", err)
|
||||
}
|
||||
|
||||
data := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(data, phyCounter)
|
||||
return setObjectCounters(counters, shardInfoB, containerCounterB)
|
||||
}
|
||||
|
||||
err = b.Put(objectPhyCounterKey, data)
|
||||
func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error {
|
||||
var phyTotal uint64
|
||||
var logicTotal uint64
|
||||
key := make([]byte, cidSize)
|
||||
for cnrID, count := range counters {
|
||||
phyTotal += count.phy
|
||||
logicTotal += count.logic
|
||||
|
||||
cnrID.Encode(key)
|
||||
value := containerCounterValue(count.phy, count.logic)
|
||||
err := containerCounterB.Put(key, value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update phy container object counter: %w", err)
|
||||
}
|
||||
}
|
||||
phyData := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(phyData, phyTotal)
|
||||
|
||||
err := shardInfoB.Put(objectPhyCounterKey, phyData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update phy object counter: %w", err)
|
||||
}
|
||||
|
||||
data = make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(data, logicCounter)
|
||||
logData := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(logData, logicTotal)
|
||||
|
||||
err = b.Put(objectLogicCounterKey, data)
|
||||
err = shardInfoB.Put(objectLogicCounterKey, logData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not update logic object counter: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func containerCounterValue(phy, logic uint64) []byte {
|
||||
res := make([]byte, 16)
|
||||
binary.LittleEndian.PutUint64(res, phy)
|
||||
binary.LittleEndian.PutUint64(res[8:], logic)
|
||||
return res
|
||||
}
|
||||
|
||||
func parseContainerCounterKey(buf []byte) (cid.ID, error) {
|
||||
if len(buf) != cidSize {
|
||||
return cid.ID{}, fmt.Errorf("invalid key length")
|
||||
}
|
||||
var cnrID cid.ID
|
||||
if err := cnrID.Decode(buf); err != nil {
|
||||
return cid.ID{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||
}
|
||||
return cnrID, nil
|
||||
}
|
||||
|
||||
// parseContainerCounterValue return phy, logic values.
|
||||
func parseContainerCounterValue(buf []byte) (uint64, uint64, error) {
|
||||
if len(buf) != 16 {
|
||||
return 0, 0, fmt.Errorf("invalid value length")
|
||||
}
|
||||
return binary.LittleEndian.Uint64(buf), binary.LittleEndian.Uint64(buf[8:]), nil
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
|
@ -25,6 +26,11 @@ func TestCounters(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Zero(t, c.Phy())
|
||||
require.Zero(t, c.Logic())
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, len(cc.Physical))
|
||||
require.Zero(t, len(cc.Logical))
|
||||
})
|
||||
|
||||
t.Run("put", func(t *testing.T) {
|
||||
|
@ -36,9 +42,14 @@ func TestCounters(t *testing.T) {
|
|||
}
|
||||
|
||||
var prm meta.PutPrm
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
|
||||
for i := 0; i < objCount; i++ {
|
||||
prm.SetObject(oo[i])
|
||||
cnrID, _ := oo[i].ContainerID()
|
||||
expPhy[cnrID]++
|
||||
expLog[cnrID]++
|
||||
|
||||
_, err := db.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
@ -48,6 +59,12 @@ func TestCounters(t *testing.T) {
|
|||
|
||||
require.Equal(t, uint64(i+1), c.Phy())
|
||||
require.Equal(t, uint64(i+1), c.Logic())
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -56,6 +73,14 @@ func TestCounters(t *testing.T) {
|
|||
db := newDB(t)
|
||||
oo := putObjs(t, db, objCount, false)
|
||||
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
for _, obj := range oo {
|
||||
cnrID, _ := obj.ContainerID()
|
||||
expPhy[cnrID]++
|
||||
expLog[cnrID]++
|
||||
}
|
||||
|
||||
var prm meta.DeletePrm
|
||||
for i := objCount - 1; i >= 0; i-- {
|
||||
prm.SetAddresses(objectcore.AddressOf(oo[i]))
|
||||
|
@ -69,6 +94,27 @@ func TestCounters(t *testing.T) {
|
|||
|
||||
require.Equal(t, uint64(i), c.Phy())
|
||||
require.Equal(t, uint64(i), c.Logic())
|
||||
|
||||
cnrID, _ := oo[i].ContainerID()
|
||||
if v, ok := expPhy[cnrID]; ok {
|
||||
if v == 1 {
|
||||
delete(expPhy, cnrID)
|
||||
} else {
|
||||
expPhy[cnrID]--
|
||||
}
|
||||
}
|
||||
if v, ok := expLog[cnrID]; ok {
|
||||
if v == 1 {
|
||||
delete(expLog, cnrID)
|
||||
} else {
|
||||
expLog[cnrID]--
|
||||
}
|
||||
}
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -77,6 +123,14 @@ func TestCounters(t *testing.T) {
|
|||
db := newDB(t)
|
||||
oo := putObjs(t, db, objCount, false)
|
||||
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
for _, obj := range oo {
|
||||
cnrID, _ := obj.ContainerID()
|
||||
expPhy[cnrID]++
|
||||
expLog[cnrID]++
|
||||
}
|
||||
|
||||
inhumedObjs := make([]oid.Address, objCount/2)
|
||||
|
||||
for i, o := range oo {
|
||||
|
@ -87,6 +141,16 @@ func TestCounters(t *testing.T) {
|
|||
inhumedObjs[i] = objectcore.AddressOf(o)
|
||||
}
|
||||
|
||||
for _, addr := range inhumedObjs {
|
||||
if v, ok := expLog[addr.Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expLog, addr.Container())
|
||||
} else {
|
||||
expLog[addr.Container()]--
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var prm meta.InhumePrm
|
||||
prm.SetTombstoneAddress(oidtest.Address())
|
||||
prm.SetAddresses(inhumedObjs...)
|
||||
|
@ -100,6 +164,12 @@ func TestCounters(t *testing.T) {
|
|||
|
||||
require.Equal(t, uint64(objCount), c.Phy())
|
||||
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
})
|
||||
|
||||
t.Run("put_split", func(t *testing.T) {
|
||||
|
@ -107,6 +177,9 @@ func TestCounters(t *testing.T) {
|
|||
db := newDB(t)
|
||||
parObj := testutil.GenerateObject()
|
||||
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
|
||||
// put objects and check that parent info
|
||||
// does not affect the counter
|
||||
for i := 0; i < objCount; i++ {
|
||||
|
@ -115,12 +188,21 @@ func TestCounters(t *testing.T) {
|
|||
o.SetParent(parObj)
|
||||
}
|
||||
|
||||
cnrID, _ := o.ContainerID()
|
||||
expLog[cnrID]++
|
||||
expPhy[cnrID]++
|
||||
|
||||
require.NoError(t, putBig(db, o))
|
||||
|
||||
c, err := db.ObjectCounters()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(i+1), c.Phy())
|
||||
require.Equal(t, uint64(i+1), c.Logic())
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -129,16 +211,40 @@ func TestCounters(t *testing.T) {
|
|||
db := newDB(t)
|
||||
oo := putObjs(t, db, objCount, true)
|
||||
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
for _, obj := range oo {
|
||||
cnrID, _ := obj.ContainerID()
|
||||
expPhy[cnrID]++
|
||||
expLog[cnrID]++
|
||||
}
|
||||
|
||||
// delete objects that have parent info
|
||||
// and check that it does not affect
|
||||
// the counter
|
||||
for i, o := range oo {
|
||||
require.NoError(t, metaDelete(db, objectcore.AddressOf(o)))
|
||||
addr := objectcore.AddressOf(o)
|
||||
require.NoError(t, metaDelete(db, addr))
|
||||
|
||||
c, err := db.ObjectCounters()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(objCount-i-1), c.Phy())
|
||||
require.Equal(t, uint64(objCount-i-1), c.Logic())
|
||||
|
||||
if v, ok := expPhy[addr.Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expPhy, addr.Container())
|
||||
} else {
|
||||
expPhy[addr.Container()]--
|
||||
}
|
||||
}
|
||||
if v, ok := expLog[addr.Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expLog, addr.Container())
|
||||
} else {
|
||||
expLog[addr.Container()]--
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -147,6 +253,14 @@ func TestCounters(t *testing.T) {
|
|||
db := newDB(t)
|
||||
oo := putObjs(t, db, objCount, true)
|
||||
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
for _, obj := range oo {
|
||||
cnrID, _ := obj.ContainerID()
|
||||
expPhy[cnrID]++
|
||||
expLog[cnrID]++
|
||||
}
|
||||
|
||||
inhumedObjs := make([]oid.Address, objCount/2)
|
||||
|
||||
for i, o := range oo {
|
||||
|
@ -157,6 +271,16 @@ func TestCounters(t *testing.T) {
|
|||
inhumedObjs[i] = objectcore.AddressOf(o)
|
||||
}
|
||||
|
||||
for _, addr := range inhumedObjs {
|
||||
if v, ok := expLog[addr.Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expLog, addr.Container())
|
||||
} else {
|
||||
expLog[addr.Container()]--
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var prm meta.InhumePrm
|
||||
prm.SetTombstoneAddress(oidtest.Address())
|
||||
prm.SetAddresses(inhumedObjs...)
|
||||
|
@ -169,6 +293,12 @@ func TestCounters(t *testing.T) {
|
|||
|
||||
require.Equal(t, uint64(objCount), c.Phy())
|
||||
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -190,6 +320,13 @@ func TestCounters_Expired(t *testing.T) {
|
|||
oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1)
|
||||
}
|
||||
|
||||
expPhy := make(map[cid.ID]uint64)
|
||||
expLog := make(map[cid.ID]uint64)
|
||||
for _, addr := range oo {
|
||||
expPhy[addr.Container()]++
|
||||
expLog[addr.Container()]++
|
||||
}
|
||||
|
||||
// 1. objects are available and counters are correct
|
||||
|
||||
c, err := db.ObjectCounters()
|
||||
|
@ -197,6 +334,12 @@ func TestCounters_Expired(t *testing.T) {
|
|||
require.Equal(t, uint64(objCount), c.Phy())
|
||||
require.Equal(t, uint64(objCount), c.Logic())
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
|
||||
for _, o := range oo {
|
||||
_, err := metaGet(db, o, true)
|
||||
require.NoError(t, err)
|
||||
|
@ -212,6 +355,12 @@ func TestCounters_Expired(t *testing.T) {
|
|||
require.Equal(t, uint64(objCount), c.Phy())
|
||||
require.Equal(t, uint64(objCount), c.Logic())
|
||||
|
||||
cc, err = db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
|
||||
for _, o := range oo {
|
||||
_, err := metaGet(db, o, true)
|
||||
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
|
||||
|
@ -235,6 +384,20 @@ func TestCounters_Expired(t *testing.T) {
|
|||
require.Equal(t, uint64(len(oo)), c.Phy())
|
||||
require.Equal(t, uint64(len(oo)-1), c.Logic())
|
||||
|
||||
if v, ok := expLog[oo[0].Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expLog, oo[0].Container())
|
||||
} else {
|
||||
expLog[oo[0].Container()]--
|
||||
}
|
||||
}
|
||||
|
||||
cc, err = db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
|
||||
// 4. `Delete` an object with GCMark should decrease the
|
||||
// phy counter but does not affect the logic counter (after
|
||||
// that step they should be equal)
|
||||
|
@ -246,6 +409,14 @@ func TestCounters_Expired(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Zero(t, deleteRes.AvailableObjectsRemoved())
|
||||
|
||||
if v, ok := expPhy[oo[0].Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expPhy, oo[0].Container())
|
||||
} else {
|
||||
expPhy[oo[0].Container()]--
|
||||
}
|
||||
}
|
||||
|
||||
oo = oo[1:]
|
||||
|
||||
c, err = db.ObjectCounters()
|
||||
|
@ -253,6 +424,12 @@ func TestCounters_Expired(t *testing.T) {
|
|||
require.Equal(t, uint64(len(oo)), c.Phy())
|
||||
require.Equal(t, uint64(len(oo)), c.Logic())
|
||||
|
||||
cc, err = db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
|
||||
// 5 `Delete` an expired object (like it would the control
|
||||
// service do) should decrease both counters despite the
|
||||
// expiration fact
|
||||
|
@ -263,12 +440,34 @@ func TestCounters_Expired(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
|
||||
|
||||
if v, ok := expLog[oo[0].Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expLog, oo[0].Container())
|
||||
} else {
|
||||
expLog[oo[0].Container()]--
|
||||
}
|
||||
}
|
||||
|
||||
if v, ok := expPhy[oo[0].Container()]; ok {
|
||||
if v == 1 {
|
||||
delete(expPhy, oo[0].Container())
|
||||
} else {
|
||||
expPhy[oo[0].Container()]--
|
||||
}
|
||||
}
|
||||
|
||||
oo = oo[1:]
|
||||
|
||||
c, err = db.ObjectCounters()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(len(oo)), c.Phy())
|
||||
require.Equal(t, uint64(len(oo)), c.Logic())
|
||||
|
||||
cc, err = db.ContainerCounters(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expPhy, cc.Physical)
|
||||
require.Equal(t, expLog, cc.Logical)
|
||||
}
|
||||
|
||||
func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object {
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
|
@ -132,8 +133,9 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64, ava
|
|||
refCounter := make(referenceCounter, len(addrs))
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
var rawDeleted uint64
|
||||
var availableDeleted uint64
|
||||
cnrIDDelta := make(map[cid.ID]ObjectCounters)
|
||||
var rawDeletedTotal uint64
|
||||
var availableDeletedTotal uint64
|
||||
|
||||
for i := range addrs {
|
||||
removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch)
|
||||
|
@ -142,40 +144,62 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64, ava
|
|||
}
|
||||
|
||||
if removed {
|
||||
rawDeleted++
|
||||
if v, ok := cnrIDDelta[addrs[i].Container()]; ok {
|
||||
v.phy++
|
||||
cnrIDDelta[addrs[i].Container()] = v
|
||||
} else {
|
||||
cnrIDDelta[addrs[i].Container()] = ObjectCounters{
|
||||
phy: 1,
|
||||
}
|
||||
}
|
||||
|
||||
rawDeletedTotal++
|
||||
sizes[i] = size
|
||||
}
|
||||
|
||||
if available {
|
||||
availableDeleted++
|
||||
if v, ok := cnrIDDelta[addrs[i].Container()]; ok {
|
||||
v.logic++
|
||||
cnrIDDelta[addrs[i].Container()] = v
|
||||
} else {
|
||||
cnrIDDelta[addrs[i].Container()] = ObjectCounters{
|
||||
logic: 1,
|
||||
}
|
||||
}
|
||||
|
||||
availableDeletedTotal++
|
||||
availableSizes[i] = size
|
||||
}
|
||||
}
|
||||
|
||||
if rawDeleted > 0 {
|
||||
err := db.updateCounter(tx, phy, rawDeleted, false)
|
||||
if rawDeletedTotal > 0 {
|
||||
err := db.updateShardObjectCounter(tx, phy, rawDeletedTotal, false)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("could not decrease phy object counter: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if availableDeleted > 0 {
|
||||
err := db.updateCounter(tx, logical, availableDeleted, false)
|
||||
if availableDeletedTotal > 0 {
|
||||
err := db.updateShardObjectCounter(tx, logical, availableDeletedTotal, false)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("could not decrease logical object counter: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := db.updateContainerCounter(tx, cnrIDDelta, false); err != nil {
|
||||
return 0, 0, fmt.Errorf("could not decrease container object counter: %w", err)
|
||||
}
|
||||
|
||||
for _, refNum := range refCounter {
|
||||
if refNum.cur == refNum.all {
|
||||
err := db.deleteObject(tx, refNum.obj, true)
|
||||
if err != nil {
|
||||
return rawDeleted, availableDeleted, err // maybe log and continue?
|
||||
return rawDeletedTotal, availableDeletedTotal, err // maybe log and continue?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rawDeleted, availableDeleted, nil
|
||||
return rawDeletedTotal, availableDeletedTotal, nil
|
||||
}
|
||||
|
||||
// delete removes object indexes from the metabase. Counts the references
|
||||
|
|
|
@ -73,6 +73,7 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) {
|
|||
Size: deletedSize,
|
||||
CID: containerID,
|
||||
})
|
||||
i.availableImhumed++
|
||||
}
|
||||
|
||||
// SetAddresses sets a list of object addresses that should be inhumed.
|
||||
|
@ -224,7 +225,27 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
}
|
||||
}
|
||||
|
||||
return db.updateCounter(tx, logical, res.availableImhumed, false)
|
||||
return db.applyInhumeResToCounters(tx, res)
|
||||
}
|
||||
|
||||
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||
var inhumedCount uint64
|
||||
inhumedbyCnr := make(map[cid.ID]ObjectCounters)
|
||||
for _, dd := range res.deletionDetails {
|
||||
if v, ok := inhumedbyCnr[dd.CID]; ok {
|
||||
v.logic++
|
||||
inhumedbyCnr[dd.CID] = v
|
||||
} else {
|
||||
inhumedbyCnr[dd.CID] = ObjectCounters{logic: 1}
|
||||
}
|
||||
inhumedCount++
|
||||
}
|
||||
|
||||
if err := db.updateShardObjectCounter(tx, logical, inhumedCount, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return db.updateContainerCounter(tx, inhumedbyCnr, false)
|
||||
}
|
||||
|
||||
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
|
||||
|
@ -279,7 +300,6 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool
|
|||
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
|
||||
containerID, _ := obj.ContainerID()
|
||||
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
|
||||
res.availableImhumed++
|
||||
res.storeDeletionInfo(containerID, obj.PayloadSize())
|
||||
}
|
||||
|
||||
|
|
|
@ -183,16 +183,8 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
|||
}
|
||||
|
||||
if !isParent {
|
||||
err = db.updateCounter(tx, phy, 1, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not increase phy object counter: %w", err)
|
||||
}
|
||||
|
||||
// it is expected that putting an unavailable object is
|
||||
// impossible and should be handled on the higher levels
|
||||
err = db.updateCounter(tx, logical, 1, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not increase logical object counter: %w", err)
|
||||
if err = db.incCounters(tx, cnr); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,9 +19,10 @@ var (
|
|||
graveyardBucketName = []byte{graveyardPrefix}
|
||||
// garbageBucketName stores rows with the objects that should be physically
|
||||
// deleted by the node (Garbage Collector routine).
|
||||
garbageBucketName = []byte{garbagePrefix}
|
||||
toMoveItBucketName = []byte{toMoveItPrefix}
|
||||
containerVolumeBucketName = []byte{containerVolumePrefix}
|
||||
garbageBucketName = []byte{garbagePrefix}
|
||||
toMoveItBucketName = []byte{toMoveItPrefix}
|
||||
containerVolumeBucketName = []byte{containerVolumePrefix}
|
||||
containerCounterBucketName = []byte{containerCountersPrefix}
|
||||
|
||||
zeroValue = []byte{0xFF}
|
||||
)
|
||||
|
@ -111,6 +112,11 @@ const (
|
|||
// Key: split ID
|
||||
// Value: list of object IDs
|
||||
splitPrefix
|
||||
|
||||
// containerCountersPrefix is used for storing container object counters.
|
||||
// Key: container ID + type
|
||||
// Value: container size in bytes as little-endian uint64
|
||||
containerCountersPrefix
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
Loading…
Reference in a new issue