From ad47e2a985dfa48b43d5dac95a5122bf26107564 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 9 Sep 2022 14:33:38 +0300 Subject: [PATCH] [#1658] meta: Add logic counter - Meta now supports (and requires) inc/dec labeled counters - The new logic counter is incremented on `Put` operations and is decremented on `Inhume` and `Delete` operations that are performed on _stored_ objects only - Allow force counters sync. "Force" mode should be used on metabase resync and should not be used on a regular meta start Signed-off-by: Pavel Karpy --- pkg/local_object_storage/metabase/VERSION.md | 1 + pkg/local_object_storage/metabase/control.go | 8 +- pkg/local_object_storage/metabase/counter.go | 106 ++++-- .../metabase/counter_test.go | 322 +++++++++++++----- pkg/local_object_storage/metabase/delete.go | 87 +++-- pkg/local_object_storage/metabase/inhume.go | 42 ++- pkg/local_object_storage/metabase/put.go | 11 +- 7 files changed, 437 insertions(+), 140 deletions(-) diff --git a/pkg/local_object_storage/metabase/VERSION.md b/pkg/local_object_storage/metabase/VERSION.md index 6413452d..0affc36c 100644 --- a/pkg/local_object_storage/metabase/VERSION.md +++ b/pkg/local_object_storage/metabase/VERSION.md @@ -32,6 +32,7 @@ This file describes changes between the metabase versions. - `id` -> shard id as bytes - `version` -> metabase version as little-endian uint64 - `phy_counter` -> shard's physical object counter as little-endian uint64 + - `logic_counter` -> shard's logical object counter as little-endian uint64 ### Unique index buckets - Buckets containing objects of REGULAR type diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index 25cd5ae6..6ae69586 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -83,6 +83,8 @@ func (db *DB) init(reset bool) error { string(shardInfoBucket): {}, } + epoch := db.epochState.CurrentEpoch() + return db.boltDB.Update(func(tx *bbolt.Tx) error { var err error if !reset { @@ -106,7 +108,7 @@ func (db *DB) init(reset bool) error { } if !reset { - err = syncCounter(tx, false) + err = syncCounter(tx, epoch, false) if err != nil { return fmt.Errorf("could not sync object counter: %w", err) } @@ -130,8 +132,10 @@ func (db *DB) init(reset bool) error { // SyncCounters forces to synchronize the object counters. func (db *DB) SyncCounters() error { + epoch := db.epochState.CurrentEpoch() + return db.boltDB.Update(func(tx *bbolt.Tx) error { - return syncCounter(tx, true) + return syncCounter(tx, epoch, true) }) } diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index dcf13f49..dbb72380 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -9,20 +9,51 @@ import ( "go.etcd.io/bbolt" ) -var objectCounterKey = []byte("phy_counter") +var objectPhyCounterKey = []byte("phy_counter") +var objectLogicCounterKey = []byte("logic_counter") -// ObjectCounter returns object count that metabase has +type objectType uint8 + +const ( + _ objectType = iota + phy + logical +) + +// ObjectCounters groups object counter +// according to metabase state. +type ObjectCounters struct { + logic uint64 + phy uint64 +} + +// Logic returns logical object counter. +func (o ObjectCounters) Logic() uint64 { + return o.logic +} + +// Phy returns physical object counter. +func (o ObjectCounters) Phy() uint64 { + return o.phy +} + +// ObjectCounters returns object counters that metabase has // tracked since it was opened and initialized. // // Returns only the errors that do not allow reading counter // in Bolt database. -func (db *DB) ObjectCounter() (counter uint64, err error) { +func (db *DB) ObjectCounters() (cc ObjectCounters, err error) { err = db.boltDB.View(func(tx *bbolt.Tx) error { b := tx.Bucket(shardInfoBucket) if b != nil { - data := b.Get(objectCounterKey) + data := b.Get(objectPhyCounterKey) if len(data) == 8 { - counter = binary.LittleEndian.Uint64(data) + cc.phy = binary.LittleEndian.Uint64(data) + } + + data = b.Get(objectLogicCounterKey) + if len(data) == 8 { + cc.logic = binary.LittleEndian.Uint64(data) } } @@ -34,15 +65,25 @@ func (db *DB) ObjectCounter() (counter uint64, err error) { // updateCounter updates the object counter. Tx MUST be writable. // If inc == `true`, increases the counter, decreases otherwise. -func (db *DB) updateCounter(tx *bbolt.Tx, delta uint64, inc bool) error { +func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error { b := tx.Bucket(shardInfoBucket) if b == nil { return nil } var counter uint64 + var counterKey []byte - data := b.Get(objectCounterKey) + switch typ { + case phy: + counterKey = objectPhyCounterKey + case logical: + counterKey = objectLogicCounterKey + default: + panic("unknown object type counter") + } + + data := b.Get(counterKey) if len(data) == 8 { counter = binary.LittleEndian.Uint64(data) } @@ -58,41 +99,60 @@ func (db *DB) updateCounter(tx *bbolt.Tx, delta uint64, inc bool) error { newCounter := make([]byte, 8) binary.LittleEndian.PutUint64(newCounter, counter) - return b.Put(objectCounterKey, newCounter) + return b.Put(counterKey, newCounter) } -// syncCounter updates object counter according to metabase state: -// it counts all the physically stored objects using internal indexes. -// Tx MUST be writable. +// syncCounter updates object counters according to metabase state: +// it counts all the physically/logically stored objects using internal +// indexes. Tx MUST be writable. // -// Does nothing if counter not empty. -func syncCounter(tx *bbolt.Tx, force bool) error { - var counter uint64 - +// Does nothing if counters are not empty and force is false. If force is +// true, updates the counters anyway. +func syncCounter(tx *bbolt.Tx, epoch uint64, force bool) error { b, err := tx.CreateBucketIfNotExists(shardInfoBucket) if err != nil { return fmt.Errorf("could not get shard info bucket: %w", err) } - data := b.Get(objectCounterKey) - if len(data) == 8 && !force { + if !force && len(b.Get(objectPhyCounterKey)) == 8 && len(b.Get(objectLogicCounterKey)) == 8 { + // the counters are already inited return nil } - err = iteratePhyObjects(tx, func(_ cid.ID, _ oid.ID) error { - counter++ + var addr oid.Address + var phyCounter uint64 + var logicCounter uint64 + + err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error { + phyCounter++ + + addr.SetContainer(cnr) + addr.SetObject(obj) + + if st := objectStatus(tx, addr, epoch); st == 0 || st == 3 { + logicCounter++ + } + return nil }) if err != nil { - return fmt.Errorf("count not iterate objects: %w", err) + return fmt.Errorf("could not iterate objects: %w", err) + } + + data := make([]byte, 8) + binary.LittleEndian.PutUint64(data, phyCounter) + + err = b.Put(objectPhyCounterKey, data) + if err != nil { + return fmt.Errorf("could not update phy object counter: %w", err) } data = make([]byte, 8) - binary.LittleEndian.PutUint64(data, counter) + binary.LittleEndian.PutUint64(data, logicCounter) - err = b.Put(objectCounterKey, data) + err = b.Put(objectLogicCounterKey, data) if err != nil { - return fmt.Errorf("could not update object counter: %w", err) + return fmt.Errorf("could not update logic object counter: %w", err) } return nil diff --git a/pkg/local_object_storage/metabase/counter_test.go b/pkg/local_object_storage/metabase/counter_test.go index c8cb8078..3f8547e2 100644 --- a/pkg/local_object_storage/metabase/counter_test.go +++ b/pkg/local_object_storage/metabase/counter_test.go @@ -6,103 +6,266 @@ import ( objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) const objCount = 10 -func TestCounter_Default(t *testing.T) { +func TestCounters(t *testing.T) { db := newDB(t) - c, err := db.ObjectCounter() - require.NoError(t, err) - require.Zero(t, c) -} - -func TestCounter(t *testing.T) { - db := newDB(t) - - var c uint64 + var c meta.ObjectCounters var err error - oo := make([]*object.Object, 0, objCount) - for i := 0; i < objCount; i++ { - oo = append(oo, generateObject(t)) - } - - var prm meta.PutPrm - - for i := 0; i < objCount; i++ { - prm.SetObject(oo[i]) - - _, err = db.Put(prm) + t.Run("defaults", func(t *testing.T) { + c, err = db.ObjectCounters() require.NoError(t, err) + require.Zero(t, c.Phy()) + require.Zero(t, c.Logic()) + }) - c, err = db.ObjectCounter() - require.NoError(t, err) - - require.Equal(t, uint64(i+1), c) - } -} - -func TestCounter_Dec(t *testing.T) { - db := newDB(t) - oo := putObjs(t, db, objCount, false) - - var err error - var c uint64 - - var prm meta.DeletePrm - for i := objCount - 1; i >= 0; i-- { - prm.SetAddresses(objectcore.AddressOf(oo[i])) - - _, err = db.Delete(prm) - require.NoError(t, err) - - c, err = db.ObjectCounter() - require.NoError(t, err) - - require.Equal(t, uint64(i), c) - } -} - -func TestCounter_PutSplit(t *testing.T) { - db := newDB(t) - - parObj := generateObject(t) - var err error - var c uint64 - - // put objects and check that parent info - // does not affect the counter - for i := 0; i < objCount; i++ { - o := generateObject(t) - if i < objCount/2 { // half of the objs will have the parent - o.SetParent(parObj) + t.Run("put", func(t *testing.T) { + oo := make([]*object.Object, 0, objCount) + for i := 0; i < objCount; i++ { + oo = append(oo, generateObject(t)) } - require.NoError(t, putBig(db, o)) + var prm meta.PutPrm - c, err = db.ObjectCounter() + for i := 0; i < objCount; i++ { + prm.SetObject(oo[i]) + + _, err = db.Put(prm) + require.NoError(t, err) + + c, err = db.ObjectCounters() + require.NoError(t, err) + + require.Equal(t, uint64(i+1), c.Phy()) + require.Equal(t, uint64(i+1), c.Logic()) + } + }) + + require.NoError(t, db.Reset()) + + t.Run("delete", func(t *testing.T) { + oo := putObjs(t, db, objCount, false) + + var prm meta.DeletePrm + for i := objCount - 1; i >= 0; i-- { + prm.SetAddresses(objectcore.AddressOf(oo[i])) + + res, err := db.Delete(prm) + require.NoError(t, err) + require.Equal(t, uint64(1), res.AvailableObjectsRemoved()) + + c, err = db.ObjectCounters() + require.NoError(t, err) + + require.Equal(t, uint64(i), c.Phy()) + require.Equal(t, uint64(i), c.Logic()) + } + }) + + require.NoError(t, db.Reset()) + + t.Run("inhume", func(t *testing.T) { + oo := putObjs(t, db, objCount, false) + + inhumedObjs := make([]oid.Address, objCount/2) + + for i, o := range oo { + if i == len(inhumedObjs) { + break + } + + inhumedObjs[i] = objectcore.AddressOf(o) + } + + var prm meta.InhumePrm + prm.SetTombstoneAddress(oidtest.Address()) + prm.SetAddresses(inhumedObjs...) + + res, err := db.Inhume(prm) require.NoError(t, err) - require.Equal(t, uint64(i+1), c) - } + require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed()) + + c, err = db.ObjectCounters() + require.NoError(t, err) + + require.Equal(t, uint64(objCount), c.Phy()) + require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic()) + }) + + require.NoError(t, db.Reset()) + + t.Run("put_split", func(t *testing.T) { + parObj := generateObject(t) + + // put objects and check that parent info + // does not affect the counter + for i := 0; i < objCount; i++ { + o := generateObject(t) + if i < objCount/2 { // half of the objs will have the parent + o.SetParent(parObj) + } + + require.NoError(t, putBig(db, o)) + + c, err = db.ObjectCounters() + require.NoError(t, err) + require.Equal(t, uint64(i+1), c.Phy()) + require.Equal(t, uint64(i+1), c.Logic()) + } + }) + + require.NoError(t, db.Reset()) + + t.Run("delete_split", func(t *testing.T) { + oo := putObjs(t, db, objCount, true) + + // delete objects that have parent info + // and check that it does not affect + // the counter + for i, o := range oo { + require.NoError(t, metaDelete(db, objectcore.AddressOf(o))) + + c, err := db.ObjectCounters() + require.NoError(t, err) + require.Equal(t, uint64(objCount-i-1), c.Phy()) + require.Equal(t, uint64(objCount-i-1), c.Logic()) + } + }) + + require.NoError(t, db.Reset()) + + t.Run("inhume_split", func(t *testing.T) { + oo := putObjs(t, db, objCount, true) + + inhumedObjs := make([]oid.Address, objCount/2) + + for i, o := range oo { + if i == len(inhumedObjs) { + break + } + + inhumedObjs[i] = objectcore.AddressOf(o) + } + + var prm meta.InhumePrm + prm.SetTombstoneAddress(oidtest.Address()) + prm.SetAddresses(inhumedObjs...) + + _, err = db.Inhume(prm) + require.NoError(t, err) + + c, err = db.ObjectCounters() + require.NoError(t, err) + + require.Equal(t, uint64(objCount), c.Phy()) + require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic()) + }) } -func TestCounter_DeleteSplit(t *testing.T) { - db := newDB(t) - oo := putObjs(t, db, objCount, true) +func TestCounters_Expired(t *testing.T) { + // That test is about expired objects without + // GCMark yet. Such objects should be treated as + // logically available: decrementing logic counter + // should be done explicitly and only in `Delete` + // and `Inhume` operations, otherwise, it would be + // impossible to maintain logic counter. - // delete objects that have parent info - // and check that it does not affect - // the counter - for i, o := range oo { - require.NoError(t, metaDelete(db, objectcore.AddressOf(o))) + const epoch = 123 - c, err := db.ObjectCounter() - require.NoError(t, err) - require.Equal(t, uint64(objCount-i-1), c) + es := &epochState{epoch} + db := newDB(t, meta.WithEpochState(es)) + + oo := make([]oid.Address, objCount) + for i := range oo { + oo[i] = putWithExpiration(t, db, object.TypeRegular, epoch+1) } + + // 1. objects are available and counters are correct + + c, err := db.ObjectCounters() + require.NoError(t, err) + require.Equal(t, uint64(objCount), c.Phy()) + require.Equal(t, uint64(objCount), c.Logic()) + + for _, o := range oo { + _, err := metaGet(db, o, true) + require.NoError(t, err) + } + + // 2. objects are expired, not available but logic counter + // is the same + + es.e = epoch + 2 + + c, err = db.ObjectCounters() + require.NoError(t, err) + require.Equal(t, uint64(objCount), c.Phy()) + require.Equal(t, uint64(objCount), c.Logic()) + + for _, o := range oo { + _, err := metaGet(db, o, true) + require.ErrorIs(t, err, objectcore.ErrObjectIsExpired) + } + + // 3. inhuming an expired object with GCMark (like it would + // the GC do) should decrease the logic counter despite the + // expiration fact + + var inhumePrm meta.InhumePrm + inhumePrm.SetGCMark() + inhumePrm.SetAddresses(oo[0]) + + inhumeRes, err := db.Inhume(inhumePrm) + require.NoError(t, err) + require.Equal(t, uint64(1), inhumeRes.AvailableInhumed()) + + c, err = db.ObjectCounters() + require.NoError(t, err) + + require.Equal(t, uint64(len(oo)), c.Phy()) + require.Equal(t, uint64(len(oo)-1), c.Logic()) + + // 4. `Delete` an object with GCMark should decrease the + // phy counter but does not affect the logic counter (after + // that step they should be equal) + + var deletePrm meta.DeletePrm + deletePrm.SetAddresses(oo[0]) + + deleteRes, err := db.Delete(deletePrm) + require.NoError(t, err) + require.Zero(t, deleteRes.AvailableObjectsRemoved()) + + oo = oo[1:] + + c, err = db.ObjectCounters() + require.NoError(t, err) + require.Equal(t, uint64(len(oo)), c.Phy()) + require.Equal(t, uint64(len(oo)), c.Logic()) + + // 5 `Delete` an expired object (like it would the control + // service do) should decrease both counters despite the + // expiration fact + + deletePrm.SetAddresses(oo[0]) + + deleteRes, err = db.Delete(deletePrm) + require.NoError(t, err) + require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved()) + + oo = oo[1:] + + c, err = db.ObjectCounters() + require.NoError(t, err) + require.Equal(t, uint64(len(oo)), c.Phy()) + require.Equal(t, uint64(len(oo)), c.Logic()) } func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*object.Object { @@ -123,10 +286,11 @@ func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*object.Ob _, err = db.Put(prm) require.NoError(t, err) - c, err := db.ObjectCounter() + c, err := db.ObjectCounters() require.NoError(t, err) - require.Equal(t, uint64(i+1), c) + require.Equal(t, uint64(i+1), c.Phy()) + require.Equal(t, uint64(i+1), c.Logic()) } return oo diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index f6c4d125..01398153 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -20,12 +20,19 @@ type DeletePrm struct { // DeleteRes groups the resulting values of Delete operation. type DeleteRes struct { - removed uint64 + rawRemoved uint64 + availableRemoved uint64 } -// RemovedObjects returns number of removed raw objects. -func (d DeleteRes) RemovedObjects() uint64 { - return d.removed +// AvailableObjectsRemoved returns the number of removed available +// objects. +func (d DeleteRes) AvailableObjectsRemoved() uint64 { + return d.availableRemoved +} + +// RawObjectsRemoved returns the number of removed raw objects. +func (d DeleteRes) RawObjectsRemoved() uint64 { + return d.rawRemoved } // SetAddresses is a Delete option to set the addresses of the objects to delete. @@ -51,10 +58,11 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) { defer db.modeMtx.RUnlock() var rawRemoved uint64 + var availableRemoved uint64 var err error err = db.boltDB.Update(func(tx *bbolt.Tx) error { - rawRemoved, err = db.deleteGroup(tx, prm.addrs) + rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs) return err }) if err == nil { @@ -64,49 +72,84 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) { storagelog.OpField("metabase DELETE")) } } - return DeleteRes{removed: rawRemoved}, err + return DeleteRes{ + rawRemoved: rawRemoved, + availableRemoved: availableRemoved, + }, err } -func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, error) { +// deleteGroup deletes object from the metabase. Handles removal of the +// references of the split objects. +// The first return value is a physical objects removed number: physical +// objects that were stored. The second return value is a logical objects +// removed number: objects that were available (without Tombstones, GCMarks +// non-expired, etc.) +func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, error) { refCounter := make(referenceCounter, len(addrs)) currEpoch := db.epochState.CurrentEpoch() var rawDeleted uint64 + var availableDeleted uint64 + for i := range addrs { - removed, err := db.delete(tx, addrs[i], refCounter, currEpoch) + removed, available, err := db.delete(tx, addrs[i], refCounter, currEpoch) if err != nil { - return 0, err // maybe log and continue? + return 0, 0, err // maybe log and continue? } if removed { rawDeleted++ } + + if available { + availableDeleted++ + } } - err := db.updateCounter(tx, rawDeleted, false) - if err != nil { - return 0, fmt.Errorf("could not decrease object counter: %w", err) + if rawDeleted > 0 { + err := db.updateCounter(tx, phy, rawDeleted, false) + if err != nil { + return 0, 0, fmt.Errorf("could not decrease phy object counter: %w", err) + } + } + + if availableDeleted > 0 { + err := db.updateCounter(tx, logical, availableDeleted, false) + if err != nil { + return 0, 0, fmt.Errorf("could not decrease logical object counter: %w", err) + } } for _, refNum := range refCounter { if refNum.cur == refNum.all { err := db.deleteObject(tx, refNum.obj, true) if err != nil { - return rawDeleted, err // maybe log and continue? + return rawDeleted, availableDeleted, err // maybe log and continue? } } } - return rawDeleted, nil + return rawDeleted, availableDeleted, nil } -func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, error) { - // remove record from the garbage bucket +// delete removes object indexes from the metabase. Counts the references +// of the object that is being removed. +// The first return value indicates if an object has been removed. (removing a +// non-exist object is error-free). The second return value indicates if an +// object was available before the removal (for calculating the logical object +// counter). +func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, error) { + addrKey := addressKey(addr) garbageBKT := tx.Bucket(garbageBucketName) + graveyardBKT := tx.Bucket(graveyardBucketName) + + removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0 + + // remove record from the garbage bucket if garbageBKT != nil { - err := garbageBKT.Delete(addressKey(addr)) + err := garbageBKT.Delete(addrKey) if err != nil { - return false, fmt.Errorf("could not remove from garbage bucket: %w", err) + return false, false, fmt.Errorf("could not remove from garbage bucket: %w", err) } } @@ -114,10 +157,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter obj, err := db.get(tx, addr, false, true, currEpoch) if err != nil { if errors.As(err, new(apistatus.ObjectNotFound)) { - return false, nil + return false, false, nil } - return false, err + return false, false, err } // if object is an only link to a parent, then remove parent @@ -142,10 +185,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter // remove object err = db.deleteObject(tx, obj, false) if err != nil { - return false, fmt.Errorf("could not remove object: %w", err) + return false, false, fmt.Errorf("could not remove object: %w", err) } - return true, nil + return true, removeAvailableObject, nil } func (db *DB) deleteObject( diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index f03f660b..344259b6 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -24,7 +24,14 @@ type InhumePrm struct { // InhumeRes encapsulates results of Inhume operation. type InhumeRes struct { - deletedLockObj []oid.Address + deletedLockObj []oid.Address + availableImhumed uint64 +} + +// AvailableInhumed return number of available object +// that have been inhumed. +func (i InhumeRes) AvailableInhumed() uint64 { + return i.availableImhumed } // DeletedLockObjects returns deleted object of LOCK @@ -86,9 +93,11 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) { defer db.modeMtx.RUnlock() currEpoch := db.epochState.CurrentEpoch() + var inhumed uint64 err = db.boltDB.Update(func(tx *bbolt.Tx) error { garbageBKT := tx.Bucket(garbageBucketName) + graveyardBKT := tx.Bucket(graveyardBucketName) var ( // target bucket of the operation, one of the: @@ -103,7 +112,7 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) { ) if prm.tomb != nil { - bkt = tx.Bucket(graveyardBucketName) + bkt = graveyardBKT tombKey := addressKey(*prm.tomb) // it is forbidden to have a tomb-on-tomb in NeoFS, @@ -144,19 +153,26 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) { lockWasChecked = true } - obj, err := db.get(tx, prm.target[i], false, true, currEpoch) + targetKey := addressKey(prm.target[i]) - // if object is stored and it is regular object then update bucket - // with container size estimations - if err == nil && obj.Type() == object.TypeRegular { - err := changeContainerSize(tx, cnr, obj.PayloadSize(), false) - if err != nil { - return err + obj, err := db.get(tx, prm.target[i], false, true, currEpoch) + if err == nil { + if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 { + // object is available, decrement the + // logical counter + inhumed++ + } + + // if object is stored, and it is regular object then update bucket + // with container size estimations + if obj.Type() == object.TypeRegular { + err := changeContainerSize(tx, cnr, obj.PayloadSize(), false) + if err != nil { + return err + } } } - targetKey := addressKey(prm.target[i]) - if prm.tomb != nil { targetIsTomb := false @@ -212,8 +228,10 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) { } } - return nil + return db.updateCounter(tx, logical, inhumed, false) }) + res.availableImhumed = inhumed + return } diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 8abe527f..3f439e3b 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -143,9 +143,16 @@ func (db *DB) put( } if !isParent { - err = db.updateCounter(tx, 1, true) + err = db.updateCounter(tx, phy, 1, true) if err != nil { - return fmt.Errorf("could not increase object counter: %w", err) + return fmt.Errorf("could not increase phy object counter: %w", err) + } + + // it is expected that putting an unavailable object is + // impossible and should be handled on the higher levels + err = db.updateCounter(tx, logical, 1, true) + if err != nil { + return fmt.Errorf("could not increase logical object counter: %w", err) } }