Fix blobovnizca data size counter #612

Merged
fyrchik merged 3 commits from dstepanov-yadro/frostfs-node:fix/blobovnizca_perf into master 2023-08-17 19:17:37 +00:00
12 changed files with 162 additions and 112 deletions

View file

@ -3,6 +3,7 @@ package blobovnicza
import ( import (
"io/fs" "io/fs"
"os" "os"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -15,9 +16,12 @@ import (
type Blobovnicza struct { type Blobovnicza struct {
cfg cfg
filled atomic.Uint64 dataSize atomic.Uint64
boltDB *bbolt.DB boltDB *bbolt.DB
opened bool
controlMtx sync.Mutex
} }
// Option is an option of Blobovnicza's constructor. // Option is an option of Blobovnicza's constructor.

View file

@ -1,8 +1,8 @@
package blobovnicza package blobovnicza
import ( import (
"errors"
"fmt" "fmt"
"os"
"path/filepath" "path/filepath"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -14,7 +14,15 @@ import (
// Open opens an internal database at the configured path with the configured permissions. // Open opens an internal database at the configured path with the configured permissions.
// //
// If the database file does not exist, it will be created automatically. // If the database file does not exist, it will be created automatically.
// If blobovnizca is already open, does nothing.
func (b *Blobovnicza) Open() error { func (b *Blobovnicza) Open() error {
b.controlMtx.Lock()
defer b.controlMtx.Unlock()
if b.opened {
return nil
}
b.log.Debug(logs.BlobovniczaCreatingDirectoryForBoltDB, b.log.Debug(logs.BlobovniczaCreatingDirectoryForBoltDB,
zap.String("path", b.path), zap.String("path", b.path),
zap.Bool("ro", b.boltOptions.ReadOnly), zap.Bool("ro", b.boltOptions.ReadOnly),
@ -36,6 +44,7 @@ func (b *Blobovnicza) Open() error {
b.boltDB, err = bbolt.Open(b.path, b.perm, b.boltOptions) b.boltDB, err = bbolt.Open(b.path, b.perm, b.boltOptions)
if err == nil { if err == nil {
b.opened = true
b.metrics.IncOpenBlobovnizcaCount() b.metrics.IncOpenBlobovnizcaCount()
} }
@ -45,61 +54,91 @@ func (b *Blobovnicza) Open() error {
// Init initializes internal database structure. // Init initializes internal database structure.
// //
// If Blobovnicza is already initialized, no action is taken. // If Blobovnicza is already initialized, no action is taken.
// // Blobovnizca must be open, otherwise an error will return.
// Should not be called in read-only configuration.
func (b *Blobovnicza) Init() error { func (b *Blobovnicza) Init() error {
b.controlMtx.Lock()
defer b.controlMtx.Unlock()
if !b.opened {
return errors.New("blobovnizca is not open")
}
b.log.Debug(logs.BlobovniczaInitializing, b.log.Debug(logs.BlobovniczaInitializing,
zap.Uint64("object size limit", b.objSizeLimit), zap.Uint64("object size limit", b.objSizeLimit),
zap.Uint64("storage size limit", b.fullSizeLimit), zap.Uint64("storage size limit", b.fullSizeLimit),
) )
if size := b.filled.Load(); size != 0 { if size := b.dataSize.Load(); size != 0 {
b.log.Debug(logs.BlobovniczaAlreadyInitialized, zap.Uint64("size", size)) b.log.Debug(logs.BlobovniczaAlreadyInitialized, zap.Uint64("size", size))
return nil return nil
} }
err := b.boltDB.Update(func(tx *bbolt.Tx) error { if !b.boltOptions.ReadOnly {
return b.iterateBucketKeys(func(lower, upper uint64, key []byte) (bool, error) { err := b.boltDB.Update(func(tx *bbolt.Tx) error {
// create size range bucket return b.iterateBucketKeys(true, func(lower, upper uint64, key []byte) (bool, error) {
// create size range bucket
rangeStr := stringifyBounds(lower, upper) rangeStr := stringifyBounds(lower, upper)
b.log.Debug(logs.BlobovniczaCreatingBucketForSizeRange, b.log.Debug(logs.BlobovniczaCreatingBucketForSizeRange,
zap.String("range", rangeStr)) zap.String("range", rangeStr))
_, err := tx.CreateBucketIfNotExists(key) _, err := tx.CreateBucketIfNotExists(key)
if err != nil { if err != nil {
return false, fmt.Errorf("(%T) could not create bucket for bounds %s: %w", return false, fmt.Errorf("(%T) could not create bucket for bounds %s: %w",
b, rangeStr, err) b, rangeStr, err)
} }
return false, nil
})
})
if err != nil {
return err
}
}
return b.initializeSize()
}
func (b *Blobovnicza) initializeSize() error {
var size uint64
err := b.boltDB.View(func(tx *bbolt.Tx) error {
return b.iterateAllBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) {
size += uint64(b.Stats().KeyN) * upper
return false, nil return false, nil
}) })
}) })
if err != nil {
return err
}
info, err := os.Stat(b.path)
if err != nil { if err != nil {
return fmt.Errorf("can't determine DB size: %w", err) return fmt.Errorf("can't determine DB size: %w", err)
} }
b.dataSize.Store(size)
sz := uint64(info.Size()) b.metrics.AddOpenBlobovnizcaSize(size)
b.filled.Store(sz) return nil
b.metrics.AddSize(sz)
return err
} }
// Close releases all internal database resources. // Close releases all internal database resources.
//
// If blobovnizca is already closed, does nothing.
func (b *Blobovnicza) Close() error { func (b *Blobovnicza) Close() error {
b.controlMtx.Lock()
defer b.controlMtx.Unlock()
if !b.opened {
return nil
}
b.log.Debug(logs.BlobovniczaClosingBoltDB, b.log.Debug(logs.BlobovniczaClosingBoltDB,
zap.String("path", b.path), zap.String("path", b.path),
) )
err := b.boltDB.Close() if err := b.boltDB.Close(); err != nil {
if err == nil { return err
b.metrics.DecOpenBlobovnizcaCount()
b.metrics.SubSize(b.filled.Load())
} }
return err
b.metrics.DecOpenBlobovnizcaCount()
b.metrics.SubOpenBlobovnizcaSize(b.dataSize.Load())
b.dataSize.Store(0)
b.opened = false
return nil
} }

View file

@ -46,33 +46,22 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
addrKey := addressKey(prm.addr) addrKey := addressKey(prm.addr)
found := false found := false
var sizeUpperBound uint64
var sizeLowerBound uint64
var dataSize uint64
err := b.boltDB.Update(func(tx *bbolt.Tx) error { err := b.boltDB.Update(func(tx *bbolt.Tx) error {
return b.iterateBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) { return b.iterateAllBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
objData := buck.Get(addrKey) objData := buck.Get(addrKey)
if objData == nil { if objData == nil {
// object is not in bucket => continue iterating // object is not in bucket => continue iterating
return false, nil return false, nil
} }
dataSize = uint64(len(objData))
sz := uint64(len(objData)) sizeLowerBound = lower
sizeUpperBound = upper
// remove object from the bucket
err := buck.Delete(addrKey)
if err == nil {
b.log.Debug(logs.BlobovniczaObjectWasRemovedFromBucket,
zap.String("binary size", stringifyByteSize(sz)),
zap.String("range", stringifyBounds(lower, upper)),
)
// decrease fullness counter
b.decSize(sz)
}
found = true found = true
return true, buck.Delete(addrKey)
// stop iteration
return true, err
}) })
}) })
@ -80,5 +69,13 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
return DeleteRes{}, new(apistatus.ObjectNotFound) return DeleteRes{}, new(apistatus.ObjectNotFound)
} }
if err == nil && found {
b.log.Debug(logs.BlobovniczaObjectWasRemovedFromBucket,
zap.String("binary size", stringifyByteSize(dataSize)),
zap.String("range", stringifyBounds(sizeLowerBound, sizeUpperBound)),
)
b.decSize(sizeUpperBound)
}
return DeleteRes{}, err return DeleteRes{}, err
} }

View file

@ -3,6 +3,7 @@ package blobovnicza
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -11,28 +12,32 @@ import (
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
func (b *Blobovnicza) iterateBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error { // iterateAllBuckets iterates all buckets in db
return b.iterateBucketKeys(func(lower uint64, upper uint64, key []byte) (bool, error) { //
// If the maximum size of the object (b.objSizeLimit) has been changed to lower value,
// then there may be more buckets than the current limit of the object size.
func (b *Blobovnicza) iterateAllBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
return b.iterateBucketKeys(false, func(lower uint64, upper uint64, key []byte) (bool, error) {
buck := tx.Bucket(key) buck := tx.Bucket(key)
if buck == nil { if buck == nil {
// expected to happen: return true, nil
// - before initialization step (incorrect usage by design)
// - if DB is corrupted (in future this case should be handled)
return false, fmt.Errorf("(%T) could not get bucket %s", b, stringifyBounds(lower, upper))
} }
return f(lower, upper, buck) return f(lower, upper, buck)
}) })
} }
func (b *Blobovnicza) iterateBucketKeys(f func(uint64, uint64, []byte) (bool, error)) error { func (b *Blobovnicza) iterateBucketKeys(useObjLimitBound bool, f func(uint64, uint64, []byte) (bool, error)) error {
return b.iterateBounds(func(lower, upper uint64) (bool, error) { return b.iterateBounds(useObjLimitBound, func(lower, upper uint64) (bool, error) {
return f(lower, upper, bucketKeyFromBounds(upper)) return f(lower, upper, bucketKeyFromBounds(upper))
}) })
} }
func (b *Blobovnicza) iterateBounds(f func(uint64, uint64) (bool, error)) error { func (b *Blobovnicza) iterateBounds(useObjLimitBound bool, f func(uint64, uint64) (bool, error)) error {
objLimitBound := upperPowerOfTwo(b.objSizeLimit) var objLimitBound uint64 = math.MaxUint64
if useObjLimitBound {
objLimitBound = upperPowerOfTwo(b.objSizeLimit)
}
for upper := firstBucketBound; upper <= max(objLimitBound, firstBucketBound); upper *= 2 { for upper := firstBucketBound; upper <= max(objLimitBound, firstBucketBound); upper *= 2 {
var lower uint64 var lower uint64

View file

@ -4,13 +4,13 @@ type Metrics interface {
IncOpenBlobovnizcaCount() IncOpenBlobovnizcaCount()
DecOpenBlobovnizcaCount() DecOpenBlobovnizcaCount()
AddSize(size uint64) AddOpenBlobovnizcaSize(size uint64)
SubSize(size uint64) SubOpenBlobovnizcaSize(size uint64)
} }
type NoopMetrics struct{} type NoopMetrics struct{}
func (m *NoopMetrics) IncOpenBlobovnizcaCount() {} func (m *NoopMetrics) IncOpenBlobovnizcaCount() {}
func (m *NoopMetrics) DecOpenBlobovnizcaCount() {} func (m *NoopMetrics) DecOpenBlobovnizcaCount() {}
func (m *NoopMetrics) AddSize(uint64) {} func (m *NoopMetrics) AddOpenBlobovnizcaSize(uint64) {}
func (m *NoopMetrics) SubSize(uint64) {} func (m *NoopMetrics) SubOpenBlobovnizcaSize(uint64) {}

View file

@ -61,7 +61,7 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
defer span.End() defer span.End()
sz := uint64(len(prm.objData)) sz := uint64(len(prm.objData))
bucketName := bucketForSize(sz) bucketName, upperBound := bucketForSize(sz)
key := addressKey(prm.addr) key := addressKey(prm.addr)
err := b.boltDB.Batch(func(tx *bbolt.Tx) error { err := b.boltDB.Batch(func(tx *bbolt.Tx) error {
@ -85,7 +85,7 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
return nil return nil
}) })
if err == nil { if err == nil {
b.incSize(sz) b.incSize(upperBound)
} }
return PutRes{}, err return PutRes{}, err

View file

@ -28,8 +28,9 @@ func bucketKeyFromBounds(upperBound uint64) []byte {
return buf[:ln] return buf[:ln]
} }
func bucketForSize(sz uint64) []byte { func bucketForSize(sz uint64) ([]byte, uint64) {
return bucketKeyFromBounds(upperPowerOfTwo(sz)) upperBound := upperPowerOfTwo(sz)
return bucketKeyFromBounds(upperBound), upperBound
} }
func upperPowerOfTwo(v uint64) uint64 { func upperPowerOfTwo(v uint64) uint64 {
@ -40,15 +41,15 @@ func upperPowerOfTwo(v uint64) uint64 {
} }
func (b *Blobovnicza) incSize(sz uint64) { func (b *Blobovnicza) incSize(sz uint64) {
b.filled.Add(sz) b.dataSize.Add(sz)
b.metrics.AddSize(sz) b.metrics.AddOpenBlobovnizcaSize(sz)
} }
func (b *Blobovnicza) decSize(sz uint64) { func (b *Blobovnicza) decSize(sz uint64) {
b.filled.Add(^(sz - 1)) b.dataSize.Add(^(sz - 1))
b.metrics.SubSize(sz) b.metrics.SubOpenBlobovnizcaSize(sz)
} }
func (b *Blobovnicza) full() bool { func (b *Blobovnicza) full() bool {
return b.filled.Load() >= b.fullSizeLimit return b.dataSize.Load() >= b.fullSizeLimit
} }

View file

@ -34,7 +34,8 @@ func TestSizes(t *testing.T) {
upperBound: 4 * firstBucketBound, upperBound: 4 * firstBucketBound,
}, },
} { } {
require.Equal(t, bucketKeyFromBounds(item.upperBound), bucketForSize(item.sz)) key, _ := bucketForSize(item.sz)
require.Equal(t, bucketKeyFromBounds(item.upperBound), key)
} }
} }

