From 1d546711e5aac987616db592273a0894da063fb4 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Wed, 3 Nov 2021 17:34:02 +0300 Subject: [PATCH] [#38] Add retrying when session token error Signed-off-by: Denis Kirillov --- pool/pool.go | 90 +++++++++++++++++++++++++++++++++++++---------- pool/pool_test.go | 7 ++-- 2 files changed, 76 insertions(+), 21 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index 451fd94..cea77cd 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -93,6 +93,11 @@ type Pool interface { WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error Close() + ClientParam +} + +// ClientParam is analogue client.Object, client.Container but uses session token cache. +type ClientParam interface { PutObjectParam(ctx context.Context, params *client.PutObjectParams, callParam *CallParam) (*object.ID, error) DeleteObjectParam(ctx context.Context, params *client.DeleteObjectParams, callParam *CallParam) error GetObjectParam(ctx context.Context, params *client.GetObjectParams, callParam *CallParam) (*object.Object, error) @@ -117,7 +122,8 @@ type clientPack struct { } type CallParam struct { - Key *ecdsa.PrivateKey + isRetry bool + Key *ecdsa.PrivateKey Options []client.CallOption } @@ -464,14 +470,17 @@ func (p *pool) AnnounceContainerUsedSpace(ctx context.Context, announce []contai return conn.AnnounceContainerUsedSpace(ctx, announce, options...) } -func (p *pool) checkSessionTokenErr(err error, address string) { +func (p *pool) checkSessionTokenErr(err error, address string) bool { if err == nil { - return + return false } if strings.Contains(err.Error(), "session token does not exist") { p.cache.DeleteByPrefix(address) + return true } + + return false } func (p *pool) PutObjectParam(ctx context.Context, params *client.PutObjectParams, callParam *CallParam) (*object.ID, error) { @@ -480,7 +489,10 @@ func (p *pool) PutObjectParam(ctx context.Context, params *client.PutObjectParam return nil, err } res, err := cp.client.PutObject(ctx, params, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.PutObjectParam(ctx, params, callParam) + } return res, err } @@ -490,7 +502,10 @@ func (p *pool) DeleteObjectParam(ctx context.Context, params *client.DeleteObjec return err } err = cp.client.DeleteObject(ctx, params, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.DeleteObjectParam(ctx, params, callParam) + } return err } @@ -500,7 +515,10 @@ func (p *pool) GetObjectParam(ctx context.Context, params *client.GetObjectParam return nil, err } res, err := cp.client.GetObject(ctx, params, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.GetObjectParam(ctx, params, callParam) + } return res, err } @@ -510,7 +528,10 @@ func (p *pool) GetObjectHeaderParam(ctx context.Context, params *client.ObjectHe return nil, err } res, err := cp.client.GetObjectHeader(ctx, params, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.GetObjectHeaderParam(ctx, params, callParam) + } return res, err } @@ -520,7 +541,10 @@ func (p *pool) ObjectPayloadRangeDataParam(ctx context.Context, params *client.R return nil, err } res, err := cp.client.ObjectPayloadRangeData(ctx, params, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.ObjectPayloadRangeDataParam(ctx, params, callParam) + } return res, err } @@ -530,7 +554,10 @@ func (p *pool) ObjectPayloadRangeSHA256Param(ctx context.Context, params *client return nil, err } res, err := cp.client.ObjectPayloadRangeSHA256(ctx, params, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.ObjectPayloadRangeSHA256Param(ctx, params, callParam) + } return res, err } @@ -540,7 +567,10 @@ func (p *pool) ObjectPayloadRangeTZParam(ctx context.Context, params *client.Ran return nil, err } res, err := cp.client.ObjectPayloadRangeTZ(ctx, params, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.ObjectPayloadRangeTZParam(ctx, params, callParam) + } return res, err } @@ -550,7 +580,10 @@ func (p *pool) SearchObjectParam(ctx context.Context, params *client.SearchObjec return nil, err } res, err := cp.client.SearchObject(ctx, params, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.SearchObjectParam(ctx, params, callParam) + } return res, err } @@ -560,7 +593,10 @@ func (p *pool) PutContainerParam(ctx context.Context, cnr *container.Container, return nil, err } res, err := cp.client.PutContainer(ctx, cnr, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.PutContainerParam(ctx, cnr, callParam) + } return res, err } @@ -570,7 +606,10 @@ func (p *pool) GetContainerParam(ctx context.Context, cid *cid.ID, callParam *Ca return nil, err } res, err := cp.client.GetContainer(ctx, cid, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.GetContainerParam(ctx, cid, callParam) + } return res, err } @@ -580,7 +619,10 @@ func (p *pool) ListContainersParam(ctx context.Context, ownerID *owner.ID, callP return nil, err } res, err := cp.client.ListContainers(ctx, ownerID, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.ListContainersParam(ctx, ownerID, callParam) + } return res, err } @@ -590,7 +632,10 @@ func (p *pool) DeleteContainerParam(ctx context.Context, cid *cid.ID, callParam return err } err = cp.client.DeleteContainer(ctx, cid, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.DeleteContainerParam(ctx, cid, callParam) + } return err } @@ -600,7 +645,10 @@ func (p *pool) GetEACLParam(ctx context.Context, cid *cid.ID, callParam *CallPar return nil, err } res, err := cp.client.GetEACL(ctx, cid, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.GetEACLParam(ctx, cid, callParam) + } return res, err } @@ -610,7 +658,10 @@ func (p *pool) SetEACLParam(ctx context.Context, table *eacl.Table, callParam *C return err } err = cp.client.SetEACL(ctx, table, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.SetEACLParam(ctx, table, callParam) + } return err } @@ -620,7 +671,10 @@ func (p *pool) AnnounceContainerUsedSpaceParam(ctx context.Context, announce []c return err } err = cp.client.AnnounceContainerUsedSpace(ctx, announce, options...) - p.checkSessionTokenErr(err, cp.address) + if p.checkSessionTokenErr(err, cp.address) && !callParam.isRetry { + callParam.isRetry = true + return p.AnnounceContainerUsedSpaceParam(ctx, announce, callParam) + } return err } diff --git a/pool/pool_test.go b/pool/pool_test.go index f2d2155..0cf7054 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -321,10 +321,11 @@ func TestSessionCache(t *testing.T) { tok.SetID(uid) tokens = append(tokens, tok) return tok, err - }).MaxTimes(2) + }).MaxTimes(3) mockClient.EXPECT().GetObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("session token does not exist")) - mockClient.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + mockClient.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("session token does not exist")) + mockClient.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) return mockClient, nil } @@ -351,7 +352,7 @@ func TestSessionCache(t *testing.T) { require.NoError(t, err) require.Contains(t, tokens, st) - _, err = pool.GetObjectParam(ctx, nil, &CallParam{}) + _, err = pool.GetObjectParam(ctx, nil, &CallParam{isRetry: true}) require.Error(t, err) // cache must not contain session token