[#1658] meta: Add object counter
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
61f0d85834
commit
626766db08
6 changed files with 229 additions and 7 deletions
|
@ -31,6 +31,7 @@ This file describes changes between the metabase versions.
|
|||
- Keys and values
|
||||
- `id` -> shard id as bytes
|
||||
- `version` -> metabase version as little-endian uint64
|
||||
- `counter` -> shard's object counter as little-endian uint64
|
||||
|
||||
### Unique index buckets
|
||||
- Buckets containing objects of REGULAR type
|
||||
|
@ -83,7 +84,9 @@ This file describes changes between the metabase versions.
|
|||
- Value: list of object IDs
|
||||
|
||||
|
||||
# History
|
||||
## Version 2
|
||||
|
||||
- Added shard's object counter to the info bucket
|
||||
|
||||
## Version 1
|
||||
|
||||
|
|
|
@ -80,6 +80,7 @@ func (db *DB) init(reset bool) error {
|
|||
string(graveyardBucketName): {},
|
||||
string(toMoveItBucketName): {},
|
||||
string(garbageBucketName): {},
|
||||
string(shardInfoBucket): {},
|
||||
}
|
||||
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
|
|
59
pkg/local_object_storage/metabase/counter.go
Normal file
59
pkg/local_object_storage/metabase/counter.go
Normal file
|
@ -0,0 +1,59 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var shardCounterKey = []byte("counter")
|
||||
|
||||
// ObjectCounter returns object count that metabase has
|
||||
// tracked since it was opened and initialized.
|
||||
//
|
||||
// Returns only the errors that do not allow reading counter
|
||||
// in Bolt database.
|
||||
func (db *DB) ObjectCounter() (counter uint64, err error) {
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b != nil {
|
||||
data := b.Get(shardCounterKey)
|
||||
if len(data) == 8 {
|
||||
counter = binary.LittleEndian.Uint64(data)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// updateCounter updates the object counter. Tx MUST be writable.
|
||||
// If inc == `true`, increases the counter, decreases otherwise.
|
||||
func (db *DB) updateCounter(tx *bbolt.Tx, delta uint64, inc bool) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var counter uint64
|
||||
|
||||
data := b.Get(shardCounterKey)
|
||||
if len(data) == 8 {
|
||||
counter = binary.LittleEndian.Uint64(data)
|
||||
}
|
||||
|
||||
if inc {
|
||||
counter += delta
|
||||
} else if counter <= delta {
|
||||
counter = 0
|
||||
} else {
|
||||
counter -= delta
|
||||
}
|
||||
|
||||
newCounter := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(newCounter, counter)
|
||||
|
||||
return b.Put(shardCounterKey, newCounter)
|
||||
}
|
133
pkg/local_object_storage/metabase/counter_test.go
Normal file
133
pkg/local_object_storage/metabase/counter_test.go
Normal file
|
@ -0,0 +1,133 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const objCount = 10
|
||||
|
||||
func TestCounter_Default(t *testing.T) {
|
||||
db := newDB(t)
|
||||
|
||||
c, err := db.ObjectCounter()
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, c)
|
||||
}
|
||||
|
||||
func TestCounter(t *testing.T) {
|
||||
db := newDB(t)
|
||||
|
||||
var c uint64
|
||||
var err error
|
||||
|
||||
oo := make([]*object.Object, 0, objCount)
|
||||
for i := 0; i < objCount; i++ {
|
||||
oo = append(oo, generateObject(t))
|
||||
}
|
||||
|
||||
var prm meta.PutPrm
|
||||
|
||||
for i := 0; i < objCount; i++ {
|
||||
prm.SetObject(oo[i])
|
||||
|
||||
_, err = db.Put(prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
c, err = db.ObjectCounter()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, uint64(i+1), c)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCounter_Dec(t *testing.T) {
|
||||
db := newDB(t)
|
||||
oo := putObjs(t, db, objCount, false)
|
||||
|
||||
var err error
|
||||
var c uint64
|
||||
|
||||
var prm meta.DeletePrm
|
||||
for i := objCount - 1; i >= 0; i-- {
|
||||
prm.SetAddresses(objectcore.AddressOf(oo[i]))
|
||||
|
||||
_, err = db.Delete(prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
c, err = db.ObjectCounter()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, uint64(i), c)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCounter_PutSplit(t *testing.T) {
|
||||
db := newDB(t)
|
||||
|
||||
parObj := generateObject(t)
|
||||
var err error
|
||||
var c uint64
|
||||
|
||||
// put objects and check that parent info
|
||||
// does not affect the counter
|
||||
for i := 0; i < objCount; i++ {
|
||||
o := generateObject(t)
|
||||
if i < objCount/2 { // half of the objs will have the parent
|
||||
o.SetParent(parObj)
|
||||
}
|
||||
|
||||
require.NoError(t, putBig(db, o))
|
||||
|
||||
c, err = db.ObjectCounter()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(i+1), c)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCounter_DeleteSplit(t *testing.T) {
|
||||
db := newDB(t)
|
||||
oo := putObjs(t, db, objCount, true)
|
||||
|
||||
// delete objects that have parent info
|
||||
// and check that it does not affect
|
||||
// the counter
|
||||
for i, o := range oo {
|
||||
require.NoError(t, metaDelete(db, objectcore.AddressOf(o)))
|
||||
|
||||
c, err := db.ObjectCounter()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(objCount-i-1), c)
|
||||
}
|
||||
}
|
||||
|
||||
func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*object.Object {
|
||||
var prm meta.PutPrm
|
||||
var err error
|
||||
parent := generateObject(t)
|
||||
|
||||
oo := make([]*object.Object, 0, count)
|
||||
for i := 0; i < count; i++ {
|
||||
o := generateObject(t)
|
||||
if withParent {
|
||||
o.SetParent(parent)
|
||||
}
|
||||
|
||||
oo = append(oo, o)
|
||||
|
||||
prm.SetObject(o)
|
||||
_, err = db.Put(prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
c, err := db.ObjectCounter()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, uint64(i+1), c)
|
||||
}
|
||||
|
||||
return oo
|
||||
}
|
|
@ -11,6 +11,7 @@ import (
|
|||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// DeletePrm groups the parameters of Delete operation.
|
||||
|
@ -60,11 +61,22 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) error {
|
|||
refCounter := make(referenceCounter, len(addrs))
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
var rawDeleted uint64
|
||||
for i := range addrs {
|
||||
err := db.delete(tx, addrs[i], refCounter, currEpoch)
|
||||
removed, err := db.delete(tx, addrs[i], refCounter, currEpoch)
|
||||
if err != nil {
|
||||
return err // maybe log and continue?
|
||||
}
|
||||
|
||||
if removed {
|
||||
rawDeleted++
|
||||
}
|
||||
}
|
||||
|
||||
err := db.updateCounter(tx, rawDeleted, false)
|
||||
if err != nil {
|
||||
db.log.Error("could not decrease object counter",
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
for _, refNum := range refCounter {
|
||||
|
@ -79,13 +91,13 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) error {
|
||||
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, error) {
|
||||
// remove record from the garbage bucket
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
if garbageBKT != nil {
|
||||
err := garbageBKT.Delete(addressKey(addr))
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not remove from garbage bucket: %w", err)
|
||||
return false, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -93,10 +105,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
|||
obj, err := db.get(tx, addr, false, true, currEpoch)
|
||||
if err != nil {
|
||||
if errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
// if object is an only link to a parent, then remove parent
|
||||
|
@ -119,7 +131,12 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
|||
}
|
||||
|
||||
// remove object
|
||||
return db.deleteObject(tx, obj, false)
|
||||
err = db.deleteObject(tx, obj, false)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("could not remove object: %w", err)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (db *DB) deleteObject(
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
|
@ -142,6 +143,14 @@ func (db *DB) put(
|
|||
}
|
||||
}
|
||||
|
||||
if !isParent {
|
||||
err = db.updateCounter(tx, 1, true)
|
||||
if err != nil {
|
||||
db.log.Error("could not increase object counter: %w",
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue