forked from TrueCloudLab/frostfs-sdk-go
[#38] Add retrying when session token error
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
e08e3d6c00
commit
1d546711e5
2 changed files with 76 additions and 21 deletions
90
pool/pool.go
90
pool/pool.go
|
@ -93,6 +93,11 @@ type Pool interface {
|
|||
WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error
|
||||
Close()
|
||||
|
||||
ClientParam
|
||||
}
|
||||
|
||||
// ClientParam is analogue client.Object, client.Container but uses session token cache.
|
||||
type ClientParam interface {
|
||||
PutObjectParam(ctx context.Context, params *client.PutObjectParams, callParam *CallParam) (*object.ID, error)
|
||||
DeleteObjectParam(ctx context.Context, params *client.DeleteObjectParams, callParam *CallParam) error
|
||||
GetObjectParam(ctx context.Context, params *client.GetObjectParams, callParam *CallParam) (*object.Object, error)
|
||||
|
@ -117,7 +122,8 @@ type clientPack struct {
|
|||
}
|
||||
|
||||
type CallParam struct {
|
||||
Key *ecdsa.PrivateKey
|
||||
isRetry bool
|
||||
Key *ecdsa.PrivateKey
|
||||
|
||||
Options []client.CallOption
|
||||
}
|
||||
|
@ -464,14 +470,17 @@ func (p *pool) AnnounceContainerUsedSpace(ctx context.Context, announce []contai
|
|||
return conn.AnnounceContainerUsedSpace(ctx, announce, options...)
|
||||
}
|
||||
|
||||
func (p *pool) checkSessionTokenErr(err error, address string) {
|
||||
func (p *pool) checkSessionTokenErr(err error, address string) bool {
|
||||
if err == nil {
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
if strings.Contains(err.Error(), "session token does not exist") {
|
||||
p.cache.DeleteByPrefix(address)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (p *pool) PutObjectParam(ctx context.Context, params *client.PutObjectParams, callParam *CallParam) (*object.ID, error) {
|
||||
|
@ -480,7 +489,10 @@ func (p *pool) PutObjectParam(ctx context.Context, params *client.PutObjectParam
|
|||
return nil, err
|
||||
}
|
||||
res, err := cp.client.PutObject(ctx, params, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.PutObjectParam(ctx, params, callParam)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
@ -490,7 +502,10 @@ func (p *pool) DeleteObjectParam(ctx context.Context, params *client.DeleteObjec
|
|||
return err
|
||||
}
|
||||
err = cp.client.DeleteObject(ctx, params, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.DeleteObjectParam(ctx, params, callParam)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -500,7 +515,10 @@ func (p *pool) GetObjectParam(ctx context.Context, params *client.GetObjectParam
|
|||
return nil, err
|
||||
}
|
||||
res, err := cp.client.GetObject(ctx, params, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.GetObjectParam(ctx, params, callParam)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
@ -510,7 +528,10 @@ func (p *pool) GetObjectHeaderParam(ctx context.Context, params *client.ObjectHe
|
|||
return nil, err
|
||||
}
|
||||
res, err := cp.client.GetObjectHeader(ctx, params, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.GetObjectHeaderParam(ctx, params, callParam)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
@ -520,7 +541,10 @@ func (p *pool) ObjectPayloadRangeDataParam(ctx context.Context, params *client.R
|
|||
return nil, err
|
||||
}
|
||||
res, err := cp.client.ObjectPayloadRangeData(ctx, params, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.ObjectPayloadRangeDataParam(ctx, params, callParam)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
@ -530,7 +554,10 @@ func (p *pool) ObjectPayloadRangeSHA256Param(ctx context.Context, params *client
|
|||
return nil, err
|
||||
}
|
||||
res, err := cp.client.ObjectPayloadRangeSHA256(ctx, params, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.ObjectPayloadRangeSHA256Param(ctx, params, callParam)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
@ -540,7 +567,10 @@ func (p *pool) ObjectPayloadRangeTZParam(ctx context.Context, params *client.Ran
|
|||
return nil, err
|
||||
}
|
||||
res, err := cp.client.ObjectPayloadRangeTZ(ctx, params, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.ObjectPayloadRangeTZParam(ctx, params, callParam)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
@ -550,7 +580,10 @@ func (p *pool) SearchObjectParam(ctx context.Context, params *client.SearchObjec
|
|||
return nil, err
|
||||
}
|
||||
res, err := cp.client.SearchObject(ctx, params, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.SearchObjectParam(ctx, params, callParam)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
@ -560,7 +593,10 @@ func (p *pool) PutContainerParam(ctx context.Context, cnr *container.Container,
|
|||
return nil, err
|
||||
}
|
||||
res, err := cp.client.PutContainer(ctx, cnr, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.PutContainerParam(ctx, cnr, callParam)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
@ -570,7 +606,10 @@ func (p *pool) GetContainerParam(ctx context.Context, cid *cid.ID, callParam *Ca
|
|||
return nil, err
|
||||
}
|
||||
res, err := cp.client.GetContainer(ctx, cid, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.GetContainerParam(ctx, cid, callParam)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
@ -580,7 +619,10 @@ func (p *pool) ListContainersParam(ctx context.Context, ownerID *owner.ID, callP
|
|||
return nil, err
|
||||
}
|
||||
res, err := cp.client.ListContainers(ctx, ownerID, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.ListContainersParam(ctx, ownerID, callParam)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
@ -590,7 +632,10 @@ func (p *pool) DeleteContainerParam(ctx context.Context, cid *cid.ID, callParam
|
|||
return err
|
||||
}
|
||||
err = cp.client.DeleteContainer(ctx, cid, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.DeleteContainerParam(ctx, cid, callParam)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -600,7 +645,10 @@ func (p *pool) GetEACLParam(ctx context.Context, cid *cid.ID, callParam *CallPar
|
|||
return nil, err
|
||||
}
|
||||
res, err := cp.client.GetEACL(ctx, cid, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.GetEACLParam(ctx, cid, callParam)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
@ -610,7 +658,10 @@ func (p *pool) SetEACLParam(ctx context.Context, table *eacl.Table, callParam *C
|
|||
return err
|
||||
}
|
||||
err = cp.client.SetEACL(ctx, table, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.SetEACLParam(ctx, table, callParam)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -620,7 +671,10 @@ func (p *pool) AnnounceContainerUsedSpaceParam(ctx context.Context, announce []c
|
|||
return err
|
||||
}
|
||||
err = cp.client.AnnounceContainerUsedSpace(ctx, announce, options...)
|
||||
p.checkSessionTokenErr(err, cp.address)
|
||||
if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry {
|
||||
callParam.isRetry = true
|
||||
return p.AnnounceContainerUsedSpaceParam(ctx, announce, callParam)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -321,10 +321,11 @@ func TestSessionCache(t *testing.T) {
|
|||
tok.SetID(uid)
|
||||
tokens = append(tokens, tok)
|
||||
return tok, err
|
||||
}).MaxTimes(2)
|
||||
}).MaxTimes(3)
|
||||
|
||||
mockClient.EXPECT().GetObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("session token does not exist"))
|
||||
mockClient.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
|
||||
mockClient.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("session token does not exist"))
|
||||
mockClient.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
|
||||
|
||||
return mockClient, nil
|
||||
}
|
||||
|
@ -351,7 +352,7 @@ func TestSessionCache(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Contains(t, tokens, st)
|
||||
|
||||
_, err = pool.GetObjectParam(ctx, nil, &CallParam{})
|
||||
_, err = pool.GetObjectParam(ctx, nil, &CallParam{isRetry: true})
|
||||
require.Error(t, err)
|
||||
|
||||
// cache must not contain session token
|
||||
|
|
Loading…
Reference in a new issue