[#1658] meta: Add logic counter

- Meta now supports (and requires) inc/dec labeled counters
- The new logic counter is incremented on `Put` operations and is
decremented on `Inhume` and `Delete` operations that are performed on
_stored_ objects only
- Allow force counters sync. "Force" mode should be used on metabase resync
and should not be used on a regular meta start

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-09-09 14:33:38 +03:00 committed by LeL
parent d872862710
commit ad47e2a985
7 changed files with 437 additions and 140 deletions

View file

@ -32,6 +32,7 @@ This file describes changes between the metabase versions.
- `id` -> shard id as bytes
- `version` -> metabase version as little-endian uint64
- `phy_counter` -> shard's physical object counter as little-endian uint64
- `logic_counter` -> shard's logical object counter as little-endian uint64
### Unique index buckets
- Buckets containing objects of REGULAR type

View file

@ -83,6 +83,8 @@ func (db *DB) init(reset bool) error {
string(shardInfoBucket): {},
}
epoch := db.epochState.CurrentEpoch()
return db.boltDB.Update(func(tx *bbolt.Tx) error {
var err error
if !reset {
@ -106,7 +108,7 @@ func (db *DB) init(reset bool) error {
}
if !reset {
err = syncCounter(tx, false)
err = syncCounter(tx, epoch, false)
if err != nil {
return fmt.Errorf("could not sync object counter: %w", err)
}
@ -130,8 +132,10 @@ func (db *DB) init(reset bool) error {
// SyncCounters forces to synchronize the object counters.
func (db *DB) SyncCounters() error {
epoch := db.epochState.CurrentEpoch()
return db.boltDB.Update(func(tx *bbolt.Tx) error {
return syncCounter(tx, true)
return syncCounter(tx, epoch, true)
})
}

View file

@ -9,20 +9,51 @@ import (
"go.etcd.io/bbolt"
)
var objectCounterKey = []byte("phy_counter")
var objectPhyCounterKey = []byte("phy_counter")
var objectLogicCounterKey = []byte("logic_counter")
// ObjectCounter returns object count that metabase has
type objectType uint8
const (
_ objectType = iota
phy
logical
)
// ObjectCounters groups object counter
// according to metabase state.
type ObjectCounters struct {
logic uint64
phy uint64
}
// Logic returns logical object counter.
func (o ObjectCounters) Logic() uint64 {
return o.logic
}
// Phy returns physical object counter.
func (o ObjectCounters) Phy() uint64 {
return o.phy
}
// ObjectCounters returns object counters that metabase has
// tracked since it was opened and initialized.
//
// Returns only the errors that do not allow reading counter
// in Bolt database.
func (db *DB) ObjectCounter() (counter uint64, err error) {
func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
err = db.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(shardInfoBucket)
if b != nil {
data := b.Get(objectCounterKey)
data := b.Get(objectPhyCounterKey)
if len(data) == 8 {
counter = binary.LittleEndian.Uint64(data)
cc.phy = binary.LittleEndian.Uint64(data)
}
data = b.Get(objectLogicCounterKey)
if len(data) == 8 {
cc.logic = binary.LittleEndian.Uint64(data)
}
}
@ -34,15 +65,25 @@ func (db *DB) ObjectCounter() (counter uint64, err error) {
// updateCounter updates the object counter. Tx MUST be writable.
// If inc == `true`, increases the counter, decreases otherwise.
func (db *DB) updateCounter(tx *bbolt.Tx, delta uint64, inc bool) error {
func (db *DB) updateCounter(tx *bbolt.Tx, typ objectType, delta uint64, inc bool) error {
b := tx.Bucket(shardInfoBucket)
if b == nil {
return nil
}
var counter uint64
var counterKey []byte
data := b.Get(objectCounterKey)
switch typ {
case phy:
counterKey = objectPhyCounterKey
case logical:
counterKey = objectLogicCounterKey
default:
panic("unknown object type counter")
}
data := b.Get(counterKey)
if len(data) == 8 {
counter = binary.LittleEndian.Uint64(data)
}
@ -58,41 +99,60 @@ func (db *DB) updateCounter(tx *bbolt.Tx, delta uint64, inc bool) error {
newCounter := make([]byte, 8)
binary.LittleEndian.PutUint64(newCounter, counter)
return b.Put(objectCounterKey, newCounter)
return b.Put(counterKey, newCounter)
}
// syncCounter updates object counter according to metabase state:
// it counts all the physically stored objects using internal indexes.
// Tx MUST be writable.
// syncCounter updates object counters according to metabase state:
// it counts all the physically/logically stored objects using internal
// indexes. Tx MUST be writable.
//
// Does nothing if counter not empty.
func syncCounter(tx *bbolt.Tx, force bool) error {
var counter uint64
// Does nothing if counters are not empty and force is false. If force is
// true, updates the counters anyway.
func syncCounter(tx *bbolt.Tx, epoch uint64, force bool) error {
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
if err != nil {
return fmt.Errorf("could not get shard info bucket: %w", err)
}
data := b.Get(objectCounterKey)
if len(data) == 8 && !force {
if !force && len(b.Get(objectPhyCounterKey)) == 8 && len(b.Get(objectLogicCounterKey)) == 8 {
// the counters are already inited
return nil
}
err = iteratePhyObjects(tx, func(_ cid.ID, _ oid.ID) error {
counter++
var addr oid.Address
var phyCounter uint64
var logicCounter uint64
err = iteratePhyObjects(tx, func(cnr cid.ID, obj oid.ID) error {
phyCounter++
addr.SetContainer(cnr)
addr.SetObject(obj)
if st := objectStatus(tx, addr, epoch); st == 0 || st == 3 {
logicCounter++
}
return nil
})
if err != nil {
return fmt.Errorf("count not iterate objects: %w", err)
return fmt.Errorf("could not iterate objects: %w", err)
}
data := make([]byte, 8)
binary.LittleEndian.PutUint64(data, phyCounter)
err = b.Put(objectPhyCounterKey, data)
if err != nil {
return fmt.Errorf("could not update phy object counter: %w", err)
}
data = make([]byte, 8)
binary.LittleEndian.PutUint64(data, counter)
binary.LittleEndian.PutUint64(data, logicCounter)
err = b.Put(objectCounterKey, data)
err = b.Put(objectLogicCounterKey, data)
if err != nil {
return fmt.Errorf("could not update object counter: %w", err)
return fmt.Errorf("could not update logic object counter: %w", err)
}
return nil

View file

@ -6,25 +6,27 @@ import (
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
const objCount = 10
func TestCounter_Default(t *testing.T) {
func TestCounters(t *testing.T) {
db := newDB(t)
c, err := db.ObjectCounter()
require.NoError(t, err)
require.Zero(t, c)
}
func TestCounter(t *testing.T) {
db := newDB(t)
var c uint64
var c meta.ObjectCounters
var err error
t.Run("defaults", func(t *testing.T) {
c, err = db.ObjectCounters()
require.NoError(t, err)
require.Zero(t, c.Phy())
require.Zero(t, c.Logic())
})
t.Run("put", func(t *testing.T) {
oo := make([]*object.Object, 0, objCount)
for i := 0; i < objCount; i++ {
oo = append(oo, generateObject(t))
@ -38,40 +40,69 @@ func TestCounter(t *testing.T) {
_, err = db.Put(prm)
require.NoError(t, err)
c, err = db.ObjectCounter()
c, err = db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(i+1), c)
}
require.Equal(t, uint64(i+1), c.Phy())
require.Equal(t, uint64(i+1), c.Logic())
}
})
func TestCounter_Dec(t *testing.T) {
db := newDB(t)
require.NoError(t, db.Reset())
t.Run("delete", func(t *testing.T) {
oo := putObjs(t, db, objCount, false)
var err error
var c uint64
var prm meta.DeletePrm
for i := objCount - 1; i >= 0; i-- {
prm.SetAddresses(objectcore.AddressOf(oo[i]))
_, err = db.Delete(prm)
res, err := db.Delete(prm)
require.NoError(t, err)
require.Equal(t, uint64(1), res.AvailableObjectsRemoved())
c, err = db.ObjectCounters()
require.NoError(t, err)
c, err = db.ObjectCounter()
require.Equal(t, uint64(i), c.Phy())
require.Equal(t, uint64(i), c.Logic())
}
})
require.NoError(t, db.Reset())
t.Run("inhume", func(t *testing.T) {
oo := putObjs(t, db, objCount, false)
inhumedObjs := make([]oid.Address, objCount/2)
for i, o := range oo {
if i == len(inhumedObjs) {
break
}
inhumedObjs[i] = objectcore.AddressOf(o)
}
var prm meta.InhumePrm
prm.SetTombstoneAddress(oidtest.Address())
prm.SetAddresses(inhumedObjs...)
res, err := db.Inhume(prm)
require.NoError(t, err)
require.Equal(t, uint64(len(inhumedObjs)), res.AvailableInhumed())
c, err = db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(i), c)
}
}
require.Equal(t, uint64(objCount), c.Phy())
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
})
func TestCounter_PutSplit(t *testing.T) {
db := newDB(t)
require.NoError(t, db.Reset())
t.Run("put_split", func(t *testing.T) {
parObj := generateObject(t)
var err error
var c uint64
// put objects and check that parent info
// does not affect the counter
@ -83,14 +114,16 @@ func TestCounter_PutSplit(t *testing.T) {
require.NoError(t, putBig(db, o))
c, err = db.ObjectCounter()
c, err = db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(i+1), c)
}
require.Equal(t, uint64(i+1), c.Phy())
require.Equal(t, uint64(i+1), c.Logic())
}
})
func TestCounter_DeleteSplit(t *testing.T) {
db := newDB(t)
require.NoError(t, db.Reset())
t.Run("delete_split", func(t *testing.T) {
oo := putObjs(t, db, objCount, true)
// delete objects that have parent info
@ -99,10 +132,140 @@ func TestCounter_DeleteSplit(t *testing.T) {
for i, o := range oo {
require.NoError(t, metaDelete(db, objectcore.AddressOf(o)))
c, err := db.ObjectCounter()
c, err := db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(objCount-i-1), c)
require.Equal(t, uint64(objCount-i-1), c.Phy())
require.Equal(t, uint64(objCount-i-1), c.Logic())
}
})
require.NoError(t, db.Reset())
t.Run("inhume_split", func(t *testing.T) {
oo := putObjs(t, db, objCount, true)
inhumedObjs := make([]oid.Address, objCount/2)
for i, o := range oo {
if i == len(inhumedObjs) {
break
}
inhumedObjs[i] = objectcore.AddressOf(o)
}
var prm meta.InhumePrm
prm.SetTombstoneAddress(oidtest.Address())
prm.SetAddresses(inhumedObjs...)
_, err = db.Inhume(prm)
require.NoError(t, err)
c, err = db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(objCount), c.Phy())
require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic())
})
}
func TestCounters_Expired(t *testing.T) {
// That test is about expired objects without
// GCMark yet. Such objects should be treated as
// logically available: decrementing logic counter
// should be done explicitly and only in `Delete`
// and `Inhume` operations, otherwise, it would be
// impossible to maintain logic counter.
const epoch = 123
es := &epochState{epoch}
db := newDB(t, meta.WithEpochState(es))
oo := make([]oid.Address, objCount)
for i := range oo {
oo[i] = putWithExpiration(t, db, object.TypeRegular, epoch+1)
}
// 1. objects are available and counters are correct
c, err := db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(objCount), c.Phy())
require.Equal(t, uint64(objCount), c.Logic())
for _, o := range oo {
_, err := metaGet(db, o, true)
require.NoError(t, err)
}
// 2. objects are expired, not available but logic counter
// is the same
es.e = epoch + 2
c, err = db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(objCount), c.Phy())
require.Equal(t, uint64(objCount), c.Logic())
for _, o := range oo {
_, err := metaGet(db, o, true)
require.ErrorIs(t, err, objectcore.ErrObjectIsExpired)
}
// 3. inhuming an expired object with GCMark (like it would
// the GC do) should decrease the logic counter despite the
// expiration fact
var inhumePrm meta.InhumePrm
inhumePrm.SetGCMark()
inhumePrm.SetAddresses(oo[0])
inhumeRes, err := db.Inhume(inhumePrm)
require.NoError(t, err)
require.Equal(t, uint64(1), inhumeRes.AvailableInhumed())
c, err = db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(len(oo)), c.Phy())
require.Equal(t, uint64(len(oo)-1), c.Logic())
// 4. `Delete` an object with GCMark should decrease the
// phy counter but does not affect the logic counter (after
// that step they should be equal)
var deletePrm meta.DeletePrm
deletePrm.SetAddresses(oo[0])
deleteRes, err := db.Delete(deletePrm)
require.NoError(t, err)
require.Zero(t, deleteRes.AvailableObjectsRemoved())
oo = oo[1:]
c, err = db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(len(oo)), c.Phy())
require.Equal(t, uint64(len(oo)), c.Logic())
// 5 `Delete` an expired object (like it would the control
// service do) should decrease both counters despite the
// expiration fact
deletePrm.SetAddresses(oo[0])
deleteRes, err = db.Delete(deletePrm)
require.NoError(t, err)
require.Equal(t, uint64(1), deleteRes.AvailableObjectsRemoved())
oo = oo[1:]
c, err = db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(len(oo)), c.Phy())
require.Equal(t, uint64(len(oo)), c.Logic())
}
func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*object.Object {
@ -123,10 +286,11 @@ func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*object.Ob
_, err = db.Put(prm)
require.NoError(t, err)
c, err := db.ObjectCounter()
c, err := db.ObjectCounters()
require.NoError(t, err)
require.Equal(t, uint64(i+1), c)
require.Equal(t, uint64(i+1), c.Phy())
require.Equal(t, uint64(i+1), c.Logic())
}
return oo

View file

@ -20,12 +20,19 @@ type DeletePrm struct {
// DeleteRes groups the resulting values of Delete operation.
type DeleteRes struct {
removed uint64
rawRemoved uint64
availableRemoved uint64
}
// RemovedObjects returns number of removed raw objects.
func (d DeleteRes) RemovedObjects() uint64 {
return d.removed
// AvailableObjectsRemoved returns the number of removed available
// objects.
func (d DeleteRes) AvailableObjectsRemoved() uint64 {
return d.availableRemoved
}
// RawObjectsRemoved returns the number of removed raw objects.
func (d DeleteRes) RawObjectsRemoved() uint64 {
return d.rawRemoved
}
// SetAddresses is a Delete option to set the addresses of the objects to delete.
@ -51,10 +58,11 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
defer db.modeMtx.RUnlock()
var rawRemoved uint64
var availableRemoved uint64
var err error
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
rawRemoved, err = db.deleteGroup(tx, prm.addrs)
rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs)
return err
})
if err == nil {
@ -64,49 +72,84 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
storagelog.OpField("metabase DELETE"))
}
}
return DeleteRes{removed: rawRemoved}, err
return DeleteRes{
rawRemoved: rawRemoved,
availableRemoved: availableRemoved,
}, err
}
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, error) {
// deleteGroup deletes object from the metabase. Handles removal of the
// references of the split objects.
// The first return value is a physical objects removed number: physical
// objects that were stored. The second return value is a logical objects
// removed number: objects that were available (without Tombstones, GCMarks
// non-expired, etc.)
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, error) {
refCounter := make(referenceCounter, len(addrs))
currEpoch := db.epochState.CurrentEpoch()
var rawDeleted uint64
var availableDeleted uint64
for i := range addrs {
removed, err := db.delete(tx, addrs[i], refCounter, currEpoch)
removed, available, err := db.delete(tx, addrs[i], refCounter, currEpoch)
if err != nil {
return 0, err // maybe log and continue?
return 0, 0, err // maybe log and continue?
}
if removed {
rawDeleted++
}
if available {
availableDeleted++
}
}
err := db.updateCounter(tx, rawDeleted, false)
if rawDeleted > 0 {
err := db.updateCounter(tx, phy, rawDeleted, false)
if err != nil {
return 0, fmt.Errorf("could not decrease object counter: %w", err)
return 0, 0, fmt.Errorf("could not decrease phy object counter: %w", err)
}
}
if availableDeleted > 0 {
err := db.updateCounter(tx, logical, availableDeleted, false)
if err != nil {
return 0, 0, fmt.Errorf("could not decrease logical object counter: %w", err)
}
}
for _, refNum := range refCounter {
if refNum.cur == refNum.all {
err := db.deleteObject(tx, refNum.obj, true)
if err != nil {
return rawDeleted, err // maybe log and continue?
return rawDeleted, availableDeleted, err // maybe log and continue?
}
}
}
return rawDeleted, nil
return rawDeleted, availableDeleted, nil
}
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, error) {
// remove record from the garbage bucket
// delete removes object indexes from the metabase. Counts the references
// of the object that is being removed.
// The first return value indicates if an object has been removed. (removing a
// non-exist object is error-free). The second return value indicates if an
// object was available before the removal (for calculating the logical object
// counter).
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, error) {
addrKey := addressKey(addr)
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0
// remove record from the garbage bucket
if garbageBKT != nil {
err := garbageBKT.Delete(addressKey(addr))
err := garbageBKT.Delete(addrKey)
if err != nil {
return false, fmt.Errorf("could not remove from garbage bucket: %w", err)
return false, false, fmt.Errorf("could not remove from garbage bucket: %w", err)
}
}
@ -114,10 +157,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
obj, err := db.get(tx, addr, false, true, currEpoch)
if err != nil {
if errors.As(err, new(apistatus.ObjectNotFound)) {
return false, nil
return false, false, nil
}
return false, err
return false, false, err
}
// if object is an only link to a parent, then remove parent
@ -142,10 +185,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
// remove object
err = db.deleteObject(tx, obj, false)
if err != nil {
return false, fmt.Errorf("could not remove object: %w", err)
return false, false, fmt.Errorf("could not remove object: %w", err)
}
return true, nil
return true, removeAvailableObject, nil
}
func (db *DB) deleteObject(

View file

@ -25,6 +25,13 @@ type InhumePrm struct {
// InhumeRes encapsulates results of Inhume operation.
type InhumeRes struct {
deletedLockObj []oid.Address
availableImhumed uint64
}
// AvailableInhumed return number of available object
// that have been inhumed.
func (i InhumeRes) AvailableInhumed() uint64 {
return i.availableImhumed
}
// DeletedLockObjects returns deleted object of LOCK
@ -86,9 +93,11 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
defer db.modeMtx.RUnlock()
currEpoch := db.epochState.CurrentEpoch()
var inhumed uint64
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
var (
// target bucket of the operation, one of the:
@ -103,7 +112,7 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
)
if prm.tomb != nil {
bkt = tx.Bucket(graveyardBucketName)
bkt = graveyardBKT
tombKey := addressKey(*prm.tomb)
// it is forbidden to have a tomb-on-tomb in NeoFS,
@ -144,18 +153,25 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
lockWasChecked = true
}
obj, err := db.get(tx, prm.target[i], false, true, currEpoch)
targetKey := addressKey(prm.target[i])
// if object is stored and it is regular object then update bucket
obj, err := db.get(tx, prm.target[i], false, true, currEpoch)
if err == nil {
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
// object is available, decrement the
// logical counter
inhumed++
}
// if object is stored, and it is regular object then update bucket
// with container size estimations
if err == nil && obj.Type() == object.TypeRegular {
if obj.Type() == object.TypeRegular {
err := changeContainerSize(tx, cnr, obj.PayloadSize(), false)
if err != nil {
return err
}
}
targetKey := addressKey(prm.target[i])
}
if prm.tomb != nil {
targetIsTomb := false
@ -212,8 +228,10 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
}
}
return nil
return db.updateCounter(tx, logical, inhumed, false)
})
res.availableImhumed = inhumed
return
}

View file

@ -143,9 +143,16 @@ func (db *DB) put(
}
if !isParent {
err = db.updateCounter(tx, 1, true)
err = db.updateCounter(tx, phy, 1, true)
if err != nil {
return fmt.Errorf("could not increase object counter: %w", err)
return fmt.Errorf("could not increase phy object counter: %w", err)
}
// it is expected that putting an unavailable object is
// impossible and should be handled on the higher levels
err = db.updateCounter(tx, logical, 1, true)
if err != nil {
return fmt.Errorf("could not increase logical object counter: %w", err)
}
}