From bc6e7a823bef89b9ea086393772b4d5dd473caaf Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Mon, 15 Mar 2021 16:07:20 +0300 Subject: [PATCH] [#426] service/object: Add object size metrics We can't rely on object size in the header because it might be omitted on initial put or it can be 0xFF.. on streaming data. Signed-off-by: Alex Vanin --- pkg/services/object/metrics.go | 66 ++++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/pkg/services/object/metrics.go b/pkg/services/object/metrics.go index 8a28ea0d..1be5ebc2 100644 --- a/pkg/services/object/metrics.go +++ b/pkg/services/object/metrics.go @@ -5,6 +5,7 @@ import ( "time" "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-node/pkg/services/util" "github.com/prometheus/client_golang/prometheus" ) @@ -12,6 +13,15 @@ type ( MetricCollector struct { next ServiceServer } + + getStreamMetric struct { + util.ServerStream + stream GetObjectStream + } + + putStreamMetric struct { + stream object.PutObjectStreamer + } ) const ( @@ -124,6 +134,23 @@ var ( }) ) +// Object payload metrics. +var ( + putPayload = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "put_payload", + Help: "Accumulated payload size at object put method", + }) + + getPayload = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "get_payload", + Help: "Accumulated payload size at object get method", + }) +) + func registerMetrics() { prometheus.MustRegister(getCounter) // todo: replace with for loop over map prometheus.MustRegister(putCounter) @@ -140,6 +167,9 @@ func registerMetrics() { prometheus.MustRegister(deleteDuration) prometheus.MustRegister(rangeDuration) prometheus.MustRegister(rangeHashDuration) + + prometheus.MustRegister(putPayload) + prometheus.MustRegister(getPayload) } func NewMetricCollector(next ServiceServer) *MetricCollector { @@ -157,7 +187,10 @@ func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) err getDuration.Add(float64(time.Since(t))) }() - return m.next.Get(req, stream) + return m.next.Get(req, &getStreamMetric{ + ServerStream: stream, + stream: stream, + }) } func (m MetricCollector) Put(ctx context.Context) (object.PutObjectStreamer, error) { @@ -167,7 +200,12 @@ func (m MetricCollector) Put(ctx context.Context) (object.PutObjectStreamer, err putDuration.Add(float64(time.Since(t))) }() - return m.next.Put(ctx) + stream, err := m.next.Put(ctx) + if err != nil { + return nil, err + } + + return &putStreamMetric{stream: stream}, nil } func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { @@ -219,3 +257,27 @@ func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRa return m.next.GetRangeHash(ctx, request) } + +func (s getStreamMetric) Send(resp *object.GetResponse) error { + chunk, ok := resp.GetBody().GetObjectPart().(*object.GetObjectPartChunk) + if ok { + ln := len(chunk.GetChunk()) + getPayload.Add(float64(ln)) + } + + return s.stream.Send(resp) +} + +func (s putStreamMetric) Send(req *object.PutRequest) error { + chunk, ok := req.GetBody().GetObjectPart().(*object.PutObjectPartChunk) + if ok { + ln := len(chunk.GetChunk()) + putPayload.Add(float64(ln)) + } + + return s.stream.Send(req) +} + +func (s putStreamMetric) CloseAndRecv() (*object.PutResponse, error) { + return s.stream.CloseAndRecv() +}