From c4adb03f8e9b3c198c0cdb00b23632b7b36cbef7 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Thu, 24 Mar 2022 17:19:30 +0300 Subject: [PATCH] [#137] pool: drop retry for object operations Signed-off-by: Denis Kirillov --- pool/pool.go | 172 ++++++++++++++++++++------------------------------- 1 file changed, 66 insertions(+), 106 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index 236019d..1d92488 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -707,17 +707,6 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights if ok { bufferWeights[j] = options.nodesParams[i].weights[j] - if !cp.healthy { - cliRes, err := createSessionTokenForDuration(ctx, cli, options.sessionExpirationDuration) - if err != nil { - ok = false - bufferWeights[j] = 0 - } else { - tkn := p.newSessionToken(cliRes) - - _ = p.cache.Put(formCacheKey(cp.address, p.key), tkn) - } - } } else { p.cache.DeleteByPrefix(cp.address) } @@ -896,24 +885,6 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon) error { return err } -type callContextWithRetry struct { - callContext - - noRetry bool -} - -func (p *Pool) initCallContextWithRetry(ctx *callContextWithRetry, cfg prmCommon) error { - err := p.initCallContext(&ctx.callContext, cfg) - if err != nil { - return err - } - - // don't retry if session was specified by the caller - ctx.noRetry = cfg.stoken != nil - - return nil -} - // opens new session or uses cached one. // Must be called only on initialized callContext with set sessionTarget. func (p *Pool) openDefaultSession(ctx *callContext) error { @@ -946,26 +917,19 @@ func (p *Pool) openDefaultSession(ctx *callContext) error { } // opens default session (if sessionDefault is set), and calls f. If f returns -// session-related error (*), and retrying is enabled, then f is called once more. -// -// (*) in this case cached token is removed. -func (p *Pool) callWithRetry(ctx *callContextWithRetry, f func() error) error { +// session-related error then cached token is removed. +func (p *Pool) call(ctx *callContext, f func() error) error { var err error if ctx.sessionDefault { - err = p.openDefaultSession(&ctx.callContext) + err = p.openDefaultSession(ctx) if err != nil { return fmt.Errorf("open default session: %w", err) } } err = f() - - if p.checkSessionTokenErr(err, ctx.endpoint) && !ctx.noRetry { - // don't retry anymore - ctx.noRetry = true - return p.callWithRetry(ctx, f) - } + _ = p.checkSessionTokenErr(err, ctx.endpoint) return err } @@ -1007,76 +971,72 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) return nil, fmt.Errorf("init writing on API client: %w", err) } - if ctxCall.sessionDefault { - ctxCall.sessionTarget = wObj.WithinSession - err = p.openDefaultSession(&ctxCall) - if err != nil { - return nil, fmt.Errorf("open default session: %w", err) + ctxCall.sessionTarget = wObj.WithinSession + + var resID oid.ID + + err = p.call(&ctxCall, func() error { + wObj.UseKey(*ctxCall.key) + + if prm.btoken != nil { + wObj.WithBearerToken(*prm.btoken) } - } - wObj.UseKey(*ctxCall.key) + if wObj.WriteHeader(prm.hdr) { + sz := prm.hdr.PayloadSize() - if prm.btoken != nil { - wObj.WithBearerToken(*prm.btoken) - } + if data := prm.hdr.Payload(); len(data) > 0 { + if prm.payload != nil { + prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) + } else { + prm.payload = bytes.NewReader(data) + sz = uint64(len(data)) + } + } - if wObj.WriteHeader(prm.hdr) { - sz := prm.hdr.PayloadSize() - - if data := prm.hdr.Payload(); len(data) > 0 { if prm.payload != nil { - prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload) - } else { - prm.payload = bytes.NewReader(data) - sz = uint64(len(data)) - } - } + const defaultBufferSizePut = 3 << 20 // configure? - if prm.payload != nil { - const defaultBufferSizePut = 3 << 20 // configure? + if sz == 0 || sz > defaultBufferSizePut { + sz = defaultBufferSizePut + } - if sz == 0 || sz > defaultBufferSizePut { - sz = defaultBufferSizePut - } + buf := make([]byte, sz) - buf := make([]byte, sz) + var n int - var n int + for { + n, err = prm.payload.Read(buf) + if n > 0 { + if !wObj.WritePayloadChunk(buf[:n]) { + break + } - for { - n, err = prm.payload.Read(buf) - if n > 0 { - if !wObj.WritePayloadChunk(buf[:n]) { + continue + } + + if errors.Is(err, io.EOF) { break } - continue + return fmt.Errorf("read payload: %w", err) } - - if errors.Is(err, io.EOF) { - break - } - - return nil, fmt.Errorf("read payload: %w", err) } } - } - res, err := wObj.Close() - if err != nil { // here err already carries both status and client errors - // removes session token from cache in case of token error - p.checkSessionTokenErr(err, ctxCall.endpoint) - return nil, fmt.Errorf("client failure: %w", err) - } + res, err := wObj.Close() + if err != nil { + return err + } - var id oid.ID + if !res.ReadStoredObjectID(&resID) { + return errors.New("missing ID of the stored object") + } - if !res.ReadStoredObjectID(&id) { - return nil, errors.New("missing ID of the stored object") - } + return nil + }) - return &id, nil + return &resID, err } // DeleteObject marks an object for deletion from the container using NeoFS API protocol. @@ -1090,12 +1050,12 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { var cliPrm sdkClient.PrmObjectDelete - var cc callContextWithRetry + var cc callContext cc.Context = ctx cc.sessionTarget = cliPrm.WithinSession - err := p.initCallContextWithRetry(&cc, prm.prmCommon) + err := p.initCallContext(&cc, prm.prmCommon) if err != nil { return err } @@ -1114,8 +1074,8 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { cliPrm.UseKey(*cc.key) - return p.callWithRetry(&cc, func() error { - _, err := cc.client.ObjectDelete(ctx, cliPrm) + return p.call(&cc, func() error { + _, err = cc.client.ObjectDelete(ctx, cliPrm) if err != nil { return fmt.Errorf("remove object via client: %w", err) } @@ -1152,12 +1112,12 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject, var cliPrm sdkClient.PrmObjectGet - var cc callContextWithRetry + var cc callContext cc.Context = ctx cc.sessionTarget = cliPrm.WithinSession - err := p.initCallContextWithRetry(&cc, prm.prmCommon) + err := p.initCallContext(&cc, prm.prmCommon) if err != nil { return nil, err } @@ -1176,7 +1136,7 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject, var res ResGetObject - err = p.callWithRetry(&cc, func() error { + err = p.call(&cc, func() error { rObj, err := cc.client.ObjectGetInit(ctx, cliPrm) if err != nil { return fmt.Errorf("init object reading on client: %w", err) @@ -1208,12 +1168,12 @@ func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Objec var cliPrm sdkClient.PrmObjectHead - var cc callContextWithRetry + var cc callContext cc.Context = ctx cc.sessionTarget = cliPrm.WithinSession - err := p.initCallContextWithRetry(&cc, prm.prmCommon) + err := p.initCallContext(&cc, prm.prmCommon) if err != nil { return nil, err } @@ -1234,7 +1194,7 @@ func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Objec var obj object.Object - err = p.callWithRetry(&cc, func() error { + err = p.call(&cc, func() error { res, err := cc.client.ObjectHead(ctx, cliPrm) if err != nil { return fmt.Errorf("read object header via client: %w", err) @@ -1286,12 +1246,12 @@ func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectR cliPrm.SetOffset(prm.off) cliPrm.SetLength(prm.ln) - var cc callContextWithRetry + var cc callContext cc.Context = ctx cc.sessionTarget = cliPrm.WithinSession - err := p.initCallContextWithRetry(&cc, prm.prmCommon) + err := p.initCallContext(&cc, prm.prmCommon) if err != nil { return nil, err } @@ -1310,7 +1270,7 @@ func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectR var res ResObjectRange - err = p.callWithRetry(&cc, func() error { + err = p.call(&cc, func() error { var err error res.payload, err = cc.client.ObjectRangeInit(ctx, cliPrm) @@ -1384,19 +1344,19 @@ func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (*ResObje cliPrm.WithBearerToken(*prm.btoken) } - var cc callContextWithRetry + var cc callContext cc.Context = ctx cc.sessionTarget = cliPrm.WithinSession - err := p.initCallContextWithRetry(&cc, prm.prmCommon) + err := p.initCallContext(&cc, prm.prmCommon) if err != nil { return nil, err } var res ResObjectSearch - err = p.callWithRetry(&cc, func() error { + err = p.call(&cc, func() error { var err error res.r, err = cc.client.ObjectSearchInit(ctx, cliPrm)