diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index 3f155eeb5..00fe1e02b 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -103,12 +103,13 @@ func (db *DB) init(reset bool) error { } mStaticBuckets := map[string]struct{}{ - string(containerVolumeBucketName): {}, - string(graveyardBucketName): {}, - string(toMoveItBucketName): {}, - string(garbageBucketName): {}, - string(shardInfoBucket): {}, - string(bucketNameLocked): {}, + string(containerVolumeBucketName): {}, + string(containerCounterBucketName): {}, + string(graveyardBucketName): {}, + string(toMoveItBucketName): {}, + string(garbageBucketName): {}, + string(shardInfoBucket): {}, + string(bucketNameLocked): {}, } return db.boltDB.Update(func(tx *bbolt.Tx) error { @@ -135,7 +136,7 @@ func (db *DB) init(reset bool) error { } } - if !reset { + if !reset { // counters will be recalculated by refill metabase err = syncCounter(tx, false) if err != nil { return fmt.Errorf("could not sync object counter: %w", err) diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index d4afe1878..d1aafa3db 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -1,10 +1,15 @@ package meta import ( + "bytes" + "context" "encoding/binary" + "errors" "fmt" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" @@ -73,9 +78,128 @@ func (db *DB) ObjectCounters() (cc ObjectCounters, err error) { return cc, metaerr.Wrap(err) } -// updateCounter updates the object counter. Tx MUST be writable. -// If inc == `true`, increases the counter, decreases otherwise. -func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error { +type ContainerCounters struct { + Logical map[cid.ID]uint64 + Physical map[cid.ID]uint64 +} + +// ContainerCounters returns object counters for each container +// that metabase has tracked since it was opened and initialized. +// +// Returns only the errors that do not allow reading counter +// in Bolt database. +// +// It is guaranteed that the ContainerCounters fields are not nil. +func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ContainerCounters", time.Since(startedAt), success) + }() + + ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerCounters") + defer span.End() + + cc := ContainerCounters{ + Logical: make(map[cid.ID]uint64), + Physical: make(map[cid.ID]uint64), + } + + lastKey := make([]byte, cidSize) + + // there is no limit for containers count, so use batching with cancellation + for { + select { + case <-ctx.Done(): + return cc, ctx.Err() + default: + } + + completed, err := db.containerCountersNextBatch(lastKey, &cc) + if err != nil { + return cc, err + } + if completed { + break + } + } + + success = true + return cc, nil +} + +func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) (bool, error) { + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return false, ErrDegradedMode + } + + counter := 0 + const batchSize = 1000 + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(containerCounterBucketName) + if b == nil { + return ErrInterruptIterator + } + c := b.Cursor() + var key, value []byte + for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() { + if bytes.Equal(lastKey, key) { + continue + } + copy(lastKey, key) + + cnrID, err := parseContainerCounterKey(key) + if err != nil { + return err + } + phy, logic, err := parseContainerCounterValue(value) + if err != nil { + return err + } + if phy > 0 { + cc.Physical[cnrID] = phy + } + if logic > 0 { + cc.Logical[cnrID] = logic + } + + counter++ + if counter == batchSize { + break + } + } + + if counter < batchSize { // last batch + return ErrInterruptIterator + } + return nil + }) + if err != nil { + if errors.Is(err, ErrInterruptIterator) { + return true, nil + } + return false, metaerr.Wrap(err) + } + return false, nil +} + +func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID) error { + if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil { + return fmt.Errorf("could not increase phy object counter: %w", err) + } + if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil { + return fmt.Errorf("could not increase logical object counter: %w", err) + } + return db.incContainerObjectCounter(tx, cnrID) +} + +func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error { b := tx.Bucket(shardInfoBucket) if b == nil { return nil @@ -112,6 +236,63 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool return b.Put(counterKey, newCounter) } +func (db *DB) updateContainerCounter(tx *bbolt.Tx, delta map[cid.ID]ObjectCounters, inc bool) error { + b := tx.Bucket(containerCounterBucketName) + if b == nil { + return nil + } + + key := make([]byte, cidSize) + for cnrID, cnrDelta := range delta { + cnrID.Encode(key) + if err := db.editContainerCounterValue(b, key, cnrDelta, inc); err != nil { + return err + } + } + return nil +} + +func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCounters, inc bool) error { + var phyValue, logicValue uint64 + var err error + data := b.Get(key) + if len(data) > 0 { + phyValue, logicValue, err = parseContainerCounterValue(data) + if err != nil { + return err + } + } + phyValue = nextValue(phyValue, delta.phy, inc) + logicValue = nextValue(logicValue, delta.logic, inc) + if phyValue > 0 || logicValue > 0 { + value := containerCounterValue(phyValue, logicValue) + return b.Put(key, value) + } + return b.Delete(key) +} + +func nextValue(existed, delta uint64, inc bool) uint64 { + if inc { + existed += delta + } else if existed <= delta { + existed = 0 + } else { + existed -= delta + } + return existed +} + +func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID) error { + b := tx.Bucket(containerCounterBucketName) + if b == nil { + return nil + } + + key := make([]byte, cidSize) + cnrID.Encode(key) + return db.editContainerCounterValue(b, key, ObjectCounters{logic: 1, phy: 1}, true) +} + // syncCounter updates object counters according to metabase state: // it counts all the physically/logically stored objects using internal // indexes. Tx MUST be writable. @@ -119,26 +300,38 @@ func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool // Does nothing if counters are not empty and force is false. If force is // true, updates the counters anyway. func syncCounter(tx *bbolt.Tx, force bool) error { - b, err := tx.CreateBucketIfNotExists(shardInfoBucket) + shardInfoB, err := tx.CreateBucketIfNotExists(shardInfoBucket) if err != nil { return fmt.Errorf("could not get shard info bucket: %w", err) } - - if !force && len(b.Get(objectPhyCounterKey)) == 8 && len(b.Get(objectLogicCounterKey)) == 8 { + shardObjectCounterInitialized := len(shardInfoB.Get(objectPhyCounterKey)) == 8 && len(shardInfoB.Get(objectLogicCounterKey)) == 8 + containerCounterInitialized := tx.Bucket(containerCounterBucketName) != nil + if !force && shardObjectCounterInitialized && containerCounterInitialized { // the counters are already inited return nil } + containerCounterB, err := tx.CreateBucketIfNotExists(containerCounterBucketName) + if err != nil { + return fmt.Errorf("could not get container counter bucket: %w", err) + } + var addr oid.Address - var phyCounter uint64 - var logicCounter uint64 + counters := make(map[cid.ID]ObjectCounters) graveyardBKT := tx.Bucket(graveyardBucketName) garbageBKT := tx.Bucket(garbageBucketName) key := make([]byte, addressKeySize) err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error { - phyCounter++ + if v, ok := counters[cnr]; ok { + v.phy++ + counters[cnr] = v + } else { + counters[cnr] = ObjectCounters{ + phy: 1, + } + } addr.SetContainer(cnr) addr.SetObject(obj) @@ -146,7 +339,14 @@ func syncCounter(tx *bbolt.Tx, force bool) error { // check if an object is available: not with GCMark // and not covered with a tombstone if inGraveyardWithKey(addressKey(addr, key), graveyardBKT, garbageBKT) == 0 { - logicCounter++ + if v, ok := counters[cnr]; ok { + v.logic++ + counters[cnr] = v + } else { + counters[cnr] = ObjectCounters{ + logic: 1, + } + } } return nil @@ -155,21 +355,65 @@ func syncCounter(tx *bbolt.Tx, force bool) error { return fmt.Errorf("could not iterate objects: %w", err) } - data := make([]byte, 8) - binary.LittleEndian.PutUint64(data, phyCounter) + return setObjectCounters(counters, shardInfoB, containerCounterB) +} - err = b.Put(objectPhyCounterKey, data) +func setObjectCounters(counters map[cid.ID]ObjectCounters, shardInfoB, containerCounterB *bbolt.Bucket) error { + var phyTotal uint64 + var logicTotal uint64 + key := make([]byte, cidSize) + for cnrID, count := range counters { + phyTotal += count.phy + logicTotal += count.logic + + cnrID.Encode(key) + value := containerCounterValue(count.phy, count.logic) + err := containerCounterB.Put(key, value) + if err != nil { + return fmt.Errorf("could not update phy container object counter: %w", err) + } + } + phyData := make([]byte, 8) + binary.LittleEndian.PutUint64(phyData, phyTotal) + + err := shardInfoB.Put(objectPhyCounterKey, phyData) if err != nil { return fmt.Errorf("could not update phy object counter: %w", err) } - data = make([]byte, 8) - binary.LittleEndian.PutUint64(data, logicCounter) + logData := make([]byte, 8) + binary.LittleEndian.PutUint64(logData, logicTotal) - err = b.Put(objectLogicCounterKey, data) + err = shardInfoB.Put(objectLogicCounterKey, logData) if err != nil { return fmt.Errorf("could not update logic object counter: %w", err) } return nil } + +func containerCounterValue(phy, logic uint64) []byte { + res := make([]byte, 16) + binary.LittleEndian.PutUint64(res, phy) + binary.LittleEndian.PutUint64(res[8:], logic) + return res +} + +func parseContainerCounterKey(buf []byte) (cid.ID, error) { + if len(buf) != cidSize { + return cid.ID{}, fmt.Errorf("invalid key length") + } + var cnrID cid.ID + if err := cnrID.Decode(buf); err != nil { + return cid.ID{}, fmt.Errorf("failed to decode container ID: %w", err) + } + return cnrID, nil +} + +// parseContainerCounterValue return phy, logic values. +func parseContainerCounterValue(buf []byte) (uint64, uint64, error) { + if len(buf) != 16 { + return 0, 0, fmt.Errorf("invalid value length") + } + return binary.LittleEndian.Uint64(buf), binary.LittleEndian.Uint64(buf[8:]), nil +} diff --git a/pkg/local_object_storage/metabase/counter_test.go b/pkg/local_object_storage/metabase/counter_test.go index 89b52c887..51062f1fe 100644 --- a/pkg/local_object_storage/metabase/counter_test.go +++ b/pkg/local_object_storage/metabase/counter_test.go @@ -7,6 +7,7 @@ import ( objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" @@ -25,6 +26,11 @@ func TestCounters(t *testing.T) { require.NoError(t, err) require.Zero(t, c.Phy()) require.Zero(t, c.Logic()) + + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + require.Zero(t, len(cc.Physical)) + require.Zero(t, len(cc.Logical)) }) t.Run("put", func(t *testing.T) { @@ -36,9 +42,14 @@ func TestCounters(t *testing.T) { } var prm meta.PutPrm + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) for i := 0; i < objCount; i++ { prm.SetObject(oo[i]) + cnrID, _ := oo[i].ContainerID() + expPhy[cnrID]++ + expLog[cnrID]++ _, err := db.Put(context.Background(), prm) require.NoError(t, err) @@ -48,6 +59,12 @@ func TestCounters(t *testing.T) { require.Equal(t, uint64(i+1), c.Phy()) require.Equal(t, uint64(i+1), c.Logic()) + + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) } }) @@ -56,6 +73,14 @@ func TestCounters(t *testing.T) { db := newDB(t) oo := putObjs(t, db, objCount, false) + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) + for _, obj := range oo { + cnrID, _ := obj.ContainerID() + expPhy[cnrID]++ + expLog[cnrID]++ + } + var prm meta.DeletePrm for i := objCount - 1; i >= 0; i-- { prm.SetAddresses(objectcore.AddressOf(oo[i])) @@ -69,6 +94,27 @@ func TestCounters(t *testing.T) { require.Equal(t, uint64(i), c.Phy()) require.Equal(t, uint64(i), c.Logic()) + + cnrID, _ := oo[i].ContainerID() + if v, ok := expPhy[cnrID]; ok { + if v == 1 { + delete(expPhy, cnrID) + } else { + expPhy[cnrID]-- + } + } + if v, ok := expLog[cnrID]; ok { + if v == 1 { + delete(expLog, cnrID) + } else { + expLog[cnrID]-- + } + } + + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) } }) @@ -77,6 +123,14 @@ func TestCounters(t *testing.T) { db := newDB(t) oo := putObjs(t, db, objCount, false) + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) + for _, obj := range oo { + cnrID, _ := obj.ContainerID() + expPhy[cnrID]++ + expLog[cnrID]++ + } + inhumedObjs := make([]oid.Address, objCount/2) for i, o := range oo { @@ -87,6 +141,16 @@ func TestCounters(t *testing.T) { inhumedObjs[i] = objectcore.AddressOf(o) } + for _, addr := range inhumedObjs { + if v, ok := expLog[addr.Container()]; ok { + if v == 1 { + delete(expLog, addr.Container()) + } else { + expLog[addr.Container()]-- + } + } + } + var prm meta.InhumePrm prm.SetTombstoneAddress(oidtest.Address()) prm.SetAddresses(inhumedObjs...) @@ -100,6 +164,12 @@ func TestCounters(t *testing.T) { require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic()) + + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) }) t.Run("put_split", func(t *testing.T) { @@ -107,6 +177,9 @@ func TestCounters(t *testing.T) { db := newDB(t) parObj := testutil.GenerateObject() + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) + // put objects and check that parent info // does not affect the counter for i := 0; i < objCount; i++ { @@ -115,12 +188,21 @@ func TestCounters(t *testing.T) { o.SetParent(parObj) } + cnrID, _ := o.ContainerID() + expLog[cnrID]++ + expPhy[cnrID]++ + require.NoError(t, putBig(db, o)) c, err := db.ObjectCounters() require.NoError(t, err) require.Equal(t, uint64(i+1), c.Phy()) require.Equal(t, uint64(i+1), c.Logic()) + + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) } }) @@ -129,16 +211,40 @@ func TestCounters(t *testing.T) { db := newDB(t) oo := putObjs(t, db, objCount, true) + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) + for _, obj := range oo { + cnrID, _ := obj.ContainerID() + expPhy[cnrID]++ + expLog[cnrID]++ + } + // delete objects that have parent info // and check that it does not affect // the counter for i, o := range oo { - require.NoError(t, metaDelete(db, objectcore.AddressOf(o))) + addr := objectcore.AddressOf(o) + require.NoError(t, metaDelete(db, addr)) c, err := db.ObjectCounters() require.NoError(t, err) require.Equal(t, uint64(objCount-i-1), c.Phy()) require.Equal(t, uint64(objCount-i-1), c.Logic()) + + if v, ok := expPhy[addr.Container()]; ok { + if v == 1 { + delete(expPhy, addr.Container()) + } else { + expPhy[addr.Container()]-- + } + } + if v, ok := expLog[addr.Container()]; ok { + if v == 1 { + delete(expLog, addr.Container()) + } else { + expLog[addr.Container()]-- + } + } } }) @@ -147,6 +253,14 @@ func TestCounters(t *testing.T) { db := newDB(t) oo := putObjs(t, db, objCount, true) + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) + for _, obj := range oo { + cnrID, _ := obj.ContainerID() + expPhy[cnrID]++ + expLog[cnrID]++ + } + inhumedObjs := make([]oid.Address, objCount/2) for i, o := range oo { @@ -157,6 +271,16 @@ func TestCounters(t *testing.T) { inhumedObjs[i] = objectcore.AddressOf(o) } + for _, addr := range inhumedObjs { + if v, ok := expLog[addr.Container()]; ok { + if v == 1 { + delete(expLog, addr.Container()) + } else { + expLog[addr.Container()]-- + } + } + } + var prm meta.InhumePrm prm.SetTombstoneAddress(oidtest.Address()) prm.SetAddresses(inhumedObjs...) @@ -169,6 +293,12 @@ func TestCounters(t *testing.T) { require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic()) + + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) }) } @@ -190,6 +320,13 @@ func TestCounters_Expired(t *testing.T) { oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1) } + expPhy := make(map[cid.ID]uint64) + expLog := make(map[cid.ID]uint64) + for _, addr := range oo { + expPhy[addr.Container()]++ + expLog[addr.Container()]++ + } + // 1. objects are available and counters are correct c, err := db.ObjectCounters() @@ -197,6 +334,12 @@ func TestCounters_Expired(t *testing.T) { require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount), c.Logic()) + cc, err := db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) + for _, o := range oo { _, err := metaGet(db, o, true) require.NoError(t, err) @@ -212,6 +355,12 @@ func TestCounters_Expired(t *testing.T) { require.Equal(t, uint64(objCount), c.Phy()) require.Equal(t, uint64(objCount), c.Logic()) + cc, err = db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) + for _, o := range oo { _, err := metaGet(db, o, true) require.ErrorIs(t, err, meta.ErrObjectIsExpired) @@ -235,6 +384,20 @@ func TestCounters_Expired(t *testing.T) { require.Equal(t, uint64(len(oo)), c.Phy()) require.Equal(t, uint64(len(oo)-1), c.Logic()) + if v, ok := expLog[oo[0].Container()]; ok { + if v == 1 { + delete(expLog, oo[0].Container()) + } else { + expLog[oo[0].Container()]-- + } + } + + cc, err = db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) + // 4. `Delete` an object with GCMark should decrease the // phy counter but does not affect the logic counter (after // that step they should be equal) @@ -246,6 +409,14 @@ func TestCounters_Expired(t *testing.T) { require.NoError(t, err) require.Zero(t, deleteRes.AvailableObjectsRemoved()) + if v, ok := expPhy[oo[0].Container()]; ok { + if v == 1 { + delete(expPhy, oo[0].Container()) + } else { + expPhy[oo[0].Container()]-- + } + } + oo = oo[1:] c, err = db.ObjectCounters() @@ -253,6 +424,12 @@ func TestCounters_Expired(t *testing.T) { require.Equal(t, uint64(len(oo)), c.Phy()) require.Equal(t, uint64(len(oo)), c.Logic()) + cc, err = db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) + // 5 `Delete` an expired object (like it would the control // service do) should decrease both counters despite the // expiration fact @@ -263,12 +440,34 @@ func TestCounters_Expired(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved()) + if v, ok := expLog[oo[0].Container()]; ok { + if v == 1 { + delete(expLog, oo[0].Container()) + } else { + expLog[oo[0].Container()]-- + } + } + + if v, ok := expPhy[oo[0].Container()]; ok { + if v == 1 { + delete(expPhy, oo[0].Container()) + } else { + expPhy[oo[0].Container()]-- + } + } + oo = oo[1:] c, err = db.ObjectCounters() require.NoError(t, err) require.Equal(t, uint64(len(oo)), c.Phy()) require.Equal(t, uint64(len(oo)), c.Logic()) + + cc, err = db.ContainerCounters(context.Background()) + require.NoError(t, err) + + require.Equal(t, expPhy, cc.Physical) + require.Equal(t, expLog, cc.Logical) } func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object { diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index e84759b88..653a88ae7 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -12,6 +12,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" @@ -132,8 +133,9 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64, ava refCounter := make(referenceCounter, len(addrs)) currEpoch := db.epochState.CurrentEpoch() - var rawDeleted uint64 - var availableDeleted uint64 + cnrIDDelta := make(map[cid.ID]ObjectCounters) + var rawDeletedTotal uint64 + var availableDeletedTotal uint64 for i := range addrs { removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch) @@ -142,40 +144,62 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64, ava } if removed { - rawDeleted++ + if v, ok := cnrIDDelta[addrs[i].Container()]; ok { + v.phy++ + cnrIDDelta[addrs[i].Container()] = v + } else { + cnrIDDelta[addrs[i].Container()] = ObjectCounters{ + phy: 1, + } + } + + rawDeletedTotal++ sizes[i] = size } if available { - availableDeleted++ + if v, ok := cnrIDDelta[addrs[i].Container()]; ok { + v.logic++ + cnrIDDelta[addrs[i].Container()] = v + } else { + cnrIDDelta[addrs[i].Container()] = ObjectCounters{ + logic: 1, + } + } + + availableDeletedTotal++ availableSizes[i] = size } } - if rawDeleted > 0 { - err := db.updateCounter(tx, phy, rawDeleted, false) + if rawDeletedTotal > 0 { + err := db.updateShardObjectCounter(tx, phy, rawDeletedTotal, false) if err != nil { return 0, 0, fmt.Errorf("could not decrease phy object counter: %w", err) } } - if availableDeleted > 0 { - err := db.updateCounter(tx, logical, availableDeleted, false) + if availableDeletedTotal > 0 { + err := db.updateShardObjectCounter(tx, logical, availableDeletedTotal, false) if err != nil { return 0, 0, fmt.Errorf("could not decrease logical object counter: %w", err) } } + if err := db.updateContainerCounter(tx, cnrIDDelta, false); err != nil { + return 0, 0, fmt.Errorf("could not decrease container object counter: %w", err) + } + for _, refNum := range refCounter { if refNum.cur == refNum.all { err := db.deleteObject(tx, refNum.obj, true) if err != nil { - return rawDeleted, availableDeleted, err // maybe log and continue? + return rawDeletedTotal, availableDeletedTotal, err // maybe log and continue? } } } - return rawDeleted, availableDeleted, nil + return rawDeletedTotal, availableDeletedTotal, nil } // delete removes object indexes from the metabase. Counts the references diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index fe8b8873e..e06e57109 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -73,6 +73,7 @@ func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64) { Size: deletedSize, CID: containerID, }) + i.availableImhumed++ } // SetAddresses sets a list of object addresses that should be inhumed. @@ -224,7 +225,27 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes } } - return db.updateCounter(tx, logical, res.availableImhumed, false) + return db.applyInhumeResToCounters(tx, res) +} + +func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { + var inhumedCount uint64 + inhumedbyCnr := make(map[cid.ID]ObjectCounters) + for _, dd := range res.deletionDetails { + if v, ok := inhumedbyCnr[dd.CID]; ok { + v.logic++ + inhumedbyCnr[dd.CID] = v + } else { + inhumedbyCnr[dd.CID] = ObjectCounters{logic: 1} + } + inhumedCount++ + } + + if err := db.updateShardObjectCounter(tx, logical, inhumedCount, false); err != nil { + return err + } + + return db.updateContainerCounter(tx, inhumedbyCnr, false) } // getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket. @@ -279,7 +300,6 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error { containerID, _ := obj.ContainerID() if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 { - res.availableImhumed++ res.storeDeletionInfo(containerID, obj.PayloadSize()) } diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 875388e58..2b5c5421d 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -183,16 +183,8 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o } if !isParent { - err = db.updateCounter(tx, phy, 1, true) - if err != nil { - return fmt.Errorf("could not increase phy object counter: %w", err) - } - - // it is expected that putting an unavailable object is - // impossible and should be handled on the higher levels - err = db.updateCounter(tx, logical, 1, true) - if err != nil { - return fmt.Errorf("could not increase logical object counter: %w", err) + if err = db.incCounters(tx, cnr); err != nil { + return err } } diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index c9d9bb947..c04aff61c 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -19,9 +19,10 @@ var ( graveyardBucketName = []byte{graveyardPrefix} // garbageBucketName stores rows with the objects that should be physically // deleted by the node (Garbage Collector routine). - garbageBucketName = []byte{garbagePrefix} - toMoveItBucketName = []byte{toMoveItPrefix} - containerVolumeBucketName = []byte{containerVolumePrefix} + garbageBucketName = []byte{garbagePrefix} + toMoveItBucketName = []byte{toMoveItPrefix} + containerVolumeBucketName = []byte{containerVolumePrefix} + containerCounterBucketName = []byte{containerCountersPrefix} zeroValue = []byte{0xFF} ) @@ -111,6 +112,11 @@ const ( // Key: split ID // Value: list of object IDs splitPrefix + + // containerCountersPrefix is used for storing container object counters. + // Key: container ID + type + // Value: container size in bytes as little-endian uint64 + containerCountersPrefix ) const (