[#126] pool: Do not resend object Put requests

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2022-02-02 14:21:23 +03:00 committed by Alex Vanin
parent de4eedcd61
commit 40aaaafc73

View file

@ -64,6 +64,7 @@ type BuilderOptions struct {
NodeConnectionTimeout time.Duration NodeConnectionTimeout time.Duration
NodeRequestTimeout time.Duration NodeRequestTimeout time.Duration
ClientRebalanceInterval time.Duration ClientRebalanceInterval time.Duration
SessionTokenThreshold time.Duration
SessionExpirationDuration uint64 SessionExpirationDuration uint64
nodesParams []*NodesParam nodesParams []*NodesParam
clientBuilder func(opts ...client.Option) (Client, error) clientBuilder func(opts ...client.Option) (Client, error)
@ -239,13 +240,14 @@ func cfgFromOpts(opts ...CallOption) *callConfig {
var _ Pool = (*pool)(nil) var _ Pool = (*pool)(nil)
type pool struct { type pool struct {
innerPools []*innerPool innerPools []*innerPool
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
owner *owner.ID owner *owner.ID
cancel context.CancelFunc cancel context.CancelFunc
closedCh chan struct{} closedCh chan struct{}
cache *SessionCache cache *SessionCache
stokenDuration uint64 stokenDuration uint64
stokenThreshold time.Duration
} }
type innerPool struct { type innerPool struct {
@ -254,7 +256,10 @@ type innerPool struct {
clientPacks []*clientPack clientPacks []*clientPack
} }
const DefaultSessionTokenExpirationDuration = 100 // in blocks const (
DefaultSessionTokenExpirationDuration = 100 // in blocks
DefaultSessionTokenThreshold = 5 * time.Second
)
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
cache, err := NewCache() cache, err := NewCache()
@ -266,6 +271,10 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
options.SessionExpirationDuration = DefaultSessionTokenExpirationDuration options.SessionExpirationDuration = DefaultSessionTokenExpirationDuration
} }
if options.SessionTokenThreshold <= 0 {
options.SessionTokenThreshold = DefaultSessionTokenThreshold
}
ownerID := owner.NewIDFromPublicKey(&options.Key.PublicKey) ownerID := owner.NewIDFromPublicKey(&options.Key.PublicKey)
inner := make([]*innerPool, len(options.nodesParams)) inner := make([]*innerPool, len(options.nodesParams))
@ -309,13 +318,14 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
pool := &pool{ pool := &pool{
innerPools: inner, innerPools: inner,
key: options.Key, key: options.Key,
owner: ownerID, owner: ownerID,
cancel: cancel, cancel: cancel,
closedCh: make(chan struct{}), closedCh: make(chan struct{}),
cache: cache, cache: cache,
stokenDuration: options.SessionExpirationDuration, stokenDuration: options.SessionExpirationDuration,
stokenThreshold: options.SessionTokenThreshold,
} }
go startRebalance(ctx, pool, options) go startRebalance(ctx, pool, options)
return pool, nil return pool, nil
@ -556,8 +566,43 @@ func createSessionTokenForDuration(ctx context.Context, c Client, dur uint64) (*
return c.CreateSession(ctx, prm) return c.CreateSession(ctx, prm)
} }
func (p *pool) removeSessionTokenAfterThreshold(cfg *callConfig) error {
cp, err := p.connection()
if err != nil {
return err
}
key := p.key
if cfg.key != nil {
key = cfg.key
}
ts, ok := p.cache.GetAccessTime(formCacheKey(cp.address, key))
if ok && time.Since(ts) > p.stokenThreshold {
p.cache.DeleteByPrefix(cp.address)
}
return nil
}
func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*oid.ID, error) { func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*oid.ID, error) {
cfg := cfgFromOpts(append(opts, useDefaultSession())...) cfg := cfgFromOpts(append(opts, useDefaultSession())...)
// Put object is different from other object service methods. Put request
// can't be resent in case of session token failures (i.e. session token is
// invalid due to lifetime expiration or server restart). The reason is that
// object's payload can be provided as a stream that should be read only once.
//
// To solve this issue, pool regenerates session tokens upon each request.
// In case of subsequent requests, pool avoids session token initialization
// by checking when the session token was accessed for the last time. If it
// hits a threshold, session token is removed from cache for a new one to be
// issued.
err := p.removeSessionTokenAfterThreshold(cfg)
if err != nil {
return nil, err
}
cp, options, err := p.conn(ctx, cfg) cp, options, err := p.conn(ctx, cfg)
if err != nil { if err != nil {
return nil, err return nil, err
@ -565,10 +610,8 @@ func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, op
res, err := cp.client.PutObject(ctx, params, options...) res, err := cp.client.PutObject(ctx, params, options...)
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { // removes session token from cache in case of token error
opts = append(opts, retry()) _ = p.checkSessionTokenErr(err, cp.address)
return p.PutObject(ctx, params, opts...)
}
if err != nil { // here err already carries both status and client errors if err != nil { // here err already carries both status and client errors
return nil, err return nil, err