diff --git a/pool/pool.go b/pool/pool.go index 7392e48..30c5219 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -96,7 +96,7 @@ type clientStatus interface { address() string // currentErrorRate returns current errors rate. // After specific threshold connection is considered as unhealthy. - // Pool.startRebalance routine can make this connection healthy again. + // Pool.rebalance routine can make this connection healthy again. currentErrorRate() uint32 // overallErrorRate returns the number of all happened errors. overallErrorRate() uint64 @@ -293,7 +293,7 @@ func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) { } // setErrorThreshold sets threshold after reaching which connection is considered unhealthy -// until Pool.startRebalance routing updates its status. +// until Pool.rebalance routing updates its status. func (x *wrapperPrm) setErrorThreshold(threshold uint32) { x.errorThreshold = threshold } @@ -1814,9 +1814,201 @@ type resCreateSession struct { // // See pool package overview to get some examples. type Pool struct { + pool atomic.Pointer[pool] +} + +// NewPool creates connection pool using parameters. +func NewPool(options InitParameters) (*Pool, error) { + pool, err := newPool(options) + if err != nil { + return nil, err + } + + var p Pool + p.pool.Store(pool) + return &p, nil +} + +// Balance requests current balance of the FrostFS account. +// +// Main return value MUST NOT be processed on an erroneous return. +func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) { + return p.pool.Load().Balance(ctx, prm) +} + +// Close closes the Pool and releases all the associated resources. +func (p *Pool) Close() { + p.pool.Load().Close() +} + +// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete. +// +// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used: +// +// polling interval: 5s +// waiting timeout: 120s +// +// Success can be verified by reading by identifier (see GetContainer). +func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error { + return p.pool.Load().DeleteContainer(ctx, prm) +} + +// DeleteObject marks an object for deletion from the container using FrostFS API protocol. +// As a marker, a special unit called a tombstone is placed in the container. +// It confirms the user's intent to delete the object, and is itself a container object. +// Explicit deletion is done asynchronously, and is generally not guaranteed. +func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { + return p.pool.Load().DeleteObject(ctx, prm) +} + +// Dial establishes a connection to the servers from the FrostFS network. +// It also starts a routine that checks the health of the nodes and +// updates the weights of the nodes for balancing. +// Returns an error describing failure reason. +// +// If failed, the Pool SHOULD NOT be used. +// +// See also InitParameters.SetClientRebalanceInterval. +func (p *Pool) Dial(ctx context.Context) error { + pool := p.pool.Load() + err := pool.Dial(ctx) + pool.startRebalance(ctx) + return err +} + +// FindSiblingByParentID implements relations.Relations. +func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) { + return p.pool.Load().FindSiblingByParentID(ctx, cnrID, objID, tokens) +} + +// FindSiblingBySplitID implements relations.Relations. +func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) { + return p.pool.Load().FindSiblingBySplitID(ctx, cnrID, splitID, tokens) +} + +// GetContainer reads FrostFS container by ID. +// +// Main return value MUST NOT be processed on an erroneous return. +func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) { + return p.pool.Load().GetContainer(ctx, prm) +} + +// GetEACL reads eACL table of the FrostFS container. +// +// Main return value MUST NOT be processed on an erroneous return. +func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) { + return p.pool.Load().GetEACL(ctx, prm) +} + +// GetLeftSibling implements relations.Relations. +func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) { + return p.pool.Load().GetLeftSibling(ctx, cnrID, objID, tokens) +} + +// GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol. +// +// Main return value MUST NOT be processed on an erroneous return. +func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) { + return p.pool.Load().GetObject(ctx, prm) +} + +// GetSplitInfo implements relations.Relations. +func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) { + return p.pool.Load().GetSplitInfo(ctx, cnrID, objID, tokens) +} + +// HeadObject reads object header through a remote server using FrostFS API protocol. +// +// Main return value MUST NOT be processed on an erroneous return. +func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) { + return p.pool.Load().HeadObject(ctx, prm) +} + +// ListChildrenByLinker implements relations.Relations. +func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) { + return p.pool.Load().ListChildrenByLinker(ctx, cnrID, objID, tokens) +} + +// ListContainers requests identifiers of the account-owned containers. +func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) { + return p.pool.Load().ListContainers(ctx, prm) +} + +// NetMapSnapshot requests information about the FrostFS network map. +// +// Main return value MUST NOT be processed on an erroneous return. +func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) { + return p.pool.Load().NetMapSnapshot(ctx) +} + +// NetworkInfo requests information about the FrostFS network of which the remote server is a part. +// +// Main return value MUST NOT be processed on an erroneous return. +func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) { + return p.pool.Load().NetworkInfo(ctx) +} + +// ObjectRange initiates reading an object's payload range through a remote +// server using FrostFS API protocol. +// +// Main return value MUST NOT be processed on an erroneous return. +func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) { + return p.pool.Load().ObjectRange(ctx, prm) +} + +// PutContainer sends request to save container in FrostFS and waits for the operation to complete. +// +// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used: +// +// polling interval: 5s +// waiting timeout: 120s +// +// Success can be verified by reading by identifier (see GetContainer). +// +// Main return value MUST NOT be processed on an erroneous return. +func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) { + return p.pool.Load().PutContainer(ctx, prm) +} + +// PutObject writes an object through a remote server using FrostFS API protocol. +// +// Main return value MUST NOT be processed on an erroneous return. +func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { + return p.pool.Load().PutObject(ctx, prm) +} + +// SearchObjects initiates object selection through a remote server using FrostFS API protocol. +// +// The call only opens the transmission channel, explicit fetching of matched objects +// is done using the ResObjectSearch. Resulting reader must be finally closed. +// +// Main return value MUST NOT be processed on an erroneous return. +func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) { + return p.pool.Load().SearchObjects(ctx, prm) +} + +// SetEACL sends request to update eACL table of the FrostFS container and waits for the operation to complete. +// +// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used: +// +// polling interval: 5s +// waiting timeout: 120s +// +// Success can be verified by reading by identifier (see GetEACL). +func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error { + return p.pool.Load().SetEACL(ctx, prm) +} + +// Statistic returns connection statistics. +func (p *Pool) Statistic() Statistic { + return p.pool.Load().Statistic() +} + +type pool struct { innerPools []*innerPool key *ecdsa.PrivateKey cancel context.CancelFunc + cancelLock *sync.Mutex closedCh chan struct{} cache *sessionCache stokenDuration uint64 @@ -1845,8 +2037,7 @@ const ( defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB ) -// NewPool creates connection pool using parameters. -func NewPool(options InitParameters) (*Pool, error) { +func newPool(options InitParameters) (*pool, error) { if options.key == nil { return nil, fmt.Errorf("missed required parameter 'Key'") } @@ -1863,7 +2054,7 @@ func NewPool(options InitParameters) (*Pool, error) { fillDefaultInitParams(&options, cache) - pool := &Pool{ + pool := &pool{ key: options.key, cache: cache, logger: options.logger, @@ -1875,26 +2066,46 @@ func NewPool(options InitParameters) (*Pool, error) { sessionExpirationDuration: options.sessionExpirationDuration, }, clientBuilder: options.clientBuilder, + cancelLock: &sync.Mutex{}, } return pool, nil } -// Dial establishes a connection to the servers from the FrostFS network. -// It also starts a routine that checks the health of the nodes and -// updates the weights of the nodes for balancing. -// Returns an error describing failure reason. -// -// If failed, the Pool SHOULD NOT be used. -// -// See also InitParameters.SetClientRebalanceInterval. -func (p *Pool) Dial(ctx context.Context) error { +func (p *pool) Dial(ctx context.Context) error { + p.stopRebalance() + + err := p.dial(ctx, nil) + if err != nil { + return err + } + + ni, err := p.NetworkInfo(ctx) + if err != nil { + return fmt.Errorf("get network info for max object size: %w", err) + } + p.maxObjectSize = ni.MaxObjectSize() + + return nil +} + +// dial initializes clients in accordance with p.rebalanceParams.nodesParams. +// existingClients is optional. After dial is executed, existingClients will contain only unused clients. +func (p *pool) dial(ctx context.Context, existingClients map[string]client) error { inner := make([]*innerPool, len(p.rebalanceParams.nodesParams)) var atLeastOneHealthy bool for i, params := range p.rebalanceParams.nodesParams { clients := make([]client, len(params.weights)) for j, addr := range params.addresses { + if client, ok := existingClients[addr]; ok { + clients[j] = client + atLeastOneHealthy = true + + delete(existingClients, addr) + continue + } + clients[j] = p.clientBuilder(addr) if err := clients[j].dial(ctx); err != nil { p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err)) @@ -1926,22 +2137,12 @@ func (p *Pool) Dial(ctx context.Context) error { return fmt.Errorf("at least one node must be healthy") } - ctx, cancel := context.WithCancel(ctx) - p.cancel = cancel - p.closedCh = make(chan struct{}) p.innerPools = inner - ni, err := p.NetworkInfo(ctx) - if err != nil { - return fmt.Errorf("get network info for max object size: %w", err) - } - p.maxObjectSize = ni.MaxObjectSize() - - go p.startRebalance(ctx) return nil } -func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) { +func (p *pool) log(level zapcore.Level, msg string, fields ...zap.Field) { if p.logger == nil { return } @@ -2023,8 +2224,26 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) { return nodesParams, nil } -// startRebalance runs loop to monitor connection healthy status. -func (p *Pool) startRebalance(ctx context.Context) { +func (p *pool) startRebalance(ctx context.Context) { + p.cancelLock.Lock() + defer p.cancelLock.Unlock() + + // stop rebalance + if p.cancel != nil { + p.cancel() + <-p.closedCh + } + + rebalanceCtx, cancel := context.WithCancel(ctx) + + p.closedCh = make(chan struct{}) + p.cancel = cancel + + go p.rebalance(rebalanceCtx) +} + +// rebalance runs loop to monitor connection healthy status. +func (p *pool) rebalance(ctx context.Context) { ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval) buffers := make([][]float64, len(p.rebalanceParams.nodesParams)) for i, params := range p.rebalanceParams.nodesParams { @@ -2043,7 +2262,17 @@ func (p *Pool) startRebalance(ctx context.Context) { } } -func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) { +func (p *pool) stopRebalance() { + p.cancelLock.Lock() + defer p.cancelLock.Unlock() + + if p.cancel != nil { + p.cancel() + <-p.closedCh + } +} + +func (p *pool) updateNodesHealth(ctx context.Context, buffers [][]float64) { wg := sync.WaitGroup{} for i, inner := range p.innerPools { wg.Add(1) @@ -2057,12 +2286,11 @@ func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) { wg.Wait() } -func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) { +func (p *pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) { if i > len(p.innerPools)-1 { return } pool := p.innerPools[i] - options := p.rebalanceParams healthyChanged := new(atomic.Bool) wg := sync.WaitGroup{} @@ -2072,12 +2300,12 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights go func(j int, cli client) { defer wg.Done() - tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) + tctx, c := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout) defer c() healthy, changed := cli.restartIfUnhealthy(tctx) if healthy { - bufferWeights[j] = options.nodesParams[i].weights[j] + bufferWeights[j] = p.rebalanceParams.nodesParams[i].weights[j] } else { bufferWeights[j] = 0 p.cache.DeleteByPrefix(cli.address()) @@ -2116,7 +2344,7 @@ func adjustWeights(weights []float64) []float64 { return adjusted } -func (p *Pool) connection() (client, error) { +func (p *pool) connection() (client, error) { for _, inner := range p.innerPools { cp, err := inner.connection() if err == nil { @@ -2159,7 +2387,7 @@ func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string return address + stype + k.String() } -func (p *Pool) checkSessionTokenErr(err error, address string) bool { +func (p *pool) checkSessionTokenErr(err error, address string) bool { if err == nil { return false } @@ -2238,7 +2466,7 @@ type callContext struct { sessionClientCut bool } -func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error { +func (p *pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error { cp, err := p.connection() if err != nil { return err @@ -2271,7 +2499,7 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex // opens new session or uses cached one. // Must be called only on initialized callContext with set sessionTarget. -func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error { +func (p *pool) openDefaultSession(ctx context.Context, cc *callContext) error { cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut) tok, ok := p.cache.Get(cacheKey) @@ -2305,7 +2533,7 @@ func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error { // opens default session (if sessionDefault is set), and calls f. If f returns // session-related error then cached token is removed. -func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error { +func (p *pool) call(ctx context.Context, cc *callContext, f func() error) error { var err error if cc.sessionDefault { @@ -2322,16 +2550,13 @@ func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error } // fillAppropriateKey use pool key if caller didn't specify its own. -func (p *Pool) fillAppropriateKey(prm *prmCommon) { +func (p *pool) fillAppropriateKey(prm *prmCommon) { if prm.key == nil { prm.key = p.key } } -// PutObject writes an object through a remote server using FrostFS API protocol. -// -// Main return value MUST NOT be processed on an erroneous return. -func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { +func (p *pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) { cnr, _ := prm.hdr.ContainerID() var prmCtx prmContext @@ -2371,11 +2596,7 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) return id, nil } -// DeleteObject marks an object for deletion from the container using FrostFS API protocol. -// As a marker, a special unit called a tombstone is placed in the container. -// It confirms the user's intent to delete the object, and is itself a container object. -// Explicit deletion is done asynchronously, and is generally not guaranteed. -func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { +func (p *pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { var prmCtx prmContext prmCtx.useDefaultSession() prmCtx.useVerb(session.VerbObjectDelete) @@ -2441,10 +2662,7 @@ type ResGetObject struct { Payload io.ReadCloser } -// GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol. -// -// Main return value MUST NOT be processed on an erroneous return. -func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) { +func (p *pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) { p.fillAppropriateKey(&prm.prmCommon) var cc callContext @@ -2466,10 +2684,7 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, e }) } -// HeadObject reads object header through a remote server using FrostFS API protocol. -// -// Main return value MUST NOT be processed on an erroneous return. -func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) { +func (p *pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) { p.fillAppropriateKey(&prm.prmCommon) var cc callContext @@ -2516,11 +2731,7 @@ func (x *ResObjectRange) Close() error { return err } -// ObjectRange initiates reading an object's payload range through a remote -// server using FrostFS API protocol. -// -// Main return value MUST NOT be processed on an erroneous return. -func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) { +func (p *pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) { p.fillAppropriateKey(&prm.prmCommon) var cc callContext @@ -2578,13 +2789,7 @@ func (x *ResObjectSearch) Close() { _, _ = x.r.Close() } -// SearchObjects initiates object selection through a remote server using FrostFS API protocol. -// -// The call only opens the transmission channel, explicit fetching of matched objects -// is done using the ResObjectSearch. Resulting reader must be finally closed. -// -// Main return value MUST NOT be processed on an erroneous return. -func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) { +func (p *pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) { p.fillAppropriateKey(&prm.prmCommon) var cc callContext @@ -2606,17 +2811,7 @@ func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjec }) } -// PutContainer sends request to save container in FrostFS and waits for the operation to complete. -// -// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used: -// -// polling interval: 5s -// waiting timeout: 120s -// -// Success can be verified by reading by identifier (see GetContainer). -// -// Main return value MUST NOT be processed on an erroneous return. -func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) { +func (p *pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) { cp, err := p.connection() if err != nil { return cid.ID{}, err @@ -2630,10 +2825,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, e return cnrID, nil } -// GetContainer reads FrostFS container by ID. -// -// Main return value MUST NOT be processed on an erroneous return. -func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) { +func (p *pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) { cp, err := p.connection() if err != nil { return container.Container{}, err @@ -2647,8 +2839,7 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container return cnrs, nil } -// ListContainers requests identifiers of the account-owned containers. -func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) { +func (p *pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) { cp, err := p.connection() if err != nil { return nil, err @@ -2662,15 +2853,7 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid. return cnrIDs, nil } -// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete. -// -// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used: -// -// polling interval: 5s -// waiting timeout: 120s -// -// Success can be verified by reading by identifier (see GetContainer). -func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error { +func (p *pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error { cp, err := p.connection() if err != nil { return err @@ -2684,10 +2867,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro return nil } -// GetEACL reads eACL table of the FrostFS container. -// -// Main return value MUST NOT be processed on an erroneous return. -func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) { +func (p *pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) { cp, err := p.connection() if err != nil { return eacl.Table{}, err @@ -2701,15 +2881,7 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, e return eaclResult, nil } -// SetEACL sends request to update eACL table of the FrostFS container and waits for the operation to complete. -// -// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used: -// -// polling interval: 5s -// waiting timeout: 120s -// -// Success can be verified by reading by identifier (see GetEACL). -func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error { +func (p *pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error { cp, err := p.connection() if err != nil { return err @@ -2723,10 +2895,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error { return nil } -// Balance requests current balance of the FrostFS account. -// -// Main return value MUST NOT be processed on an erroneous return. -func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) { +func (p *pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) { cp, err := p.connection() if err != nil { return accounting.Decimal{}, err @@ -2740,8 +2909,7 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decim return balance, nil } -// Statistic returns connection statistics. -func (p Pool) Statistic() Statistic { +func (p *pool) Statistic() Statistic { stat := Statistic{} for _, inner := range p.innerPools { nodes := make([]string, 0, len(inner.clients)) @@ -2818,10 +2986,7 @@ func waitFor(ctx context.Context, params *WaitParams, condition func(context.Con } } -// NetworkInfo requests information about the FrostFS network of which the remote server is a part. -// -// Main return value MUST NOT be processed on an erroneous return. -func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) { +func (p *pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) { cp, err := p.connection() if err != nil { return netmap.NetworkInfo{}, err @@ -2835,10 +3000,7 @@ func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) { return netInfo, nil } -// NetMapSnapshot requests information about the FrostFS network map. -// -// Main return value MUST NOT be processed on an erroneous return. -func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) { +func (p *pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) { cp, err := p.connection() if err != nil { return netmap.NetMap{}, err @@ -2852,12 +3014,8 @@ func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) { return netMap, nil } -// Close closes the Pool and releases all the associated resources. -func (p *Pool) Close() { - p.cancel() - <-p.closedCh - - // close all clients +func (p *pool) Close() { + p.stopRebalance() for _, pools := range p.innerPools { for _, cli := range pools.clients { if cli.isDialed() { @@ -2875,7 +3033,7 @@ func (p *Pool) Close() { // // Returns any error that does not allow reading configuration // from the network. -func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *Pool) error { +func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *pool) error { ni, err := p.NetworkInfo(ctx) if err != nil { return fmt.Errorf("network info: %w", err) @@ -2886,8 +3044,7 @@ func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p * return nil } -// GetSplitInfo implements relations.Relations. -func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) { +func (p *pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) { var addr oid.Address addr.SetContainer(cnrID) addr.SetObject(objID) @@ -2916,8 +3073,7 @@ func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tok } } -// ListChildrenByLinker implements relations.Relations. -func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) { +func (p *pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) { var addr oid.Address addr.SetContainer(cnrID) addr.SetObject(objID) @@ -2939,8 +3095,7 @@ func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid return res.Children(), nil } -// GetLeftSibling implements relations.Relations. -func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) { +func (p *pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) { var addr oid.Address addr.SetContainer(cnrID) addr.SetObject(objID) @@ -2966,8 +3121,7 @@ func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, t return idMember, nil } -// FindSiblingBySplitID implements relations.Relations. -func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) { +func (p *pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) { var query object.SearchFilters query.AddSplitIDFilter(object.MatchStringEqual, splitID) @@ -2998,8 +3152,7 @@ func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID * return members, nil } -// FindSiblingByParentID implements relations.Relations. -func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) { +func (p *pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) { var query object.SearchFilters query.AddParentIDFilter(object.MatchStringEqual, objID) diff --git a/pool/pool_test.go b/pool/pool_test.go index c8721b9..03b1b2a 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -101,11 +101,11 @@ func TestBuildPoolOneNodeFailed(t *testing.T) { expectedAuthKey := frostfsecdsa.PublicKey(clientKeys[1].PublicKey) condition := func() bool { - cp, err := clientPool.connection() + cp, err := clientPool.pool.Load().connection() if err != nil { return false } - st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key, false)) + st, _ := clientPool.pool.Load().cache.Get(formCacheKey(cp.address(), clientPool.pool.Load().key, false)) return st.AssertAuthKey(&expectedAuthKey) } require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond) @@ -138,9 +138,9 @@ func TestOneNode(t *testing.T) { require.NoError(t, err) t.Cleanup(pool.Close) - cp, err := pool.connection() + cp, err := pool.pool.Load().connection() require.NoError(t, err) - st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) + st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false)) expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey) require.True(t, st.AssertAuthKey(&expectedAuthKey)) } @@ -168,9 +168,9 @@ func TestTwoNodes(t *testing.T) { require.NoError(t, err) t.Cleanup(pool.Close) - cp, err := pool.connection() + cp, err := pool.pool.Load().connection() require.NoError(t, err) - st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) + st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false)) require.True(t, assertAuthKeyForAny(st, clientKeys)) } @@ -223,9 +223,9 @@ func TestOneOfTwoFailed(t *testing.T) { time.Sleep(2 * time.Second) for i := 0; i < 5; i++ { - cp, err := pool.connection() + cp, err := pool.pool.Load().connection() require.NoError(t, err) - st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) + st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false)) require.True(t, assertAuthKeyForAny(st, clientKeys)) } } @@ -259,7 +259,7 @@ func TestTwoFailed(t *testing.T) { time.Sleep(2 * time.Second) - _, err = pool.connection() + _, err = pool.pool.Load().connection() require.Error(t, err) require.Contains(t, err.Error(), "no healthy") } @@ -293,9 +293,9 @@ func TestSessionCache(t *testing.T) { t.Cleanup(pool.Close) // cache must contain session token - cp, err := pool.connection() + cp, err := pool.pool.Load().connection() require.NoError(t, err) - st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) + st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false)) require.True(t, st.AssertAuthKey(&expectedAuthKey)) var prm PrmObjectGet @@ -306,9 +306,9 @@ func TestSessionCache(t *testing.T) { require.Error(t, err) // cache must not contain session token - cp, err = pool.connection() + cp, err = pool.pool.Load().connection() require.NoError(t, err) - _, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) + _, ok := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false)) require.False(t, ok) var prm2 PrmObjectPut @@ -318,9 +318,9 @@ func TestSessionCache(t *testing.T) { require.NoError(t, err) // cache must contain session token - cp, err = pool.connection() + cp, err = pool.pool.Load().connection() require.NoError(t, err) - st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) + st, _ = pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false)) require.True(t, st.AssertAuthKey(&expectedAuthKey)) } @@ -362,17 +362,17 @@ func TestPriority(t *testing.T) { expectedAuthKey1 := frostfsecdsa.PublicKey(clientKeys[0].PublicKey) firstNode := func() bool { - cp, err := pool.connection() + cp, err := pool.pool.Load().connection() require.NoError(t, err) - st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) + st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false)) return st.AssertAuthKey(&expectedAuthKey1) } expectedAuthKey2 := frostfsecdsa.PublicKey(clientKeys[1].PublicKey) secondNode := func() bool { - cp, err := pool.connection() + cp, err := pool.pool.Load().connection() require.NoError(t, err) - st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) + st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false)) return st.AssertAuthKey(&expectedAuthKey2) } require.Never(t, secondNode, time.Second, 200*time.Millisecond) @@ -407,9 +407,9 @@ func TestSessionCacheWithKey(t *testing.T) { require.NoError(t, err) // cache must contain session token - cp, err := pool.connection() + cp, err := pool.pool.Load().connection() require.NoError(t, err) - st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false)) + st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false)) require.True(t, st.AssertAuthKey(&expectedAuthKey)) var prm PrmObjectDelete @@ -419,7 +419,7 @@ func TestSessionCacheWithKey(t *testing.T) { err = pool.DeleteObject(ctx, prm) require.NoError(t, err) - st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey, false)) + st, _ = pool.pool.Load().cache.Get(formCacheKey(cp.address(), anonKey, false)) require.True(t, st.AssertAuthKey(&expectedAuthKey)) } @@ -460,10 +460,10 @@ func TestSessionTokenOwner(t *testing.T) { cc.sessionTarget = func(tok session.Object) { tkn = tok } - err = p.initCallContext(&cc, prm, prmCtx) + err = p.pool.Load().initCallContext(&cc, prm, prmCtx) require.NoError(t, err) - err = p.openDefaultSession(ctx, &cc) + err = p.pool.Load().openDefaultSession(ctx, &cc) require.NoError(t, err) require.True(t, tkn.VerifySignature()) require.True(t, tkn.Issuer().Equals(anonOwner)) @@ -708,14 +708,14 @@ func TestSwitchAfterErrorThreshold(t *testing.T) { t.Cleanup(pool.Close) for i := 0; i < errorThreshold; i++ { - conn, err := pool.connection() + conn, err := pool.pool.Load().connection() require.NoError(t, err) require.Equal(t, nodes[0].address, conn.address()) _, err = conn.objectGet(ctx, PrmObjectGet{}) require.Error(t, err) } - conn, err := pool.connection() + conn, err := pool.pool.Load().connection() require.NoError(t, err) require.Equal(t, nodes[1].address, conn.address()) _, err = conn.objectGet(ctx, PrmObjectGet{}) diff --git a/pool/sampler_test.go b/pool/sampler_test.go index 5ea2326..f85697e 100644 --- a/pool/sampler_test.go +++ b/pool/sampler_test.go @@ -59,7 +59,7 @@ func TestHealthyReweight(t *testing.T) { sampler: newSampler(weights, rand.NewSource(0)), clients: []client{client1, client2}, } - p := &Pool{ + p := &pool{ innerPools: []*innerPool{inner}, cache: cache, key: newPrivateKey(t), @@ -108,7 +108,7 @@ func TestHealthyNoReweight(t *testing.T) { newMockClient(names[1], *newPrivateKey(t)), }, } - p := &Pool{ + p := &pool{ innerPools: []*innerPool{inner}, rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}}, }