[#137] pool: drop retry for object operations

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
remotes/fyrchik/split-info-format
Denis Kirillov 2022-03-24 17:19:30 +03:00 committed by Alex Vanin
parent fcfae4a249
commit c4adb03f8e
1 changed files with 66 additions and 106 deletions

View File

@ -707,17 +707,6 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
if ok {
bufferWeights[j] = options.nodesParams[i].weights[j]
if !cp.healthy {
cliRes, err := createSessionTokenForDuration(ctx, cli, options.sessionExpirationDuration)
if err != nil {
ok = false
bufferWeights[j] = 0
} else {
tkn := p.newSessionToken(cliRes)
_ = p.cache.Put(formCacheKey(cp.address, p.key), tkn)
}
}
} else {
p.cache.DeleteByPrefix(cp.address)
}
@ -896,24 +885,6 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon) error {
return err
}
type callContextWithRetry struct {
callContext
noRetry bool
}
func (p *Pool) initCallContextWithRetry(ctx *callContextWithRetry, cfg prmCommon) error {
err := p.initCallContext(&ctx.callContext, cfg)
if err != nil {
return err
}
// don't retry if session was specified by the caller
ctx.noRetry = cfg.stoken != nil
return nil
}
// opens new session or uses cached one.
// Must be called only on initialized callContext with set sessionTarget.
func (p *Pool) openDefaultSession(ctx *callContext) error {
@ -946,26 +917,19 @@ func (p *Pool) openDefaultSession(ctx *callContext) error {
}
// opens default session (if sessionDefault is set), and calls f. If f returns
// session-related error (*), and retrying is enabled, then f is called once more.
//
// (*) in this case cached token is removed.
func (p *Pool) callWithRetry(ctx *callContextWithRetry, f func() error) error {
// session-related error then cached token is removed.
func (p *Pool) call(ctx *callContext, f func() error) error {
var err error
if ctx.sessionDefault {
err = p.openDefaultSession(&ctx.callContext)
err = p.openDefaultSession(ctx)
if err != nil {
return fmt.Errorf("open default session: %w", err)
}
}
err = f()
if p.checkSessionTokenErr(err, ctx.endpoint) && !ctx.noRetry {
// don't retry anymore
ctx.noRetry = true
return p.callWithRetry(ctx, f)
}
_ = p.checkSessionTokenErr(err, ctx.endpoint)
return err
}
@ -1007,76 +971,72 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (*oid.ID, error)
return nil, fmt.Errorf("init writing on API client: %w", err)
}
if ctxCall.sessionDefault {
ctxCall.sessionTarget = wObj.WithinSession
err = p.openDefaultSession(&ctxCall)
if err != nil {
return nil, fmt.Errorf("open default session: %w", err)
ctxCall.sessionTarget = wObj.WithinSession
var resID oid.ID
err = p.call(&ctxCall, func() error {
wObj.UseKey(*ctxCall.key)
if prm.btoken != nil {
wObj.WithBearerToken(*prm.btoken)
}
}
wObj.UseKey(*ctxCall.key)
if wObj.WriteHeader(prm.hdr) {
sz := prm.hdr.PayloadSize()
if prm.btoken != nil {
wObj.WithBearerToken(*prm.btoken)
}
if data := prm.hdr.Payload(); len(data) > 0 {
if prm.payload != nil {
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
} else {
prm.payload = bytes.NewReader(data)
sz = uint64(len(data))
}
}
if wObj.WriteHeader(prm.hdr) {
sz := prm.hdr.PayloadSize()
if data := prm.hdr.Payload(); len(data) > 0 {
if prm.payload != nil {
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
} else {
prm.payload = bytes.NewReader(data)
sz = uint64(len(data))
}
}
const defaultBufferSizePut = 3 << 20 // configure?
if prm.payload != nil {
const defaultBufferSizePut = 3 << 20 // configure?
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
}
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
}
buf := make([]byte, sz)
buf := make([]byte, sz)
var n int
var n int
for {
n, err = prm.payload.Read(buf)
if n > 0 {
if !wObj.WritePayloadChunk(buf[:n]) {
break
}
for {
n, err = prm.payload.Read(buf)
if n > 0 {
if !wObj.WritePayloadChunk(buf[:n]) {
continue
}
if errors.Is(err, io.EOF) {
break
}
continue
return fmt.Errorf("read payload: %w", err)
}
if errors.Is(err, io.EOF) {
break
}
return nil, fmt.Errorf("read payload: %w", err)
}
}
}
res, err := wObj.Close()
if err != nil { // here err already carries both status and client errors
// removes session token from cache in case of token error
p.checkSessionTokenErr(err, ctxCall.endpoint)
return nil, fmt.Errorf("client failure: %w", err)
}
res, err := wObj.Close()
if err != nil {
return err
}
var id oid.ID
if !res.ReadStoredObjectID(&resID) {
return errors.New("missing ID of the stored object")
}
if !res.ReadStoredObjectID(&id) {
return nil, errors.New("missing ID of the stored object")
}
return nil
})
return &id, nil
return &resID, err
}
// DeleteObject marks an object for deletion from the container using NeoFS API protocol.
@ -1090,12 +1050,12 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
var cliPrm sdkClient.PrmObjectDelete
var cc callContextWithRetry
var cc callContext
cc.Context = ctx
cc.sessionTarget = cliPrm.WithinSession
err := p.initCallContextWithRetry(&cc, prm.prmCommon)
err := p.initCallContext(&cc, prm.prmCommon)
if err != nil {
return err
}
@ -1114,8 +1074,8 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
cliPrm.UseKey(*cc.key)
return p.callWithRetry(&cc, func() error {
_, err := cc.client.ObjectDelete(ctx, cliPrm)
return p.call(&cc, func() error {
_, err = cc.client.ObjectDelete(ctx, cliPrm)
if err != nil {
return fmt.Errorf("remove object via client: %w", err)
}
@ -1152,12 +1112,12 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject,
var cliPrm sdkClient.PrmObjectGet
var cc callContextWithRetry
var cc callContext
cc.Context = ctx
cc.sessionTarget = cliPrm.WithinSession
err := p.initCallContextWithRetry(&cc, prm.prmCommon)
err := p.initCallContext(&cc, prm.prmCommon)
if err != nil {
return nil, err
}
@ -1176,7 +1136,7 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject,
var res ResGetObject
err = p.callWithRetry(&cc, func() error {
err = p.call(&cc, func() error {
rObj, err := cc.client.ObjectGetInit(ctx, cliPrm)
if err != nil {
return fmt.Errorf("init object reading on client: %w", err)
@ -1208,12 +1168,12 @@ func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Objec
var cliPrm sdkClient.PrmObjectHead
var cc callContextWithRetry
var cc callContext
cc.Context = ctx
cc.sessionTarget = cliPrm.WithinSession
err := p.initCallContextWithRetry(&cc, prm.prmCommon)
err := p.initCallContext(&cc, prm.prmCommon)
if err != nil {
return nil, err
}
@ -1234,7 +1194,7 @@ func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Objec
var obj object.Object
err = p.callWithRetry(&cc, func() error {
err = p.call(&cc, func() error {
res, err := cc.client.ObjectHead(ctx, cliPrm)
if err != nil {
return fmt.Errorf("read object header via client: %w", err)
@ -1286,12 +1246,12 @@ func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectR
cliPrm.SetOffset(prm.off)
cliPrm.SetLength(prm.ln)
var cc callContextWithRetry
var cc callContext
cc.Context = ctx
cc.sessionTarget = cliPrm.WithinSession
err := p.initCallContextWithRetry(&cc, prm.prmCommon)
err := p.initCallContext(&cc, prm.prmCommon)
if err != nil {
return nil, err
}
@ -1310,7 +1270,7 @@ func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectR
var res ResObjectRange
err = p.callWithRetry(&cc, func() error {
err = p.call(&cc, func() error {
var err error
res.payload, err = cc.client.ObjectRangeInit(ctx, cliPrm)
@ -1384,19 +1344,19 @@ func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (*ResObje
cliPrm.WithBearerToken(*prm.btoken)
}
var cc callContextWithRetry
var cc callContext
cc.Context = ctx
cc.sessionTarget = cliPrm.WithinSession
err := p.initCallContextWithRetry(&cc, prm.prmCommon)
err := p.initCallContext(&cc, prm.prmCommon)
if err != nil {
return nil, err
}
var res ResObjectSearch
err = p.callWithRetry(&cc, func() error {
err = p.call(&cc, func() error {
var err error
res.r, err = cc.client.ObjectSearchInit(ctx, cliPrm)