View file

@ -136,5 +136,8 @@ func (b *Blobovniczas) openBlobovniczaNoCache(p string) (*blobovnicza.Blobovnicz
if err := blz.Open(); err != nil { if err := blz.Open(); err != nil {
return nil, fmt.Errorf("could not open blobovnicza %s: %w", p, err) return nil, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
} }
if err := blz.Init(); err != nil {
return nil, fmt.Errorf("could not init blobovnicza %s: %w", p, err)
}
return blz, nil return blz, nil
} }

View file

@ -81,12 +81,12 @@ type blobovniczaMetrics struct {
path string path string
} }
func (m *blobovniczaMetrics) AddSize(size uint64) { func (m *blobovniczaMetrics) AddOpenBlobovnizcaSize(size uint64) {
m.m.AddTreeSize(m.shardID(), m.path, size) m.m.AddOpenBlobovnizcaSize(m.shardID(), m.path, size)
} }
func (m *blobovniczaMetrics) SubSize(size uint64) { func (m *blobovniczaMetrics) SubOpenBlobovnizcaSize(size uint64) {
m.m.SubTreeSize(m.shardID(), m.path, size) m.m.SubOpenBlobovnizcaSize(m.shardID(), m.path, size)
} }
func (m *blobovniczaMetrics) IncOpenBlobovnizcaCount() { func (m *blobovniczaMetrics) IncOpenBlobovnizcaCount() {

View file

@ -15,8 +15,8 @@ type BlobobvnizcaMetrics interface {
AddBlobobvnizcaTreePut(shardID, path string, size int) AddBlobobvnizcaTreePut(shardID, path string, size int)
AddBlobobvnizcaTreeGet(shardID, path string, size int) AddBlobobvnizcaTreeGet(shardID, path string, size int)
AddTreeSize(shardID, path string, size uint64) AddOpenBlobovnizcaSize(shardID, path string, size uint64)
SubTreeSize(shardID, path string, size uint64) SubOpenBlobovnizcaSize(shardID, path string, size uint64)
IncOpenBlobovnizcaCount(shardID, path string) IncOpenBlobovnizcaCount(shardID, path string)
DecOpenBlobovnizcaCount(shardID, path string) DecOpenBlobovnizcaCount(shardID, path string)
@ -27,41 +27,41 @@ type blobovnizca struct {
treeReqDuration *prometheus.HistogramVec treeReqDuration *prometheus.HistogramVec
treePut *prometheus.CounterVec treePut *prometheus.CounterVec
treeGet *prometheus.CounterVec treeGet *prometheus.CounterVec
treeSize *prometheus.GaugeVec treeOpenSize *prometheus.GaugeVec
treeOpenCounter *prometheus.GaugeVec treeOpenCounter *prometheus.GaugeVec
} }
func newBlobovnizca() *blobovnizca { func newBlobovnizca() *blobovnizca {
return &blobovnizca{ return &blobovnizca{
treeMode: newShardIDPathMode(blobovnizaTreeSubSystem, "mode", "Blobovnizca tree mode"), treeMode: newShardIDPathMode(blobovniczaTreeSubSystem, "mode", "Blobovnizca tree mode"),
treeReqDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{ treeReqDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: blobovnizaTreeSubSystem, Subsystem: blobovniczaTreeSubSystem,
Name: "request_duration_seconds", Name: "request_duration_seconds",
Help: "Accumulated Blobovnizca tree request process duration", Help: "Accumulated Blobovnizca tree request process duration",
}, []string{shardIDLabel, pathLabel, successLabel, methodLabel, withStorageIDLabel}), }, []string{shardIDLabel, pathLabel, successLabel, methodLabel, withStorageIDLabel}),
treePut: metrics.NewCounterVec(prometheus.CounterOpts{ treePut: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: blobovnizaTreeSubSystem, Subsystem: blobovniczaTreeSubSystem,
Name: "put_bytes", Name: "put_bytes",
Help: "Accumulated payload size written to Blobovnizca tree", Help: "Accumulated payload size written to Blobovnizca tree",
}, []string{shardIDLabel, pathLabel}), }, []string{shardIDLabel, pathLabel}),
treeGet: metrics.NewCounterVec(prometheus.CounterOpts{ treeGet: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: blobovnizaTreeSubSystem, Subsystem: blobovniczaTreeSubSystem,
Name: "get_bytes", Name: "get_bytes",
Help: "Accumulated payload size read from Blobovnizca tree", Help: "Accumulated payload size read from Blobovnizca tree",
}, []string{shardIDLabel, pathLabel}), }, []string{shardIDLabel, pathLabel}),
treeSize: metrics.NewGaugeVec(prometheus.GaugeOpts{ treeOpenSize: metrics.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: blobovnizaTreeSubSystem, Subsystem: blobovniczaTreeSubSystem,
Name: "size_bytes", Name: "open_blobovnizca_size_bytes",
Help: "Blobovnizca tree size", Help: "Size of opened blobovnizcas of Blobovnizca tree",
}, []string{shardIDLabel, pathLabel}), }, []string{shardIDLabel, pathLabel}),
treeOpenCounter: metrics.NewGaugeVec(prometheus.GaugeOpts{ treeOpenCounter: metrics.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: blobovnizaTreeSubSystem, Subsystem: blobovniczaTreeSubSystem,
Name: "open_blobovnizca_count", Name: "open_blobovnizca_count",
Help: "Count of opened blobovnizcas of Blobovnizca tree", Help: "Count of opened blobovnizcas of Blobovnizca tree",
}, []string{shardIDLabel, pathLabel}), }, []string{shardIDLabel, pathLabel}),
@ -112,15 +112,15 @@ func (b *blobovnizca) AddBlobobvnizcaTreeGet(shardID, path string, size int) {
}).Add(float64(size)) }).Add(float64(size))
} }
func (b *blobovnizca) AddTreeSize(shardID, path string, size uint64) { func (b *blobovnizca) AddOpenBlobovnizcaSize(shardID, path string, size uint64) {
b.treeSize.With(prometheus.Labels{ b.treeOpenSize.With(prometheus.Labels{
shardIDLabel: shardID, shardIDLabel: shardID,
pathLabel: path, pathLabel: path,
}).Add(float64(size)) }).Add(float64(size))
} }
func (b *blobovnizca) SubTreeSize(shardID, path string, size uint64) { func (b *blobovnizca) SubOpenBlobovnizcaSize(shardID, path string, size uint64) {
b.treeSize.With(prometheus.Labels{ b.treeOpenSize.With(prometheus.Labels{
shardIDLabel: shardID, shardIDLabel: shardID,
pathLabel: path, pathLabel: path,
}).Sub(float64(size)) }).Sub(float64(size))

View file

@ -4,22 +4,22 @@ const (
namespace = "frostfs_node" namespace = "frostfs_node"
innerRingNamespace = "frostfs_ir" innerRingNamespace = "frostfs_ir"
fstreeSubSystem = "fstree" fstreeSubSystem = "fstree"
blobstoreSubSystem = "blobstore" blobstoreSubSystem = "blobstore"
blobovnizaTreeSubSystem = "blobovniza_tree" blobovniczaTreeSubSystem = "blobovnicza_tree"
metabaseSubSystem = "metabase" metabaseSubSystem = "metabase"
piloramaSubSystem = "pilorama" piloramaSubSystem = "pilorama"
engineSubsystem = "engine" engineSubsystem = "engine"
gcSubsystem = "garbage_collector" gcSubsystem = "garbage_collector"
innerRingSubsystem = "ir" innerRingSubsystem = "ir"
morphSubsystem = "morph" morphSubsystem = "morph"
morphCacheSubsystem = "morphcache" morphCacheSubsystem = "morphcache"
objectSubsystem = "object" objectSubsystem = "object"
replicatorSubsystem = "replicator" replicatorSubsystem = "replicator"
stateSubsystem = "state" stateSubsystem = "state"
treeServiceSubsystem = "treeservice" treeServiceSubsystem = "treeservice"
writeCacheSubsystem = "writecache" writeCacheSubsystem = "writecache"
grpcServerSubsystem = "grpc_server" grpcServerSubsystem = "grpc_server"
successLabel = "success" successLabel = "success"
shardIDLabel = "shard_id" shardIDLabel = "shard_id"