[#373] pilorama: Add metrics

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-06-07 12:27:53 +03:00
parent 059e9e88a2
commit d5aaec1107
3 changed files with 159 additions and 13 deletions

View file

@ -72,6 +72,7 @@ func NewBoltForest(opts ...Option) ForestStorage {
maxBatchDelay: bbolt.DefaultMaxBatchDelay, maxBatchDelay: bbolt.DefaultMaxBatchDelay,
maxBatchSize: bbolt.DefaultMaxBatchSize, maxBatchSize: bbolt.DefaultMaxBatchSize,
openFile: os.OpenFile, openFile: os.OpenFile,
metrics: &noopMetrics{},
}, },
} }
@ -101,6 +102,7 @@ func (t *boltForest) SetMode(m mode.Mode) error {
} }
t.mode = m t.mode = m
t.metrics.SetMode(m)
return nil return nil
} }
func (t *boltForest) Open(readOnly bool) error { func (t *boltForest) Open(readOnly bool) error {
@ -122,7 +124,11 @@ func (t *boltForest) Open(readOnly bool) error {
t.db.MaxBatchSize = t.maxBatchSize t.db.MaxBatchSize = t.maxBatchSize
t.db.MaxBatchDelay = t.maxBatchDelay t.db.MaxBatchDelay = t.maxBatchDelay
m := mode.ReadWrite
if readOnly {
m = mode.ReadOnly
}
t.metrics.SetMode(m)
return nil return nil
} }
func (t *boltForest) Init() error { func (t *boltForest) Init() error {
@ -142,10 +148,14 @@ func (t *boltForest) Init() error {
}) })
} }
func (t *boltForest) Close() error { func (t *boltForest) Close() error {
var err error
if t.db != nil { if t.db != nil {
return t.db.Close() err = t.db.Close()
} }
return nil if err == nil {
t.metrics.Close()
}
return err
} }
// TreeMove implements the Forest interface. // TreeMove implements the Forest interface.
@ -225,6 +235,14 @@ func (t *boltForest) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID strin
// TreeExists implements the Forest interface. // TreeExists implements the Forest interface.
func (t *boltForest) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) { func (t *boltForest) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeExists", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeExists", _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeExists",
trace.WithAttributes( trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()), attribute.String("container_id", cid.EncodeToString()),
@ -247,7 +265,7 @@ func (t *boltForest) TreeExists(ctx context.Context, cid cidSDK.ID, treeID strin
exists = treeRoot != nil exists = treeRoot != nil
return nil return nil
}) })
success = err == nil
return exists, metaerr.Wrap(err) return exists, metaerr.Wrap(err)
} }
@ -255,6 +273,14 @@ var syncHeightKey = []byte{'h'}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface. // TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (t *boltForest) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error { func (t *boltForest) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeUpdateLastSyncHeight", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeUpdateLastSyncHeight", _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeUpdateLastSyncHeight",
trace.WithAttributes( trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()), attribute.String("container_id", cid.EncodeToString()),
@ -268,7 +294,7 @@ func (t *boltForest) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID
binary.LittleEndian.PutUint64(rawHeight, height) binary.LittleEndian.PutUint64(rawHeight, height)
buck := bucketName(cid, treeID) buck := bucketName(cid, treeID)
return metaerr.Wrap(t.db.Batch(func(tx *bbolt.Tx) error { err := metaerr.Wrap(t.db.Batch(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(buck) treeRoot := tx.Bucket(buck)
if treeRoot == nil { if treeRoot == nil {
return ErrTreeNotFound return ErrTreeNotFound
@ -277,10 +303,20 @@ func (t *boltForest) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID
b := treeRoot.Bucket(dataBucket) b := treeRoot.Bucket(dataBucket)
return b.Put(syncHeightKey, rawHeight) return b.Put(syncHeightKey, rawHeight)
})) }))
success = err == nil
return err
} }
// TreeLastSyncHeight implements the pilorama.Forest interface. // TreeLastSyncHeight implements the pilorama.Forest interface.
func (t *boltForest) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) { func (t *boltForest) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeLastSyncHeight", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeLastSyncHeight", _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeLastSyncHeight",
trace.WithAttributes( trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()), attribute.String("container_id", cid.EncodeToString()),
@ -305,11 +341,20 @@ func (t *boltForest) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, tree
} }
return nil return nil
}) })
success = err == nil
return height, metaerr.Wrap(err) return height, metaerr.Wrap(err)
} }
// TreeAddByPath implements the Forest interface. // TreeAddByPath implements the Forest interface.
func (t *boltForest) TreeAddByPath(ctx context.Context, d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) { func (t *boltForest) TreeAddByPath(ctx context.Context, d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeAddByPath", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeAddByPath", _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeAddByPath",
trace.WithAttributes( trace.WithAttributes(
attribute.String("container_id", d.CID.EncodeToString()), attribute.String("container_id", d.CID.EncodeToString()),
@ -323,6 +368,12 @@ func (t *boltForest) TreeAddByPath(ctx context.Context, d CIDDescriptor, treeID
) )
defer span.End() defer span.End()
res, err := t.addByPathInternal(d, attr, treeID, path, meta)
success = err == nil
return res, err
}
func (t *boltForest) addByPathInternal(d CIDDescriptor, attr string, treeID string, path []string, meta []KeyValue) ([]Move, error) {
if !d.checkValid() { if !d.checkValid() {
return nil, ErrInvalidCIDDescriptor return nil, ErrInvalidCIDDescriptor
} }
@ -417,6 +468,14 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
// TreeApply implements the Forest interface. // TreeApply implements the Forest interface.
func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error { func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeApply", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApply", _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApply",
trace.WithAttributes( trace.WithAttributes(
attribute.String("container_id", cnr.EncodeToString()), attribute.String("container_id", cnr.EncodeToString()),
@ -440,6 +499,7 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string
err := t.db.View(func(tx *bbolt.Tx) error { err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cnr, treeID)) treeRoot := tx.Bucket(bucketName(cnr, treeID))
if treeRoot == nil { if treeRoot == nil {
success = true
return nil return nil
} }
@ -448,16 +508,18 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string
var logKey [8]byte var logKey [8]byte
binary.BigEndian.PutUint64(logKey[:], m.Time) binary.BigEndian.PutUint64(logKey[:], m.Time)
seen = b.Get(logKey[:]) != nil seen = b.Get(logKey[:]) != nil
success = true
return nil return nil
}) })
if err != nil || seen { if err != nil || seen {
success = err == nil
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
} }
if t.db.MaxBatchSize == 1 { if t.db.MaxBatchSize == 1 {
fullID := bucketName(cnr, treeID) fullID := bucketName(cnr, treeID)
return metaerr.Wrap(t.db.Update(func(tx *bbolt.Tx) error { err := metaerr.Wrap(t.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, fullID) bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil { if err != nil {
return err return err
@ -466,11 +528,15 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string
var lm Move var lm Move
return t.applyOperation(bLog, bTree, []*Move{m}, &lm) return t.applyOperation(bLog, bTree, []*Move{m}, &lm)
})) }))
success = err == nil
return err
} }
ch := make(chan error, 1) ch := make(chan error, 1)
t.addBatch(cnr, treeID, m, ch) t.addBatch(cnr, treeID, m, ch)
return metaerr.Wrap(<-ch) err := <-ch
success = err == nil
return metaerr.Wrap(err)
} }
func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) { func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) {
@ -724,6 +790,14 @@ func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool {
// TreeGetByPath implements the Forest interface. // TreeGetByPath implements the Forest interface.
func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) { func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeGetByPath", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetByPath", _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetByPath",
trace.WithAttributes( trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()), attribute.String("container_id", cid.EncodeToString()),
@ -740,6 +814,7 @@ func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID st
} }
if len(path) == 0 { if len(path) == 0 {
success = true
return nil, nil return nil, nil
} }
@ -752,7 +827,7 @@ func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID st
var nodes []Node var nodes []Node
return nodes, metaerr.Wrap(t.db.View(func(tx *bbolt.Tx) error { err := metaerr.Wrap(t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID)) treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot == nil { if treeRoot == nil {
return ErrTreeNotFound return ErrTreeNotFound
@ -790,10 +865,20 @@ func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID st
} }
return nil return nil
})) }))
success = err == nil
return nodes, err
} }
// TreeGetMeta implements the forest interface. // TreeGetMeta implements the forest interface.
func (t *boltForest) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) (Meta, Node, error) { func (t *boltForest) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) (Meta, Node, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeGetMeta", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetMeta", _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetMeta",
trace.WithAttributes( trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()), attribute.String("container_id", cid.EncodeToString()),
@ -828,12 +913,20 @@ func (t *boltForest) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID stri
_, _, meta, _ := t.getState(b, stateKey(key, nodeID)) _, _, meta, _ := t.getState(b, stateKey(key, nodeID))
return m.FromBytes(meta) return m.FromBytes(meta)
}) })
success = err == nil
return m, parentID, metaerr.Wrap(err) return m, parentID, metaerr.Wrap(err)
} }
// TreeGetChildren implements the Forest interface. // TreeGetChildren implements the Forest interface.
func (t *boltForest) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) ([]uint64, error) { func (t *boltForest) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) ([]uint64, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeGetChildren", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetChildren", _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetChildren",
trace.WithAttributes( trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()), attribute.String("container_id", cid.EncodeToString()),
@ -869,12 +962,20 @@ func (t *boltForest) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID
} }
return nil return nil
}) })
success = err == nil
return children, metaerr.Wrap(err) return children, metaerr.Wrap(err)
} }
// TreeList implements the Forest interface. // TreeList implements the Forest interface.
func (t *boltForest) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) { func (t *boltForest) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeList", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeList", _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeList",
trace.WithAttributes( trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()), attribute.String("container_id", cid.EncodeToString()),
@ -910,12 +1011,20 @@ func (t *boltForest) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, err
if err != nil { if err != nil {
return nil, metaerr.Wrap(fmt.Errorf("could not list trees: %w", err)) return nil, metaerr.Wrap(fmt.Errorf("could not list trees: %w", err))
} }
success = true
return ids, nil return ids, nil
} }
// TreeGetOpLog implements the pilorama.Forest interface. // TreeGetOpLog implements the pilorama.Forest interface.
func (t *boltForest) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (Move, error) { func (t *boltForest) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (Move, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeGetOpLog", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetOpLog", _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetOpLog",
trace.WithAttributes( trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()), attribute.String("container_id", cid.EncodeToString()),
@ -949,12 +1058,20 @@ func (t *boltForest) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID str
} }
return nil return nil
}) })
success = err == nil
return lm, metaerr.Wrap(err) return lm, metaerr.Wrap(err)
} }
// TreeDrop implements the pilorama.Forest interface. // TreeDrop implements the pilorama.Forest interface.
func (t *boltForest) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string) error { func (t *boltForest) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeDrop", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeDrop", _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeDrop",
trace.WithAttributes( trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()), attribute.String("container_id", cid.EncodeToString()),
@ -972,7 +1089,7 @@ func (t *boltForest) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string)
return ErrReadOnlyMode return ErrReadOnlyMode
} }
return metaerr.Wrap(t.db.Batch(func(tx *bbolt.Tx) error { err := metaerr.Wrap(t.db.Batch(func(tx *bbolt.Tx) error {
if treeID == "" { if treeID == "" {
c := tx.Cursor() c := tx.Cursor()
prefix := make([]byte, 32) prefix := make([]byte, 32)
@ -991,6 +1108,8 @@ func (t *boltForest) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string)
} }
return err return err
})) }))
success = err == nil
return err
} }
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) { func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {

View file

@ -0,0 +1,20 @@
package pilorama
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
)
type Metrics interface {
SetMode(m mode.Mode)
Close()
AddMethodDuration(method string, d time.Duration, success bool)
}
type noopMetrics struct{}
func (m *noopMetrics) SetMode(mode.Mode) {}
func (m *noopMetrics) Close() {}
func (m *noopMetrics) AddMethodDuration(string, time.Duration, bool) {}

View file

@ -15,6 +15,7 @@ type cfg struct {
maxBatchDelay time.Duration maxBatchDelay time.Duration
maxBatchSize int maxBatchSize int
openFile func(string, int, fs.FileMode) (*os.File, error) openFile func(string, int, fs.FileMode) (*os.File, error)
metrics Metrics
} }
func WithPath(path string) Option { func WithPath(path string) Option {
@ -52,3 +53,9 @@ func WithOpenFile(openFile func(string, int, fs.FileMode) (*os.File, error)) Opt
c.openFile = openFile c.openFile = openFile
} }
} }
func WithMetrics(m Metrics) Option {
return func(c *cfg) {
c.metrics = m
}
}