From a86c08b3ee7e663028f883bbfeb03b8ad5d70a78 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Wed, 2 Feb 2022 16:25:36 +0300 Subject: [PATCH] [#130] pool: Replace expiration value with expiration duration Based on the applications that use pool, most of them simply set session token duration to MaxUint64 value. It is completely understandable, because epochs are incrementing and expiration value will be surpassed sooner or later, unless it is MaxUint64. As an alternative I suggest specifying duration instead of absolute epoch values. Now apps can set duration of 100-200 epochs and pool automatically calculated expiration epoch base on the network info. Signed-off-by: Alex Vanin --- pool/pool.go | 82 ++++++++++++++++++++++++++--------------------- pool/pool_test.go | 14 ++++++++ 2 files changed, 59 insertions(+), 37 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index d69fabc..a5f9570 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -59,14 +59,14 @@ type Client interface { // BuilderOptions contains options used to build connection pool. type BuilderOptions struct { - Key *ecdsa.PrivateKey - Logger *zap.Logger - NodeConnectionTimeout time.Duration - NodeRequestTimeout time.Duration - ClientRebalanceInterval time.Duration - SessionExpirationEpoch uint64 - nodesParams []*NodesParam - clientBuilder func(opts ...client.Option) (Client, error) + Key *ecdsa.PrivateKey + Logger *zap.Logger + NodeConnectionTimeout time.Duration + NodeRequestTimeout time.Duration + ClientRebalanceInterval time.Duration + SessionExpirationDuration uint64 + nodesParams []*NodesParam + clientBuilder func(opts ...client.Option) (Client, error) } type NodesParam struct { @@ -239,12 +239,13 @@ func cfgFromOpts(opts ...CallOption) *callConfig { var _ Pool = (*pool)(nil) type pool struct { - innerPools []*innerPool - key *ecdsa.PrivateKey - owner *owner.ID - cancel context.CancelFunc - closedCh chan struct{} - cache *SessionCache + innerPools []*innerPool + key *ecdsa.PrivateKey + owner *owner.ID + cancel context.CancelFunc + closedCh chan struct{} + cache *SessionCache + stokenDuration uint64 } type innerPool struct { @@ -264,10 +265,6 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { inner := make([]*innerPool, len(options.nodesParams)) var atLeastOneHealthy bool - var cliPrm client.CreateSessionPrm - - cliPrm.SetExp(options.SessionExpirationEpoch) - for i, params := range options.nodesParams { clientPacks := make([]*clientPack, len(params.weights)) for j, address := range params.addresses { @@ -279,7 +276,7 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { return nil, err } var healthy bool - cliRes, err := c.CreateSession(ctx, cliPrm) + cliRes, err := createSessionTokenForDuration(ctx, c, options.SessionExpirationDuration) if err != nil && options.Logger != nil { options.Logger.Warn("failed to create neofs session token for client", zap.String("address", address), @@ -306,12 +303,13 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { ctx, cancel := context.WithCancel(ctx) pool := &pool{ - innerPools: inner, - key: options.Key, - owner: ownerID, - cancel: cancel, - closedCh: make(chan struct{}), - cache: cache, + innerPools: inner, + key: options.Key, + owner: ownerID, + cancel: cancel, + closedCh: make(chan struct{}), + cache: cache, + stokenDuration: options.SessionExpirationDuration, } go startRebalance(ctx, pool, options) return pool, nil @@ -359,12 +357,7 @@ func updateInnerNodesHealth(ctx context.Context, pool *pool, i int, options *Bui healthyChanged := false wg := sync.WaitGroup{} - var ( - prmEndpoint client.EndpointInfoPrm - prmSession client.CreateSessionPrm - ) - - prmSession.SetExp(options.SessionExpirationEpoch) + var prmEndpoint client.EndpointInfoPrm for j, cPack := range p.clientPacks { wg.Add(1) @@ -385,7 +378,8 @@ func updateInnerNodesHealth(ctx context.Context, pool *pool, i int, options *Bui if ok { bufferWeights[j] = options.nodesParams[i].weights[j] if !cp.healthy { - if cliRes, err := cli.CreateSession(ctx, prmSession); err != nil { + cliRes, err := createSessionTokenForDuration(ctx, cli, options.SessionExpirationDuration) + if err != nil { ok = false bufferWeights[j] = 0 } else { @@ -502,11 +496,7 @@ func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, []client cacheKey := formCacheKey(cp.address, key) sessionToken = p.cache.Get(cacheKey) if sessionToken == nil { - var cliPrm client.CreateSessionPrm - - cliPrm.SetExp(math.MaxUint32) - - cliRes, err := cp.client.CreateSession(ctx, cliPrm) + cliRes, err := createSessionTokenForDuration(ctx, cp.client, p.stokenDuration) if err != nil { return nil, nil, err } @@ -541,6 +531,24 @@ func (p *pool) checkSessionTokenErr(err error, address string) bool { return false } +func createSessionTokenForDuration(ctx context.Context, c Client, dur uint64) (*client.CreateSessionRes, error) { + ni, err := c.NetworkInfo(ctx, client.NetworkInfoPrm{}) + if err != nil { + return nil, err + } + + epoch := ni.Info().CurrentEpoch() + + var prm client.CreateSessionPrm + if math.MaxUint64-epoch < dur { + prm.SetExp(math.MaxUint64) + } else { + prm.SetExp(epoch + dur) + } + + return c.CreateSession(ctx, prm) +} + func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, opts ...CallOption) (*oid.ID, error) { cfg := cfgFromOpts(append(opts, useDefaultSession())...) cp, options, err := p.conn(ctx, cfg) diff --git a/pool/pool_test.go b/pool/pool_test.go index 6f3a5a6..ba792c7 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -48,6 +48,7 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) { mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error session")).AnyTimes() mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfoRes{}, nil).AnyTimes() + mockClient.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).Return(&client.NetworkInfoRes{}, nil).AnyTimes() return mockClient, nil } @@ -94,10 +95,12 @@ func TestBuildPoolOneNodeFailed(t *testing.T) { }).AnyTimes() mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockClient.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).Return(&client.NetworkInfoRes{}, nil).AnyTimes() mockClient2 := NewMockClient(ctrl2) mockClient2.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() mockClient2.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockClient.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).Return(&client.NetworkInfoRes{}, nil).AnyTimes() if clientCount == 0 { return mockClient, nil @@ -153,6 +156,7 @@ func TestOneNode(t *testing.T) { mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(tok, nil) mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfoRes{}, nil).AnyTimes() + mockClient.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).Return(&client.NetworkInfoRes{}, nil).AnyTimes() return mockClient, nil } @@ -190,6 +194,7 @@ func TestTwoNodes(t *testing.T) { return tok, err }) mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfoRes{}, nil).AnyTimes() + mockClient.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).Return(&client.NetworkInfoRes{}, nil).AnyTimes() return mockClient, nil } @@ -228,6 +233,7 @@ func TestOneOfTwoFailed(t *testing.T) { return tok, nil }).AnyTimes() mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockClient.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).Return(&client.NetworkInfoRes{}, nil).AnyTimes() mockClient2 := NewMockClient(ctrl2) mockClient2.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) { @@ -238,6 +244,9 @@ func TestOneOfTwoFailed(t *testing.T) { mockClient2.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).DoAndReturn(func(_ interface{}, _ ...interface{}) (*client.EndpointInfoRes, error) { return nil, fmt.Errorf("error") }).AnyTimes() + mockClient2.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).DoAndReturn(func(_ interface{}, _ ...interface{}) (*client.NetworkInfoRes, error) { + return nil, fmt.Errorf("error") + }).AnyTimes() if clientCount == 0 { return mockClient, nil @@ -277,6 +286,7 @@ func TestTwoFailed(t *testing.T) { mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error")).AnyTimes() + mockClient.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error")).AnyTimes() return mockClient, nil } @@ -379,6 +389,7 @@ func TestPriority(t *testing.T) { return tok, nil }).AnyTimes() mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error")).AnyTimes() + mockClient.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error")).AnyTimes() mockClient2 := NewMockClient(ctrl2) mockClient2.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) { @@ -387,6 +398,7 @@ func TestPriority(t *testing.T) { return tok, nil }).AnyTimes() mockClient2.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockClient2.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() if clientCount == 0 { return mockClient, nil @@ -489,6 +501,7 @@ func TestSessionTokenOwner(t *testing.T) { mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&client.CreateSessionRes{}, nil).AnyTimes() mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfoRes{}, nil).AnyTimes() + mockClient.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).Return(&client.NetworkInfoRes{}, nil).AnyTimes() return mockClient, nil } @@ -527,6 +540,7 @@ func TestWaitPresence(t *testing.T) { mockClient := NewMockClient(ctrl) mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockClient.EXPECT().NetworkInfo(gomock.Any(), gomock.Any()).Return(&client.NetworkInfoRes{}, nil).AnyTimes() mockClient.EXPECT().GetContainer(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() cache, err := NewCache()