[#164] pool: Make pool.Pool
a struct
Connection pool package should not define `Pool` type as an interface since it provides single particular implementation. Make `pool.Pool` type a struct instead of interface. Also remove `Object`, `Container` and `Accounting` interfaces. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
22dad0573d
commit
b81122740a
4 changed files with 43 additions and 81 deletions
|
@ -1,5 +1,5 @@
|
||||||
// Code generated by MockGen. DO NOT EDIT.
|
// Code generated by MockGen. DO NOT EDIT.
|
||||||
// Source: github.com/nspcc-dev/neofs-sdk-go/pool (interfaces: Client)
|
// Source: github.com/nspcc-dev/neofs-sdk-go/Pool (interfaces: Client)
|
||||||
|
|
||||||
// Package pool is a generated GoMock package.
|
// Package pool is a generated GoMock package.
|
||||||
package pool
|
package pool
|
||||||
|
|
109
pool/pool.go
109
pool/pool.go
|
@ -50,7 +50,7 @@ type Client interface {
|
||||||
SessionCreate(context.Context, client.PrmSessionCreate) (*client.ResSessionCreate, error)
|
SessionCreate(context.Context, client.PrmSessionCreate) (*client.ResSessionCreate, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BuilderOptions contains options used to build connection pool.
|
// BuilderOptions contains options used to build connection Pool.
|
||||||
type BuilderOptions struct {
|
type BuilderOptions struct {
|
||||||
Key *ecdsa.PrivateKey
|
Key *ecdsa.PrivateKey
|
||||||
Logger *zap.Logger
|
Logger *zap.Logger
|
||||||
|
@ -76,7 +76,7 @@ type NodeParam struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Builder is an interim structure used to collect node addresses/weights and
|
// Builder is an interim structure used to collect node addresses/weights and
|
||||||
// build connection pool subsequently.
|
// build connection Pool subsequently.
|
||||||
type Builder struct {
|
type Builder struct {
|
||||||
nodeParams []NodeParam
|
nodeParams []NodeParam
|
||||||
}
|
}
|
||||||
|
@ -105,8 +105,8 @@ func (pb *Builder) AddNode(address string, priority int, weight float64) *Builde
|
||||||
return pb
|
return pb
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build creates new pool based on current PoolBuilder state and options.
|
// Build creates new Pool based on current PoolBuilder state and options.
|
||||||
func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (*Pool, error) {
|
||||||
if len(pb.nodeParams) == 0 {
|
if len(pb.nodeParams) == 0 {
|
||||||
return nil, errors.New("no NeoFS peers configured")
|
return nil, errors.New("no NeoFS peers configured")
|
||||||
}
|
}
|
||||||
|
@ -134,39 +134,6 @@ func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (Pool, er
|
||||||
return newPool(ctx, options)
|
return newPool(ctx, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pool is an interface providing connection artifacts on request.
|
|
||||||
type Pool interface {
|
|
||||||
Object
|
|
||||||
Container
|
|
||||||
Accounting
|
|
||||||
Connection() (Client, *session.Token, error)
|
|
||||||
OwnerID() *owner.ID
|
|
||||||
WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error
|
|
||||||
Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
type Object interface {
|
|
||||||
PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error)
|
|
||||||
DeleteObject(ctx context.Context, addr address.Address, opts ...CallOption) error
|
|
||||||
GetObject(context.Context, address.Address, ...CallOption) (*ResGetObject, error)
|
|
||||||
HeadObject(context.Context, address.Address, ...CallOption) (*object.Object, error)
|
|
||||||
ObjectRange(ctx context.Context, addr address.Address, off, ln uint64, opts ...CallOption) (*ResObjectRange, error)
|
|
||||||
SearchObjects(context.Context, cid.ID, object.SearchFilters, ...CallOption) (*ResObjectSearch, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Container interface {
|
|
||||||
PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error)
|
|
||||||
GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error)
|
|
||||||
ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error)
|
|
||||||
DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error
|
|
||||||
GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error)
|
|
||||||
SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type Accounting interface {
|
|
||||||
Balance(ctx context.Context, owner *owner.ID, opts ...CallOption) (*accounting.Decimal, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type clientPack struct {
|
type clientPack struct {
|
||||||
client Client
|
client Client
|
||||||
healthy bool
|
healthy bool
|
||||||
|
@ -229,9 +196,7 @@ func cfgFromOpts(opts ...CallOption) *callConfig {
|
||||||
return cfg
|
return cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Pool = (*pool)(nil)
|
type Pool struct {
|
||||||
|
|
||||||
type pool struct {
|
|
||||||
innerPools []*innerPool
|
innerPools []*innerPool
|
||||||
key *ecdsa.PrivateKey
|
key *ecdsa.PrivateKey
|
||||||
owner *owner.ID
|
owner *owner.ID
|
||||||
|
@ -254,7 +219,7 @@ const (
|
||||||
defaultSessionTokenThreshold = 5 * time.Second
|
defaultSessionTokenThreshold = 5 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
func newPool(ctx context.Context, options *BuilderOptions) (*Pool, error) {
|
||||||
cache, err := NewCache()
|
cache, err := NewCache()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't create cache: %w", err)
|
return nil, fmt.Errorf("couldn't create cache: %w", err)
|
||||||
|
@ -325,7 +290,7 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
pool := &pool{
|
pool := &Pool{
|
||||||
innerPools: inner,
|
innerPools: inner,
|
||||||
key: options.Key,
|
key: options.Key,
|
||||||
owner: ownerID,
|
owner: ownerID,
|
||||||
|
@ -339,7 +304,7 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
||||||
return pool, nil
|
return pool, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) {
|
func startRebalance(ctx context.Context, p *Pool, options *BuilderOptions) {
|
||||||
ticker := time.NewTimer(options.ClientRebalanceInterval)
|
ticker := time.NewTimer(options.ClientRebalanceInterval)
|
||||||
buffers := make([][]float64, len(options.nodesParams))
|
buffers := make([][]float64, len(options.nodesParams))
|
||||||
for i, params := range options.nodesParams {
|
for i, params := range options.nodesParams {
|
||||||
|
@ -358,7 +323,7 @@ func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, buffers [][]float64) {
|
func updateNodesHealth(ctx context.Context, p *Pool, options *BuilderOptions, buffers [][]float64) {
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for i, inner := range p.innerPools {
|
for i, inner := range p.innerPools {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
@ -372,7 +337,7 @@ func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, bu
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateInnerNodesHealth(ctx context.Context, pool *pool, i int, options *BuilderOptions, bufferWeights []float64) {
|
func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options *BuilderOptions, bufferWeights []float64) {
|
||||||
if i > len(pool.innerPools)-1 {
|
if i > len(pool.innerPools)-1 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -450,7 +415,7 @@ func adjustWeights(weights []float64) []float64 {
|
||||||
return adjusted
|
return adjusted
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) Connection() (Client, *session.Token, error) {
|
func (p *Pool) Connection() (Client, *session.Token, error) {
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
@ -460,7 +425,7 @@ func (p *pool) Connection() (Client, *session.Token, error) {
|
||||||
return cp.client, tok, nil
|
return cp.client, tok, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) connection() (*clientPack, error) {
|
func (p *Pool) connection() (*clientPack, error) {
|
||||||
for _, inner := range p.innerPools {
|
for _, inner := range p.innerPools {
|
||||||
cp, err := inner.connection()
|
cp, err := inner.connection()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -492,7 +457,7 @@ func (p *innerPool) connection() (*clientPack, error) {
|
||||||
return nil, errors.New("no healthy client")
|
return nil, errors.New("no healthy client")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) OwnerID() *owner.ID {
|
func (p *Pool) OwnerID() *owner.ID {
|
||||||
return p.owner
|
return p.owner
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -501,7 +466,7 @@ func formCacheKey(address string, key *ecdsa.PrivateKey) string {
|
||||||
return address + k.String()
|
return address + k.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, error) {
|
func (p *Pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, error) {
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -534,7 +499,7 @@ func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, error) {
|
||||||
return cp, nil
|
return cp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) checkSessionTokenErr(err error, address string) bool {
|
func (p *Pool) checkSessionTokenErr(err error, address string) bool {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -566,7 +531,7 @@ func createSessionTokenForDuration(ctx context.Context, c Client, dur uint64) (*
|
||||||
return c.SessionCreate(ctx, prm)
|
return c.SessionCreate(ctx, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) removeSessionTokenAfterThreshold(cfg *callConfig) error {
|
func (p *Pool) removeSessionTokenAfterThreshold(cfg *callConfig) error {
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -603,7 +568,7 @@ type callContext struct {
|
||||||
sessionContext *session.ObjectContext
|
sessionContext *session.ObjectContext
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) initCallContext(ctx *callContext, cfg *callConfig) error {
|
func (p *Pool) initCallContext(ctx *callContext, cfg *callConfig) error {
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -639,7 +604,7 @@ type callContextWithRetry struct {
|
||||||
noRetry bool
|
noRetry bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) initCallContextWithRetry(ctx *callContextWithRetry, cfg *callConfig) error {
|
func (p *Pool) initCallContextWithRetry(ctx *callContextWithRetry, cfg *callConfig) error {
|
||||||
err := p.initCallContext(&ctx.callContext, cfg)
|
err := p.initCallContext(&ctx.callContext, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -653,7 +618,7 @@ func (p *pool) initCallContextWithRetry(ctx *callContextWithRetry, cfg *callConf
|
||||||
|
|
||||||
// opens new session or uses cached one.
|
// opens new session or uses cached one.
|
||||||
// Must be called only on initialized callContext with set sessionTarget.
|
// Must be called only on initialized callContext with set sessionTarget.
|
||||||
func (p *pool) openDefaultSession(ctx *callContext) error {
|
func (p *Pool) openDefaultSession(ctx *callContext) error {
|
||||||
cacheKey := formCacheKey(ctx.endpoint, ctx.key)
|
cacheKey := formCacheKey(ctx.endpoint, ctx.key)
|
||||||
|
|
||||||
tok := p.cache.Get(cacheKey)
|
tok := p.cache.Get(cacheKey)
|
||||||
|
@ -686,7 +651,7 @@ func (p *pool) openDefaultSession(ctx *callContext) error {
|
||||||
// session-related error (*), and retrying is enabled, then f is called once more.
|
// session-related error (*), and retrying is enabled, then f is called once more.
|
||||||
//
|
//
|
||||||
// (*) in this case cached token is removed.
|
// (*) in this case cached token is removed.
|
||||||
func (p *pool) callWithRetry(ctx *callContextWithRetry, f func() error) error {
|
func (p *Pool) callWithRetry(ctx *callContextWithRetry, f func() error) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if ctx.sessionDefault {
|
if ctx.sessionDefault {
|
||||||
|
@ -707,7 +672,7 @@ func (p *pool) callWithRetry(ctx *callContextWithRetry, f func() error) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) {
|
func (p *Pool) PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) {
|
||||||
cfg := cfgFromOpts(append(opts,
|
cfg := cfgFromOpts(append(opts,
|
||||||
useDefaultSession(),
|
useDefaultSession(),
|
||||||
useVerb(sessionv2.ObjectVerbPut),
|
useVerb(sessionv2.ObjectVerbPut),
|
||||||
|
@ -816,7 +781,7 @@ func (p *pool) PutObject(ctx context.Context, hdr object.Object, payload io.Read
|
||||||
return &id, nil
|
return &id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) DeleteObject(ctx context.Context, addr address.Address, opts ...CallOption) error {
|
func (p *Pool) DeleteObject(ctx context.Context, addr address.Address, opts ...CallOption) error {
|
||||||
cfg := cfgFromOpts(append(opts,
|
cfg := cfgFromOpts(append(opts,
|
||||||
useDefaultSession(),
|
useDefaultSession(),
|
||||||
useVerb(sessionv2.ObjectVerbDelete),
|
useVerb(sessionv2.ObjectVerbDelete),
|
||||||
|
@ -871,7 +836,7 @@ type ResGetObject struct {
|
||||||
Payload io.ReadCloser
|
Payload io.ReadCloser
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) {
|
func (p *Pool) GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) {
|
||||||
cfg := cfgFromOpts(append(opts,
|
cfg := cfgFromOpts(append(opts,
|
||||||
useDefaultSession(),
|
useDefaultSession(),
|
||||||
useVerb(sessionv2.ObjectVerbGet),
|
useVerb(sessionv2.ObjectVerbGet),
|
||||||
|
@ -923,7 +888,7 @@ func (p *pool) GetObject(ctx context.Context, addr address.Address, opts ...Call
|
||||||
return &res, nil
|
return &res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) HeadObject(ctx context.Context, addr address.Address, opts ...CallOption) (*object.Object, error) {
|
func (p *Pool) HeadObject(ctx context.Context, addr address.Address, opts ...CallOption) (*object.Object, error) {
|
||||||
cfg := cfgFromOpts(append(opts,
|
cfg := cfgFromOpts(append(opts,
|
||||||
useDefaultSession(),
|
useDefaultSession(),
|
||||||
useVerb(sessionv2.ObjectVerbHead),
|
useVerb(sessionv2.ObjectVerbHead),
|
||||||
|
@ -985,7 +950,7 @@ func (x *ResObjectRange) Close() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) ObjectRange(ctx context.Context, addr address.Address, off, ln uint64, opts ...CallOption) (*ResObjectRange, error) {
|
func (p *Pool) ObjectRange(ctx context.Context, addr address.Address, off, ln uint64, opts ...CallOption) (*ResObjectRange, error) {
|
||||||
cfg := cfgFromOpts(append(opts,
|
cfg := cfgFromOpts(append(opts,
|
||||||
useDefaultSession(),
|
useDefaultSession(),
|
||||||
useVerb(sessionv2.ObjectVerbRange),
|
useVerb(sessionv2.ObjectVerbRange),
|
||||||
|
@ -1061,7 +1026,7 @@ func (x *ResObjectSearch) Close() {
|
||||||
_, _ = x.r.Close()
|
_, _ = x.r.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) SearchObjects(ctx context.Context, idCnr cid.ID, filters object.SearchFilters, opts ...CallOption) (*ResObjectSearch, error) {
|
func (p *Pool) SearchObjects(ctx context.Context, idCnr cid.ID, filters object.SearchFilters, opts ...CallOption) (*ResObjectSearch, error) {
|
||||||
cfg := cfgFromOpts(append(opts,
|
cfg := cfgFromOpts(append(opts,
|
||||||
useDefaultSession(),
|
useDefaultSession(),
|
||||||
useVerb(sessionv2.ObjectVerbSearch),
|
useVerb(sessionv2.ObjectVerbSearch),
|
||||||
|
@ -1103,7 +1068,7 @@ func (p *pool) SearchObjects(ctx context.Context, idCnr cid.ID, filters object.S
|
||||||
return &res, nil
|
return &res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) {
|
func (p *Pool) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) {
|
||||||
cfg := cfgFromOpts(opts...)
|
cfg := cfgFromOpts(opts...)
|
||||||
cp, err := p.conn(ctx, cfg)
|
cp, err := p.conn(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1124,7 +1089,7 @@ func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts
|
||||||
return res.ID(), nil
|
return res.ID(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) {
|
func (p *Pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) {
|
||||||
cfg := cfgFromOpts(opts...)
|
cfg := cfgFromOpts(opts...)
|
||||||
cp, err := p.conn(ctx, cfg)
|
cp, err := p.conn(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1145,7 +1110,7 @@ func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption
|
||||||
return res.Container(), nil
|
return res.Container(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) {
|
func (p *Pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) {
|
||||||
cfg := cfgFromOpts(opts...)
|
cfg := cfgFromOpts(opts...)
|
||||||
cp, err := p.conn(ctx, cfg)
|
cp, err := p.conn(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1166,7 +1131,7 @@ func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...Ca
|
||||||
return res.Containers(), nil
|
return res.Containers(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error {
|
func (p *Pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error {
|
||||||
cfg := cfgFromOpts(opts...)
|
cfg := cfgFromOpts(opts...)
|
||||||
cp, err := p.conn(ctx, cfg)
|
cp, err := p.conn(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1190,7 +1155,7 @@ func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOpt
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) {
|
func (p *Pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) {
|
||||||
cfg := cfgFromOpts(opts...)
|
cfg := cfgFromOpts(opts...)
|
||||||
cp, err := p.conn(ctx, cfg)
|
cp, err := p.conn(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1211,7 +1176,7 @@ func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*e
|
||||||
return res.Table(), nil
|
return res.Table(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error {
|
func (p *Pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error {
|
||||||
cfg := cfgFromOpts(opts...)
|
cfg := cfgFromOpts(opts...)
|
||||||
cp, err := p.conn(ctx, cfg)
|
cp, err := p.conn(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1231,7 +1196,7 @@ func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOptio
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) Balance(ctx context.Context, o *owner.ID, opts ...CallOption) (*accounting.Decimal, error) {
|
func (p *Pool) Balance(ctx context.Context, o *owner.ID, opts ...CallOption) (*accounting.Decimal, error) {
|
||||||
cfg := cfgFromOpts(opts...)
|
cfg := cfgFromOpts(opts...)
|
||||||
cp, err := p.conn(ctx, cfg)
|
cp, err := p.conn(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1252,7 +1217,7 @@ func (p *pool) Balance(ctx context.Context, o *owner.ID, opts ...CallOption) (*a
|
||||||
return res.Amount(), nil
|
return res.Amount(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollParams *ContainerPollingParams) error {
|
func (p *Pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollParams *ContainerPollingParams) error {
|
||||||
conn, _, err := p.Connection()
|
conn, _, err := p.Connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -1286,14 +1251,14 @@ func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollPa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the pool and releases all the associated resources.
|
// Close closes the Pool and releases all the associated resources.
|
||||||
func (p *pool) Close() {
|
func (p *Pool) Close() {
|
||||||
p.cancel()
|
p.cancel()
|
||||||
<-p.closedCh
|
<-p.closedCh
|
||||||
}
|
}
|
||||||
|
|
||||||
// creates new session token from SessionCreate call result.
|
// creates new session token from SessionCreate call result.
|
||||||
func (p *pool) newSessionToken(cliRes *client.ResSessionCreate) *session.Token {
|
func (p *Pool) newSessionToken(cliRes *client.ResSessionCreate) *session.Token {
|
||||||
return sessionTokenForOwner(p.owner, cliRes)
|
return sessionTokenForOwner(p.owner, cliRes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -518,12 +518,9 @@ func TestSessionTokenOwner(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
pp, err := pb.Build(ctx, opts)
|
p, err := pb.Build(ctx, opts)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
t.Cleanup(pp.Close)
|
t.Cleanup(p.Close)
|
||||||
|
|
||||||
p, ok := pp.(*pool)
|
|
||||||
require.True(t, ok)
|
|
||||||
|
|
||||||
anonKey := newPrivateKey(t)
|
anonKey := newPrivateKey(t)
|
||||||
anonOwner := owner.NewIDFromPublicKey(&anonKey.PublicKey)
|
anonOwner := owner.NewIDFromPublicKey(&anonKey.PublicKey)
|
||||||
|
@ -556,7 +553,7 @@ func TestWaitPresence(t *testing.T) {
|
||||||
}},
|
}},
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &pool{
|
p := &Pool{
|
||||||
innerPools: []*innerPool{inner},
|
innerPools: []*innerPool{inner},
|
||||||
key: newPrivateKey(t),
|
key: newPrivateKey(t),
|
||||||
cache: cache,
|
cache: cache,
|
||||||
|
|
|
@ -81,7 +81,7 @@ func TestHealthyReweight(t *testing.T) {
|
||||||
{client: newNetmapMock(names[0], true), healthy: true, address: "address0"},
|
{client: newNetmapMock(names[0], true), healthy: true, address: "address0"},
|
||||||
{client: newNetmapMock(names[1], false), healthy: true, address: "address1"}},
|
{client: newNetmapMock(names[1], false), healthy: true, address: "address1"}},
|
||||||
}
|
}
|
||||||
p := &pool{
|
p := &Pool{
|
||||||
innerPools: []*innerPool{inner},
|
innerPools: []*innerPool{inner},
|
||||||
cache: cache,
|
cache: cache,
|
||||||
key: newPrivateKey(t),
|
key: newPrivateKey(t),
|
||||||
|
@ -129,7 +129,7 @@ func TestHealthyNoReweight(t *testing.T) {
|
||||||
{client: newNetmapMock(names[0], false), healthy: true},
|
{client: newNetmapMock(names[0], false), healthy: true},
|
||||||
{client: newNetmapMock(names[1], false), healthy: true}},
|
{client: newNetmapMock(names[1], false), healthy: true}},
|
||||||
}
|
}
|
||||||
p := &pool{
|
p := &Pool{
|
||||||
innerPools: []*innerPool{inner},
|
innerPools: []*innerPool{inner},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue