forked from TrueCloudLab/frostfs-sdk-go
[#137] pool: drop sessionTokenThreshold
Description Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
c4adb03f8e
commit
df0573d521
3 changed files with 8 additions and 97 deletions
|
@ -3,7 +3,6 @@ package pool
|
|||
import (
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/session"
|
||||
|
@ -15,7 +14,6 @@ type sessionCache struct {
|
|||
}
|
||||
|
||||
type cacheValue struct {
|
||||
atime time.Time
|
||||
token *session.Token
|
||||
}
|
||||
|
||||
|
@ -43,8 +41,6 @@ func (c *sessionCache) Get(key string) *session.Token {
|
|||
return nil
|
||||
}
|
||||
|
||||
value.atime = time.Now()
|
||||
|
||||
if value.token == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -54,18 +50,8 @@ func (c *sessionCache) Get(key string) *session.Token {
|
|||
return &res
|
||||
}
|
||||
|
||||
func (c *sessionCache) GetAccessTime(key string) (time.Time, bool) {
|
||||
valueRaw, ok := c.cache.Peek(key)
|
||||
if !ok {
|
||||
return time.Time{}, false
|
||||
}
|
||||
|
||||
return valueRaw.(*cacheValue).atime, true
|
||||
}
|
||||
|
||||
func (c *sessionCache) Put(key string, token *session.Token) bool {
|
||||
return c.cache.Add(key, &cacheValue{
|
||||
atime: time.Now(),
|
||||
token: token,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package pool
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/session"
|
||||
|
@ -10,32 +9,6 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSessionCache_GetAccessTime(t *testing.T) {
|
||||
const key = "Foo"
|
||||
|
||||
cache, err := newCache()
|
||||
require.NoError(t, err)
|
||||
|
||||
st := session.NewToken()
|
||||
st.SetExp(1)
|
||||
|
||||
cache.Put(key, st)
|
||||
|
||||
t1, ok := cache.GetAccessTime(key)
|
||||
require.True(t, ok)
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
t2, ok := cache.GetAccessTime(key)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, t1, t2)
|
||||
|
||||
_ = cache.Get(key)
|
||||
t3, ok := cache.GetAccessTime(key)
|
||||
require.True(t, ok)
|
||||
require.NotEqual(t, t1, t3)
|
||||
}
|
||||
|
||||
func TestSessionCache_GetUnmodifiedToken(t *testing.T) {
|
||||
const key = "Foo"
|
||||
target := sessiontest.Token()
|
||||
|
|
50
pool/pool.go
50
pool/pool.go
|
@ -58,7 +58,6 @@ type InitParameters struct {
|
|||
nodeDialTimeout time.Duration
|
||||
healthcheckTimeout time.Duration
|
||||
clientRebalanceInterval time.Duration
|
||||
sessionTokenThreshold time.Duration
|
||||
sessionExpirationDuration uint64
|
||||
nodeParams []NodeParam
|
||||
|
||||
|
@ -94,11 +93,6 @@ func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
|
|||
x.clientRebalanceInterval = interval
|
||||
}
|
||||
|
||||
// SetSessionThreshold specifies the max session token life time for PutObject operation.
|
||||
func (x *InitParameters) SetSessionThreshold(threshold time.Duration) {
|
||||
x.sessionTokenThreshold = threshold
|
||||
}
|
||||
|
||||
// SetSessionExpirationDuration specifies the session token lifetime in epochs.
|
||||
func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) {
|
||||
x.sessionExpirationDuration = expirationDuration
|
||||
|
@ -464,7 +458,6 @@ type Pool struct {
|
|||
closedCh chan struct{}
|
||||
cache *sessionCache
|
||||
stokenDuration uint64
|
||||
stokenThreshold time.Duration
|
||||
rebalanceParams rebalanceParameters
|
||||
clientBuilder func(endpoint string) (client, error)
|
||||
logger *zap.Logger
|
||||
|
@ -479,7 +472,6 @@ type innerPool struct {
|
|||
const (
|
||||
defaultSessionTokenExpirationDuration = 100 // in blocks
|
||||
|
||||
defaultSessionTokenThreshold = 5 * time.Second
|
||||
defaultRebalanceInterval = 25 * time.Second
|
||||
defaultRequestTimeout = 4 * time.Second
|
||||
)
|
||||
|
@ -508,7 +500,6 @@ func NewPool(options InitParameters) (*Pool, error) {
|
|||
cache: cache,
|
||||
logger: options.logger,
|
||||
stokenDuration: options.sessionExpirationDuration,
|
||||
stokenThreshold: options.sessionTokenThreshold,
|
||||
rebalanceParams: rebalanceParameters{
|
||||
nodesParams: nodesParams,
|
||||
nodeRequestTimeout: options.healthcheckTimeout,
|
||||
|
@ -580,10 +571,6 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
|||
params.sessionExpirationDuration = defaultSessionTokenExpirationDuration
|
||||
}
|
||||
|
||||
if params.sessionTokenThreshold <= 0 {
|
||||
params.sessionTokenThreshold = defaultSessionTokenThreshold
|
||||
}
|
||||
|
||||
if params.clientRebalanceInterval <= 0 {
|
||||
params.clientRebalanceInterval = defaultRebalanceInterval
|
||||
}
|
||||
|
@ -818,25 +805,6 @@ func createSessionTokenForDuration(ctx context.Context, c client, dur uint64) (*
|
|||
return c.SessionCreate(ctx, prm)
|
||||
}
|
||||
|
||||
func (p *Pool) removeSessionTokenAfterThreshold(cfg prmCommon) error {
|
||||
cp, err := p.connection()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key := p.key
|
||||
if cfg.key != nil {
|
||||
key = cfg.key
|
||||
}
|
||||
|
||||
ts, ok := p.cache.GetAccessTime(formCacheKey(cp.address, key))
|
||||
if ok && time.Since(ts) > p.stokenThreshold {
|
||||
p.cache.DeleteByPrefix(cp.address)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type callContext struct {
|
||||
// base context for RPC
|
||||
context.Context
|
||||
|
@ -940,27 +908,11 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (*oid.ID, error)
|
|||
prm.useVerb(sessionv2.ObjectVerbPut)
|
||||
prm.useAddress(newAddressFromCnrID(prm.hdr.ContainerID()))
|
||||
|
||||
// Put object is different from other object service methods. Put request
|
||||
// can't be resent in case of session token failures (i.e. session token is
|
||||
// invalid due to lifetime expiration or server restart). The reason is that
|
||||
// object's payload can be provided as a stream that should be read only once.
|
||||
//
|
||||
// To solve this issue, pool regenerates session tokens upon each request.
|
||||
// In case of subsequent requests, pool avoids session token initialization
|
||||
// by checking when the session token was accessed for the last time. If it
|
||||
// hits a threshold, session token is removed from cache for a new one to be
|
||||
// issued.
|
||||
err := p.removeSessionTokenAfterThreshold(prm.prmCommon)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ctxCall callContext
|
||||
|
||||
ctxCall.Context = ctx
|
||||
|
||||
err = p.initCallContext(&ctxCall, prm.prmCommon)
|
||||
if err != nil {
|
||||
if err := p.initCallContext(&ctxCall, prm.prmCommon); err != nil {
|
||||
return nil, fmt.Errorf("init call context")
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue