[#698] blobovnicza: Store counter values
Blobovnicza initialization take a long time because of bucket Stat() call. So now blobovnicza stores counters in META bucket. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
5e8c08da3e
commit
e54dc3dc7c
8 changed files with 152 additions and 7 deletions
|
@ -548,4 +548,7 @@ const (
|
|||
BlobovniczatreeCouldNotCheckExistenceInTargetDB = "could not check object existence in target blobovnicza"
|
||||
BlobovniczatreeCouldNotGetObjectFromSourceDB = "could not get object from source blobovnicza"
|
||||
BlobovniczatreeCouldNotPutObjectToTargetDB = "could not put object to target blobovnicza"
|
||||
BlobovniczaSavingCountersToMeta = "saving counters to blobovnicza's meta..."
|
||||
BlobovniczaSavingCountersToMetaSuccess = "saving counters to blobovnicza's meta completed successfully"
|
||||
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
|
||||
)
|
||||
|
|
|
@ -104,7 +104,17 @@ func (b *Blobovnicza) Init() error {
|
|||
func (b *Blobovnicza) initializeCounters() error {
|
||||
var size uint64
|
||||
var items uint64
|
||||
var sizeExists bool
|
||||
var itemsCountExists bool
|
||||
|
||||
err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
size, sizeExists = hasDataSize(tx)
|
||||
items, itemsCountExists = hasItemsCount(tx)
|
||||
|
||||
if sizeExists && itemsCountExists {
|
||||
return nil
|
||||
}
|
||||
|
||||
return b.iterateAllDataBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) {
|
||||
keysN := uint64(b.Stats().KeyN)
|
||||
size += keysN * upper
|
||||
|
@ -115,6 +125,20 @@ func (b *Blobovnicza) initializeCounters() error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("can't determine DB size: %w", err)
|
||||
}
|
||||
if (!sizeExists || !itemsCountExists) && !b.boltOptions.ReadOnly {
|
||||
b.log.Debug(logs.BlobovniczaSavingCountersToMeta, zap.Uint64("size", size), zap.Uint64("items", items))
|
||||
if err := b.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
if err := saveDataSize(tx, size); err != nil {
|
||||
return err
|
||||
}
|
||||
return saveItemsCount(tx, items)
|
||||
}); err != nil {
|
||||
b.log.Debug(logs.BlobovniczaSavingCountersToMetaFailed, zap.Uint64("size", size), zap.Uint64("items", items))
|
||||
return fmt.Errorf("can't save blobovnicza's size and items count: %w", err)
|
||||
}
|
||||
b.log.Debug(logs.BlobovniczaSavingCountersToMetaSuccess, zap.Uint64("size", size), zap.Uint64("items", items))
|
||||
}
|
||||
|
||||
b.dataSize.Store(size)
|
||||
b.itemsCount.Store(items)
|
||||
b.metrics.AddOpenBlobovniczaSize(size)
|
||||
|
|
|
@ -51,7 +51,7 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
|
|||
var dataSize uint64
|
||||
|
||||
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return b.iterateAllDataBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
|
||||
err := b.iterateAllDataBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
|
||||
objData := buck.Get(addrKey)
|
||||
if objData == nil {
|
||||
// object is not in bucket => continue iterating
|
||||
|
@ -63,6 +63,21 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
|
|||
found = true
|
||||
return true, buck.Delete(addrKey)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if found {
|
||||
return updateMeta(tx, func(count, size uint64) (uint64, uint64) {
|
||||
if count > 0 {
|
||||
count--
|
||||
}
|
||||
if size >= sizeUpperBound {
|
||||
size -= sizeUpperBound
|
||||
}
|
||||
return count, size
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err == nil && !found {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package blobovnicza
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
|
@ -26,7 +25,7 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error
|
|||
|
||||
err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
||||
if bytes.Equal(bucketName, incompletedMoveBucketName) {
|
||||
if isNonDataBucket(bucketName) {
|
||||
return nil
|
||||
}
|
||||
exists = buck.Get(addrKey) != nil
|
||||
|
|
|
@ -58,7 +58,7 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
|||
|
||||
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
||||
if bytes.Equal(bucketName, incompletedMoveBucketName) {
|
||||
if isNonDataBucket(bucketName) {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package blobovnicza
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
|
@ -140,7 +139,7 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
|
|||
|
||||
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
||||
if bytes.Equal(bucketName, incompletedMoveBucketName) {
|
||||
if isNonDataBucket(bucketName) {
|
||||
return nil
|
||||
}
|
||||
return buck.ForEach(func(k, v []byte) error {
|
||||
|
|
103
pkg/local_object_storage/blobovnicza/meta.go
Normal file
103
pkg/local_object_storage/blobovnicza/meta.go
Normal file
|
@ -0,0 +1,103 @@
|
|||
package blobovnicza
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
const (
|
||||
dataSizeAndItemsCountBufLength = 8
|
||||
)
|
||||
|
||||
var (
|
||||
metaBucketName = []byte("META")
|
||||
dataSizeKey = []byte("data_size")
|
||||
itemsCountKey = []byte("items_count")
|
||||
)
|
||||
|
||||
func isNonDataBucket(bucketName []byte) bool {
|
||||
return bytes.Equal(bucketName, incompletedMoveBucketName) || bytes.Equal(bucketName, metaBucketName)
|
||||
}
|
||||
|
||||
func hasDataSize(tx *bbolt.Tx) (uint64, bool) {
|
||||
b := tx.Bucket(metaBucketName)
|
||||
if b == nil {
|
||||
return 0, false
|
||||
}
|
||||
v := b.Get(dataSizeKey)
|
||||
if v == nil {
|
||||
return 0, false
|
||||
}
|
||||
if len(v) != dataSizeAndItemsCountBufLength {
|
||||
return 0, false
|
||||
}
|
||||
return binary.LittleEndian.Uint64(v), true
|
||||
}
|
||||
|
||||
func hasItemsCount(tx *bbolt.Tx) (uint64, bool) {
|
||||
b := tx.Bucket(metaBucketName)
|
||||
if b == nil {
|
||||
return 0, false
|
||||
}
|
||||
v := b.Get(itemsCountKey)
|
||||
if v == nil {
|
||||
return 0, false
|
||||
}
|
||||
if len(v) != dataSizeAndItemsCountBufLength {
|
||||
return 0, false
|
||||
}
|
||||
return binary.LittleEndian.Uint64(v), true
|
||||
}
|
||||
|
||||
func saveDataSize(tx *bbolt.Tx, size uint64) error {
|
||||
b, err := tx.CreateBucketIfNotExists(metaBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
buf := make([]byte, dataSizeAndItemsCountBufLength)
|
||||
binary.LittleEndian.PutUint64(buf, size)
|
||||
return b.Put(dataSizeKey, buf)
|
||||
}
|
||||
|
||||
func saveItemsCount(tx *bbolt.Tx, count uint64) error {
|
||||
b, err := tx.CreateBucketIfNotExists(metaBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
buf := make([]byte, dataSizeAndItemsCountBufLength)
|
||||
binary.LittleEndian.PutUint64(buf, count)
|
||||
return b.Put(itemsCountKey, buf)
|
||||
}
|
||||
|
||||
func updateMeta(tx *bbolt.Tx, updateValues func(count, size uint64) (uint64, uint64)) error {
|
||||
b, err := tx.CreateBucketIfNotExists(metaBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var count uint64
|
||||
var size uint64
|
||||
|
||||
v := b.Get(itemsCountKey)
|
||||
if v != nil {
|
||||
count = binary.LittleEndian.Uint64(v)
|
||||
}
|
||||
|
||||
v = b.Get(dataSizeKey)
|
||||
if v != nil {
|
||||
size = binary.LittleEndian.Uint64(v)
|
||||
}
|
||||
|
||||
count, size = updateValues(count, size)
|
||||
|
||||
buf := make([]byte, dataSizeAndItemsCountBufLength)
|
||||
binary.LittleEndian.PutUint64(buf, size)
|
||||
if err := b.Put(dataSizeKey, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
binary.LittleEndian.PutUint64(buf, count)
|
||||
return b.Put(itemsCountKey, buf)
|
||||
}
|
|
@ -73,7 +73,9 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
|||
return fmt.Errorf("(%T) could not save object in bucket: %w", b, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return updateMeta(tx, func(count, size uint64) (uint64, uint64) {
|
||||
return count + 1, size + upperBound
|
||||
})
|
||||
})
|
||||
if err == nil {
|
||||
b.itemAdded(upperBound)
|
||||
|
|
Loading…
Reference in a new issue