From 41578001e420492f70137e2ed07222c11ef5bdb3 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Fri, 22 Jan 2021 14:07:08 +0300 Subject: [PATCH] [#337] metabase: Keep container size estimation Storage nodes keep container size estimation so they can announce this info and hope for some basic income settlements. This is also useful for monitoring. Container size does not include non regular or inhumed object sizes. Signed-off-by: Alex Vanin --- pkg/local_object_storage/metabase/cleanup.go | 14 ++++ .../metabase/containers.go | 53 +++++++++++++++ .../metabase/containers_test.go | 65 +++++++++++++++++++ pkg/local_object_storage/metabase/inhume.go | 16 +++++ pkg/local_object_storage/metabase/put.go | 19 +++++- pkg/local_object_storage/metabase/util.go | 5 +- 6 files changed, 167 insertions(+), 5 deletions(-) diff --git a/pkg/local_object_storage/metabase/cleanup.go b/pkg/local_object_storage/metabase/cleanup.go index ee936ed0d..3e485c74a 100644 --- a/pkg/local_object_storage/metabase/cleanup.go +++ b/pkg/local_object_storage/metabase/cleanup.go @@ -1,6 +1,7 @@ package meta import ( + "bytes" "strings" "go.etcd.io/bbolt" @@ -59,6 +60,19 @@ func isListBucket(name []byte) bool { } func cleanUpUniqueBucket(tx *bbolt.Tx, name []byte, b *bbolt.Bucket) { + switch { // clean well-known global metabase buckets + case bytes.Equal(name, containerVolumeBucketName): + _ = b.ForEach(func(k, v []byte) error { + if parseContainerSize(v) == 0 { + _ = b.Delete(k) + } + + return nil + }) + default: + // do nothing + } + if b.Stats().KeyN == 0 { _ = tx.DeleteBucket(name) // ignore error, best effort there } diff --git a/pkg/local_object_storage/metabase/containers.go b/pkg/local_object_storage/metabase/containers.go index 66f897d57..809d1f1e5 100644 --- a/pkg/local_object_storage/metabase/containers.go +++ b/pkg/local_object_storage/metabase/containers.go @@ -1,6 +1,7 @@ package meta import ( + "encoding/binary" "strings" "github.com/nspcc-dev/neofs-api-go/pkg/container" @@ -36,6 +37,27 @@ func (db *DB) containers(tx *bbolt.Tx) ([]*container.ID, error) { return result, err } +func (db *DB) ContainerSize(id *container.ID) (size uint64, err error) { + err = db.boltDB.Update(func(tx *bbolt.Tx) error { + size, err = db.containerSize(tx, id) + + return err + }) + + return size, err +} + +func (db *DB) containerSize(tx *bbolt.Tx, id *container.ID) (uint64, error) { + containerVolume, err := tx.CreateBucketIfNotExists(containerVolumeBucketName) + if err != nil { + return 0, err + } + + key := id.ToV2().GetValue() + + return parseContainerSize(containerVolume.Get(key)), nil +} + func parseContainerID(name []byte) (*container.ID, error) { strName := string(name) @@ -47,3 +69,34 @@ func parseContainerID(name []byte) (*container.ID, error) { return id, id.Parse(strName) } + +func parseContainerSize(v []byte) uint64 { + if len(v) == 0 { + return 0 + } + + return binary.LittleEndian.Uint64(v) +} + +func changeContainerSize(tx *bbolt.Tx, id *container.ID, delta uint64, increase bool) error { + containerVolume, err := tx.CreateBucketIfNotExists(containerVolumeBucketName) + if err != nil { + return err + } + + key := id.ToV2().GetValue() + size := parseContainerSize(containerVolume.Get(key)) + + if increase { + size += delta + } else if size > delta { + size -= delta + } else { + size = 0 + } + + buf := make([]byte, 8) // consider using sync.Pool to decrease allocations + binary.LittleEndian.PutUint64(buf, size) + + return containerVolume.Put(key, buf) +} diff --git a/pkg/local_object_storage/metabase/containers_test.go b/pkg/local_object_storage/metabase/containers_test.go index f68495126..6d9c5b2a5 100644 --- a/pkg/local_object_storage/metabase/containers_test.go +++ b/pkg/local_object_storage/metabase/containers_test.go @@ -1,8 +1,11 @@ package meta_test import ( + "math/rand" "testing" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-node/pkg/core/object" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/stretchr/testify/require" ) @@ -67,3 +70,65 @@ func TestDB_Containers(t *testing.T) { require.Contains(t, cnrs, obj.ContainerID()) }) } + +func TestDB_ContainerSize(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + const ( + C = 3 + N = 5 + ) + + cids := make(map[*container.ID]int, C) + objs := make(map[*container.ID][]*object.RawObject, C*N) + + for i := 0; i < C; i++ { + cid := testCID() + cids[cid] = 0 + + for j := 0; j < N; j++ { + size := rand.Intn(1024) + + parent := generateRawObjectWithCID(t, cid) + parent.SetPayloadSize(uint64(size / 2)) + + obj := generateRawObjectWithCID(t, cid) + obj.SetPayloadSize(uint64(size)) + obj.SetParentID(parent.ID()) + obj.SetParent(parent.Object().SDK()) + + cids[cid] += size + objs[cid] = append(objs[cid], obj) + + err := putBig(db, obj.Object()) + require.NoError(t, err) + } + } + + for cid, volume := range cids { + n, err := db.ContainerSize(cid) + require.NoError(t, err) + require.Equal(t, volume, int(n)) + } + + t.Run("Inhume", func(t *testing.T) { + for cid, list := range objs { + volume := cids[cid] + + for _, obj := range list { + require.NoError(t, meta.Inhume( + db, + obj.Object().Address(), + generateAddress(), + )) + + volume -= int(obj.PayloadSize()) + + n, err := db.ContainerSize(cid) + require.NoError(t, err) + require.Equal(t, volume, int(n)) + } + } + }) +} diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index de607ea53..e65fe2c22 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -49,6 +49,22 @@ func (db *DB) Inhume(prm *InhumePrm) (res *InhumeRes, err error) { return err } + obj, err := db.get(tx, prm.target, false, true) + + // if object is stored and it is regular object then update bucket + // with container size estimations + if err == nil && obj.Type() == objectSDK.TypeRegular { + err := changeContainerSize( + tx, + obj.ContainerID(), + obj.PayloadSize(), + false, + ) + if err != nil { + return err + } + } + // consider checking if target is already in graveyard? return graveyard.Put(addressKey(prm.target), addressKey(prm.tomb)) }) diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index b7ad15207..15d660bd3 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -124,7 +124,7 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, si *obje // put unique indexes for i := range uniqueIndexes { - err := putUniqueIndexItem(tx, uniqueIndexes[i]) + err = putUniqueIndexItem(tx, uniqueIndexes[i]) if err != nil { return err } @@ -138,7 +138,7 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, si *obje // put list indexes for i := range listIndexes { - err := putListIndexItem(tx, listIndexes[i]) + err = putListIndexItem(tx, listIndexes[i]) if err != nil { return err } @@ -152,7 +152,20 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, si *obje // put fake bucket tree indexes for i := range fkbtIndexes { - err := putFKBTIndexItem(tx, fkbtIndexes[i]) + err = putFKBTIndexItem(tx, fkbtIndexes[i]) + if err != nil { + return err + } + } + + // update container volume size estimation + if obj.Type() == objectSDK.TypeRegular && !isParent { + err = changeContainerSize( + tx, + obj.ContainerID(), + obj.PayloadSize(), + true, + ) if err != nil { return err } diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index 076f1a6ce..02fd31bf6 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -17,8 +17,9 @@ bytes. Check it later. const invalidBase58String = "_" var ( - graveyardBucketName = []byte(invalidBase58String + "Graveyard") - toMoveItBucketName = []byte(invalidBase58String + "ToMoveIt") + graveyardBucketName = []byte(invalidBase58String + "Graveyard") + toMoveItBucketName = []byte(invalidBase58String + "ToMoveIt") + containerVolumeBucketName = []byte(invalidBase58String + "ContainerSize") zeroValue = []byte{0xFF}