frostfs-node/pkg/services/object/metrics.go

163 lines
3.6 KiB
Go
Raw Normal View History

package object
import (
"context"
"time"
"github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-node/pkg/services/util"
)
type (
MetricCollector struct {
next ServiceServer
metrics MetricRegister
}
getStreamMetric struct {
util.ServerStream
stream GetObjectStream
metrics MetricRegister
}
putStreamMetric struct {
stream PutObjectStream
metrics MetricRegister
start time.Time
}
MetricRegister interface {
IncGetReqCounter()
IncPutReqCounter()
IncHeadReqCounter()
IncSearchReqCounter()
IncDeleteReqCounter()
IncRangeReqCounter()
IncRangeHashReqCounter()
AddGetReqDuration(time.Duration)
AddPutReqDuration(time.Duration)
AddHeadReqDuration(time.Duration)
AddSearchReqDuration(time.Duration)
AddDeleteReqDuration(time.Duration)
AddRangeReqDuration(time.Duration)
AddRangeHashReqDuration(time.Duration)
AddPutPayload(int)
AddGetPayload(int)
}
)
func NewMetricCollector(next ServiceServer, register MetricRegister) *MetricCollector {
return &MetricCollector{
next: next,
metrics: register,
}
}
func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) error {
t := time.Now()
defer func() {
m.metrics.IncGetReqCounter()
m.metrics.AddGetReqDuration(time.Since(t))
}()
return m.next.Get(req, &getStreamMetric{
ServerStream: stream,
stream: stream,
metrics: m.metrics,
})
}
func (m MetricCollector) Put(ctx context.Context) (PutObjectStream, error) {
t := time.Now()
stream, err := m.next.Put(ctx)
if err != nil {
return nil, err
}
return &putStreamMetric{
stream: stream,
metrics: m.metrics,
start: t,
}, nil
}
func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
t := time.Now()
defer func() {
m.metrics.IncHeadReqCounter()
m.metrics.AddHeadReqDuration(time.Since(t))
}()
return m.next.Head(ctx, request)
}
func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) error {
t := time.Now()
defer func() {
m.metrics.IncSearchReqCounter()
m.metrics.AddSearchReqDuration(time.Since(t))
}()
return m.next.Search(req, stream)
}
func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) {
t := time.Now()
defer func() {
m.metrics.IncDeleteReqCounter()
m.metrics.AddDeleteReqDuration(time.Since(t))
}()
return m.next.Delete(ctx, request)
}
func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
t := time.Now()
defer func() {
m.metrics.IncRangeReqCounter()
m.metrics.AddRangeReqDuration(time.Since(t))
}()
return m.next.GetRange(req, stream)
}
func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
t := time.Now()
defer func() {
m.metrics.IncRangeHashReqCounter()
m.metrics.AddRangeHashReqDuration(time.Since(t))
}()
return m.next.GetRangeHash(ctx, request)
}
func (s getStreamMetric) Send(resp *object.GetResponse) error {
chunk, ok := resp.GetBody().GetObjectPart().(*object.GetObjectPartChunk)
if ok {
s.metrics.AddGetPayload(len(chunk.GetChunk()))
}
return s.stream.Send(resp)
}
func (s putStreamMetric) Send(req *object.PutRequest) error {
chunk, ok := req.GetBody().GetObjectPart().(*object.PutObjectPartChunk)
if ok {
s.metrics.AddPutPayload(len(chunk.GetChunk()))
}
return s.stream.Send(req)
}
func (s putStreamMetric) CloseAndRecv() (*object.PutResponse, error) {
defer func() {
s.metrics.IncPutReqCounter()
s.metrics.AddPutReqDuration(time.Since(s.start))
}()
return s.stream.CloseAndRecv()
}