forked from TrueCloudLab/frostfs-node
Airat Arifullin
9b13a18aac
* Update version within go.mod; * Fix deprecated frostfs-api-go/v2 package and use frostfs-sdk-go/api instead. Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
229 lines
5 KiB
Go
229 lines
5 KiB
Go
package object
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
|
)
|
|
|
|
type (
|
|
MetricCollector struct {
|
|
next ServiceServer
|
|
metrics MetricRegister
|
|
enabled bool
|
|
}
|
|
|
|
getStreamMetric struct {
|
|
util.ServerStream
|
|
stream GetObjectStream
|
|
metrics MetricRegister
|
|
}
|
|
|
|
putStreamMetric struct {
|
|
stream PutObjectStream
|
|
metrics MetricRegister
|
|
start time.Time
|
|
}
|
|
|
|
patchStreamMetric struct {
|
|
stream PatchObjectStream
|
|
metrics MetricRegister
|
|
start time.Time
|
|
}
|
|
|
|
MetricRegister interface {
|
|
AddRequestDuration(string, time.Duration, bool)
|
|
AddPayloadSize(string, int)
|
|
}
|
|
)
|
|
|
|
func NewMetricCollector(next ServiceServer, register MetricRegister, enabled bool) *MetricCollector {
|
|
return &MetricCollector{
|
|
next: next,
|
|
metrics: register,
|
|
enabled: enabled,
|
|
}
|
|
}
|
|
|
|
func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (err error) {
|
|
if m.enabled {
|
|
t := time.Now()
|
|
defer func() {
|
|
m.metrics.AddRequestDuration("Get", time.Since(t), err == nil)
|
|
}()
|
|
err = m.next.Get(req, &getStreamMetric{
|
|
ServerStream: stream,
|
|
stream: stream,
|
|
metrics: m.metrics,
|
|
})
|
|
} else {
|
|
err = m.next.Get(req, stream)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (m MetricCollector) Put() (PutObjectStream, error) {
|
|
if m.enabled {
|
|
t := time.Now()
|
|
|
|
stream, err := m.next.Put()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &putStreamMetric{
|
|
stream: stream,
|
|
metrics: m.metrics,
|
|
start: t,
|
|
}, nil
|
|
}
|
|
return m.next.Put()
|
|
}
|
|
|
|
func (m MetricCollector) Patch() (PatchObjectStream, error) {
|
|
if m.enabled {
|
|
t := time.Now()
|
|
|
|
stream, err := m.next.Patch()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &patchStreamMetric{
|
|
stream: stream,
|
|
metrics: m.metrics,
|
|
start: t,
|
|
}, nil
|
|
}
|
|
return m.next.Patch()
|
|
}
|
|
|
|
func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
|
if m.enabled {
|
|
t := time.Now()
|
|
|
|
res, err := m.next.PutSingle(ctx, request)
|
|
|
|
m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil)
|
|
if err == nil {
|
|
m.metrics.AddPayloadSize("PutSingle", len(request.GetBody().GetObject().GetPayload()))
|
|
}
|
|
|
|
return res, err
|
|
}
|
|
return m.next.PutSingle(ctx, request)
|
|
}
|
|
|
|
func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
|
|
if m.enabled {
|
|
t := time.Now()
|
|
|
|
res, err := m.next.Head(ctx, request)
|
|
|
|
m.metrics.AddRequestDuration("Head", time.Since(t), err == nil)
|
|
|
|
return res, err
|
|
}
|
|
return m.next.Head(ctx, request)
|
|
}
|
|
|
|
func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) error {
|
|
if m.enabled {
|
|
t := time.Now()
|
|
|
|
err := m.next.Search(req, stream)
|
|
|
|
m.metrics.AddRequestDuration("Search", time.Since(t), err == nil)
|
|
|
|
return err
|
|
}
|
|
return m.next.Search(req, stream)
|
|
}
|
|
|
|
func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) {
|
|
if m.enabled {
|
|
t := time.Now()
|
|
|
|
res, err := m.next.Delete(ctx, request)
|
|
|
|
m.metrics.AddRequestDuration("Delete", time.Since(t), err == nil)
|
|
return res, err
|
|
}
|
|
return m.next.Delete(ctx, request)
|
|
}
|
|
|
|
func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
|
|
if m.enabled {
|
|
t := time.Now()
|
|
|
|
err := m.next.GetRange(req, stream)
|
|
|
|
m.metrics.AddRequestDuration("GetRange", time.Since(t), err == nil)
|
|
|
|
return err
|
|
}
|
|
return m.next.GetRange(req, stream)
|
|
}
|
|
|
|
func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
|
if m.enabled {
|
|
t := time.Now()
|
|
|
|
res, err := m.next.GetRangeHash(ctx, request)
|
|
|
|
m.metrics.AddRequestDuration("GetRangeHash", time.Since(t), err == nil)
|
|
|
|
return res, err
|
|
}
|
|
return m.next.GetRangeHash(ctx, request)
|
|
}
|
|
|
|
func (m *MetricCollector) Enable() {
|
|
m.enabled = true
|
|
}
|
|
|
|
func (m *MetricCollector) Disable() {
|
|
m.enabled = false
|
|
}
|
|
|
|
func (s getStreamMetric) Send(resp *object.GetResponse) error {
|
|
chunk, ok := resp.GetBody().GetObjectPart().(*object.GetObjectPartChunk)
|
|
if ok {
|
|
s.metrics.AddPayloadSize("Get", len(chunk.GetChunk()))
|
|
}
|
|
|
|
return s.stream.Send(resp)
|
|
}
|
|
|
|
func (s putStreamMetric) Send(ctx context.Context, req *object.PutRequest) error {
|
|
chunk, ok := req.GetBody().GetObjectPart().(*object.PutObjectPartChunk)
|
|
if ok {
|
|
s.metrics.AddPayloadSize("Put", len(chunk.GetChunk()))
|
|
}
|
|
|
|
return s.stream.Send(ctx, req)
|
|
}
|
|
|
|
func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
|
|
res, err := s.stream.CloseAndRecv(ctx)
|
|
|
|
s.metrics.AddRequestDuration("Put", time.Since(s.start), err == nil)
|
|
|
|
return res, err
|
|
}
|
|
|
|
func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) error {
|
|
s.metrics.AddPayloadSize("Patch", len(req.GetBody().GetPatch().GetChunk()))
|
|
|
|
return s.stream.Send(ctx, req)
|
|
}
|
|
|
|
func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
|
res, err := s.stream.CloseAndRecv(ctx)
|
|
|
|
s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil)
|
|
|
|
return res, err
|
|
}
|