From 809e97626b367d4ae054cf6e9846fa617812b183 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 16 Aug 2023 11:12:19 +0300 Subject: [PATCH] [#602] blobovnicza: Fix size counter Signed-off-by: Dmitrii Stepanov --- .../blobovnicza/blobovnicza.go | 6 +- .../blobovnicza/control.go | 67 +++++++++++++++---- .../blobovnicza/delete.go | 35 +++++----- .../blobovnicza/iterate.go | 25 ++++--- pkg/local_object_storage/blobovnicza/put.go | 4 +- pkg/local_object_storage/blobovnicza/sizes.go | 11 +-- .../blobovnicza/sizes_test.go | 3 +- 7 files changed, 99 insertions(+), 52 deletions(-) diff --git a/pkg/local_object_storage/blobovnicza/blobovnicza.go b/pkg/local_object_storage/blobovnicza/blobovnicza.go index d5741fba7..22168de22 100644 --- a/pkg/local_object_storage/blobovnicza/blobovnicza.go +++ b/pkg/local_object_storage/blobovnicza/blobovnicza.go @@ -3,6 +3,7 @@ package blobovnicza import ( "io/fs" "os" + "sync" "sync/atomic" "time" @@ -15,9 +16,12 @@ import ( type Blobovnicza struct { cfg - filled atomic.Uint64 + dataSize atomic.Uint64 boltDB *bbolt.DB + + opened bool + controlMtx sync.Mutex } // Option is an option of Blobovnicza's constructor. diff --git a/pkg/local_object_storage/blobovnicza/control.go b/pkg/local_object_storage/blobovnicza/control.go index 79bad91e2..7b5552907 100644 --- a/pkg/local_object_storage/blobovnicza/control.go +++ b/pkg/local_object_storage/blobovnicza/control.go @@ -1,8 +1,8 @@ package blobovnicza import ( + "errors" "fmt" - "os" "path/filepath" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -14,7 +14,15 @@ import ( // Open opens an internal database at the configured path with the configured permissions. // // If the database file does not exist, it will be created automatically. +// If blobovnizca is already open, does nothing. func (b *Blobovnicza) Open() error { + b.controlMtx.Lock() + defer b.controlMtx.Unlock() + + if b.opened { + return nil + } + b.log.Debug(logs.BlobovniczaCreatingDirectoryForBoltDB, zap.String("path", b.path), zap.Bool("ro", b.boltOptions.ReadOnly), @@ -36,6 +44,7 @@ func (b *Blobovnicza) Open() error { b.boltDB, err = bbolt.Open(b.path, b.perm, b.boltOptions) if err == nil { + b.opened = true b.metrics.IncOpenBlobovnizcaCount() } @@ -45,20 +54,28 @@ func (b *Blobovnicza) Open() error { // Init initializes internal database structure. // // If Blobovnicza is already initialized, no action is taken. +// Blobovnizca must be open, otherwise an error will return. func (b *Blobovnicza) Init() error { + b.controlMtx.Lock() + defer b.controlMtx.Unlock() + + if !b.opened { + return errors.New("blobovnizca is not open") + } + b.log.Debug(logs.BlobovniczaInitializing, zap.Uint64("object size limit", b.objSizeLimit), zap.Uint64("storage size limit", b.fullSizeLimit), ) - if size := b.filled.Load(); size != 0 { + if size := b.dataSize.Load(); size != 0 { b.log.Debug(logs.BlobovniczaAlreadyInitialized, zap.Uint64("size", size)) return nil } if !b.boltOptions.ReadOnly { err := b.boltDB.Update(func(tx *bbolt.Tx) error { - return b.iterateBucketKeys(func(lower, upper uint64, key []byte) (bool, error) { + return b.iterateBucketKeys(true, func(lower, upper uint64, key []byte) (bool, error) { // create size range bucket rangeStr := stringifyBounds(lower, upper) @@ -79,27 +96,49 @@ func (b *Blobovnicza) Init() error { } } - info, err := os.Stat(b.path) + return b.initializeSize() +} + +func (b *Blobovnicza) initializeSize() error { + var size uint64 + err := b.boltDB.View(func(tx *bbolt.Tx) error { + return b.iterateAllBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) { + size += uint64(b.Stats().KeyN) * upper + return false, nil + }) + }) if err != nil { return fmt.Errorf("can't determine DB size: %w", err) } - - sz := uint64(info.Size()) - b.filled.Store(sz) - b.metrics.AddSize(sz) - return err + b.dataSize.Store(size) + b.metrics.AddSize(size) + return nil } // Close releases all internal database resources. +// +// If blobovnizca is already closed, does nothing. func (b *Blobovnicza) Close() error { + b.controlMtx.Lock() + defer b.controlMtx.Unlock() + + if !b.opened { + return nil + } + b.log.Debug(logs.BlobovniczaClosingBoltDB, zap.String("path", b.path), ) - err := b.boltDB.Close() - if err == nil { - b.metrics.DecOpenBlobovnizcaCount() - b.metrics.SubSize(b.filled.Load()) + if err := b.boltDB.Close(); err != nil { + return err } - return err + + b.metrics.DecOpenBlobovnizcaCount() + b.metrics.SubSize(b.dataSize.Load()) + b.dataSize.Store(0) + + b.opened = false + + return nil } diff --git a/pkg/local_object_storage/blobovnicza/delete.go b/pkg/local_object_storage/blobovnicza/delete.go index 419e073c6..f332173c1 100644 --- a/pkg/local_object_storage/blobovnicza/delete.go +++ b/pkg/local_object_storage/blobovnicza/delete.go @@ -46,33 +46,22 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err addrKey := addressKey(prm.addr) found := false + var sizeUpperBound uint64 + var sizeLowerBound uint64 + var dataSize uint64 err := b.boltDB.Update(func(tx *bbolt.Tx) error { - return b.iterateBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) { + return b.iterateAllBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) { objData := buck.Get(addrKey) if objData == nil { // object is not in bucket => continue iterating return false, nil } - - sz := uint64(len(objData)) - - // remove object from the bucket - err := buck.Delete(addrKey) - - if err == nil { - b.log.Debug(logs.BlobovniczaObjectWasRemovedFromBucket, - zap.String("binary size", stringifyByteSize(sz)), - zap.String("range", stringifyBounds(lower, upper)), - ) - // decrease fullness counter - b.decSize(sz) - } - + dataSize = uint64(len(objData)) + sizeLowerBound = lower + sizeUpperBound = upper found = true - - // stop iteration - return true, err + return true, buck.Delete(addrKey) }) }) @@ -80,5 +69,13 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err return DeleteRes{}, new(apistatus.ObjectNotFound) } + if err == nil && found { + b.log.Debug(logs.BlobovniczaObjectWasRemovedFromBucket, + zap.String("binary size", stringifyByteSize(dataSize)), + zap.String("range", stringifyBounds(sizeLowerBound, sizeUpperBound)), + ) + b.decSize(sizeUpperBound) + } + return DeleteRes{}, err } diff --git a/pkg/local_object_storage/blobovnicza/iterate.go b/pkg/local_object_storage/blobovnicza/iterate.go index c2031ea54..b29ccb43c 100644 --- a/pkg/local_object_storage/blobovnicza/iterate.go +++ b/pkg/local_object_storage/blobovnicza/iterate.go @@ -3,6 +3,7 @@ package blobovnicza import ( "context" "fmt" + "math" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -11,28 +12,32 @@ import ( "go.opentelemetry.io/otel/trace" ) -func (b *Blobovnicza) iterateBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error { - return b.iterateBucketKeys(func(lower uint64, upper uint64, key []byte) (bool, error) { +// iterateAllBuckets iterates all buckets in db +// +// If the maximum size of the object (b.objSizeLimit) has been changed to lower value, +// then there may be more buckets than the current limit of the object size. +func (b *Blobovnicza) iterateAllBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error { + return b.iterateBucketKeys(false, func(lower uint64, upper uint64, key []byte) (bool, error) { buck := tx.Bucket(key) if buck == nil { - // expected to happen: - // - before initialization step (incorrect usage by design) - // - if DB is corrupted (in future this case should be handled) - return false, fmt.Errorf("(%T) could not get bucket %s", b, stringifyBounds(lower, upper)) + return true, nil } return f(lower, upper, buck) }) } -func (b *Blobovnicza) iterateBucketKeys(f func(uint64, uint64, []byte) (bool, error)) error { - return b.iterateBounds(func(lower, upper uint64) (bool, error) { +func (b *Blobovnicza) iterateBucketKeys(useObjLimitBound bool, f func(uint64, uint64, []byte) (bool, error)) error { + return b.iterateBounds(useObjLimitBound, func(lower, upper uint64) (bool, error) { return f(lower, upper, bucketKeyFromBounds(upper)) }) } -func (b *Blobovnicza) iterateBounds(f func(uint64, uint64) (bool, error)) error { - objLimitBound := upperPowerOfTwo(b.objSizeLimit) +func (b *Blobovnicza) iterateBounds(useObjLimitBound bool, f func(uint64, uint64) (bool, error)) error { + var objLimitBound uint64 = math.MaxUint64 + if useObjLimitBound { + objLimitBound = upperPowerOfTwo(b.objSizeLimit) + } for upper := firstBucketBound; upper <= max(objLimitBound, firstBucketBound); upper *= 2 { var lower uint64 diff --git a/pkg/local_object_storage/blobovnicza/put.go b/pkg/local_object_storage/blobovnicza/put.go index cc816b3e1..b8cc9954a 100644 --- a/pkg/local_object_storage/blobovnicza/put.go +++ b/pkg/local_object_storage/blobovnicza/put.go @@ -61,7 +61,7 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) { defer span.End() sz := uint64(len(prm.objData)) - bucketName := bucketForSize(sz) + bucketName, upperBound := bucketForSize(sz) key := addressKey(prm.addr) err := b.boltDB.Batch(func(tx *bbolt.Tx) error { @@ -85,7 +85,7 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) { return nil }) if err == nil { - b.incSize(sz) + b.incSize(upperBound) } return PutRes{}, err diff --git a/pkg/local_object_storage/blobovnicza/sizes.go b/pkg/local_object_storage/blobovnicza/sizes.go index 7e10b728e..482242c3f 100644 --- a/pkg/local_object_storage/blobovnicza/sizes.go +++ b/pkg/local_object_storage/blobovnicza/sizes.go @@ -28,8 +28,9 @@ func bucketKeyFromBounds(upperBound uint64) []byte { return buf[:ln] } -func bucketForSize(sz uint64) []byte { - return bucketKeyFromBounds(upperPowerOfTwo(sz)) +func bucketForSize(sz uint64) ([]byte, uint64) { + upperBound := upperPowerOfTwo(sz) + return bucketKeyFromBounds(upperBound), upperBound } func upperPowerOfTwo(v uint64) uint64 { @@ -40,15 +41,15 @@ func upperPowerOfTwo(v uint64) uint64 { } func (b *Blobovnicza) incSize(sz uint64) { - b.filled.Add(sz) + b.dataSize.Add(sz) b.metrics.AddSize(sz) } func (b *Blobovnicza) decSize(sz uint64) { - b.filled.Add(^(sz - 1)) + b.dataSize.Add(^(sz - 1)) b.metrics.SubSize(sz) } func (b *Blobovnicza) full() bool { - return b.filled.Load() >= b.fullSizeLimit + return b.dataSize.Load() >= b.fullSizeLimit } diff --git a/pkg/local_object_storage/blobovnicza/sizes_test.go b/pkg/local_object_storage/blobovnicza/sizes_test.go index 2bf451f12..d2f576fd3 100644 --- a/pkg/local_object_storage/blobovnicza/sizes_test.go +++ b/pkg/local_object_storage/blobovnicza/sizes_test.go @@ -34,7 +34,8 @@ func TestSizes(t *testing.T) { upperBound: 4 * firstBucketBound, }, } { - require.Equal(t, bucketKeyFromBounds(item.upperBound), bucketForSize(item.sz)) + key, _ := bucketForSize(item.sz) + require.Equal(t, bucketKeyFromBounds(item.upperBound), key) } }