From 40aaaafc73a6b90583b373f231b6e8f0523bf59f Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Wed, 2 Feb 2022 14:21:23 +0300 Subject: [PATCH] [#126] pool: Do not resend object Put requests Signed-off-by: Alex Vanin --- pool/pool.go | 81 ++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 62 insertions(+), 19 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index b1bf97f..22a4b7d 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -64,6 +64,7 @@ type BuilderOptions struct { NodeConnectionTimeout time.Duration NodeRequestTimeout time.Duration ClientRebalanceInterval time.Duration + SessionTokenThreshold time.Duration SessionExpirationDuration uint64 nodesParams []*NodesParam clientBuilder func(opts ...client.Option) (Client, error) @@ -239,13 +240,14 @@ func cfgFromOpts(opts ...CallOption) *callConfig { var _ Pool = (*pool)(nil) type pool struct { - innerPools []*innerPool - key *ecdsa.PrivateKey - owner *owner.ID - cancel context.CancelFunc - closedCh chan struct{} - cache *SessionCache - stokenDuration uint64 + innerPools []*innerPool + key *ecdsa.PrivateKey + owner *owner.ID + cancel context.CancelFunc + closedCh chan struct{} + cache *SessionCache + stokenDuration uint64 + stokenThreshold time.Duration } type innerPool struct { @@ -254,7 +256,10 @@ type innerPool struct { clientPacks []*clientPack } -const DefaultSessionTokenExpirationDuration = 100 // in blocks +const ( + DefaultSessionTokenExpirationDuration = 100 // in blocks + DefaultSessionTokenThreshold = 5 * time.Second +) func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { cache, err := NewCache() @@ -266,6 +271,10 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { options.SessionExpirationDuration = DefaultSessionTokenExpirationDuration } + if options.SessionTokenThreshold <= 0 { + options.SessionTokenThreshold = DefaultSessionTokenThreshold + } + ownerID := owner.NewIDFromPublicKey(&options.Key.PublicKey) inner := make([]*innerPool, len(options.nodesParams)) @@ -309,13 +318,14 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { ctx, cancel := context.WithCancel(ctx) pool := &pool{ - innerPools: inner, - key: options.Key, - owner: ownerID, - cancel: cancel, - closedCh: make(chan struct{}), - cache: cache, - stokenDuration: options.SessionExpirationDuration, + innerPools: inner, + key: options.Key, + owner: ownerID, + cancel: cancel, + closedCh: make(chan struct{}), + cache: cache, + stokenDuration: options.SessionExpirationDuration, + stokenThreshold: options.SessionTokenThreshold, } go startRebalance(ctx, pool, options) return pool, nil @@ -556,8 +566,43 @@ func createSessionTokenForDuration(ctx context.Context, c Client, dur uint64) (* return c.CreateSession(ctx, prm) } +func (p *pool) removeSessionTokenAfterThreshold(cfg *callConfig) error { + cp, err := p.connection() + if err != nil { + return err + } + + key := p.key + if cfg.key != nil { + key = cfg.key + } + + ts, ok := p.cache.GetAccessTime(formCacheKey(cp.address, key)) + if ok && time.Since(ts) > p.stokenThreshold { + p.cache.DeleteByPrefix(cp.address) + } + + return nil +} + func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*oid.ID, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) + + // Put object is different from other object service methods. Put request + // can't be resent in case of session token failures (i.e. session token is + // invalid due to lifetime expiration or server restart). The reason is that + // object's payload can be provided as a stream that should be read only once. + // + // To solve this issue, pool regenerates session tokens upon each request. + // In case of subsequent requests, pool avoids session token initialization + // by checking when the session token was accessed for the last time. If it + // hits a threshold, session token is removed from cache for a new one to be + // issued. + err := p.removeSessionTokenAfterThreshold(cfg) + if err != nil { + return nil, err + } + cp, options, err := p.conn(ctx, cfg) if err != nil { return nil, err @@ -565,10 +610,8 @@ func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, op res, err := cp.client.PutObject(ctx, params, options...) - if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { - opts = append(opts, retry()) - return p.PutObject(ctx, params, opts...) - } + // removes session token from cache in case of token error + _ = p.checkSessionTokenErr(err, cp.address) if err != nil { // here err already carries both status and client errors return nil, err