From b81122740a70420218cc3f2f66ca330d2bb17b3d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 9 Mar 2022 12:40:23 +0300 Subject: [PATCH] [#164] pool: Make `pool.Pool` a struct Connection pool package should not define `Pool` type as an interface since it provides single particular implementation. Make `pool.Pool` type a struct instead of interface. Also remove `Object`, `Container` and `Accounting` interfaces. Signed-off-by: Leonard Lyubich --- pool/mock_test.go | 2 +- pool/pool.go | 109 +++++++++++++++---------------------------- pool/pool_test.go | 9 ++-- pool/sampler_test.go | 4 +- 4 files changed, 43 insertions(+), 81 deletions(-) diff --git a/pool/mock_test.go b/pool/mock_test.go index c3032b9..07e9ce3 100644 --- a/pool/mock_test.go +++ b/pool/mock_test.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/nspcc-dev/neofs-sdk-go/pool (interfaces: Client) +// Source: github.com/nspcc-dev/neofs-sdk-go/Pool (interfaces: Client) // Package pool is a generated GoMock package. package pool diff --git a/pool/pool.go b/pool/pool.go index b283af6..38fdc69 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -50,7 +50,7 @@ type Client interface { SessionCreate(context.Context, client.PrmSessionCreate) (*client.ResSessionCreate, error) } -// BuilderOptions contains options used to build connection pool. +// BuilderOptions contains options used to build connection Pool. type BuilderOptions struct { Key *ecdsa.PrivateKey Logger *zap.Logger @@ -76,7 +76,7 @@ type NodeParam struct { } // Builder is an interim structure used to collect node addresses/weights and -// build connection pool subsequently. +// build connection Pool subsequently. type Builder struct { nodeParams []NodeParam } @@ -105,8 +105,8 @@ func (pb *Builder) AddNode(address string, priority int, weight float64) *Builde return pb } -// Build creates new pool based on current PoolBuilder state and options. -func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (Pool, error) { +// Build creates new Pool based on current PoolBuilder state and options. +func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (*Pool, error) { if len(pb.nodeParams) == 0 { return nil, errors.New("no NeoFS peers configured") } @@ -134,39 +134,6 @@ func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (Pool, er return newPool(ctx, options) } -// Pool is an interface providing connection artifacts on request. -type Pool interface { - Object - Container - Accounting - Connection() (Client, *session.Token, error) - OwnerID() *owner.ID - WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error - Close() -} - -type Object interface { - PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) - DeleteObject(ctx context.Context, addr address.Address, opts ...CallOption) error - GetObject(context.Context, address.Address, ...CallOption) (*ResGetObject, error) - HeadObject(context.Context, address.Address, ...CallOption) (*object.Object, error) - ObjectRange(ctx context.Context, addr address.Address, off, ln uint64, opts ...CallOption) (*ResObjectRange, error) - SearchObjects(context.Context, cid.ID, object.SearchFilters, ...CallOption) (*ResObjectSearch, error) -} - -type Container interface { - PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) - GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) - ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) - DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error - GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) - SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error -} - -type Accounting interface { - Balance(ctx context.Context, owner *owner.ID, opts ...CallOption) (*accounting.Decimal, error) -} - type clientPack struct { client Client healthy bool @@ -229,9 +196,7 @@ func cfgFromOpts(opts ...CallOption) *callConfig { return cfg } -var _ Pool = (*pool)(nil) - -type pool struct { +type Pool struct { innerPools []*innerPool key *ecdsa.PrivateKey owner *owner.ID @@ -254,7 +219,7 @@ const ( defaultSessionTokenThreshold = 5 * time.Second ) -func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { +func newPool(ctx context.Context, options *BuilderOptions) (*Pool, error) { cache, err := NewCache() if err != nil { return nil, fmt.Errorf("couldn't create cache: %w", err) @@ -325,7 +290,7 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { } ctx, cancel := context.WithCancel(ctx) - pool := &pool{ + pool := &Pool{ innerPools: inner, key: options.Key, owner: ownerID, @@ -339,7 +304,7 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { return pool, nil } -func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) { +func startRebalance(ctx context.Context, p *Pool, options *BuilderOptions) { ticker := time.NewTimer(options.ClientRebalanceInterval) buffers := make([][]float64, len(options.nodesParams)) for i, params := range options.nodesParams { @@ -358,7 +323,7 @@ func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) { } } -func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, buffers [][]float64) { +func updateNodesHealth(ctx context.Context, p *Pool, options *BuilderOptions, buffers [][]float64) { wg := sync.WaitGroup{} for i, inner := range p.innerPools { wg.Add(1) @@ -372,7 +337,7 @@ func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, bu wg.Wait() } -func updateInnerNodesHealth(ctx context.Context, pool *pool, i int, options *BuilderOptions, bufferWeights []float64) { +func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options *BuilderOptions, bufferWeights []float64) { if i > len(pool.innerPools)-1 { return } @@ -450,7 +415,7 @@ func adjustWeights(weights []float64) []float64 { return adjusted } -func (p *pool) Connection() (Client, *session.Token, error) { +func (p *Pool) Connection() (Client, *session.Token, error) { cp, err := p.connection() if err != nil { return nil, nil, err @@ -460,7 +425,7 @@ func (p *pool) Connection() (Client, *session.Token, error) { return cp.client, tok, nil } -func (p *pool) connection() (*clientPack, error) { +func (p *Pool) connection() (*clientPack, error) { for _, inner := range p.innerPools { cp, err := inner.connection() if err == nil { @@ -492,7 +457,7 @@ func (p *innerPool) connection() (*clientPack, error) { return nil, errors.New("no healthy client") } -func (p *pool) OwnerID() *owner.ID { +func (p *Pool) OwnerID() *owner.ID { return p.owner } @@ -501,7 +466,7 @@ func formCacheKey(address string, key *ecdsa.PrivateKey) string { return address + k.String() } -func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, error) { +func (p *Pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, error) { cp, err := p.connection() if err != nil { return nil, err @@ -534,7 +499,7 @@ func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, error) { return cp, nil } -func (p *pool) checkSessionTokenErr(err error, address string) bool { +func (p *Pool) checkSessionTokenErr(err error, address string) bool { if err == nil { return false } @@ -566,7 +531,7 @@ func createSessionTokenForDuration(ctx context.Context, c Client, dur uint64) (* return c.SessionCreate(ctx, prm) } -func (p *pool) removeSessionTokenAfterThreshold(cfg *callConfig) error { +func (p *Pool) removeSessionTokenAfterThreshold(cfg *callConfig) error { cp, err := p.connection() if err != nil { return err @@ -603,7 +568,7 @@ type callContext struct { sessionContext *session.ObjectContext } -func (p *pool) initCallContext(ctx *callContext, cfg *callConfig) error { +func (p *Pool) initCallContext(ctx *callContext, cfg *callConfig) error { cp, err := p.connection() if err != nil { return err @@ -639,7 +604,7 @@ type callContextWithRetry struct { noRetry bool } -func (p *pool) initCallContextWithRetry(ctx *callContextWithRetry, cfg *callConfig) error { +func (p *Pool) initCallContextWithRetry(ctx *callContextWithRetry, cfg *callConfig) error { err := p.initCallContext(&ctx.callContext, cfg) if err != nil { return err @@ -653,7 +618,7 @@ func (p *pool) initCallContextWithRetry(ctx *callContextWithRetry, cfg *callConf // opens new session or uses cached one. // Must be called only on initialized callContext with set sessionTarget. -func (p *pool) openDefaultSession(ctx *callContext) error { +func (p *Pool) openDefaultSession(ctx *callContext) error { cacheKey := formCacheKey(ctx.endpoint, ctx.key) tok := p.cache.Get(cacheKey) @@ -686,7 +651,7 @@ func (p *pool) openDefaultSession(ctx *callContext) error { // session-related error (*), and retrying is enabled, then f is called once more. // // (*) in this case cached token is removed. -func (p *pool) callWithRetry(ctx *callContextWithRetry, f func() error) error { +func (p *Pool) callWithRetry(ctx *callContextWithRetry, f func() error) error { var err error if ctx.sessionDefault { @@ -707,7 +672,7 @@ func (p *pool) callWithRetry(ctx *callContextWithRetry, f func() error) error { return err } -func (p *pool) PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) { +func (p *Pool) PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) { cfg := cfgFromOpts(append(opts, useDefaultSession(), useVerb(sessionv2.ObjectVerbPut), @@ -816,7 +781,7 @@ func (p *pool) PutObject(ctx context.Context, hdr object.Object, payload io.Read return &id, nil } -func (p *pool) DeleteObject(ctx context.Context, addr address.Address, opts ...CallOption) error { +func (p *Pool) DeleteObject(ctx context.Context, addr address.Address, opts ...CallOption) error { cfg := cfgFromOpts(append(opts, useDefaultSession(), useVerb(sessionv2.ObjectVerbDelete), @@ -871,7 +836,7 @@ type ResGetObject struct { Payload io.ReadCloser } -func (p *pool) GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) { +func (p *Pool) GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) { cfg := cfgFromOpts(append(opts, useDefaultSession(), useVerb(sessionv2.ObjectVerbGet), @@ -923,7 +888,7 @@ func (p *pool) GetObject(ctx context.Context, addr address.Address, opts ...Call return &res, nil } -func (p *pool) HeadObject(ctx context.Context, addr address.Address, opts ...CallOption) (*object.Object, error) { +func (p *Pool) HeadObject(ctx context.Context, addr address.Address, opts ...CallOption) (*object.Object, error) { cfg := cfgFromOpts(append(opts, useDefaultSession(), useVerb(sessionv2.ObjectVerbHead), @@ -985,7 +950,7 @@ func (x *ResObjectRange) Close() error { return err } -func (p *pool) ObjectRange(ctx context.Context, addr address.Address, off, ln uint64, opts ...CallOption) (*ResObjectRange, error) { +func (p *Pool) ObjectRange(ctx context.Context, addr address.Address, off, ln uint64, opts ...CallOption) (*ResObjectRange, error) { cfg := cfgFromOpts(append(opts, useDefaultSession(), useVerb(sessionv2.ObjectVerbRange), @@ -1061,7 +1026,7 @@ func (x *ResObjectSearch) Close() { _, _ = x.r.Close() } -func (p *pool) SearchObjects(ctx context.Context, idCnr cid.ID, filters object.SearchFilters, opts ...CallOption) (*ResObjectSearch, error) { +func (p *Pool) SearchObjects(ctx context.Context, idCnr cid.ID, filters object.SearchFilters, opts ...CallOption) (*ResObjectSearch, error) { cfg := cfgFromOpts(append(opts, useDefaultSession(), useVerb(sessionv2.ObjectVerbSearch), @@ -1103,7 +1068,7 @@ func (p *pool) SearchObjects(ctx context.Context, idCnr cid.ID, filters object.S return &res, nil } -func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) { +func (p *Pool) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) { cfg := cfgFromOpts(opts...) cp, err := p.conn(ctx, cfg) if err != nil { @@ -1124,7 +1089,7 @@ func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts return res.ID(), nil } -func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) { +func (p *Pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) { cfg := cfgFromOpts(opts...) cp, err := p.conn(ctx, cfg) if err != nil { @@ -1145,7 +1110,7 @@ func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption return res.Container(), nil } -func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) { +func (p *Pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) { cfg := cfgFromOpts(opts...) cp, err := p.conn(ctx, cfg) if err != nil { @@ -1166,7 +1131,7 @@ func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...Ca return res.Containers(), nil } -func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error { +func (p *Pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error { cfg := cfgFromOpts(opts...) cp, err := p.conn(ctx, cfg) if err != nil { @@ -1190,7 +1155,7 @@ func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOpt return err } -func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) { +func (p *Pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) { cfg := cfgFromOpts(opts...) cp, err := p.conn(ctx, cfg) if err != nil { @@ -1211,7 +1176,7 @@ func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*e return res.Table(), nil } -func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error { +func (p *Pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error { cfg := cfgFromOpts(opts...) cp, err := p.conn(ctx, cfg) if err != nil { @@ -1231,7 +1196,7 @@ func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOptio return err } -func (p *pool) Balance(ctx context.Context, o *owner.ID, opts ...CallOption) (*accounting.Decimal, error) { +func (p *Pool) Balance(ctx context.Context, o *owner.ID, opts ...CallOption) (*accounting.Decimal, error) { cfg := cfgFromOpts(opts...) cp, err := p.conn(ctx, cfg) if err != nil { @@ -1252,7 +1217,7 @@ func (p *pool) Balance(ctx context.Context, o *owner.ID, opts ...CallOption) (*a return res.Amount(), nil } -func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollParams *ContainerPollingParams) error { +func (p *Pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollParams *ContainerPollingParams) error { conn, _, err := p.Connection() if err != nil { return err @@ -1286,14 +1251,14 @@ func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollPa } } -// Close closes the pool and releases all the associated resources. -func (p *pool) Close() { +// Close closes the Pool and releases all the associated resources. +func (p *Pool) Close() { p.cancel() <-p.closedCh } // creates new session token from SessionCreate call result. -func (p *pool) newSessionToken(cliRes *client.ResSessionCreate) *session.Token { +func (p *Pool) newSessionToken(cliRes *client.ResSessionCreate) *session.Token { return sessionTokenForOwner(p.owner, cliRes) } diff --git a/pool/pool_test.go b/pool/pool_test.go index 94d0ef8..888685f 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -518,12 +518,9 @@ func TestSessionTokenOwner(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pp, err := pb.Build(ctx, opts) + p, err := pb.Build(ctx, opts) require.NoError(t, err) - t.Cleanup(pp.Close) - - p, ok := pp.(*pool) - require.True(t, ok) + t.Cleanup(p.Close) anonKey := newPrivateKey(t) anonOwner := owner.NewIDFromPublicKey(&anonKey.PublicKey) @@ -556,7 +553,7 @@ func TestWaitPresence(t *testing.T) { }}, } - p := &pool{ + p := &Pool{ innerPools: []*innerPool{inner}, key: newPrivateKey(t), cache: cache, diff --git a/pool/sampler_test.go b/pool/sampler_test.go index cb3c81f..aa27499 100644 --- a/pool/sampler_test.go +++ b/pool/sampler_test.go @@ -81,7 +81,7 @@ func TestHealthyReweight(t *testing.T) { {client: newNetmapMock(names[0], true), healthy: true, address: "address0"}, {client: newNetmapMock(names[1], false), healthy: true, address: "address1"}}, } - p := &pool{ + p := &Pool{ innerPools: []*innerPool{inner}, cache: cache, key: newPrivateKey(t), @@ -129,7 +129,7 @@ func TestHealthyNoReweight(t *testing.T) { {client: newNetmapMock(names[0], false), healthy: true}, {client: newNetmapMock(names[1], false), healthy: true}}, } - p := &pool{ + p := &Pool{ innerPools: []*innerPool{inner}, }