[#337] metabase: Keep container size estimation
Storage nodes keep container size estimation so they can announce this info and hope for some basic income settlements. This is also useful for monitoring. Container size does not include non regular or inhumed object sizes. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
998a097767
commit
41578001e4
6 changed files with 167 additions and 5 deletions
|
@ -1,6 +1,7 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
@ -59,6 +60,19 @@ func isListBucket(name []byte) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func cleanUpUniqueBucket(tx *bbolt.Tx, name []byte, b *bbolt.Bucket) {
|
func cleanUpUniqueBucket(tx *bbolt.Tx, name []byte, b *bbolt.Bucket) {
|
||||||
|
switch { // clean well-known global metabase buckets
|
||||||
|
case bytes.Equal(name, containerVolumeBucketName):
|
||||||
|
_ = b.ForEach(func(k, v []byte) error {
|
||||||
|
if parseContainerSize(v) == 0 {
|
||||||
|
_ = b.Delete(k)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
default:
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
if b.Stats().KeyN == 0 {
|
if b.Stats().KeyN == 0 {
|
||||||
_ = tx.DeleteBucket(name) // ignore error, best effort there
|
_ = tx.DeleteBucket(name) // ignore error, best effort there
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/binary"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||||
|
@ -36,6 +37,27 @@ func (db *DB) containers(tx *bbolt.Tx) ([]*container.ID, error) {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) ContainerSize(id *container.ID) (size uint64, err error) {
|
||||||
|
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
size, err = db.containerSize(tx, id)
|
||||||
|
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
|
return size, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) containerSize(tx *bbolt.Tx, id *container.ID) (uint64, error) {
|
||||||
|
containerVolume, err := tx.CreateBucketIfNotExists(containerVolumeBucketName)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
key := id.ToV2().GetValue()
|
||||||
|
|
||||||
|
return parseContainerSize(containerVolume.Get(key)), nil
|
||||||
|
}
|
||||||
|
|
||||||
func parseContainerID(name []byte) (*container.ID, error) {
|
func parseContainerID(name []byte) (*container.ID, error) {
|
||||||
strName := string(name)
|
strName := string(name)
|
||||||
|
|
||||||
|
@ -47,3 +69,34 @@ func parseContainerID(name []byte) (*container.ID, error) {
|
||||||
|
|
||||||
return id, id.Parse(strName)
|
return id, id.Parse(strName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parseContainerSize(v []byte) uint64 {
|
||||||
|
if len(v) == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return binary.LittleEndian.Uint64(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func changeContainerSize(tx *bbolt.Tx, id *container.ID, delta uint64, increase bool) error {
|
||||||
|
containerVolume, err := tx.CreateBucketIfNotExists(containerVolumeBucketName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
key := id.ToV2().GetValue()
|
||||||
|
size := parseContainerSize(containerVolume.Get(key))
|
||||||
|
|
||||||
|
if increase {
|
||||||
|
size += delta
|
||||||
|
} else if size > delta {
|
||||||
|
size -= delta
|
||||||
|
} else {
|
||||||
|
size = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, 8) // consider using sync.Pool to decrease allocations
|
||||||
|
binary.LittleEndian.PutUint64(buf, size)
|
||||||
|
|
||||||
|
return containerVolume.Put(key, buf)
|
||||||
|
}
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
package meta_test
|
package meta_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -67,3 +70,65 @@ func TestDB_Containers(t *testing.T) {
|
||||||
require.Contains(t, cnrs, obj.ContainerID())
|
require.Contains(t, cnrs, obj.ContainerID())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDB_ContainerSize(t *testing.T) {
|
||||||
|
db := newDB(t)
|
||||||
|
defer releaseDB(db)
|
||||||
|
|
||||||
|
const (
|
||||||
|
C = 3
|
||||||
|
N = 5
|
||||||
|
)
|
||||||
|
|
||||||
|
cids := make(map[*container.ID]int, C)
|
||||||
|
objs := make(map[*container.ID][]*object.RawObject, C*N)
|
||||||
|
|
||||||
|
for i := 0; i < C; i++ {
|
||||||
|
cid := testCID()
|
||||||
|
cids[cid] = 0
|
||||||
|
|
||||||
|
for j := 0; j < N; j++ {
|
||||||
|
size := rand.Intn(1024)
|
||||||
|
|
||||||
|
parent := generateRawObjectWithCID(t, cid)
|
||||||
|
parent.SetPayloadSize(uint64(size / 2))
|
||||||
|
|
||||||
|
obj := generateRawObjectWithCID(t, cid)
|
||||||
|
obj.SetPayloadSize(uint64(size))
|
||||||
|
obj.SetParentID(parent.ID())
|
||||||
|
obj.SetParent(parent.Object().SDK())
|
||||||
|
|
||||||
|
cids[cid] += size
|
||||||
|
objs[cid] = append(objs[cid], obj)
|
||||||
|
|
||||||
|
err := putBig(db, obj.Object())
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for cid, volume := range cids {
|
||||||
|
n, err := db.ContainerSize(cid)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, volume, int(n))
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("Inhume", func(t *testing.T) {
|
||||||
|
for cid, list := range objs {
|
||||||
|
volume := cids[cid]
|
||||||
|
|
||||||
|
for _, obj := range list {
|
||||||
|
require.NoError(t, meta.Inhume(
|
||||||
|
db,
|
||||||
|
obj.Object().Address(),
|
||||||
|
generateAddress(),
|
||||||
|
))
|
||||||
|
|
||||||
|
volume -= int(obj.PayloadSize())
|
||||||
|
|
||||||
|
n, err := db.ContainerSize(cid)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, volume, int(n))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -49,6 +49,22 @@ func (db *DB) Inhume(prm *InhumePrm) (res *InhumeRes, err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
obj, err := db.get(tx, prm.target, false, true)
|
||||||
|
|
||||||
|
// if object is stored and it is regular object then update bucket
|
||||||
|
// with container size estimations
|
||||||
|
if err == nil && obj.Type() == objectSDK.TypeRegular {
|
||||||
|
err := changeContainerSize(
|
||||||
|
tx,
|
||||||
|
obj.ContainerID(),
|
||||||
|
obj.PayloadSize(),
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// consider checking if target is already in graveyard?
|
// consider checking if target is already in graveyard?
|
||||||
return graveyard.Put(addressKey(prm.target), addressKey(prm.tomb))
|
return graveyard.Put(addressKey(prm.target), addressKey(prm.tomb))
|
||||||
})
|
})
|
||||||
|
|
|
@ -124,7 +124,7 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, si *obje
|
||||||
|
|
||||||
// put unique indexes
|
// put unique indexes
|
||||||
for i := range uniqueIndexes {
|
for i := range uniqueIndexes {
|
||||||
err := putUniqueIndexItem(tx, uniqueIndexes[i])
|
err = putUniqueIndexItem(tx, uniqueIndexes[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -138,7 +138,7 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, si *obje
|
||||||
|
|
||||||
// put list indexes
|
// put list indexes
|
||||||
for i := range listIndexes {
|
for i := range listIndexes {
|
||||||
err := putListIndexItem(tx, listIndexes[i])
|
err = putListIndexItem(tx, listIndexes[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -152,7 +152,20 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, si *obje
|
||||||
|
|
||||||
// put fake bucket tree indexes
|
// put fake bucket tree indexes
|
||||||
for i := range fkbtIndexes {
|
for i := range fkbtIndexes {
|
||||||
err := putFKBTIndexItem(tx, fkbtIndexes[i])
|
err = putFKBTIndexItem(tx, fkbtIndexes[i])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// update container volume size estimation
|
||||||
|
if obj.Type() == objectSDK.TypeRegular && !isParent {
|
||||||
|
err = changeContainerSize(
|
||||||
|
tx,
|
||||||
|
obj.ContainerID(),
|
||||||
|
obj.PayloadSize(),
|
||||||
|
true,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,8 +17,9 @@ bytes. Check it later.
|
||||||
const invalidBase58String = "_"
|
const invalidBase58String = "_"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
graveyardBucketName = []byte(invalidBase58String + "Graveyard")
|
graveyardBucketName = []byte(invalidBase58String + "Graveyard")
|
||||||
toMoveItBucketName = []byte(invalidBase58String + "ToMoveIt")
|
toMoveItBucketName = []byte(invalidBase58String + "ToMoveIt")
|
||||||
|
containerVolumeBucketName = []byte(invalidBase58String + "ContainerSize")
|
||||||
|
|
||||||
zeroValue = []byte{0xFF}
|
zeroValue = []byte{0xFF}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue