From 12da2f826278762e8879994c2788e88b9084afe4 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 29 Jan 2025 11:29:22 +0300 Subject: [PATCH] [#1608] object: Add IO tag adjustment layer Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/object.go | 5 +- internal/qos/tags.go | 39 +++++++++++ pkg/services/object/qos.go | 132 +++++++++++++++++++++++++++++++++++++ 3 files changed, 174 insertions(+), 2 deletions(-) create mode 100644 internal/qos/tags.go create mode 100644 pkg/services/object/qos.go diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 77446b81c..40d3cc1cd 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -168,7 +168,7 @@ func initObjectService(c *cfg) { sPatch := createPatchSvc(sGet, sPut) // build service pipeline - // grpc | audit | | signature | response | acl | ape | split + // grpc | audit | qos | | signature | response | acl | ape | split splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch) @@ -191,7 +191,8 @@ func initObjectService(c *cfg) { c.shared.metricsSvc = objectService.NewMetricCollector( signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg)) - auditSvc := objectService.NewAuditService(c.shared.metricsSvc, c.log, c.audit) + qosService := objectService.NewQoSObjectService(c.shared.metricsSvc, &c.cfgQoSService) + auditSvc := objectService.NewAuditService(qosService, c.log, c.audit) server := objectTransportGRPC.New(auditSvc) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { diff --git a/internal/qos/tags.go b/internal/qos/tags.go new file mode 100644 index 000000000..6a9a7f7a4 --- /dev/null +++ b/internal/qos/tags.go @@ -0,0 +1,39 @@ +package qos + +import "fmt" + +type IOTag string + +const ( + IOTagClient IOTag = "client" + IOTagInternal IOTag = "internal" + IOTagBackground IOTag = "background" + IOTagWritecache IOTag = "writecache" + IOTagPolicer IOTag = "policer" + IOTagCritical IOTag = "critical" + + ioTagUnknown IOTag = "" +) + +func FromRawString(s string) (IOTag, error) { + switch s { + case string(IOTagCritical): + return IOTagCritical, nil + case string(IOTagClient): + return IOTagClient, nil + case string(IOTagInternal): + return IOTagInternal, nil + case string(IOTagBackground): + return IOTagBackground, nil + case string(IOTagWritecache): + return IOTagWritecache, nil + case string(IOTagPolicer): + return IOTagPolicer, nil + default: + return ioTagUnknown, fmt.Errorf("unknown tag %s", s) + } +} + +func (t IOTag) String() string { + return string(t) +} diff --git a/pkg/services/object/qos.go b/pkg/services/object/qos.go new file mode 100644 index 000000000..145a316e2 --- /dev/null +++ b/pkg/services/object/qos.go @@ -0,0 +1,132 @@ +package object + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session" +) + +var _ ServiceServer = (*qosObjectService)(nil) + +type AdjustIOTag interface { + AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context +} + +type qosObjectService struct { + next ServiceServer + adj AdjustIOTag +} + +func NewQoSObjectService(next ServiceServer, adjIOTag AdjustIOTag) ServiceServer { + return &qosObjectService{ + next: next, + adj: adjIOTag, + } +} + +func (q *qosObjectService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { + ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey()) + return q.next.Delete(ctx, req) +} + +func (q *qosObjectService) Get(req *object.GetRequest, s GetObjectStream) error { + ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey()) + return q.next.Get(req, &qosReadStream[*object.GetResponse]{ + ctxF: func() context.Context { return ctx }, + sender: s, + }) +} + +func (q *qosObjectService) GetRange(req *object.GetRangeRequest, s GetObjectRangeStream) error { + ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey()) + return q.next.GetRange(req, &qosReadStream[*object.GetRangeResponse]{ + ctxF: func() context.Context { return ctx }, + sender: s, + }) +} + +func (q *qosObjectService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { + ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey()) + return q.next.GetRangeHash(ctx, req) +} + +func (q *qosObjectService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { + ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey()) + return q.next.Head(ctx, req) +} + +func (q *qosObjectService) Patch(ctx context.Context) (PatchObjectStream, error) { + s, err := q.next.Patch(ctx) + if err != nil { + return nil, err + } + return &qosWriteStream[*object.PatchRequest, *object.PatchResponse]{ + s: s, + adj: q.adj, + }, nil +} + +func (q *qosObjectService) Put(ctx context.Context) (PutObjectStream, error) { + s, err := q.next.Put(ctx) + if err != nil { + return nil, err + } + return &qosWriteStream[*object.PutRequest, *object.PutResponse]{ + s: s, + adj: q.adj, + }, nil +} + +func (q *qosObjectService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) { + ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey()) + return q.next.PutSingle(ctx, req) +} + +func (q *qosObjectService) Search(req *object.SearchRequest, s SearchStream) error { + ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey()) + return q.next.Search(req, &qosReadStream[*object.SearchResponse]{ + ctxF: func() context.Context { return ctx }, + sender: s, + }) +} + +type qosSend[T any] interface { + Send(T) error +} + +type qosReadStream[T any] struct { + sender qosSend[T] + ctxF func() context.Context +} + +func (g *qosReadStream[T]) Context() context.Context { + return g.ctxF() +} + +func (g *qosReadStream[T]) Send(resp T) error { + return g.sender.Send(resp) +} + +type qosVerificationHeader interface { + GetVerificationHeader() *session.RequestVerificationHeader +} + +type qosSendRecv[TReq qosVerificationHeader, TResp any] interface { + Send(context.Context, TReq) error + CloseAndRecv(context.Context) (TResp, error) +} + +type qosWriteStream[TReq qosVerificationHeader, TResp any] struct { + s qosSendRecv[TReq, TResp] + adj AdjustIOTag +} + +func (q *qosWriteStream[TReq, TResp]) CloseAndRecv(ctx context.Context) (TResp, error) { + return q.s.CloseAndRecv(ctx) +} + +func (q *qosWriteStream[TReq, TResp]) Send(ctx context.Context, req TReq) error { + ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey()) + return q.s.Send(ctx, req) +}