[#1608] tree: Add IO tag for tree sync requests
All checks were successful
DCO action / DCO (pull_request) Successful in 52s
Vulncheck / Vulncheck (pull_request) Successful in 53s
Build / Build Components (pull_request) Successful in 1m30s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m37s
Tests and linters / Run gofumpt (pull_request) Successful in 1m44s
Tests and linters / Staticcheck (pull_request) Successful in 2m2s
Tests and linters / Tests (pull_request) Successful in 2m53s
Tests and linters / Lint (pull_request) Successful in 2m57s
Tests and linters / Tests with -race (pull_request) Successful in 3m0s
Tests and linters / gopls check (pull_request) Successful in 3m29s
All checks were successful
DCO action / DCO (pull_request) Successful in 52s
Vulncheck / Vulncheck (pull_request) Successful in 53s
Build / Build Components (pull_request) Successful in 1m30s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m37s
Tests and linters / Run gofumpt (pull_request) Successful in 1m44s
Tests and linters / Staticcheck (pull_request) Successful in 2m2s
Tests and linters / Tests (pull_request) Successful in 2m53s
Tests and linters / Lint (pull_request) Successful in 2m57s
Tests and linters / Tests with -race (pull_request) Successful in 3m0s
Tests and linters / gopls check (pull_request) Successful in 3m29s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
041100d882
commit
7a7ec844c2
3 changed files with 103 additions and 1 deletions
|
@ -72,7 +72,7 @@ func initTreeService(c *cfg) {
|
|||
)
|
||||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
tree.RegisterTreeServiceServer(s, c.treeService)
|
||||
tree.RegisterTreeServiceServer(s, tree.NewIOTagAdjustServer(c.treeService, &c.cfgQoSService))
|
||||
})
|
||||
|
||||
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
|
||||
|
|
101
pkg/services/tree/qos.go
Normal file
101
pkg/services/tree/qos.go
Normal file
|
@ -0,0 +1,101 @@
|
|||
package tree
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
var _ TreeServiceServer = (*ioTagAdjust)(nil)
|
||||
|
||||
type AdjustIOTag interface {
|
||||
AdjustIncommingTag(ctx context.Context, requestSignPublicKey []byte) context.Context
|
||||
}
|
||||
|
||||
type ioTagAdjust struct {
|
||||
s TreeServiceServer
|
||||
a AdjustIOTag
|
||||
}
|
||||
|
||||
func NewIOTagAdjustServer(s TreeServiceServer, a AdjustIOTag) TreeServiceServer {
|
||||
return &ioTagAdjust{
|
||||
s: s,
|
||||
a: a,
|
||||
}
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
||||
ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Add(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
||||
ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.AddByPath(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
|
||||
ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Apply(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
||||
ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.GetNodeByPath(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
||||
ctx := i.a.AdjustIncommingTag(srv.Context(), req.GetSignature().GetKey())
|
||||
return i.s.GetOpLog(req, &qosServerWrapper[*GetOpLogResponse]{
|
||||
sender: srv,
|
||||
ServerStream: srv,
|
||||
ctxF: func() context.Context { return ctx },
|
||||
})
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
||||
ctx := i.a.AdjustIncommingTag(srv.Context(), req.GetSignature().GetKey())
|
||||
return i.s.GetSubTree(req, &qosServerWrapper[*GetSubTreeResponse]{
|
||||
sender: srv,
|
||||
ServerStream: srv,
|
||||
ctxF: func() context.Context { return ctx },
|
||||
})
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Healthcheck(ctx context.Context, req *HealthcheckRequest) (*HealthcheckResponse, error) {
|
||||
ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Healthcheck(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
||||
ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Move(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
||||
ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.Remove(ctx, req)
|
||||
}
|
||||
|
||||
func (i *ioTagAdjust) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
||||
ctx = i.a.AdjustIncommingTag(ctx, req.GetSignature().GetKey())
|
||||
return i.s.TreeList(ctx, req)
|
||||
}
|
||||
|
||||
type qosSend[T any] interface {
|
||||
Send(T) error
|
||||
}
|
||||
|
||||
type qosServerWrapper[T any] struct {
|
||||
grpc.ServerStream
|
||||
sender qosSend[T]
|
||||
ctxF func() context.Context
|
||||
}
|
||||
|
||||
func (w *qosServerWrapper[T]) Send(resp T) error {
|
||||
return w.sender.Send(resp)
|
||||
}
|
||||
|
||||
func (w *qosServerWrapper[T]) Context() context.Context {
|
||||
return w.ctxF()
|
||||
}
|
|
@ -387,6 +387,7 @@ func (s *Service) SynchronizeAll() error {
|
|||
}
|
||||
|
||||
func (s *Service) syncLoop(ctx context.Context) {
|
||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
||||
for {
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
|
|
Loading…
Add table
Reference in a new issue