2020-07-10 14:17:51 +00:00
|
|
|
package metrics
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2020-07-24 13:54:03 +00:00
|
|
|
"errors"
|
2020-07-10 14:17:51 +00:00
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/nspcc-dev/neofs-api-go/object"
|
|
|
|
"github.com/nspcc-dev/neofs-api-go/refs"
|
2020-07-24 13:54:03 +00:00
|
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
|
|
|
|
meta2 "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/meta"
|
2020-07-10 14:17:51 +00:00
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
type (
|
|
|
|
// Collector is an interface of the metrics collector.
|
|
|
|
Collector interface {
|
|
|
|
Start(ctx context.Context)
|
|
|
|
UpdateSpaceUsage()
|
|
|
|
|
|
|
|
SetCounter(ObjectCounter)
|
2020-07-24 13:54:03 +00:00
|
|
|
SetIterator(iter meta2.Iterator)
|
2020-07-10 14:17:51 +00:00
|
|
|
UpdateContainer(cid refs.CID, size uint64, op SpaceOp)
|
|
|
|
}
|
|
|
|
|
|
|
|
collector struct {
|
|
|
|
log *zap.Logger
|
|
|
|
interval time.Duration
|
|
|
|
counter *counterWrapper
|
|
|
|
|
|
|
|
sizes *syncStore
|
|
|
|
metas *metaWrapper
|
|
|
|
|
|
|
|
updateSpaceSize func()
|
|
|
|
updateObjectCount func()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Params groups the parameters of metrics collector's constructor.
|
|
|
|
Params struct {
|
|
|
|
Options []string
|
|
|
|
Logger *zap.Logger
|
|
|
|
Interval time.Duration
|
2020-07-24 13:54:03 +00:00
|
|
|
MetricsStore bucket.Bucket
|
2020-07-10 14:17:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ObjectCounter is an interface of object number storage.
|
|
|
|
ObjectCounter interface {
|
|
|
|
ObjectsCount() (uint64, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// CounterSetter is an interface of ObjectCounter container.
|
|
|
|
CounterSetter interface {
|
|
|
|
SetCounter(ObjectCounter)
|
|
|
|
}
|
|
|
|
|
|
|
|
counterWrapper struct {
|
|
|
|
sync.Mutex
|
|
|
|
counter ObjectCounter
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
2020-07-24 13:54:03 +00:00
|
|
|
var (
|
|
|
|
errEmptyCounter = errors.New("empty object counter")
|
|
|
|
errEmptyLogger = errors.New("empty logger")
|
|
|
|
errEmptyMetaStore = errors.New("empty meta store")
|
|
|
|
errEmptyMetricsStore = errors.New("empty metrics store")
|
2020-07-10 14:17:51 +00:00
|
|
|
)
|
|
|
|
|
2020-07-24 13:54:03 +00:00
|
|
|
const defaultMetricsInterval = 5 * time.Second
|
|
|
|
|
2020-07-10 14:17:51 +00:00
|
|
|
// New constructs metrics collector and returns Collector interface.
|
|
|
|
func New(p Params) (Collector, error) {
|
|
|
|
switch {
|
|
|
|
case p.Logger == nil:
|
|
|
|
return nil, errEmptyLogger
|
|
|
|
case p.MetricsStore == nil:
|
|
|
|
return nil, errEmptyMetricsStore
|
|
|
|
}
|
|
|
|
|
|
|
|
if p.Interval <= 0 {
|
|
|
|
p.Interval = defaultMetricsInterval
|
|
|
|
}
|
|
|
|
|
|
|
|
metas := newMetaWrapper()
|
|
|
|
sizes := newSyncStore(p.Logger, p.MetricsStore)
|
|
|
|
|
|
|
|
sizes.Load()
|
|
|
|
|
|
|
|
return &collector{
|
|
|
|
log: p.Logger,
|
|
|
|
interval: p.Interval,
|
|
|
|
counter: new(counterWrapper),
|
|
|
|
|
|
|
|
metas: metas,
|
|
|
|
sizes: sizes,
|
|
|
|
|
|
|
|
updateSpaceSize: spaceUpdater(sizes),
|
|
|
|
updateObjectCount: metricsUpdater(p.Options),
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *counterWrapper) SetCounter(counter ObjectCounter) {
|
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
|
|
|
|
c.counter = counter
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *counterWrapper) ObjectsCount() (uint64, error) {
|
|
|
|
c.Lock()
|
|
|
|
defer c.Unlock()
|
|
|
|
|
|
|
|
if c.counter == nil {
|
|
|
|
return 0, errEmptyCounter
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.counter.ObjectsCount()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *collector) SetCounter(counter ObjectCounter) {
|
|
|
|
c.counter.SetCounter(counter)
|
|
|
|
}
|
|
|
|
|
2020-07-24 13:54:03 +00:00
|
|
|
func (c *collector) SetIterator(iter meta2.Iterator) {
|
2020-07-10 14:17:51 +00:00
|
|
|
c.metas.changeIter(iter)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *collector) UpdateContainer(cid refs.CID, size uint64, op SpaceOp) {
|
|
|
|
c.sizes.Update(cid, size, op)
|
|
|
|
c.updateSpaceSize()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *collector) UpdateSpaceUsage() {
|
|
|
|
sizes := make(map[refs.CID]uint64)
|
|
|
|
|
|
|
|
err := c.metas.Iterate(func(obj *object.Object) error {
|
|
|
|
if !obj.IsTombstone() {
|
|
|
|
cid := obj.SystemHeader.CID
|
|
|
|
sizes[cid] += obj.SystemHeader.PayloadLength
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
c.log.Error("could not update space metrics", zap.Error(err))
|
|
|
|
}
|
|
|
|
|
|
|
|
c.sizes.Reset(sizes)
|
|
|
|
c.updateSpaceSize()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *collector) Start(ctx context.Context) {
|
|
|
|
t := time.NewTicker(c.interval)
|
|
|
|
|
|
|
|
loop:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
c.log.Warn("stop collecting metrics", zap.Error(ctx.Err()))
|
|
|
|
break loop
|
|
|
|
case <-t.C:
|
|
|
|
count, err := c.counter.ObjectsCount()
|
|
|
|
if err != nil {
|
|
|
|
c.log.Warn("get object count failure", zap.Error(err))
|
|
|
|
continue loop
|
|
|
|
}
|
|
|
|
counter.Store(float64(count))
|
|
|
|
c.updateObjectCount()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
t.Stop()
|
|
|
|
}
|