feature/115-adopt_put_single_pool #140
4 changed files with 348 additions and 37 deletions
|
@ -69,3 +69,7 @@ func (c *sessionCache) expired(val *cacheValue) bool {
|
|||
// use epoch+1 (clear cache beforehand) to prevent 'expired session token' error right after epoch tick
|
||||
return val.token.ExpiredAt(epoch + 1)
|
||||
}
|
||||
|
||||
func (c *sessionCache) Epoch() uint64 {
|
||||
return c.currentEpoch.Load()
|
||||
}
|
||||
|
|
173
pool/object_put_pool_transformer.go
Normal file
173
pool/object_put_pool_transformer.go
Normal file
|
@ -0,0 +1,173 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
type logger interface {
|
||||
log(level zapcore.Level, msg string, fields ...zap.Field)
|
||||
}
|
||||
|
||||
type PrmObjectPutClientCutInit struct {
|
||||
PrmObjectPut
|
||||
withoutHomomorphicHash bool
|
||||
}
|
||||
|
||||
func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) (*objectWriterTransformer, error) {
|
||||
cl, err := c.getClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var w objectWriterTransformer
|
||||
|
||||
w.it = internalTarget{
|
||||
client: cl,
|
||||
prm: prm,
|
||||
address: c.address(),
|
||||
logger: &c.clientStatusMonitor,
|
||||
}
|
||||
|
||||
key := &c.prm.key
|
||||
if prm.key != nil {
|
||||
key = prm.key
|
||||
}
|
||||
|
||||
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||
Key: key,
|
||||
NextTargetInit: func() transformer.ObjectWriter { return &w.it },
|
||||
MaxSize: prm.networkInfo.MaxObjectSize(),
|
||||
WithoutHomomorphicHash: prm.withoutHomomorphicHash,
|
||||
NetworkState: prm.networkInfo,
|
||||
SessionToken: prm.stoken,
|
||||
})
|
||||
return &w, nil
|
||||
}
|
||||
|
||||
type objectWriterTransformer struct {
|
||||
ot transformer.ChunkedObjectWriter
|
||||
it internalTarget
|
||||
err error
|
||||
}
|
||||
|
||||
func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool {
|
||||
x.err = x.ot.WriteHeader(ctx, &hdr)
|
||||
return x.err == nil
|
||||
}
|
||||
|
||||
func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool {
|
||||
_, x.err = x.ot.Write(ctx, chunk)
|
||||
return x.err == nil
|
||||
}
|
||||
|
||||
// ResObjectPut groups the final result values of ObjectPutInit operation.
|
||||
type ResObjectPut struct {
|
||||
Status apistatus.Status
|
||||
OID oid.ID
|
||||
}
|
||||
|
||||
// Close return non nil result in any case. If error occurred, the result contains only buffer for further reusing.
|
||||
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
|
||||
ai, err := x.ot.Close(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if ai != nil && ai.ParentID != nil {
|
||||
x.it.res.OID = *ai.ParentID
|
||||
}
|
||||
return &x.it.res, nil
|
||||
}
|
||||
|
||||
type internalTarget struct {
|
||||
client *sdkClient.Client
|
||||
res ResObjectPut
|
||||
prm PrmObjectPutClientCutInit
|
||||
useStream bool
|
||||
address string
|
||||
logger logger
|
||||
resolveFrostFSErrors bool
|
||||
}
|
||||
|
||||
func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error {
|
||||
putSingleImplemented, err := it.tryPutSingle(ctx, o)
|
||||
if putSingleImplemented {
|
||||
return err
|
||||
}
|
||||
|
||||
it.logger.log(zapcore.DebugLevel, "putSingle not implemented, trying put as stream", zap.String("address", it.address))
|
||||
|
||||
it.useStream = true
|
||||
return it.putAsStream(ctx, o)
|
||||
}
|
||||
|
||||
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
|
||||
var cliPrm sdkClient.PrmObjectPutInit
|
||||
cliPrm.SetCopiesNumberByVectors(it.prm.copiesNumber)
|
||||
if it.prm.stoken != nil {
|
||||
cliPrm.WithinSession(*it.prm.stoken)
|
||||
}
|
||||
if it.prm.key != nil {
|
||||
cliPrm.UseKey(*it.prm.key)
|
||||
}
|
||||
if it.prm.btoken != nil {
|
||||
cliPrm.WithBearerToken(*it.prm.btoken)
|
||||
}
|
||||
|
||||
wrt, err := it.client.ObjectPutInit(ctx, cliPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if wrt.WriteHeader(ctx, *o) {
|
||||
wrt.WritePayloadChunk(ctx, o.Payload())
|
||||
}
|
||||
res, err := wrt.Close(ctx)
|
||||
if res != nil {
|
||||
it.res.Status = res.Status()
|
||||
it.res.OID = res.StoredObjectID()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) {
|
||||
if it.useStream {
|
||||
return false, nil
|
||||
}
|
||||
var cliPrm sdkClient.PrmObjectPutSingle
|
||||
cliPrm.SetCopiesNumber(it.prm.copiesNumber)
|
||||
cliPrm.UseKey(it.prm.key)
|
||||
if it.prm.stoken != nil {
|
||||
cliPrm.WithinSession(*it.prm.stoken)
|
||||
}
|
||||
if it.prm.btoken != nil {
|
||||
cliPrm.WithBearerToken(*it.prm.btoken)
|
||||
}
|
||||
cliPrm.SetObject(o.ToV2())
|
||||
|
||||
res, err := it.client.ObjectPutSingle(ctx, cliPrm)
|
||||
if err != nil && status.Code(err) == codes.Unimplemented {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
id, _ := o.ID()
|
||||
it.res = ResObjectPut{
|
||||
Status: res.Status(),
|
||||
OID: id,
|
||||
}
|
||||
if !it.resolveFrostFSErrors && !apistatus.IsSuccessful(it.res.Status) {
|
||||
return true, apistatus.ErrFromStatus(it.res.Status)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
return true, err
|
||||
}
|
186
pool/pool.go
186
pool/pool.go
|
@ -252,6 +252,11 @@ func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) {
|
|||
x.key = key
|
||||
}
|
||||
|
||||
// setLogger sets sdkClient.Client logger.
|
||||
func (x *wrapperPrm) setLogger(logger *zap.Logger) {
|
||||
x.logger = logger
|
||||
}
|
||||
|
||||
// setDialTimeout sets the timeout for connection to be established.
|
||||
func (x *wrapperPrm) setDialTimeout(timeout time.Duration) {
|
||||
x.dialTimeout = timeout
|
||||
|
@ -629,6 +634,14 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netm
|
|||
|
||||
// objectPut writes object to FrostFS.
|
||||
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||
if prm.clientCut {
|
||||
return c.objectPutClientCut(ctx, prm)
|
||||
}
|
||||
|
||||
return c.objectPutServerCut(ctx, prm)
|
||||
}
|
||||
|
||||
func (c *clientWrapper) objectPutServerCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||
cl, err := c.getClient()
|
||||
if err != nil {
|
||||
return oid.ID{}, err
|
||||
|
@ -666,7 +679,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
|
|||
}
|
||||
|
||||
if prm.payload != nil {
|
||||
const defaultBufferSizePut = 3 << 20 // configure?
|
||||
const defaultBufferSizePut = 3 << 20 // default grpc message size, configure?
|
||||
|
||||
if sz == 0 || sz > defaultBufferSizePut {
|
||||
sz = defaultBufferSizePut
|
||||
|
@ -710,6 +723,75 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
|
|||
return res.StoredObjectID(), nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) objectPutClientCut(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||
putInitPrm := PrmObjectPutClientCutInit{
|
||||
PrmObjectPut: prm,
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
wObj, err := c.objectPutInitTransformer(putInitPrm)
|
||||
c.incRequests(time.Since(start), methodObjectPut)
|
||||
if err = c.handleError(ctx, nil, err); err != nil {
|
||||
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
|
||||
}
|
||||
|
||||
if wObj.WriteHeader(ctx, prm.hdr) {
|
||||
sz := prm.hdr.PayloadSize()
|
||||
|
||||
if data := prm.hdr.Payload(); len(data) > 0 {
|
||||
if prm.payload != nil {
|
||||
prm.payload = io.MultiReader(bytes.NewReader(data), prm.payload)
|
||||
} else {
|
||||
prm.payload = bytes.NewReader(data)
|
||||
sz = uint64(len(data))
|
||||
}
|
||||
}
|
||||
|
||||
if prm.payload != nil {
|
||||
const defaultBufferSizePut = 64 * 1024 // it's buffer size in s3-gw, configure?
|
||||
|
||||
|
||||
if sz == 0 || sz > defaultBufferSizePut {
|
||||
sz = defaultBufferSizePut
|
||||
}
|
||||
|
||||
buf := make([]byte, sz)
|
||||
|
||||
var n int
|
||||
|
||||
for {
|
||||
n, err = prm.payload.Read(buf)
|
||||
if n > 0 {
|
||||
start = time.Now()
|
||||
successWrite := wObj.WritePayloadChunk(ctx, buf[:n])
|
||||
c.incRequests(time.Since(start), methodObjectPut)
|
||||
if !successWrite {
|
||||
break
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
|
||||
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
res, err := wObj.Close(ctx)
|
||||
var st apistatus.Status
|
||||
if res != nil {
|
||||
st = res.Status
|
||||
}
|
||||
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
|
||||
return oid.ID{}, fmt.Errorf("client failure: %w", err)
|
||||
}
|
||||
|
||||
return res.OID, nil
|
||||
}
|
||||
|
||||
// objectDelete invokes sdkClient.ObjectDelete parse response status to error.
|
||||
func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error {
|
||||
cl, err := c.getClient()
|
||||
|
@ -972,12 +1054,20 @@ func (c *clientStatusMonitor) incErrorRate() {
|
|||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
if thresholdReached && c.logger != nil {
|
||||
c.logger.Warn("error threshold reached",
|
||||
if thresholdReached {
|
||||
c.log(zapcore.WarnLevel, "error threshold reached",
|
||||
zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||
if c.logger == nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.Log(level, msg, fields...)
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) currentErrorRate() uint32 {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
@ -1316,6 +1406,9 @@ type PrmObjectPut struct {
|
|||
payload io.Reader
|
||||
|
||||
copiesNumber []uint32
|
||||
|
||||
clientCut bool
|
||||
networkInfo netmap.NetworkInfo
|
||||
}
|
||||
|
||||
// SetHeader specifies header of the object.
|
||||
|
@ -1340,6 +1433,20 @@ func (x *PrmObjectPut) SetCopiesNumberVector(copiesNumber []uint32) {
|
|||
x.copiesNumber = copiesNumber
|
||||
}
|
||||
|
||||
// SetClientCut enables client cut for objects. It means that full object is prepared on client side
|
||||
// and retrying is possible. But this leads to additional memory using for buffering object parts.
|
||||
// Buffer size for every put is MaxObjectSize value from FrostFS network.
|
||||
// There is limit for total memory allocation for in-flight request and
|
||||
// can be set by InitParameters.SetMaxClientCutMemory (default value is 1gb).
|
||||
// Put requests will fail if this limit be reached.
|
||||
func (x *PrmObjectPut) SetClientCut(clientCut bool) {
|
||||
x.clientCut = clientCut
|
||||
}
|
||||
|
||||
func (x *PrmObjectPut) setNetworkInfo(ni netmap.NetworkInfo) {
|
||||
x.networkInfo = ni
|
||||
}
|
||||
|
||||
// PrmObjectDelete groups parameters of DeleteObject operation.
|
||||
type PrmObjectDelete struct {
|
||||
prmCommon
|
||||
|
@ -1635,6 +1742,8 @@ type Pool struct {
|
|||
rebalanceParams rebalanceParameters
|
||||
clientBuilder clientBuilder
|
||||
logger *zap.Logger
|
||||
|
||||
maxObjectSize uint64
|
||||
}
|
||||
|
||||
type innerPool struct {
|
||||
|
@ -1710,7 +1819,7 @@ func (p *Pool) Dial(ctx context.Context) error {
|
|||
}
|
||||
|
||||
var st session.Object
|
||||
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key)
|
||||
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key, false)
|
||||
if err != nil {
|
||||
clients[j].setUnhealthy()
|
||||
p.log(zap.WarnLevel, "failed to create frostfs session token for client",
|
||||
|
@ -1718,7 +1827,7 @@ func (p *Pool) Dial(ctx context.Context) error {
|
|||
continue
|
||||
}
|
||||
|
||||
_ = p.cache.Put(formCacheKey(addr, p.key), st)
|
||||
_ = p.cache.Put(formCacheKey(addr, p.key, false), st)
|
||||
atLeastOneHealthy = true
|
||||
}
|
||||
source := rand.NewSource(time.Now().UnixNano())
|
||||
|
@ -1739,6 +1848,12 @@ func (p *Pool) Dial(ctx context.Context) error {
|
|||
p.closedCh = make(chan struct{})
|
||||
p.innerPools = inner
|
||||
|
||||
ni, err := p.NetworkInfo(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get network info for max object size: %w", err)
|
||||
}
|
||||
p.maxObjectSize = ni.MaxObjectSize()
|
||||
|
||||
go p.startRebalance(ctx)
|
||||
return nil
|
||||
}
|
||||
|
@ -1781,6 +1896,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
|||
var prm wrapperPrm
|
||||
prm.setAddress(addr)
|
||||
prm.setKey(*params.key)
|
||||
prm.setLogger(params.logger)
|
||||
prm.setDialTimeout(params.nodeDialTimeout)
|
||||
prm.setStreamTimeout(params.nodeStreamTimeout)
|
||||
prm.setErrorThreshold(params.errorThreshold)
|
||||
|
@ -1949,9 +2065,15 @@ func (p *innerPool) connection() (client, error) {
|
|||
return nil, errors.New("no healthy client")
|
||||
}
|
||||
|
||||
func formCacheKey(address string, key *ecdsa.PrivateKey) string {
|
||||
func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string {
|
||||
k := keys.PrivateKey{PrivateKey: *key}
|
||||
return address + k.String()
|
||||
|
||||
stype := "server"
|
||||
if clientCut {
|
||||
stype = "client"
|
||||
}
|
||||
|
||||
return address + stype + k.String()
|
||||
}
|
||||
|
||||
func (p *Pool) checkSessionTokenErr(err error, address string) bool {
|
||||
|
@ -1967,7 +2089,7 @@ func (p *Pool) checkSessionTokenErr(err error, address string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey) error {
|
||||
func initSessionForDuration(ctx context.Context, dst *session.Object, c client, dur uint64, ownerKey ecdsa.PrivateKey, clientCut bool) error {
|
||||
ni, err := c.networkInfo(ctx, prmNetworkInfo{})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1985,23 +2107,25 @@ func initSessionForDuration(ctx context.Context, dst *session.Object, c client,
|
|||
prm.setExp(exp)
|
||||
prm.useKey(ownerKey)
|
||||
|
||||
res, err := c.sessionCreate(ctx, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var (
|
||||
id uuid.UUID
|
||||
key frostfsecdsa.PublicKey
|
||||
)
|
||||
|
||||
var id uuid.UUID
|
||||
|
||||
err = id.UnmarshalBinary(res.id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid session token ID: %w", err)
|
||||
}
|
||||
|
||||
var key frostfsecdsa.PublicKey
|
||||
|
||||
err = key.Decode(res.sessionKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid public session key: %w", err)
|
||||
if clientCut {
|
||||
id = uuid.New()
|
||||
key = frostfsecdsa.PublicKey(ownerKey.PublicKey)
|
||||
} else {
|
||||
res, err := c.sessionCreate(ctx, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = id.UnmarshalBinary(res.id); err != nil {
|
||||
return fmt.Errorf("invalid session token ID: %w", err)
|
||||
}
|
||||
if err = key.Decode(res.sessionKey); err != nil {
|
||||
return fmt.Errorf("invalid public session key: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
dst.SetID(id)
|
||||
|
@ -2027,6 +2151,8 @@ type callContext struct {
|
|||
sessionCnr cid.ID
|
||||
sessionObjSet bool
|
||||
sessionObjs []oid.ID
|
||||
|
||||
sessionClientCut bool
|
||||
}
|
||||
|
||||
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
|
||||
|
@ -2063,12 +2189,12 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
|
|||
// opens new session or uses cached one.
|
||||
// Must be called only on initialized callContext with set sessionTarget.
|
||||
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
|
||||
cacheKey := formCacheKey(cc.endpoint, cc.key)
|
||||
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
|
||||
|
||||
tok, ok := p.cache.Get(cacheKey)
|
||||
if !ok {
|
||||
// init new session
|
||||
err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key)
|
||||
err := initSessionForDuration(ctx, &tok, cc.client, p.stokenDuration, *cc.key, cc.sessionClientCut)
|
||||
if err != nil {
|
||||
return fmt.Errorf("session API client: %w", err)
|
||||
}
|
||||
|
@ -2133,6 +2259,7 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
|
|||
p.fillAppropriateKey(&prm.prmCommon)
|
||||
|
||||
var ctxCall callContext
|
||||
ctxCall.sessionClientCut = prm.clientCut
|
||||
if err := p.initCallContext(&ctxCall, prm.prmCommon, prmCtx); err != nil {
|
||||
return oid.ID{}, fmt.Errorf("init call context: %w", err)
|
||||
}
|
||||
|
@ -2144,6 +2271,13 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
|
|||
}
|
||||
}
|
||||
|
||||
if prm.clientCut {
|
||||
var ni netmap.NetworkInfo
|
||||
ni.SetCurrentEpoch(p.cache.Epoch())
|
||||
ni.SetMaxObjectSize(p.maxObjectSize) // we want to use initial max object size in PayloadSizeLimiter
|
||||
prm.setNetworkInfo(ni)
|
||||
}
|
||||
|
||||
id, err := ctxCall.client.objectPut(ctx, prm)
|
||||
if err != nil {
|
||||
// removes session token from cache in case of token error
|
||||
|
|
|
@ -106,7 +106,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
|
|||
if err != nil {
|
||||
return false
|
||||
}
|
||||
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key))
|
||||
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key, false))
|
||||
return st.AssertAuthKey(&expectedAuthKey)
|
||||
}
|
||||
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
|
||||
|
@ -141,7 +141,7 @@ func TestOneNode(t *testing.T) {
|
|||
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
|
||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||
}
|
||||
|
@ -171,7 +171,7 @@ func TestTwoNodes(t *testing.T) {
|
|||
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
||||
}
|
||||
|
||||
|
@ -226,7 +226,7 @@ func TestOneOfTwoFailed(t *testing.T) {
|
|||
for i := 0; i < 5; i++ {
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
||||
}
|
||||
}
|
||||
|
@ -296,7 +296,7 @@ func TestSessionCache(t *testing.T) {
|
|||
// cache must contain session token
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||
|
||||
var prm PrmObjectGet
|
||||
|
@ -309,7 +309,7 @@ func TestSessionCache(t *testing.T) {
|
|||
// cache must not contain session token
|
||||
cp, err = pool.connection()
|
||||
require.NoError(t, err)
|
||||
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.False(t, ok)
|
||||
|
||||
var prm2 PrmObjectPut
|
||||
|
@ -321,7 +321,7 @@ func TestSessionCache(t *testing.T) {
|
|||
// cache must contain session token
|
||||
cp, err = pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||
}
|
||||
|
||||
|
@ -365,7 +365,7 @@ func TestPriority(t *testing.T) {
|
|||
firstNode := func() bool {
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
return st.AssertAuthKey(&expectedAuthKey1)
|
||||
}
|
||||
|
||||
|
@ -373,7 +373,7 @@ func TestPriority(t *testing.T) {
|
|||
secondNode := func() bool {
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
return st.AssertAuthKey(&expectedAuthKey2)
|
||||
}
|
||||
require.Never(t, secondNode, time.Second, 200*time.Millisecond)
|
||||
|
@ -410,7 +410,7 @@ func TestSessionCacheWithKey(t *testing.T) {
|
|||
// cache must contain session token
|
||||
cp, err := pool.connection()
|
||||
require.NoError(t, err)
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key))
|
||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||
|
||||
var prm PrmObjectDelete
|
||||
|
@ -420,7 +420,7 @@ func TestSessionCacheWithKey(t *testing.T) {
|
|||
|
||||
err = pool.DeleteObject(ctx, prm)
|
||||
require.NoError(t, err)
|
||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey))
|
||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey, false))
|
||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue
Should we create an issue for that?
#149