WIP: Blobtree substorage #645
14 changed files with 300 additions and 8 deletions
|
@ -889,6 +889,11 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
|||
badgerstore.WithMemTablesCount(sRead.memTablesCount),
|
||||
badgerstore.WithValueLogSize(sRead.valueLogFileSize),
|
||||
}
|
||||
if c.metricsCollector != nil {
|
||||
badgerStoreOpts = append(badgerStoreOpts,
|
||||
badgerstore.WithMetrics(
|
||||
lsmetrics.NewBadgerStoreMetrics(sRead.path, c.metricsCollector.BadgerStoreMetrics())))
|
||||
}
|
||||
ss = append(ss, blobstor.SubStorage{
|
||||
Storage: badgerstore.New(badgerStoreOpts...),
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
|
|
|
@ -16,6 +16,7 @@ type cfg struct {
|
|||
db badger.Options
|
||||
gcTimeout time.Duration
|
||||
gcDiscardRatio float64
|
||||
metrics Metrics
|
||||
}
|
||||
|
||||
type Option func(*cfg)
|
||||
|
@ -55,6 +56,7 @@ func defaultCfg() *cfg {
|
|||
db: opts,
|
||||
gcTimeout: 10 * time.Minute,
|
||||
gcDiscardRatio: 0.2, // for 1GB vLog file GC will perform only if around 200MB could be free
|
||||
metrics: &noopMetrics{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,3 +116,10 @@ func WithValueLogSize(sz int64) Option {
|
|||
c.db.ValueLogFileSize = sz
|
||||
}
|
||||
}
|
||||
|
||||
// WithMetrics sets metrics.
|
||||
func WithMetrics(m Metrics) Option {
|
||||
return func(c *cfg) {
|
||||
c.metrics = m
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ func (s *Store) Close() error {
|
|||
return err
|
||||
}
|
||||
s.opened = false
|
||||
s.cfg.metrics.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -83,6 +84,7 @@ func (s *Store) Open(readOnly bool) error {
|
|||
return err
|
||||
}
|
||||
s.opened = true
|
||||
s.cfg.metrics.SetMode(readOnly)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package badgerstore
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
|
@ -14,6 +15,13 @@ import (
|
|||
|
||||
// Delete implements common.Storage.
|
||||
func (s *Store) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) {
|
||||
success := false
|
||||
startedAt := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.cfg.metrics.Delete(time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Delete",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
|
@ -36,9 +44,12 @@ func (s *Store) Delete(ctx context.Context, prm common.DeletePrm) (common.Delete
|
|||
return common.DeleteRes{}, err
|
||||
}
|
||||
|
||||
err = tx.Delete(key(prm.Address))
|
||||
if err != nil {
|
||||
if err = tx.Delete(key(prm.Address)); err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
return common.DeleteRes{}, tx.Commit()
|
||||
if err = tx.Commit(); err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
success = true
|
||||
return common.DeleteRes{}, nil
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package badgerstore
|
|||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
|
@ -13,6 +14,13 @@ import (
|
|||
|
||||
// Exists implements common.Storage.
|
||||
func (s *Store) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
success := false
|
||||
startedAt := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.cfg.metrics.Exists(time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Exists",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
|
@ -27,10 +35,12 @@ func (s *Store) Exists(ctx context.Context, prm common.ExistsPrm) (common.Exists
|
|||
_, err := tx.Get(key(prm.Address))
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
success = true
|
||||
return common.ExistsRes{Exists: false}, nil
|
||||
}
|
||||
return common.ExistsRes{}, err
|
||||
}
|
||||
|
||||
success = true
|
||||
return common.ExistsRes{Exists: true}, nil
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"encoding/hex"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
|
@ -19,6 +20,14 @@ import (
|
|||
|
||||
// Get implements common.Storage.
|
||||
func (s *Store) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
|
||||
success := false
|
||||
size := 0
|
||||
startedAt := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.cfg.metrics.Get(time.Since(startedAt), size, success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Get",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
|
@ -43,11 +52,21 @@ func (s *Store) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, erro
|
|||
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
||||
}
|
||||
|
||||
success = true
|
||||
size = len(data)
|
||||
return common.GetRes{Object: obj, RawData: data}, nil
|
||||
}
|
||||
|
||||
// GetRange implements common.Storage.
|
||||
func (s *Store) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||
success := false
|
||||
size := 0
|
||||
startedAt := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.cfg.metrics.GetRange(time.Since(startedAt), size, success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.GetRange",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
|
@ -81,9 +100,10 @@ func (s *Store) GetRange(ctx context.Context, prm common.GetRangePrm) (common.Ge
|
|||
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange))
|
||||
}
|
||||
|
||||
return common.GetRangeRes{
|
||||
Data: payload[from:to],
|
||||
}, nil
|
||||
res := common.GetRangeRes{Data: payload[from:to]}
|
||||
success = true
|
||||
size = len(res.Data)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (s *Store) getObjectData(addr oid.Address) ([]byte, error) {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
|
@ -14,6 +15,13 @@ import (
|
|||
|
||||
// Iterate implements common.Storage.
|
||||
func (s *Store) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||
success := false
|
||||
startedAt := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.cfg.metrics.Iterate(time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Iterate",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
|
@ -78,6 +86,7 @@ func (s *Store) Iterate(ctx context.Context, prm common.IteratePrm) (common.Iter
|
|||
}
|
||||
}
|
||||
|
||||
success = true
|
||||
return common.IterateRes{}, nil
|
||||
}
|
||||
|
||||
|
|
31
pkg/local_object_storage/blobstor/badgerstore/metrics.go
Normal file
31
pkg/local_object_storage/blobstor/badgerstore/metrics.go
Normal file
|
@ -0,0 +1,31 @@
|
|||
package badgerstore
|
||||
|
||||
import "time"
|
||||
|
||||
type Metrics interface {
|
||||
SetParentID(parentID string)
|
||||
|
||||
SetMode(readOnly bool)
|
||||
Close()
|
||||
|
||||
Delete(d time.Duration, success bool)
|
||||
Exists(d time.Duration, success bool)
|
||||
GetRange(d time.Duration, size int, success bool)
|
||||
Get(d time.Duration, size int, success bool)
|
||||
Iterate(d time.Duration, success bool)
|
||||
Put(d time.Duration, size int, success bool)
|
||||
}
|
||||
|
||||
var _ Metrics = (*noopMetrics)(nil)
|
||||
|
||||
type noopMetrics struct{}
|
||||
|
||||
func (*noopMetrics) Close() {}
|
||||
func (*noopMetrics) Delete(time.Duration, bool) {}
|
||||
func (*noopMetrics) Exists(time.Duration, bool) {}
|
||||
func (*noopMetrics) Get(time.Duration, int, bool) {}
|
||||
func (*noopMetrics) GetRange(time.Duration, int, bool) {}
|
||||
func (*noopMetrics) Iterate(time.Duration, bool) {}
|
||||
func (*noopMetrics) Put(time.Duration, int, bool) {}
|
||||
func (*noopMetrics) SetMode(bool) {}
|
||||
func (*noopMetrics) SetParentID(string) {}
|
|
@ -2,6 +2,7 @@ package badgerstore
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
|
@ -11,6 +12,14 @@ import (
|
|||
|
||||
// Put implements common.Storage.
|
||||
func (s *Store) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
||||
success := false
|
||||
size := 0
|
||||
startedAt := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.cfg.metrics.Put(time.Since(startedAt), size, success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Put",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
|
@ -34,5 +43,10 @@ func (s *Store) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
|||
if err != nil {
|
||||
return common.PutRes{}, err
|
||||
}
|
||||
return common.PutRes{}, b.Flush()
|
||||
if err = b.Flush(); err != nil {
|
||||
return common.PutRes{}, err
|
||||
}
|
||||
success = true
|
||||
size = len(prm.RawData)
|
||||
return common.PutRes{}, nil
|
||||
}
|
||||
|
|
|
@ -55,7 +55,9 @@ func (s *Store) SetCompressor(cc *compression.Config) {
|
|||
}
|
||||
|
||||
// SetParentID implements common.Storage.
|
||||
func (*Store) SetParentID(parentID string) {}
|
||||
func (s *Store) SetParentID(parentID string) {
|
||||
s.cfg.metrics.SetParentID(parentID)
|
||||
}
|
||||
|
||||
// SetReportErrorFunc implements common.Storage.
|
||||
func (*Store) SetReportErrorFunc(func(string, error)) {}
|
||||
|
|
74
pkg/local_object_storage/metrics/badgerstore.go
Normal file
74
pkg/local_object_storage/metrics/badgerstore.go
Normal file
|
@ -0,0 +1,74 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
|
||||
metrics_impl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||
)
|
||||
|
||||
func NewBadgerStoreMetrics(path string, m metrics_impl.BadgerStoreMetrics) badgerstore.Metrics {
|
||||
return &badgerStoreMetrics{
|
||||
path: path,
|
||||
m: m,
|
||||
}
|
||||
}
|
||||
|
||||
type badgerStoreMetrics struct {
|
||||
path, shardID string
|
||||
m metrics_impl.BadgerStoreMetrics
|
||||
}
|
||||
|
||||
// Close implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) Close() {
|
||||
m.m.Close(m.shardID, m.path)
|
||||
}
|
||||
|
||||
// Delete implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) Delete(d time.Duration, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "Delete", d, success)
|
||||
}
|
||||
|
||||
// Exists implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) Exists(d time.Duration, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "Exists", d, success)
|
||||
}
|
||||
|
||||
// Get implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) Get(d time.Duration, size int, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "Get", d, success)
|
||||
if success {
|
||||
m.m.AddGet(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
// GetRange implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) GetRange(d time.Duration, size int, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "GetRange", d, success)
|
||||
if success {
|
||||
m.m.AddGet(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) Iterate(d time.Duration, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "Iterate", d, success)
|
||||
}
|
||||
|
||||
// Put implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) Put(d time.Duration, size int, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "Put", d, success)
|
||||
if success {
|
||||
m.m.AddPut(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
// SetMode implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) SetMode(readOnly bool) {
|
||||
m.m.SetMode(m.shardID, m.path, readOnly)
|
||||
}
|
||||
|
||||
// SetParentID implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) SetParentID(parentID string) {
|
||||
m.shardID = parentID
|
||||
}
|
98
pkg/metrics/badgerstore.go
Normal file
98
pkg/metrics/badgerstore.go
Normal file
|
@ -0,0 +1,98 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type BadgerStoreMetrics interface {
|
||||
SetMode(shardID, path string, readOnly bool)
|
||||
Close(shardID, path string)
|
||||
MethodDuration(shardID, path string, method string, d time.Duration, success bool)
|
||||
AddPut(shardID, path string, size int)
|
||||
AddGet(shardID, path string, size int)
|
||||
}
|
||||
|
||||
var _ BadgerStoreMetrics = (*badgerStoreMetrics)(nil)
|
||||
|
||||
type badgerStoreMetrics struct {
|
||||
mode *shardIDPathModeValue
|
||||
reqDuration *prometheus.HistogramVec
|
||||
put *prometheus.CounterVec
|
||||
get *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func newbadgerStoreMetrics() *badgerStoreMetrics {
|
||||
return &badgerStoreMetrics{
|
||||
mode: newShardIDPathMode(badgerStoreSubSystem, "mode", "BadgerStore mode"),
|
||||
reqDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: badgerStoreSubSystem,
|
||||
Name: "request_duration_seconds",
|
||||
Help: "Accumulated BadgerStore request process duration",
|
||||
}, []string{shardIDLabel, pathLabel, successLabel, methodLabel}),
|
||||
put: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: badgerStoreSubSystem,
|
||||
Name: "put_bytes",
|
||||
Help: "Accumulated payload size written to BadgerStore",
|
||||
}, []string{shardIDLabel, pathLabel}),
|
||||
get: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: badgerStoreSubSystem,
|
||||
Name: "get_bytes",
|
||||
Help: "Accumulated payload size read from BadgerStore",
|
||||
}, []string{shardIDLabel, pathLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
// AddGet implements BadgerStoreMetrics.
|
||||
func (b *badgerStoreMetrics) AddGet(shardID string, path string, size int) {
|
||||
b.get.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
}).Add(float64(size))
|
||||
}
|
||||
|
||||
// AddPut implements BadgerStoreMetrics.
|
||||
func (b *badgerStoreMetrics) AddPut(shardID string, path string, size int) {
|
||||
b.put.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
}).Add(float64(size))
|
||||
}
|
||||
|
||||
// Close implements BadgerStoreMetrics.
|
||||
func (b *badgerStoreMetrics) Close(shardID string, path string) {
|
||||
b.mode.SetMode(shardID, path, closedMode)
|
||||
b.reqDuration.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
b.get.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
b.put.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
}
|
||||
|
||||
// MethodDuration implements BadgerStoreMetrics.
|
||||
func (b *badgerStoreMetrics) MethodDuration(shardID string, path string, method string, d time.Duration, success bool) {
|
||||
b.reqDuration.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
successLabel: strconv.FormatBool(success),
|
||||
methodLabel: method,
|
||||
}).Observe(d.Seconds())
|
||||
}
|
||||
|
||||
// SetMode implements BadgerStoreMetrics.
|
||||
func (b *badgerStoreMetrics) SetMode(shardID string, path string, readOnly bool) {
|
||||
b.mode.SetMode(shardID, path, modeFromBool(readOnly))
|
||||
}
|
|
@ -22,6 +22,7 @@ const (
|
|||
writeCacheSubsystem = "writecache"
|
||||
grpcServerSubsystem = "grpc_server"
|
||||
policerSubsystem = "policer"
|
||||
badgerStoreSubSystem = "badgerstore"
|
||||
|
||||
successLabel = "success"
|
||||
shardIDLabel = "shard_id"
|
||||
|
|
|
@ -21,6 +21,7 @@ type NodeMetrics struct {
|
|||
pilorama *piloramaMetrics
|
||||
grpc *grpcServerMetrics
|
||||
blobTree *blobTreeMetrics
|
||||
badgerStore *badgerStoreMetrics
|
||||
policer *policerMetrics
|
||||
morphClient *morphClientMetrics
|
||||
morphCache *morphCacheMetrics
|
||||
|
@ -51,6 +52,7 @@ func NewNodeMetrics() *NodeMetrics {
|
|||
morphCache: newMorphCacheMetrics(namespace),
|
||||
log: logger.NewLogMetrics(namespace),
|
||||
blobTree: newBlobTreeMetrics(),
|
||||
badgerStore: newbadgerStoreMetrics(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -122,3 +124,7 @@ func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
|
|||
func (m *NodeMetrics) BlobTreeMetrics() BlobTreeMetrics {
|
||||
return m.blobTree
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) BadgerStoreMetrics() BadgerStoreMetrics {
|
||||
return m.badgerStore
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue