forked from TrueCloudLab/frostfs-sdk-go
Compare commits
4 commits
master
...
poc/limit_
Author | SHA1 | Date | |
---|---|---|---|
2919da09ac | |||
d04bdf213e | |||
dd96a3ee32 | |||
901a02042b |
4 changed files with 133 additions and 22 deletions
|
@ -9,6 +9,7 @@ import (
|
|||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"runtime/trace"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -686,6 +687,9 @@ func (c *clientWrapper) netMapSnapshot(ctx context.Context, _ prmNetMapSnapshot)
|
|||
|
||||
// objectPut writes object to FrostFS.
|
||||
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||
ctx, task := trace.NewTask(ctx, "obj.put")
|
||||
defer task.End()
|
||||
|
||||
if prm.bufferMaxSize == 0 {
|
||||
prm.bufferMaxSize = defaultBufferMaxSizeForPut
|
||||
}
|
||||
|
@ -872,6 +876,9 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
|
|||
|
||||
// objectGet returns reader for object.
|
||||
func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
|
||||
ctx, task := trace.NewTask(ctx, "obj.getinit")
|
||||
defer task.End()
|
||||
|
||||
cl, err := c.getClient()
|
||||
if err != nil {
|
||||
return ResGetObject{}, err
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime/trace"
|
||||
"sync"
|
||||
|
||||
apiClient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
|
@ -63,16 +64,22 @@ func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bo
|
|||
defer c.mu.Unlock()
|
||||
|
||||
if c.conn == nil {
|
||||
reg := trace.StartRegion(ctx, "tree.updatenodehealth.dial")
|
||||
if c.conn, c.service, err = dialClient(ctx, c.address, c.opts...); err != nil {
|
||||
reg.End()
|
||||
return false, err
|
||||
}
|
||||
reg.End()
|
||||
}
|
||||
|
||||
wasHealthy := c.healthy
|
||||
reg := trace.StartRegion(ctx, "tree.updatenodehealth.healthcheck")
|
||||
if _, err = c.service.Healthcheck(ctx, &grpcService.HealthcheckRequest{}); err != nil {
|
||||
c.healthy = false
|
||||
reg.End()
|
||||
return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err)
|
||||
}
|
||||
reg.End()
|
||||
|
||||
c.healthy = true
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"runtime/trace"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -60,6 +61,7 @@ type InitParameters struct {
|
|||
clientRebalanceInterval time.Duration
|
||||
nodeParams []pool.NodeParam
|
||||
dialOptions []grpc.DialOption
|
||||
maxRequestAttempts int
|
||||
}
|
||||
|
||||
// Pool represents virtual connection to the FrostFS tree services network to communicate
|
||||
|
@ -78,6 +80,8 @@ type Pool struct {
|
|||
dialOptions []grpc.DialOption
|
||||
logger *zap.Logger
|
||||
|
||||
maxRequestAttempts int
|
||||
|
||||
startIndicesMtx sync.RWMutex
|
||||
// startIndices points to the client from which the next request will be executed.
|
||||
// Since clients are stored in innerPool field we have to use two indices.
|
||||
|
@ -177,6 +181,7 @@ func NewPool(options InitParameters) (*Pool, error) {
|
|||
nodeRequestTimeout: options.healthcheckTimeout,
|
||||
clientRebalanceInterval: options.clientRebalanceInterval,
|
||||
},
|
||||
maxRequestAttempts: options.maxRequestAttempts,
|
||||
}
|
||||
|
||||
return p, nil
|
||||
|
@ -268,12 +273,21 @@ func (x *InitParameters) SetGRPCDialOptions(opts ...grpc.DialOption) {
|
|||
x.dialOptions = opts
|
||||
}
|
||||
|
||||
// SetMaxRequestAttempts sets the max attempt to make successful request.
|
||||
// Default value is 0 that means the number of attempts equals to number of nodes in pool.
|
||||
func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
|
||||
x.maxRequestAttempts = maxAttempts
|
||||
}
|
||||
|
||||
// GetNodes invokes eponymous method from TreeServiceClient.
|
||||
//
|
||||
// Can return predefined errors:
|
||||
// * ErrNodeNotFound
|
||||
// * ErrNodeAccessDenied.
|
||||
func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService.GetNodeByPathResponse_Info, error) {
|
||||
ctx, task := trace.NewTask(ctx, "tree.getnodes")
|
||||
defer task.End()
|
||||
|
||||
request := &grpcService.GetNodeByPathRequest{
|
||||
Body: &grpcService.GetNodeByPathRequest_Body{
|
||||
ContainerId: prm.CID[:],
|
||||
|
@ -297,7 +311,10 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
|
|||
}
|
||||
|
||||
var resp *grpcService.GetNodeByPathResponse
|
||||
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
reg := trace.StartRegion(ctx, "tree.getnodes.single")
|
||||
defer reg.End()
|
||||
|
||||
resp, inErr = client.GetNodeByPath(ctx, request)
|
||||
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
||||
// Empty result is expected due to delayed tree service sync.
|
||||
|
@ -394,7 +411,7 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
|
|||
}
|
||||
|
||||
var cli grpcService.TreeService_GetSubTreeClient
|
||||
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
cli, inErr = client.GetSubTree(ctx, request)
|
||||
return handleError("failed to get sub tree client", inErr)
|
||||
}); err != nil {
|
||||
|
@ -410,6 +427,9 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
|
|||
// * ErrNodeNotFound
|
||||
// * ErrNodeAccessDenied.
|
||||
func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||
ctx, task := trace.NewTask(ctx, "tree.addnode")
|
||||
defer task.End()
|
||||
|
||||
request := &grpcService.AddRequest{
|
||||
Body: &grpcService.AddRequest_Body{
|
||||
ContainerId: prm.CID[:],
|
||||
|
@ -429,7 +449,10 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
|||
}
|
||||
|
||||
var resp *grpcService.AddResponse
|
||||
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
reg := trace.StartRegion(ctx, "tree.addnode.single")
|
||||
defer reg.End()
|
||||
|
||||
resp, inErr = client.Add(ctx, request)
|
||||
return handleError("failed to add node", inErr)
|
||||
}); err != nil {
|
||||
|
@ -445,6 +468,9 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
|||
// * ErrNodeNotFound
|
||||
// * ErrNodeAccessDenied.
|
||||
func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint64, error) {
|
||||
ctx, task := trace.NewTask(ctx, "tree.addnodebypath")
|
||||
defer task.End()
|
||||
|
||||
request := &grpcService.AddByPathRequest{
|
||||
Body: &grpcService.AddByPathRequest_Body{
|
||||
ContainerId: prm.CID[:],
|
||||
|
@ -466,7 +492,10 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
|||
}
|
||||
|
||||
var resp *grpcService.AddByPathResponse
|
||||
if err := p.requestWithRetry(func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
if err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
reg := trace.StartRegion(ctx, "tree.addnodebypath.single")
|
||||
defer reg.End()
|
||||
|
||||
resp, inErr = client.AddByPath(ctx, request)
|
||||
return handleError("failed to add node by path", inErr)
|
||||
}); err != nil {
|
||||
|
@ -490,6 +519,9 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
|||
// * ErrNodeNotFound
|
||||
// * ErrNodeAccessDenied.
|
||||
func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
||||
ctx, task := trace.NewTask(ctx, "tree.movenode")
|
||||
defer task.End()
|
||||
|
||||
request := &grpcService.MoveRequest{
|
||||
Body: &grpcService.MoveRequest_Body{
|
||||
ContainerId: prm.CID[:],
|
||||
|
@ -510,7 +542,10 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
reg := trace.StartRegion(ctx, "tree.movenode.single")
|
||||
defer reg.End()
|
||||
|
||||
if _, err := client.Move(ctx, request); err != nil {
|
||||
return handleError("failed to move node", err)
|
||||
}
|
||||
|
@ -524,6 +559,9 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
|||
// * ErrNodeNotFound
|
||||
// * ErrNodeAccessDenied.
|
||||
func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
||||
ctx, task := trace.NewTask(ctx, "tree.removenode")
|
||||
defer task.End()
|
||||
|
||||
request := &grpcService.RemoveRequest{
|
||||
Body: &grpcService.RemoveRequest_Body{
|
||||
ContainerId: prm.CID[:],
|
||||
|
@ -541,7 +579,10 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||
return p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
reg := trace.StartRegion(ctx, "tree.removenode.single")
|
||||
defer reg.End()
|
||||
|
||||
if _, err := client.Remove(ctx, request); err != nil {
|
||||
return handleError("failed to remove node", err)
|
||||
}
|
||||
|
@ -628,6 +669,10 @@ func fillDefaultInitParams(params *InitParameters) {
|
|||
if params.nodeStreamTimeout <= 0 {
|
||||
params.nodeStreamTimeout = defaultStreamTimeout
|
||||
}
|
||||
|
||||
if params.maxRequestAttempts <= 0 {
|
||||
params.maxRequestAttempts = len(params.nodeParams)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||
|
@ -659,6 +704,9 @@ func (p *Pool) startRebalance(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) {
|
||||
ctx, task := trace.NewTask(ctx, "tree.updatenodehealth")
|
||||
defer task.End()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for i, inner := range p.innerPools {
|
||||
wg.Add(1)
|
||||
|
@ -728,19 +776,31 @@ func (p *Pool) setStartIndices(i, j int) {
|
|||
p.startIndicesMtx.Unlock()
|
||||
}
|
||||
|
||||
func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) error) error {
|
||||
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client grpcService.TreeServiceClient) error) error {
|
||||
var (
|
||||
err, finErr error
|
||||
cl grpcService.TreeServiceClient
|
||||
)
|
||||
|
||||
reqID := GetRequestID(ctx)
|
||||
|
||||
startI, startJ := p.getStartIndices()
|
||||
groupsLen := len(p.innerPools)
|
||||
attempts := p.maxRequestAttempts
|
||||
for i := startI; i < startI+groupsLen; i++ {
|
||||
indexI := i % groupsLen
|
||||
clientsLen := len(p.innerPools[indexI].clients)
|
||||
for j := startJ; j < startJ+clientsLen; j++ {
|
||||
indexJ := j % clientsLen
|
||||
|
||||
if attempts == 0 {
|
||||
if startI != indexI || startJ != indexJ {
|
||||
p.setStartIndices(indexI, indexJ)
|
||||
}
|
||||
break
|
||||
}
|
||||
attempts--
|
||||
|
||||
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
|
||||
err = fn(cl)
|
||||
}
|
||||
|
@ -750,8 +810,10 @@ func (p *Pool) requestWithRetry(fn func(client grpcService.TreeServiceClient) er
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
finErr = finalError(finErr, err)
|
||||
p.log(zap.DebugLevel, "tree request error", zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
||||
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
|
||||
zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
||||
}
|
||||
startJ = 0
|
||||
}
|
||||
|
@ -772,6 +834,9 @@ func prioErr(err error) int {
|
|||
case errors.Is(err, ErrNodeNotFound) ||
|
||||
errors.Is(err, errNodeEmptyResult):
|
||||
return 200
|
||||
case errors.Is(err, context.Canceled) ||
|
||||
errors.Is(err, context.DeadlineExceeded):
|
||||
return 250
|
||||
case errors.Is(err, ErrUnhealthyEndpoint):
|
||||
return 300
|
||||
default:
|
||||
|
@ -791,3 +856,16 @@ func finalError(current, candidate error) error {
|
|||
|
||||
return current
|
||||
}
|
||||
|
||||
type reqKeyType string
|
||||
|
||||
const reqIDKey = reqKeyType("request_id")
|
||||
|
||||
func SetRequestID(ctx context.Context, reqID string) context.Context {
|
||||
return context.WithValue(ctx, reqIDKey, reqID)
|
||||
}
|
||||
|
||||
func GetRequestID(ctx context.Context) string {
|
||||
reqID, _ := ctx.Value(reqIDKey).(string)
|
||||
return reqID
|
||||
}
|
||||
|
|
|
@ -77,14 +77,21 @@ func TestHandleError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRetry(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
nodes := [][]string{
|
||||
{"node00", "node01", "node02", "node03"},
|
||||
{"node10", "node11", "node12", "node13"},
|
||||
}
|
||||
|
||||
var lenNodes int
|
||||
for i := range nodes {
|
||||
lenNodes += len(nodes[i])
|
||||
}
|
||||
|
||||
p := &Pool{
|
||||
logger: zaptest.NewLogger(t),
|
||||
innerPools: makeInnerPool(nodes),
|
||||
logger: zaptest.NewLogger(t),
|
||||
innerPools: makeInnerPool(nodes),
|
||||
maxRequestAttempts: lenNodes,
|
||||
}
|
||||
|
||||
makeFn := func(client grpcService.TreeServiceClient) error {
|
||||
|
@ -92,14 +99,14 @@ func TestRetry(t *testing.T) {
|
|||
}
|
||||
|
||||
t.Run("first ok", func(t *testing.T) {
|
||||
err := p.requestWithRetry(makeFn)
|
||||
err := p.requestWithRetry(ctx, makeFn)
|
||||
require.NoError(t, err)
|
||||
checkIndicesAndReset(t, p, 0, 0)
|
||||
})
|
||||
|
||||
t.Run("first failed", func(t *testing.T) {
|
||||
setErrors(p, "node00")
|
||||
err := p.requestWithRetry(makeFn)
|
||||
err := p.requestWithRetry(ctx, makeFn)
|
||||
require.NoError(t, err)
|
||||
checkIndicesAndReset(t, p, 0, 1)
|
||||
})
|
||||
|
@ -107,7 +114,7 @@ func TestRetry(t *testing.T) {
|
|||
t.Run("all failed", func(t *testing.T) {
|
||||
setErrors(p, nodes[0]...)
|
||||
setErrors(p, nodes[1]...)
|
||||
err := p.requestWithRetry(makeFn)
|
||||
err := p.requestWithRetry(ctx, makeFn)
|
||||
require.Error(t, err)
|
||||
checkIndicesAndReset(t, p, 0, 0)
|
||||
})
|
||||
|
@ -115,13 +122,13 @@ func TestRetry(t *testing.T) {
|
|||
t.Run("round", func(t *testing.T) {
|
||||
setErrors(p, nodes[0][0], nodes[0][1])
|
||||
setErrors(p, nodes[1]...)
|
||||
err := p.requestWithRetry(makeFn)
|
||||
err := p.requestWithRetry(ctx, makeFn)
|
||||
require.NoError(t, err)
|
||||
checkIndices(t, p, 0, 2)
|
||||
resetClientsErrors(p)
|
||||
|
||||
setErrors(p, nodes[0][2], nodes[0][3])
|
||||
err = p.requestWithRetry(makeFn)
|
||||
err = p.requestWithRetry(ctx, makeFn)
|
||||
require.NoError(t, err)
|
||||
checkIndicesAndReset(t, p, 0, 0)
|
||||
})
|
||||
|
@ -129,14 +136,14 @@ func TestRetry(t *testing.T) {
|
|||
t.Run("group switch", func(t *testing.T) {
|
||||
setErrors(p, nodes[0]...)
|
||||
setErrors(p, nodes[1][0])
|
||||
err := p.requestWithRetry(makeFn)
|
||||
err := p.requestWithRetry(ctx, makeFn)
|
||||
require.NoError(t, err)
|
||||
checkIndicesAndReset(t, p, 1, 1)
|
||||
})
|
||||
|
||||
t.Run("group round", func(t *testing.T) {
|
||||
setErrors(p, nodes[0][1:]...)
|
||||
err := p.requestWithRetry(makeFn)
|
||||
err := p.requestWithRetry(ctx, makeFn)
|
||||
require.NoError(t, err)
|
||||
checkIndicesAndReset(t, p, 0, 0)
|
||||
})
|
||||
|
@ -144,7 +151,7 @@ func TestRetry(t *testing.T) {
|
|||
t.Run("group round switch", func(t *testing.T) {
|
||||
setErrors(p, nodes[0]...)
|
||||
p.setStartIndices(0, 1)
|
||||
err := p.requestWithRetry(makeFn)
|
||||
err := p.requestWithRetry(ctx, makeFn)
|
||||
require.NoError(t, err)
|
||||
checkIndicesAndReset(t, p, 1, 0)
|
||||
})
|
||||
|
@ -152,14 +159,14 @@ func TestRetry(t *testing.T) {
|
|||
t.Run("no panic group switch", func(t *testing.T) {
|
||||
setErrors(p, nodes[1]...)
|
||||
p.setStartIndices(1, 0)
|
||||
err := p.requestWithRetry(makeFn)
|
||||
err := p.requestWithRetry(ctx, makeFn)
|
||||
require.NoError(t, err)
|
||||
checkIndicesAndReset(t, p, 0, 0)
|
||||
})
|
||||
|
||||
t.Run("error empty result", func(t *testing.T) {
|
||||
errNodes, index := 2, 0
|
||||
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
if index < errNodes {
|
||||
index++
|
||||
return errNodeEmptyResult
|
||||
|
@ -172,7 +179,7 @@ func TestRetry(t *testing.T) {
|
|||
|
||||
t.Run("error not found", func(t *testing.T) {
|
||||
errNodes, index := 2, 0
|
||||
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
if index < errNodes {
|
||||
index++
|
||||
return ErrNodeNotFound
|
||||
|
@ -185,7 +192,7 @@ func TestRetry(t *testing.T) {
|
|||
|
||||
t.Run("error access denied", func(t *testing.T) {
|
||||
var index int
|
||||
err := p.requestWithRetry(func(client grpcService.TreeServiceClient) error {
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
index++
|
||||
return ErrNodeAccessDenied
|
||||
})
|
||||
|
@ -193,6 +200,18 @@ func TestRetry(t *testing.T) {
|
|||
require.Equal(t, 1, index)
|
||||
checkIndicesAndReset(t, p, 0, 0)
|
||||
})
|
||||
|
||||
t.Run("limit attempts", func(t *testing.T) {
|
||||
oldVal := p.maxRequestAttempts
|
||||
p.maxRequestAttempts = 2
|
||||
setErrors(p, nodes[0]...)
|
||||
setErrors(p, nodes[1]...)
|
||||
err := p.requestWithRetry(ctx, makeFn)
|
||||
require.Error(t, err)
|
||||
checkIndicesAndReset(t, p, 0, 2)
|
||||
p.maxRequestAttempts = oldVal
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestRebalance(t *testing.T) {
|
||||
|
|
Loading…
Add table
Reference in a new issue