[#602] blobovnicza: Fix size counter

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-08-16 11:12:19 +03:00
parent 326f2017ed
commit 5f5dcf0ebc
7 changed files with 99 additions and 52 deletions

View file

@ -3,6 +3,7 @@ package blobovnicza
import ( import (
"io/fs" "io/fs"
"os" "os"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -15,9 +16,12 @@ import (
type Blobovnicza struct { type Blobovnicza struct {
cfg cfg
filled atomic.Uint64 dataSize atomic.Uint64
boltDB *bbolt.DB boltDB *bbolt.DB
opened bool
controlMtx sync.Mutex
} }
// Option is an option of Blobovnicza's constructor. // Option is an option of Blobovnicza's constructor.

View file

@ -1,8 +1,8 @@
package blobovnicza package blobovnicza
import ( import (
"errors"
"fmt" "fmt"
"os"
"path/filepath" "path/filepath"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -14,7 +14,15 @@ import (
// Open opens an internal database at the configured path with the configured permissions. // Open opens an internal database at the configured path with the configured permissions.
// //
// If the database file does not exist, it will be created automatically. // If the database file does not exist, it will be created automatically.
// If blobovnizca is already open, does nothing.
func (b *Blobovnicza) Open() error { func (b *Blobovnicza) Open() error {
b.controlMtx.Lock()
defer b.controlMtx.Unlock()
if b.opened {
return nil
}
b.log.Debug(logs.BlobovniczaCreatingDirectoryForBoltDB, b.log.Debug(logs.BlobovniczaCreatingDirectoryForBoltDB,
zap.String("path", b.path), zap.String("path", b.path),
zap.Bool("ro", b.boltOptions.ReadOnly), zap.Bool("ro", b.boltOptions.ReadOnly),
@ -36,6 +44,7 @@ func (b *Blobovnicza) Open() error {
b.boltDB, err = bbolt.Open(b.path, b.perm, b.boltOptions) b.boltDB, err = bbolt.Open(b.path, b.perm, b.boltOptions)
if err == nil { if err == nil {
b.opened = true
b.metrics.IncOpenBlobovnizcaCount() b.metrics.IncOpenBlobovnizcaCount()
} }
@ -45,20 +54,28 @@ func (b *Blobovnicza) Open() error {
// Init initializes internal database structure. // Init initializes internal database structure.
// //
// If Blobovnicza is already initialized, no action is taken. // If Blobovnicza is already initialized, no action is taken.
// Blobovnizca must be open, otherwise an error will return.
func (b *Blobovnicza) Init() error { func (b *Blobovnicza) Init() error {
b.controlMtx.Lock()
defer b.controlMtx.Unlock()
if !b.opened {
return errors.New("blobovnizca is not open")
}
b.log.Debug(logs.BlobovniczaInitializing, b.log.Debug(logs.BlobovniczaInitializing,
zap.Uint64("object size limit", b.objSizeLimit), zap.Uint64("object size limit", b.objSizeLimit),
zap.Uint64("storage size limit", b.fullSizeLimit), zap.Uint64("storage size limit", b.fullSizeLimit),
) )
if size := b.filled.Load(); size != 0 { if size := b.dataSize.Load(); size != 0 {
b.log.Debug(logs.BlobovniczaAlreadyInitialized, zap.Uint64("size", size)) b.log.Debug(logs.BlobovniczaAlreadyInitialized, zap.Uint64("size", size))
return nil return nil
} }
if !b.boltOptions.ReadOnly { if !b.boltOptions.ReadOnly {
err := b.boltDB.Update(func(tx *bbolt.Tx) error { err := b.boltDB.Update(func(tx *bbolt.Tx) error {
return b.iterateBucketKeys(func(lower, upper uint64, key []byte) (bool, error) { return b.iterateBucketKeys(true, func(lower, upper uint64, key []byte) (bool, error) {
// create size range bucket // create size range bucket
rangeStr := stringifyBounds(lower, upper) rangeStr := stringifyBounds(lower, upper)
@ -79,27 +96,49 @@ func (b *Blobovnicza) Init() error {
} }
} }
info, err := os.Stat(b.path) return b.initializeSize()
}
func (b *Blobovnicza) initializeSize() error {
var size uint64
err := b.boltDB.View(func(tx *bbolt.Tx) error {
return b.iterateAllBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) {
size += uint64(b.Stats().KeyN) * upper
return false, nil
})
})
if err != nil { if err != nil {
return fmt.Errorf("can't determine DB size: %w", err) return fmt.Errorf("can't determine DB size: %w", err)
} }
b.dataSize.Store(size)
sz := uint64(info.Size()) b.metrics.AddSize(size)
b.filled.Store(sz) return nil
b.metrics.AddSize(sz)
return err
} }
// Close releases all internal database resources. // Close releases all internal database resources.
//
// If blobovnizca is already closed, does nothing.
func (b *Blobovnicza) Close() error { func (b *Blobovnicza) Close() error {
b.controlMtx.Lock()
defer b.controlMtx.Unlock()
if !b.opened {
return nil
}
b.log.Debug(logs.BlobovniczaClosingBoltDB, b.log.Debug(logs.BlobovniczaClosingBoltDB,
zap.String("path", b.path), zap.String("path", b.path),
) )
err := b.boltDB.Close() if err := b.boltDB.Close(); err != nil {
if err == nil {
b.metrics.DecOpenBlobovnizcaCount()
b.metrics.SubSize(b.filled.Load())
}
return err return err
}
b.metrics.DecOpenBlobovnizcaCount()
b.metrics.SubSize(b.dataSize.Load())
b.dataSize.Store(0)
b.opened = false
return nil
} }

View file

@ -46,33 +46,22 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
addrKey := addressKey(prm.addr) addrKey := addressKey(prm.addr)
found := false found := false
var sizeUpperBound uint64
var sizeLowerBound uint64
var dataSize uint64
err := b.boltDB.Update(func(tx *bbolt.Tx) error { err := b.boltDB.Update(func(tx *bbolt.Tx) error {
return b.iterateBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) { return b.iterateAllBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
objData := buck.Get(addrKey) objData := buck.Get(addrKey)
if objData == nil { if objData == nil {
// object is not in bucket => continue iterating // object is not in bucket => continue iterating
return false, nil return false, nil
} }
dataSize = uint64(len(objData))
sz := uint64(len(objData)) sizeLowerBound = lower
sizeUpperBound = upper
// remove object from the bucket
err := buck.Delete(addrKey)
if err == nil {
b.log.Debug(logs.BlobovniczaObjectWasRemovedFromBucket,
zap.String("binary size", stringifyByteSize(sz)),
zap.String("range", stringifyBounds(lower, upper)),
)
// decrease fullness counter
b.decSize(sz)
}
found = true found = true
return true, buck.Delete(addrKey)
// stop iteration
return true, err
}) })
}) })
@ -80,5 +69,13 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
return DeleteRes{}, new(apistatus.ObjectNotFound) return DeleteRes{}, new(apistatus.ObjectNotFound)
} }
if err == nil && found {
b.log.Debug(logs.BlobovniczaObjectWasRemovedFromBucket,
zap.String("binary size", stringifyByteSize(dataSize)),
zap.String("range", stringifyBounds(sizeLowerBound, sizeUpperBound)),
)
b.decSize(sizeUpperBound)
}
return DeleteRes{}, err return DeleteRes{}, err
} }

View file

@ -3,6 +3,7 @@ package blobovnicza
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -11,28 +12,32 @@ import (
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
func (b *Blobovnicza) iterateBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error { // iterateAllBuckets iterates all buckets in db
return b.iterateBucketKeys(func(lower uint64, upper uint64, key []byte) (bool, error) { //
// If the maximum size of the object (b.objSizeLimit) has been changed to lower value,
// then there may be more buckets than the current limit of the object size.
func (b *Blobovnicza) iterateAllBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
return b.iterateBucketKeys(false, func(lower uint64, upper uint64, key []byte) (bool, error) {
buck := tx.Bucket(key) buck := tx.Bucket(key)
if buck == nil { if buck == nil {
// expected to happen: return true, nil
// - before initialization step (incorrect usage by design)
// - if DB is corrupted (in future this case should be handled)
return false, fmt.Errorf("(%T) could not get bucket %s", b, stringifyBounds(lower, upper))
} }
return f(lower, upper, buck) return f(lower, upper, buck)
}) })
} }
func (b *Blobovnicza) iterateBucketKeys(f func(uint64, uint64, []byte) (bool, error)) error { func (b *Blobovnicza) iterateBucketKeys(useObjLimitBound bool, f func(uint64, uint64, []byte) (bool, error)) error {
return b.iterateBounds(func(lower, upper uint64) (bool, error) { return b.iterateBounds(useObjLimitBound, func(lower, upper uint64) (bool, error) {
return f(lower, upper, bucketKeyFromBounds(upper)) return f(lower, upper, bucketKeyFromBounds(upper))
}) })
} }
func (b *Blobovnicza) iterateBounds(f func(uint64, uint64) (bool, error)) error { func (b *Blobovnicza) iterateBounds(useObjLimitBound bool, f func(uint64, uint64) (bool, error)) error {
objLimitBound := upperPowerOfTwo(b.objSizeLimit) var objLimitBound uint64 = math.MaxUint64
if useObjLimitBound {
objLimitBound = upperPowerOfTwo(b.objSizeLimit)
}
for upper := firstBucketBound; upper <= max(objLimitBound, firstBucketBound); upper *= 2 { for upper := firstBucketBound; upper <= max(objLimitBound, firstBucketBound); upper *= 2 {
var lower uint64 var lower uint64

View file

@ -61,7 +61,7 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
defer span.End() defer span.End()
sz := uint64(len(prm.objData)) sz := uint64(len(prm.objData))
bucketName := bucketForSize(sz) bucketName, upperBound := bucketForSize(sz)
key := addressKey(prm.addr) key := addressKey(prm.addr)
err := b.boltDB.Batch(func(tx *bbolt.Tx) error { err := b.boltDB.Batch(func(tx *bbolt.Tx) error {
@ -85,7 +85,7 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
return nil return nil
}) })
if err == nil { if err == nil {
b.incSize(sz) b.incSize(upperBound)
} }
return PutRes{}, err return PutRes{}, err

View file

@ -28,8 +28,9 @@ func bucketKeyFromBounds(upperBound uint64) []byte {
return buf[:ln] return buf[:ln]
} }
func bucketForSize(sz uint64) []byte { func bucketForSize(sz uint64) ([]byte, uint64) {
return bucketKeyFromBounds(upperPowerOfTwo(sz)) upperBound := upperPowerOfTwo(sz)
return bucketKeyFromBounds(upperBound), upperBound
} }
func upperPowerOfTwo(v uint64) uint64 { func upperPowerOfTwo(v uint64) uint64 {
@ -40,15 +41,15 @@ func upperPowerOfTwo(v uint64) uint64 {
} }
func (b *Blobovnicza) incSize(sz uint64) { func (b *Blobovnicza) incSize(sz uint64) {
b.filled.Add(sz) b.dataSize.Add(sz)
b.metrics.AddSize(sz) b.metrics.AddSize(sz)
} }
func (b *Blobovnicza) decSize(sz uint64) { func (b *Blobovnicza) decSize(sz uint64) {
b.filled.Add(^(sz - 1)) b.dataSize.Add(^(sz - 1))
b.metrics.SubSize(sz) b.metrics.SubSize(sz)
} }
func (b *Blobovnicza) full() bool { func (b *Blobovnicza) full() bool {
return b.filled.Load() >= b.fullSizeLimit return b.dataSize.Load() >= b.fullSizeLimit
} }

View file

@ -34,7 +34,8 @@ func TestSizes(t *testing.T) {
upperBound: 4 * firstBucketBound, upperBound: 4 * firstBucketBound,
}, },
} { } {
require.Equal(t, bucketKeyFromBounds(item.upperBound), bucketForSize(item.sz)) key, _ := bucketForSize(item.sz)
require.Equal(t, bucketKeyFromBounds(item.upperBound), key)
} }
} }