[#1608] object: Add IO tag adjustment layer
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
f6b3f79e89
commit
12da2f8262
3 changed files with 174 additions and 2 deletions
|
@ -168,7 +168,7 @@ func initObjectService(c *cfg) {
|
|||
sPatch := createPatchSvc(sGet, sPut)
|
||||
|
||||
// build service pipeline
|
||||
// grpc | audit | <metrics> | signature | response | acl | ape | split
|
||||
// grpc | audit | qos | <metrics> | signature | response | acl | ape | split
|
||||
|
||||
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch)
|
||||
|
||||
|
@ -191,7 +191,8 @@ func initObjectService(c *cfg) {
|
|||
|
||||
c.shared.metricsSvc = objectService.NewMetricCollector(
|
||||
signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg))
|
||||
auditSvc := objectService.NewAuditService(c.shared.metricsSvc, c.log, c.audit)
|
||||
qosService := objectService.NewQoSObjectService(c.shared.metricsSvc, &c.cfgQoSService)
|
||||
auditSvc := objectService.NewAuditService(qosService, c.log, c.audit)
|
||||
server := objectTransportGRPC.New(auditSvc)
|
||||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
|
|
39
internal/qos/tags.go
Normal file
39
internal/qos/tags.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package qos
|
||||
|
||||
import "fmt"
|
||||
|
||||
type IOTag string
|
||||
|
||||
const (
|
||||
IOTagClient IOTag = "client"
|
||||
IOTagInternal IOTag = "internal"
|
||||
IOTagBackground IOTag = "background"
|
||||
IOTagWritecache IOTag = "writecache"
|
||||
IOTagPolicer IOTag = "policer"
|
||||
IOTagCritical IOTag = "critical"
|
||||
|
||||
ioTagUnknown IOTag = ""
|
||||
)
|
||||
|
||||
func FromRawString(s string) (IOTag, error) {
|
||||
switch s {
|
||||
case string(IOTagCritical):
|
||||
return IOTagCritical, nil
|
||||
case string(IOTagClient):
|
||||
return IOTagClient, nil
|
||||
case string(IOTagInternal):
|
||||
return IOTagInternal, nil
|
||||
case string(IOTagBackground):
|
||||
return IOTagBackground, nil
|
||||
case string(IOTagWritecache):
|
||||
return IOTagWritecache, nil
|
||||
case string(IOTagPolicer):
|
||||
return IOTagPolicer, nil
|
||||
default:
|
||||
return ioTagUnknown, fmt.Errorf("unknown tag %s", s)
|
||||
}
|
||||
}
|
||||
|
||||
func (t IOTag) String() string {
|
||||
return string(t)
|
||||
}
|
132
pkg/services/object/qos.go
Normal file
132
pkg/services/object/qos.go
Normal file
|
@ -0,0 +1,132 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
|
||||
)
|
||||
|
||||
var _ ServiceServer = (*qosObjectService)(nil)
|
||||
|
||||
type AdjustIOTag interface {
|
||||
AdjustIncomingTag(ctx context.Context, requestSignPublicKey []byte) context.Context
|
||||
}
|
||||
|
||||
type qosObjectService struct {
|
||||
next ServiceServer
|
||||
adj AdjustIOTag
|
||||
}
|
||||
|
||||
func NewQoSObjectService(next ServiceServer, adjIOTag AdjustIOTag) ServiceServer {
|
||||
return &qosObjectService{
|
||||
next: next,
|
||||
adj: adjIOTag,
|
||||
}
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.Delete(ctx, req)
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Get(req *object.GetRequest, s GetObjectStream) error {
|
||||
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.Get(req, &qosReadStream[*object.GetResponse]{
|
||||
ctxF: func() context.Context { return ctx },
|
||||
sender: s,
|
||||
})
|
||||
}
|
||||
|
||||
func (q *qosObjectService) GetRange(req *object.GetRangeRequest, s GetObjectRangeStream) error {
|
||||
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.GetRange(req, &qosReadStream[*object.GetRangeResponse]{
|
||||
ctxF: func() context.Context { return ctx },
|
||||
sender: s,
|
||||
})
|
||||
}
|
||||
|
||||
func (q *qosObjectService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.GetRangeHash(ctx, req)
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.Head(ctx, req)
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Patch(ctx context.Context) (PatchObjectStream, error) {
|
||||
s, err := q.next.Patch(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &qosWriteStream[*object.PatchRequest, *object.PatchResponse]{
|
||||
s: s,
|
||||
adj: q.adj,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Put(ctx context.Context) (PutObjectStream, error) {
|
||||
s, err := q.next.Put(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &qosWriteStream[*object.PutRequest, *object.PutResponse]{
|
||||
s: s,
|
||||
adj: q.adj,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (q *qosObjectService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.PutSingle(ctx, req)
|
||||
}
|
||||
|
||||
func (q *qosObjectService) Search(req *object.SearchRequest, s SearchStream) error {
|
||||
ctx := q.adj.AdjustIncomingTag(s.Context(), req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.next.Search(req, &qosReadStream[*object.SearchResponse]{
|
||||
ctxF: func() context.Context { return ctx },
|
||||
sender: s,
|
||||
})
|
||||
}
|
||||
|
||||
type qosSend[T any] interface {
|
||||
Send(T) error
|
||||
}
|
||||
|
||||
type qosReadStream[T any] struct {
|
||||
sender qosSend[T]
|
||||
ctxF func() context.Context
|
||||
}
|
||||
|
||||
func (g *qosReadStream[T]) Context() context.Context {
|
||||
return g.ctxF()
|
||||
}
|
||||
|
||||
func (g *qosReadStream[T]) Send(resp T) error {
|
||||
return g.sender.Send(resp)
|
||||
}
|
||||
|
||||
type qosVerificationHeader interface {
|
||||
GetVerificationHeader() *session.RequestVerificationHeader
|
||||
}
|
||||
|
||||
type qosSendRecv[TReq qosVerificationHeader, TResp any] interface {
|
||||
Send(context.Context, TReq) error
|
||||
CloseAndRecv(context.Context) (TResp, error)
|
||||
}
|
||||
|
||||
type qosWriteStream[TReq qosVerificationHeader, TResp any] struct {
|
||||
s qosSendRecv[TReq, TResp]
|
||||
adj AdjustIOTag
|
||||
}
|
||||
|
||||
func (q *qosWriteStream[TReq, TResp]) CloseAndRecv(ctx context.Context) (TResp, error) {
|
||||
return q.s.CloseAndRecv(ctx)
|
||||
}
|
||||
|
||||
func (q *qosWriteStream[TReq, TResp]) Send(ctx context.Context, req TReq) error {
|
||||
ctx = q.adj.AdjustIncomingTag(ctx, req.GetVerificationHeader().GetBodySignature().GetKey())
|
||||
return q.s.Send(ctx, req)
|
||||
}
|
Loading…
Add table
Reference in a new issue