From df0573d52147685a43af0c99789c6fe65db8df12 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Thu, 24 Mar 2022 17:35:09 +0300 Subject: [PATCH] [#137] pool: drop sessionTokenThreshold Description Signed-off-by: Denis Kirillov --- pool/cache.go | 14 ---------- pool/cache_test.go | 27 ------------------- pool/pool.go | 64 ++++++---------------------------------------- 3 files changed, 8 insertions(+), 97 deletions(-) diff --git a/pool/cache.go b/pool/cache.go index 4281853..de4df69 100644 --- a/pool/cache.go +++ b/pool/cache.go @@ -3,7 +3,6 @@ package pool import ( "strings" "sync/atomic" - "time" lru "github.com/hashicorp/golang-lru" "github.com/nspcc-dev/neofs-sdk-go/session" @@ -15,7 +14,6 @@ type sessionCache struct { } type cacheValue struct { - atime time.Time token *session.Token } @@ -43,8 +41,6 @@ func (c *sessionCache) Get(key string) *session.Token { return nil } - value.atime = time.Now() - if value.token == nil { return nil } @@ -54,18 +50,8 @@ func (c *sessionCache) Get(key string) *session.Token { return &res } -func (c *sessionCache) GetAccessTime(key string) (time.Time, bool) { - valueRaw, ok := c.cache.Peek(key) - if !ok { - return time.Time{}, false - } - - return valueRaw.(*cacheValue).atime, true -} - func (c *sessionCache) Put(key string, token *session.Token) bool { return c.cache.Add(key, &cacheValue{ - atime: time.Now(), token: token, }) } diff --git a/pool/cache_test.go b/pool/cache_test.go index 2886c1a..5a5e2a2 100644 --- a/pool/cache_test.go +++ b/pool/cache_test.go @@ -2,7 +2,6 @@ package pool import ( "testing" - "time" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neofs-sdk-go/session" @@ -10,32 +9,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestSessionCache_GetAccessTime(t *testing.T) { - const key = "Foo" - - cache, err := newCache() - require.NoError(t, err) - - st := session.NewToken() - st.SetExp(1) - - cache.Put(key, st) - - t1, ok := cache.GetAccessTime(key) - require.True(t, ok) - - time.Sleep(10 * time.Millisecond) - - t2, ok := cache.GetAccessTime(key) - require.True(t, ok) - require.Equal(t, t1, t2) - - _ = cache.Get(key) - t3, ok := cache.GetAccessTime(key) - require.True(t, ok) - require.NotEqual(t, t1, t3) -} - func TestSessionCache_GetUnmodifiedToken(t *testing.T) { const key = "Foo" target := sessiontest.Token() diff --git a/pool/pool.go b/pool/pool.go index 1d92488..3d3e5ca 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -58,7 +58,6 @@ type InitParameters struct { nodeDialTimeout time.Duration healthcheckTimeout time.Duration clientRebalanceInterval time.Duration - sessionTokenThreshold time.Duration sessionExpirationDuration uint64 nodeParams []NodeParam @@ -94,11 +93,6 @@ func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) { x.clientRebalanceInterval = interval } -// SetSessionThreshold specifies the max session token life time for PutObject operation. -func (x *InitParameters) SetSessionThreshold(threshold time.Duration) { - x.sessionTokenThreshold = threshold -} - // SetSessionExpirationDuration specifies the session token lifetime in epochs. func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) { x.sessionExpirationDuration = expirationDuration @@ -464,7 +458,6 @@ type Pool struct { closedCh chan struct{} cache *sessionCache stokenDuration uint64 - stokenThreshold time.Duration rebalanceParams rebalanceParameters clientBuilder func(endpoint string) (client, error) logger *zap.Logger @@ -479,9 +472,8 @@ type innerPool struct { const ( defaultSessionTokenExpirationDuration = 100 // in blocks - defaultSessionTokenThreshold = 5 * time.Second - defaultRebalanceInterval = 25 * time.Second - defaultRequestTimeout = 4 * time.Second + defaultRebalanceInterval = 25 * time.Second + defaultRequestTimeout = 4 * time.Second ) // NewPool creates connection pool using parameters. @@ -503,12 +495,11 @@ func NewPool(options InitParameters) (*Pool, error) { fillDefaultInitParams(&options, cache) pool := &Pool{ - key: options.key, - owner: owner.NewIDFromPublicKey(&options.key.PublicKey), - cache: cache, - logger: options.logger, - stokenDuration: options.sessionExpirationDuration, - stokenThreshold: options.sessionTokenThreshold, + key: options.key, + owner: owner.NewIDFromPublicKey(&options.key.PublicKey), + cache: cache, + logger: options.logger, + stokenDuration: options.sessionExpirationDuration, rebalanceParams: rebalanceParameters{ nodesParams: nodesParams, nodeRequestTimeout: options.healthcheckTimeout, @@ -580,10 +571,6 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { params.sessionExpirationDuration = defaultSessionTokenExpirationDuration } - if params.sessionTokenThreshold <= 0 { - params.sessionTokenThreshold = defaultSessionTokenThreshold - } - if params.clientRebalanceInterval <= 0 { params.clientRebalanceInterval = defaultRebalanceInterval } @@ -818,25 +805,6 @@ func createSessionTokenForDuration(ctx context.Context, c client, dur uint64) (* return c.SessionCreate(ctx, prm) } -func (p *Pool) removeSessionTokenAfterThreshold(cfg prmCommon) error { - cp, err := p.connection() - if err != nil { - return err - } - - key := p.key - if cfg.key != nil { - key = cfg.key - } - - ts, ok := p.cache.GetAccessTime(formCacheKey(cp.address, key)) - if ok && time.Since(ts) > p.stokenThreshold { - p.cache.DeleteByPrefix(cp.address) - } - - return nil -} - type callContext struct { // base context for RPC context.Context @@ -940,27 +908,11 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) prm.useVerb(sessionv2.ObjectVerbPut) prm.useAddress(newAddressFromCnrID(prm.hdr.ContainerID())) - // Put object is different from other object service methods. Put request - // can't be resent in case of session token failures (i.e. session token is - // invalid due to lifetime expiration or server restart). The reason is that - // object's payload can be provided as a stream that should be read only once. - // - // To solve this issue, pool regenerates session tokens upon each request. - // In case of subsequent requests, pool avoids session token initialization - // by checking when the session token was accessed for the last time. If it - // hits a threshold, session token is removed from cache for a new one to be - // issued. - err := p.removeSessionTokenAfterThreshold(prm.prmCommon) - if err != nil { - return nil, err - } - var ctxCall callContext ctxCall.Context = ctx - err = p.initCallContext(&ctxCall, prm.prmCommon) - if err != nil { + if err := p.initCallContext(&ctxCall, prm.prmCommon); err != nil { return nil, fmt.Errorf("init call context") }