Fix blobovnizca data size counter #612
12 changed files with 162 additions and 112 deletions
|
@ -3,6 +3,7 @@ package blobovnicza
|
|||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
@ -15,9 +16,12 @@ import (
|
|||
type Blobovnicza struct {
|
||||
cfg
|
||||
|
||||
filled atomic.Uint64
|
||||
dataSize atomic.Uint64
|
||||
|
||||
boltDB *bbolt.DB
|
||||
|
||||
opened bool
|
||||
controlMtx sync.Mutex
|
||||
}
|
||||
|
||||
// Option is an option of Blobovnicza's constructor.
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package blobovnicza
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -14,7 +14,15 @@ import (
|
|||
// Open opens an internal database at the configured path with the configured permissions.
|
||||
//
|
||||
// If the database file does not exist, it will be created automatically.
|
||||
// If blobovnizca is already open, does nothing.
|
||||
func (b *Blobovnicza) Open() error {
|
||||
b.controlMtx.Lock()
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
defer b.controlMtx.Unlock()
|
||||
|
||||
if b.opened {
|
||||
return nil
|
||||
}
|
||||
|
||||
b.log.Debug(logs.BlobovniczaCreatingDirectoryForBoltDB,
|
||||
zap.String("path", b.path),
|
||||
zap.Bool("ro", b.boltOptions.ReadOnly),
|
||||
|
@ -36,6 +44,7 @@ func (b *Blobovnicza) Open() error {
|
|||
|
||||
b.boltDB, err = bbolt.Open(b.path, b.perm, b.boltOptions)
|
||||
if err == nil {
|
||||
b.opened = true
|
||||
b.metrics.IncOpenBlobovnizcaCount()
|
||||
}
|
||||
|
||||
|
@ -45,21 +54,28 @@ func (b *Blobovnicza) Open() error {
|
|||
// Init initializes internal database structure.
|
||||
//
|
||||
// If Blobovnicza is already initialized, no action is taken.
|
||||
//
|
||||
// Should not be called in read-only configuration.
|
||||
// Blobovnizca must be open, otherwise an error will return.
|
||||
func (b *Blobovnicza) Init() error {
|
||||
b.controlMtx.Lock()
|
||||
defer b.controlMtx.Unlock()
|
||||
|
||||
if !b.opened {
|
||||
return errors.New("blobovnizca is not open")
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
s/open/opened/ s/open/opened/
ale64bit
commented
"open" seems fine to me as well. As in, "the door is open". "open" seems fine to me as well. As in, "the door is open".
|
||||
}
|
||||
|
||||
b.log.Debug(logs.BlobovniczaInitializing,
|
||||
zap.Uint64("object size limit", b.objSizeLimit),
|
||||
zap.Uint64("storage size limit", b.fullSizeLimit),
|
||||
)
|
||||
|
||||
if size := b.filled.Load(); size != 0 {
|
||||
if size := b.dataSize.Load(); size != 0 {
|
||||
b.log.Debug(logs.BlobovniczaAlreadyInitialized, zap.Uint64("size", size))
|
||||
return nil
|
||||
}
|
||||
|
||||
if !b.boltOptions.ReadOnly {
|
||||
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return b.iterateBucketKeys(func(lower, upper uint64, key []byte) (bool, error) {
|
||||
return b.iterateBucketKeys(true, func(lower, upper uint64, key []byte) (bool, error) {
|
||||
// create size range bucket
|
||||
|
||||
rangeStr := stringifyBounds(lower, upper)
|
||||
|
@ -78,28 +94,51 @@ func (b *Blobovnicza) Init() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
info, err := os.Stat(b.path)
|
||||
return b.initializeSize()
|
||||
}
|
||||
|
||||
func (b *Blobovnicza) initializeSize() error {
|
||||
var size uint64
|
||||
err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return b.iterateAllBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) {
|
||||
dstepanov-yadro
commented
There should be 3 buckets for There should be 3 buckets for `objSizeLimit=128KB`: 0 - 32KB, 32KB-64KB, 64KB - 128KB.
|
||||
size += uint64(b.Stats().KeyN) * upper
|
||||
return false, nil
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't determine DB size: %w", err)
|
||||
}
|
||||
|
||||
sz := uint64(info.Size())
|
||||
b.filled.Store(sz)
|
||||
b.metrics.AddSize(sz)
|
||||
return err
|
||||
b.dataSize.Store(size)
|
||||
b.metrics.AddOpenBlobovnizcaSize(size)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close releases all internal database resources.
|
||||
//
|
||||
// If blobovnizca is already closed, does nothing.
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
s/close/closed/ s/close/closed/
dstepanov-yadro
commented
fixed fixed
|
||||
func (b *Blobovnicza) Close() error {
|
||||
b.controlMtx.Lock()
|
||||
defer b.controlMtx.Unlock()
|
||||
|
||||
if !b.opened {
|
||||
return nil
|
||||
}
|
||||
|
||||
b.log.Debug(logs.BlobovniczaClosingBoltDB,
|
||||
zap.String("path", b.path),
|
||||
)
|
||||
|
||||
err := b.boltDB.Close()
|
||||
if err == nil {
|
||||
b.metrics.DecOpenBlobovnizcaCount()
|
||||
b.metrics.SubSize(b.filled.Load())
|
||||
}
|
||||
if err := b.boltDB.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.metrics.DecOpenBlobovnizcaCount()
|
||||
b.metrics.SubOpenBlobovnizcaSize(b.dataSize.Load())
|
||||
b.dataSize.Store(0)
|
||||
|
||||
b.opened = false
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -46,33 +46,22 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
|
|||
addrKey := addressKey(prm.addr)
|
||||
|
||||
found := false
|
||||
var sizeUpperBound uint64
|
||||
var sizeLowerBound uint64
|
||||
var dataSize uint64
|
||||
|
||||
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return b.iterateBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
|
||||
return b.iterateAllBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
|
||||
dstepanov-yadro
commented
Scenario: I set the maximum size of the object to 1MB, saved the object, changed the maximum size to 128KB, then I'm trying to delete it, but the corresponding bucket is not found because of new limit. After this fix all bucket will be iterated, not only limited by object size. Scenario: I set the maximum size of the object to 1MB, saved the object, changed the maximum size to 128KB, then I'm trying to delete it, but the corresponding bucket is not found because of new limit.
After this fix all bucket will be iterated, not only limited by object size.
|
||||
objData := buck.Get(addrKey)
|
||||
if objData == nil {
|
||||
// object is not in bucket => continue iterating
|
||||
return false, nil
|
||||
}
|
||||
|
||||
sz := uint64(len(objData))
|
||||
|
||||
// remove object from the bucket
|
||||
err := buck.Delete(addrKey)
|
||||
|
||||
if err == nil {
|
||||
b.log.Debug(logs.BlobovniczaObjectWasRemovedFromBucket,
|
||||
zap.String("binary size", stringifyByteSize(sz)),
|
||||
zap.String("range", stringifyBounds(lower, upper)),
|
||||
)
|
||||
// decrease fullness counter
|
||||
b.decSize(sz)
|
||||
}
|
||||
|
||||
dataSize = uint64(len(objData))
|
||||
sizeLowerBound = lower
|
||||
sizeUpperBound = upper
|
||||
found = true
|
||||
|
||||
// stop iteration
|
||||
return true, err
|
||||
return true, buck.Delete(addrKey)
|
||||
})
|
||||
})
|
||||
|
||||
|
@ -80,5 +69,13 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
|
|||
return DeleteRes{}, new(apistatus.ObjectNotFound)
|
||||
}
|
||||
|
||||
if err == nil && found {
|
||||
b.log.Debug(logs.BlobovniczaObjectWasRemovedFromBucket,
|
||||
zap.String("binary size", stringifyByteSize(dataSize)),
|
||||
zap.String("range", stringifyBounds(sizeLowerBound, sizeUpperBound)),
|
||||
)
|
||||
b.decSize(sizeUpperBound)
|
||||
}
|
||||
|
||||
return DeleteRes{}, err
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package blobovnicza
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -11,28 +12,32 @@ import (
|
|||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (b *Blobovnicza) iterateBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
|
||||
return b.iterateBucketKeys(func(lower uint64, upper uint64, key []byte) (bool, error) {
|
||||
// iterateAllBuckets iterates all buckets in db
|
||||
//
|
||||
// If the maximum size of the object (b.objSizeLimit) has been changed to lower value,
|
||||
// then there may be more buckets than the current limit of the object size.
|
||||
func (b *Blobovnicza) iterateAllBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
|
||||
return b.iterateBucketKeys(false, func(lower uint64, upper uint64, key []byte) (bool, error) {
|
||||
buck := tx.Bucket(key)
|
||||
if buck == nil {
|
||||
// expected to happen:
|
||||
// - before initialization step (incorrect usage by design)
|
||||
// - if DB is corrupted (in future this case should be handled)
|
||||
return false, fmt.Errorf("(%T) could not get bucket %s", b, stringifyBounds(lower, upper))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return f(lower, upper, buck)
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Blobovnicza) iterateBucketKeys(f func(uint64, uint64, []byte) (bool, error)) error {
|
||||
return b.iterateBounds(func(lower, upper uint64) (bool, error) {
|
||||
func (b *Blobovnicza) iterateBucketKeys(useObjLimitBound bool, f func(uint64, uint64, []byte) (bool, error)) error {
|
||||
return b.iterateBounds(useObjLimitBound, func(lower, upper uint64) (bool, error) {
|
||||
return f(lower, upper, bucketKeyFromBounds(upper))
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Blobovnicza) iterateBounds(f func(uint64, uint64) (bool, error)) error {
|
||||
objLimitBound := upperPowerOfTwo(b.objSizeLimit)
|
||||
func (b *Blobovnicza) iterateBounds(useObjLimitBound bool, f func(uint64, uint64) (bool, error)) error {
|
||||
var objLimitBound uint64 = math.MaxUint64
|
||||
if useObjLimitBound {
|
||||
objLimitBound = upperPowerOfTwo(b.objSizeLimit)
|
||||
}
|
||||
|
||||
for upper := firstBucketBound; upper <= max(objLimitBound, firstBucketBound); upper *= 2 {
|
||||
var lower uint64
|
||||
|
|
|
@ -4,13 +4,13 @@ type Metrics interface {
|
|||
IncOpenBlobovnizcaCount()
|
||||
DecOpenBlobovnizcaCount()
|
||||
|
||||
AddSize(size uint64)
|
||||
SubSize(size uint64)
|
||||
AddOpenBlobovnizcaSize(size uint64)
|
||||
SubOpenBlobovnizcaSize(size uint64)
|
||||
}
|
||||
|
||||
type NoopMetrics struct{}
|
||||
|
||||
func (m *NoopMetrics) IncOpenBlobovnizcaCount() {}
|
||||
func (m *NoopMetrics) DecOpenBlobovnizcaCount() {}
|
||||
func (m *NoopMetrics) AddSize(uint64) {}
|
||||
func (m *NoopMetrics) SubSize(uint64) {}
|
||||
func (m *NoopMetrics) AddOpenBlobovnizcaSize(uint64) {}
|
||||
func (m *NoopMetrics) SubOpenBlobovnizcaSize(uint64) {}
|
||||
|
|
|
@ -61,7 +61,7 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
|||
defer span.End()
|
||||
|
||||
sz := uint64(len(prm.objData))
|
||||
bucketName := bucketForSize(sz)
|
||||
bucketName, upperBound := bucketForSize(sz)
|
||||
key := addressKey(prm.addr)
|
||||
|
||||
err := b.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
|
@ -85,7 +85,7 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
|||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
b.incSize(sz)
|
||||
b.incSize(upperBound)
|
||||
}
|
||||
|
||||
return PutRes{}, err
|
||||
|
|
|
@ -28,8 +28,9 @@ func bucketKeyFromBounds(upperBound uint64) []byte {
|
|||
return buf[:ln]
|
||||
}
|
||||
|
||||
func bucketForSize(sz uint64) []byte {
|
||||
return bucketKeyFromBounds(upperPowerOfTwo(sz))
|
||||
func bucketForSize(sz uint64) ([]byte, uint64) {
|
||||
upperBound := upperPowerOfTwo(sz)
|
||||
return bucketKeyFromBounds(upperBound), upperBound
|
||||
}
|
||||
|
||||
func upperPowerOfTwo(v uint64) uint64 {
|
||||
|
@ -40,15 +41,15 @@ func upperPowerOfTwo(v uint64) uint64 {
|
|||
}
|
||||
|
||||
func (b *Blobovnicza) incSize(sz uint64) {
|
||||
b.filled.Add(sz)
|
||||
b.metrics.AddSize(sz)
|
||||
b.dataSize.Add(sz)
|
||||
b.metrics.AddOpenBlobovnizcaSize(sz)
|
||||
}
|
||||
|
||||
func (b *Blobovnicza) decSize(sz uint64) {
|
||||
b.filled.Add(^(sz - 1))
|
||||
b.metrics.SubSize(sz)
|
||||
b.dataSize.Add(^(sz - 1))
|
||||
b.metrics.SubOpenBlobovnizcaSize(sz)
|
||||
}
|
||||
|
||||
func (b *Blobovnicza) full() bool {
|
||||
return b.filled.Load() >= b.fullSizeLimit
|
||||
return b.dataSize.Load() >= b.fullSizeLimit
|
||||
}
|
||||
|
|
|
@ -34,7 +34,8 @@ func TestSizes(t *testing.T) {
|
|||
upperBound: 4 * firstBucketBound,
|
||||
},
|
||||
} {
|
||||
require.Equal(t, bucketKeyFromBounds(item.upperBound), bucketForSize(item.sz))
|
||||
key, _ := bucketForSize(item.sz)
|
||||
require.Equal(t, bucketKeyFromBounds(item.upperBound), key)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -136,5 +136,8 @@ func (b *Blobovniczas) openBlobovniczaNoCache(p string) (*blobovnicza.Blobovnicz
|
|||
if err := blz.Open(); err != nil {
|
||||
return nil, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
||||
}
|
||||
if err := blz.Init(); err != nil {
|
||||
return nil, fmt.Errorf("could not init blobovnicza %s: %w", p, err)
|
||||
}
|
||||
return blz, nil
|
||||
}
|
||||
|
|
|
@ -81,12 +81,12 @@ type blobovniczaMetrics struct {
|
|||
path string
|
||||
}
|
||||
|
||||
func (m *blobovniczaMetrics) AddSize(size uint64) {
|
||||
m.m.AddTreeSize(m.shardID(), m.path, size)
|
||||
func (m *blobovniczaMetrics) AddOpenBlobovnizcaSize(size uint64) {
|
||||
m.m.AddOpenBlobovnizcaSize(m.shardID(), m.path, size)
|
||||
}
|
||||
|
||||
func (m *blobovniczaMetrics) SubSize(size uint64) {
|
||||
m.m.SubTreeSize(m.shardID(), m.path, size)
|
||||
func (m *blobovniczaMetrics) SubOpenBlobovnizcaSize(size uint64) {
|
||||
m.m.SubOpenBlobovnizcaSize(m.shardID(), m.path, size)
|
||||
}
|
||||
|
||||
func (m *blobovniczaMetrics) IncOpenBlobovnizcaCount() {
|
||||
|
|
|
@ -15,8 +15,8 @@ type BlobobvnizcaMetrics interface {
|
|||
AddBlobobvnizcaTreePut(shardID, path string, size int)
|
||||
AddBlobobvnizcaTreeGet(shardID, path string, size int)
|
||||
|
||||
AddTreeSize(shardID, path string, size uint64)
|
||||
SubTreeSize(shardID, path string, size uint64)
|
||||
AddOpenBlobovnizcaSize(shardID, path string, size uint64)
|
||||
SubOpenBlobovnizcaSize(shardID, path string, size uint64)
|
||||
|
||||
IncOpenBlobovnizcaCount(shardID, path string)
|
||||
DecOpenBlobovnizcaCount(shardID, path string)
|
||||
|
@ -27,41 +27,41 @@ type blobovnizca struct {
|
|||
treeReqDuration *prometheus.HistogramVec
|
||||
treePut *prometheus.CounterVec
|
||||
treeGet *prometheus.CounterVec
|
||||
treeSize *prometheus.GaugeVec
|
||||
treeOpenSize *prometheus.GaugeVec
|
||||
treeOpenCounter *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
func newBlobovnizca() *blobovnizca {
|
||||
return &blobovnizca{
|
||||
treeMode: newShardIDPathMode(blobovnizaTreeSubSystem, "mode", "Blobovnizca tree mode"),
|
||||
treeMode: newShardIDPathMode(blobovniczaTreeSubSystem, "mode", "Blobovnizca tree mode"),
|
||||
|
||||
treeReqDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: blobovnizaTreeSubSystem,
|
||||
Subsystem: blobovniczaTreeSubSystem,
|
||||
Name: "request_duration_seconds",
|
||||
Help: "Accumulated Blobovnizca tree request process duration",
|
||||
}, []string{shardIDLabel, pathLabel, successLabel, methodLabel, withStorageIDLabel}),
|
||||
treePut: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: blobovnizaTreeSubSystem,
|
||||
Subsystem: blobovniczaTreeSubSystem,
|
||||
Name: "put_bytes",
|
||||
Help: "Accumulated payload size written to Blobovnizca tree",
|
||||
}, []string{shardIDLabel, pathLabel}),
|
||||
treeGet: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: blobovnizaTreeSubSystem,
|
||||
Subsystem: blobovniczaTreeSubSystem,
|
||||
Name: "get_bytes",
|
||||
Help: "Accumulated payload size read from Blobovnizca tree",
|
||||
}, []string{shardIDLabel, pathLabel}),
|
||||
treeSize: metrics.NewGaugeVec(prometheus.GaugeOpts{
|
||||
treeOpenSize: metrics.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: blobovnizaTreeSubSystem,
|
||||
Name: "size_bytes",
|
||||
Help: "Blobovnizca tree size",
|
||||
Subsystem: blobovniczaTreeSubSystem,
|
||||
Name: "open_blobovnizca_size_bytes",
|
||||
Help: "Size of opened blobovnizcas of Blobovnizca tree",
|
||||
}, []string{shardIDLabel, pathLabel}),
|
||||
treeOpenCounter: metrics.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: blobovnizaTreeSubSystem,
|
||||
Subsystem: blobovniczaTreeSubSystem,
|
||||
Name: "open_blobovnizca_count",
|
||||
Help: "Count of opened blobovnizcas of Blobovnizca tree",
|
||||
}, []string{shardIDLabel, pathLabel}),
|
||||
|
@ -112,15 +112,15 @@ func (b *blobovnizca) AddBlobobvnizcaTreeGet(shardID, path string, size int) {
|
|||
}).Add(float64(size))
|
||||
}
|
||||
|
||||
func (b *blobovnizca) AddTreeSize(shardID, path string, size uint64) {
|
||||
b.treeSize.With(prometheus.Labels{
|
||||
func (b *blobovnizca) AddOpenBlobovnizcaSize(shardID, path string, size uint64) {
|
||||
b.treeOpenSize.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
}).Add(float64(size))
|
||||
}
|
||||
|
||||
func (b *blobovnizca) SubTreeSize(shardID, path string, size uint64) {
|
||||
b.treeSize.With(prometheus.Labels{
|
||||
func (b *blobovnizca) SubOpenBlobovnizcaSize(shardID, path string, size uint64) {
|
||||
b.treeOpenSize.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
}).Sub(float64(size))
|
||||
|
|
|
@ -6,7 +6,7 @@ const (
|
|||
|
||||
fstreeSubSystem = "fstree"
|
||||
blobstoreSubSystem = "blobstore"
|
||||
blobovnizaTreeSubSystem = "blobovniza_tree"
|
||||
blobovniczaTreeSubSystem = "blobovnicza_tree"
|
||||
metabaseSubSystem = "metabase"
|
||||
piloramaSubSystem = "pilorama"
|
||||
engineSubsystem = "engine"
|
||||
|
|
Loading…
Reference in a new issue
Open
is usually something done once after creation, do we have any place where we reuse theBlobovnicza
struct?No, here we check 'opened' flag for consistency with
Init
andClose