Add pool Update #204

Open
achuprov wants to merge 2 commits from achuprov/frostfs-sdk-go:feat/pool_update into master
3 changed files with 583 additions and 165 deletions

View file

@ -96,7 +96,7 @@ type clientStatus interface {
address() string
// currentErrorRate returns current errors rate.
// After specific threshold connection is considered as unhealthy.
// Pool.startRebalance routine can make this connection healthy again.
// Pool.rebalance routine can make this connection healthy again.
currentErrorRate() uint32
// overallErrorRate returns the number of all happened errors.
overallErrorRate() uint64
@ -293,7 +293,7 @@ func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) {
}
// setErrorThreshold sets threshold after reaching which connection is considered unhealthy
// until Pool.startRebalance routing updates its status.
// until Pool.rebalance routing updates its status.
func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
x.errorThreshold = threshold
}
@ -1814,9 +1814,232 @@ type resCreateSession struct {
//
// See pool package overview to get some examples.
type Pool struct {
pool atomic.Pointer[pool]
}
// NewPool creates connection pool using parameters.
func NewPool(options InitParameters) (*Pool, error) {
pool, err := newPool(options)
if err != nil {
return nil, err
}
var p Pool
p.pool.Store(pool)
return &p, nil
}
// Balance requests current balance of the FrostFS account.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
return p.pool.Load().Balance(ctx, prm)
}
// Close closes the Pool and releases all the associated resources.
func (p *Pool) Close() {
p.pool.Load().Close()
}
// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetContainer).
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
return p.pool.Load().DeleteContainer(ctx, prm)
}
// DeleteObject marks an object for deletion from the container using FrostFS API protocol.
// As a marker, a special unit called a tombstone is placed in the container.
// It confirms the user's intent to delete the object, and is itself a container object.
// Explicit deletion is done asynchronously, and is generally not guaranteed.
func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
return p.pool.Load().DeleteObject(ctx, prm)
}
// Dial establishes a connection to the servers from the FrostFS network.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed, the Pool SHOULD NOT be used.
//
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
pool := p.pool.Load()
if err := pool.Dial(ctx); err != nil {
return err
}
pool.cancelLock.Lock()
if pool.cancel != nil {
pool.cancel()
}
pool.cancelLock.Unlock()
pool.startRebalance(ctx)
return nil
}
// FindSiblingByParentID implements relations.Relations.
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
return p.pool.Load().FindSiblingByParentID(ctx, cnrID, objID, tokens)
}
// FindSiblingBySplitID implements relations.Relations.
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
return p.pool.Load().FindSiblingBySplitID(ctx, cnrID, splitID, tokens)
}
// GetContainer reads FrostFS container by ID.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
return p.pool.Load().GetContainer(ctx, prm)
}
// GetEACL reads eACL table of the FrostFS container.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
return p.pool.Load().GetEACL(ctx, prm)
}
// GetLeftSibling implements relations.Relations.
func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
return p.pool.Load().GetLeftSibling(ctx, cnrID, objID, tokens)
}
// GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
return p.pool.Load().GetObject(ctx, prm)
}
// GetSplitInfo implements relations.Relations.
func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
return p.pool.Load().GetSplitInfo(ctx, cnrID, objID, tokens)
}
// HeadObject reads object header through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
return p.pool.Load().HeadObject(ctx, prm)
}
// ListChildrenByLinker implements relations.Relations.
func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
return p.pool.Load().ListChildrenByLinker(ctx, cnrID, objID, tokens)
}
// ListContainers requests identifiers of the account-owned containers.
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
return p.pool.Load().ListContainers(ctx, prm)
}
// NetMapSnapshot requests information about the FrostFS network map.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
return p.pool.Load().NetMapSnapshot(ctx)
}
// NetworkInfo requests information about the FrostFS network of which the remote server is a part.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
return p.pool.Load().NetworkInfo(ctx)
}
// ObjectRange initiates reading an object's payload range through a remote
// server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
return p.pool.Load().ObjectRange(ctx, prm)
}
// PutContainer sends request to save container in FrostFS and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetContainer).
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
return p.pool.Load().PutContainer(ctx, prm)
}
// PutObject writes an object through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
return p.pool.Load().PutObject(ctx, prm)
}
// SearchObjects initiates object selection through a remote server using FrostFS API protocol.
//
// The call only opens the transmission channel, explicit fetching of matched objects
// is done using the ResObjectSearch. Resulting reader must be finally closed.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
return p.pool.Load().SearchObjects(ctx, prm)
}
// SetEACL sends request to update eACL table of the FrostFS container and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetEACL).
func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
return p.pool.Load().SetEACL(ctx, prm)
}
// Statistic returns connection statistics.
func (p *Pool) Statistic() Statistic {
return p.pool.Load().Statistic()
}
// Update is a method that lets you refresh the list of nodes without recreating the pool.
// Use a long-lived context to avoid early rebalance stop.
// Can interrupt an operation being performed on a node that was removed.
// Ensures that:
// 1) Preserved connections would not be closed.
// 2) In the event of an error, the pool remains operational.
func (p *Pool) Update(ctx context.Context, prm []NodeParam) error {
pool := p.pool.Load()
newPool, equal, err := pool.update(ctx, prm)
if equal || err != nil {
return err
}
newPool.startRebalance(ctx)
oldPool := p.pool.Swap(newPool)
oldPool.stopRebalance()
return nil
}
type pool struct {
innerPools []*innerPool
key *ecdsa.PrivateKey
cancel context.CancelFunc
cancelLock *sync.Mutex
closedCh chan struct{}
cache *sessionCache
stokenDuration uint64
@ -1845,8 +2068,7 @@ const (
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
)
// NewPool creates connection pool using parameters.
func NewPool(options InitParameters) (*Pool, error) {
func newPool(options InitParameters) (*pool, error) {
if options.key == nil {
return nil, fmt.Errorf("missed required parameter 'Key'")
}
@ -1863,7 +2085,7 @@ func NewPool(options InitParameters) (*Pool, error) {
fillDefaultInitParams(&options, cache)
pool := &Pool{
pool := &pool{
key: options.key,
cache: cache,
logger: options.logger,
@ -1875,26 +2097,44 @@ func NewPool(options InitParameters) (*Pool, error) {
sessionExpirationDuration: options.sessionExpirationDuration,
},
clientBuilder: options.clientBuilder,
cancelLock: &sync.Mutex{},
}
return pool, nil
}
// Dial establishes a connection to the servers from the FrostFS network.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed, the Pool SHOULD NOT be used.
//
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
func (p *pool) Dial(ctx context.Context) error {
err := p.dial(ctx, nil)
if err != nil {
return err
}
ni, err := p.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info for max object size: %w", err)
}
p.maxObjectSize = ni.MaxObjectSize()
return nil
}
// dial initializes clients in accordance with p.rebalanceParams.nodesParams.
// existingClients is optional. After dial is executed, existingClients will contain only unused clients.
func (p *pool) dial(ctx context.Context, existingClients map[string]client) error {
inner := make([]*innerPool, len(p.rebalanceParams.nodesParams))
var atLeastOneHealthy bool
for i, params := range p.rebalanceParams.nodesParams {
clients := make([]client, len(params.weights))
for j, addr := range params.addresses {
if client, ok := existingClients[addr]; ok {
clients[j] = client
atLeastOneHealthy = true
delete(existingClients, addr)
continue
}
clients[j] = p.clientBuilder(addr)
if err := clients[j].dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
@ -1926,22 +2166,70 @@ func (p *Pool) Dial(ctx context.Context) error {
return fmt.Errorf("at least one node must be healthy")
}
ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel
p.closedCh = make(chan struct{})
p.innerPools = inner
ni, err := p.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info for max object size: %w", err)
}
p.maxObjectSize = ni.MaxObjectSize()
go p.startRebalance(ctx)
return nil
}
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
func nodesParamEqual(a, b []*nodesParam) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v.priority != b[i].priority || len(v.addresses) != len(b[i].addresses) {
return false
}
for j, address := range v.addresses {
if address != b[i].addresses[j] {
return false
}
}
}
return true
}
// Update requires that no other parallel operations are executed concurrently on the pool instance.
func (p *pool) update(ctx context.Context, prm []NodeParam) (*pool, bool, error) {
newPool := *p
existingClients := make(map[string]client)
for i, pool := range newPool.rebalanceParams.nodesParams {
for j := range pool.weights {
existingClients[pool.addresses[j]] = newPool.innerPools[i].clients[j]
}
}
nodesParams, err := adjustNodeParams(prm)
if err != nil {
return nil, false, err
}
if nodesParamEqual(newPool.rebalanceParams.nodesParams, nodesParams) {
return nil, true, err
}
newPool.rebalanceParams.nodesParams = nodesParams
err = newPool.dial(ctx, existingClients)
if err != nil {
return nil, false, err
}
// After newPool.dial(ctx, existingClients), existingClients will contain only outdated clients.
// Removing outdated clients
for _, client := range existingClients {
if clientErr := client.close(); clientErr != nil {
err = errors.Join(err, clientErr)
}
}
return &newPool, false, err
}
func (p *pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
if p.logger == nil {
return
}
@ -1994,6 +2282,11 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
}
}
type addressWeightPair struct {
address string
weight float64
}
func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
if len(nodeParams) == 0 {
return nil, errors.New("no FrostFS peers configured")
@ -2020,11 +2313,39 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
return nodesParams[i].priority < nodesParams[j].priority
})
for _, nodes := range nodesParams {
addressWeightPairs := make([]addressWeightPair, len(nodes.addresses))
for i := range nodes.addresses {
addressWeightPairs[i] = addressWeightPair{address: nodes.addresses[i], weight: nodes.weights[i]}
}
sort.Slice(addressWeightPairs, func(i, j int) bool {
return addressWeightPairs[i].address < addressWeightPairs[j].address
})
for i, pair := range addressWeightPairs {
nodes.addresses[i] = pair.address
nodes.weights[i] = pair.weight
}
}
return nodesParams, nil
}
// startRebalance runs loop to monitor connection healthy status.
func (p *Pool) startRebalance(ctx context.Context) {
func (p *pool) startRebalance(ctx context.Context) {
p.cancelLock.Lock()
defer p.cancelLock.Unlock()
rebalanceCtx, cancel := context.WithCancel(ctx)
p.closedCh = make(chan struct{})
p.cancel = cancel
go p.rebalance(rebalanceCtx)
}
// rebalance runs loop to monitor connection healthy status.
func (p *pool) rebalance(ctx context.Context) {
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
for i, params := range p.rebalanceParams.nodesParams {
@ -2043,7 +2364,15 @@ func (p *Pool) startRebalance(ctx context.Context) {
}
}
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
func (p *pool) stopRebalance() {
p.cancelLock.Lock()
defer p.cancelLock.Unlock()
p.cancel()
<-p.closedCh
}
func (p *pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
wg := sync.WaitGroup{}
for i, inner := range p.innerPools {
wg.Add(1)
@ -2057,12 +2386,11 @@ func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
wg.Wait()
}
func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
func (p *pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
if i > len(p.innerPools)-1 {
return
}
pool := p.innerPools[i]
options := p.rebalanceParams
healthyChanged := new(atomic.Bool)
wg := sync.WaitGroup{}
@ -2072,12 +2400,12 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
go func(j int, cli client) {
defer wg.Done()
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
tctx, c := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer c()
healthy, changed := cli.restartIfUnhealthy(tctx)
if healthy {
bufferWeights[j] = options.nodesParams[i].weights[j]
bufferWeights[j] = p.rebalanceParams.nodesParams[i].weights[j]
} else {
bufferWeights[j] = 0
p.cache.DeleteByPrefix(cli.address())
@ -2116,7 +2444,7 @@ func adjustWeights(weights []float64) []float64 {
return adjusted
}
func (p *Pool) connection() (client, error) {
func (p *pool) connection() (client, error) {
for _, inner := range p.innerPools {
cp, err := inner.connection()
if err == nil {
@ -2159,7 +2487,7 @@ func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string
return address + stype + k.String()
}
func (p *Pool) checkSessionTokenErr(err error, address string) bool {
func (p *pool) checkSessionTokenErr(err error, address string) bool {
if err == nil {
return false
}
@ -2238,7 +2566,7 @@ type callContext struct {
sessionClientCut bool
}
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
func (p *pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
cp, err := p.connection()
if err != nil {
return err
@ -2271,7 +2599,7 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
// opens new session or uses cached one.
// Must be called only on initialized callContext with set sessionTarget.
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
func (p *pool) openDefaultSession(ctx context.Context, cc *callContext) error {
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
tok, ok := p.cache.Get(cacheKey)
@ -2305,7 +2633,7 @@ func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
// opens default session (if sessionDefault is set), and calls f. If f returns
// session-related error then cached token is removed.
func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error {
func (p *pool) call(ctx context.Context, cc *callContext, f func() error) error {
var err error
if cc.sessionDefault {
@ -2322,16 +2650,13 @@ func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error
}
// fillAppropriateKey use pool key if caller didn't specify its own.
func (p *Pool) fillAppropriateKey(prm *prmCommon) {
func (p *pool) fillAppropriateKey(prm *prmCommon) {
if prm.key == nil {
prm.key = p.key
}
}
// PutObject writes an object through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
func (p *pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
cnr, _ := prm.hdr.ContainerID()
var prmCtx prmContext
@ -2371,11 +2696,7 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
return id, nil
}
// DeleteObject marks an object for deletion from the container using FrostFS API protocol.
// As a marker, a special unit called a tombstone is placed in the container.
// It confirms the user's intent to delete the object, and is itself a container object.
// Explicit deletion is done asynchronously, and is generally not guaranteed.
func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
func (p *pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
var prmCtx prmContext
prmCtx.useDefaultSession()
prmCtx.useVerb(session.VerbObjectDelete)
@ -2441,10 +2762,7 @@ type ResGetObject struct {
Payload io.ReadCloser
}
// GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
func (p *pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
p.fillAppropriateKey(&prm.prmCommon)
var cc callContext
@ -2466,10 +2784,7 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, e
})
}
// HeadObject reads object header through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
func (p *pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
p.fillAppropriateKey(&prm.prmCommon)
var cc callContext
@ -2516,11 +2831,7 @@ func (x *ResObjectRange) Close() error {
return err
}
// ObjectRange initiates reading an object's payload range through a remote
// server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
func (p *pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
p.fillAppropriateKey(&prm.prmCommon)
var cc callContext
@ -2578,13 +2889,7 @@ func (x *ResObjectSearch) Close() {
_, _ = x.r.Close()
}
// SearchObjects initiates object selection through a remote server using FrostFS API protocol.
//
// The call only opens the transmission channel, explicit fetching of matched objects
// is done using the ResObjectSearch. Resulting reader must be finally closed.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
func (p *pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
p.fillAppropriateKey(&prm.prmCommon)
var cc callContext
@ -2606,17 +2911,7 @@ func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjec
})
}
// PutContainer sends request to save container in FrostFS and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetContainer).
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
func (p *pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
cp, err := p.connection()
if err != nil {
return cid.ID{}, err
@ -2630,10 +2925,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, e
return cnrID, nil
}
// GetContainer reads FrostFS container by ID.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
func (p *pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
cp, err := p.connection()
if err != nil {
return container.Container{}, err
@ -2647,8 +2939,7 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container
return cnrs, nil
}
// ListContainers requests identifiers of the account-owned containers.
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
func (p *pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
cp, err := p.connection()
if err != nil {
return nil, err
@ -2662,15 +2953,7 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.
return cnrIDs, nil
}
// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetContainer).
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
func (p *pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
cp, err := p.connection()
if err != nil {
return err
@ -2684,10 +2967,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro
return nil
}
// GetEACL reads eACL table of the FrostFS container.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
func (p *pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
cp, err := p.connection()
if err != nil {
return eacl.Table{}, err
@ -2701,15 +2981,7 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, e
return eaclResult, nil
}
// SetEACL sends request to update eACL table of the FrostFS container and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetEACL).
func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
func (p *pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
cp, err := p.connection()
if err != nil {
return err
@ -2723,10 +2995,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
return nil
}
// Balance requests current balance of the FrostFS account.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
func (p *pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
cp, err := p.connection()
if err != nil {
return accounting.Decimal{}, err
@ -2740,8 +3009,7 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decim
return balance, nil
}
// Statistic returns connection statistics.
func (p Pool) Statistic() Statistic {
func (p *pool) Statistic() Statistic {
stat := Statistic{}
for _, inner := range p.innerPools {
nodes := make([]string, 0, len(inner.clients))
@ -2818,10 +3086,7 @@ func waitFor(ctx context.Context, params *WaitParams, condition func(context.Con
}
}
// NetworkInfo requests information about the FrostFS network of which the remote server is a part.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
func (p *pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
cp, err := p.connection()
if err != nil {
return netmap.NetworkInfo{}, err
@ -2835,10 +3100,7 @@ func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
return netInfo, nil
}
// NetMapSnapshot requests information about the FrostFS network map.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
func (p *pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
cp, err := p.connection()
if err != nil {
return netmap.NetMap{}, err
@ -2852,12 +3114,8 @@ func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
return netMap, nil
}
// Close closes the Pool and releases all the associated resources.
func (p *Pool) Close() {
p.cancel()
<-p.closedCh
// close all clients
func (p *pool) Close() {
p.stopRebalance()
for _, pools := range p.innerPools {
for _, cli := range pools.clients {
if cli.isDialed() {
@ -2875,7 +3133,7 @@ func (p *Pool) Close() {
//
// Returns any error that does not allow reading configuration
// from the network.
func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *Pool) error {
func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *pool) error {
ni, err := p.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("network info: %w", err)
@ -2886,8 +3144,7 @@ func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *
return nil
}
// GetSplitInfo implements relations.Relations.
func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
func (p *pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
var addr oid.Address
addr.SetContainer(cnrID)
addr.SetObject(objID)
@ -2916,8 +3173,7 @@ func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tok
}
}
// ListChildrenByLinker implements relations.Relations.
func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
func (p *pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
var addr oid.Address
addr.SetContainer(cnrID)
addr.SetObject(objID)
@ -2939,8 +3195,7 @@ func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid
return res.Children(), nil
}
// GetLeftSibling implements relations.Relations.
func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
func (p *pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
var addr oid.Address
addr.SetContainer(cnrID)
addr.SetObject(objID)
@ -2966,8 +3221,7 @@ func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, t
return idMember, nil
}
// FindSiblingBySplitID implements relations.Relations.
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
func (p *pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
var query object.SearchFilters
query.AddSplitIDFilter(object.MatchStringEqual, splitID)
@ -2998,8 +3252,7 @@ func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *
return members, nil
}
// FindSiblingByParentID implements relations.Relations.
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
func (p *pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
var query object.SearchFilters
query.AddParentIDFilter(object.MatchStringEqual, objID)

View file

@ -4,7 +4,9 @@ import (
"context"
"crypto/ecdsa"
"errors"
"reflect"
"strconv"
"sync"
"testing"
"time"
@ -101,11 +103,11 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
expectedAuthKey := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
condition := func() bool {
cp, err := clientPool.connection()
cp, err := clientPool.pool.Load().connection()
if err != nil {
return false
}
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key, false))
st, _ := clientPool.pool.Load().cache.Get(formCacheKey(cp.address(), clientPool.pool.Load().key, false))
return st.AssertAuthKey(&expectedAuthKey)
}
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
@ -138,9 +140,9 @@ func TestOneNode(t *testing.T) {
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
require.True(t, st.AssertAuthKey(&expectedAuthKey))
}
@ -168,12 +170,156 @@ func TestTwoNodes(t *testing.T) {
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys))
}
func TestTwoNodesUpdate(t *testing.T) {
var clientKeys []*ecdsa.PrivateKey
mockClientBuilder := func(addr string) client {
key := newPrivateKey(t)
clientKeys = append(clientKeys, key)
return newMockClient(addr, *key)
}
opts := InitParameters{
key: newPrivateKey(t),
nodeParams: []NodeParam{
{2, "peer0", 1},
{2, "peer1", 1},
},
}
opts.setClientBuilder(mockClientBuilder)
pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys))
pool.Update(context.Background(), []NodeParam{
{1, "peer-1", 1},
{2, "peer0", 1},
{2, "peer1", 1},
})
st1, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.Equal(t, &st1, &st)
cp2, err := pool.pool.Load().connection()
require.NoError(t, err)
require.Equal(t, cp2.address(), "peer-1")
}
func TestUpdateNodeMultithread(t *testing.T) {
key1 := newPrivateKey(t)
mockClientBuilder := func(addr string) client {
return newMockClient(addr, *key1)
}
opts := InitParameters{
key: newPrivateKey(t),
nodeParams: []NodeParam{{1, "peer0", 1}},
}
opts.setClientBuilder(mockClientBuilder)
pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.NoError(t, err)
t.Cleanup(pool.Close)
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
err := pool.Update(context.Background(), []NodeParam{{1, "peer" + strconv.Itoa(i+1), 1}})
require.NoError(t, err)
}(i)
}
wg.Wait()
}
func TestUpdateNodeEqualConfig(t *testing.T) {
key1 := newPrivateKey(t)
mockClientBuilder := func(addr string) client {
return newMockClient(addr, *key1)
}
opts := InitParameters{
key: newPrivateKey(t),
nodeParams: []NodeParam{{1, "peer0", 1}},
}
opts.setClientBuilder(mockClientBuilder)
pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
require.True(t, st.AssertAuthKey(&expectedAuthKey))
_, flag, err := pool.pool.Load().update(context.Background(), []NodeParam{{1, "peer0", 1}})
require.NoError(t, err)
require.True(t, flag)
}
func TestUpdateNode(t *testing.T) {
key1 := newPrivateKey(t)
mockClientBuilder := func(addr string) client {
return newMockClient(addr, *key1)
}
opts := InitParameters{
key: newPrivateKey(t),
nodeParams: []NodeParam{{1, "peer0", 1}},
}
opts.setClientBuilder(mockClientBuilder)
pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
require.True(t, st.AssertAuthKey(&expectedAuthKey))
pool.Update(context.Background(), []NodeParam{{1, "peer0", 1}})
cp1, err := pool.pool.Load().connection()
st1, _ := pool.pool.Load().cache.Get(formCacheKey(cp1.address(), pool.pool.Load().key, false))
require.NoError(t, err)
require.Equal(t, &st, &st1)
require.Equal(t, &cp, &cp1)
pool.Update(context.Background(), []NodeParam{{1, "peer1", 1}})
cp2, err := pool.pool.Load().connection()
require.NoError(t, err)
st2, _ := pool.pool.Load().cache.Get(formCacheKey(cp2.address(), pool.pool.Load().key, false))
require.NotEqual(t, cp.address(), cp2.address())
require.NotEqual(t, &st, &st2)
}
func assertAuthKeyForAny(st session.Object, clientKeys []*ecdsa.PrivateKey) bool {
for _, key := range clientKeys {
expectedAuthKey := frostfsecdsa.PublicKey(key.PublicKey)
@ -223,9 +369,9 @@ func TestOneOfTwoFailed(t *testing.T) {
time.Sleep(2 * time.Second)
for i := 0; i < 5; i++ {
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys))
}
}
@ -259,7 +405,7 @@ func TestTwoFailed(t *testing.T) {
time.Sleep(2 * time.Second)
_, err = pool.connection()
_, err = pool.pool.Load().connection()
require.Error(t, err)
require.Contains(t, err.Error(), "no healthy")
}
@ -293,9 +439,9 @@ func TestSessionCache(t *testing.T) {
t.Cleanup(pool.Close)
// cache must contain session token
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectGet
@ -306,9 +452,9 @@ func TestSessionCache(t *testing.T) {
require.Error(t, err)
// cache must not contain session token
cp, err = pool.connection()
cp, err = pool.pool.Load().connection()
require.NoError(t, err)
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
_, ok := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.False(t, ok)
var prm2 PrmObjectPut
@ -318,9 +464,9 @@ func TestSessionCache(t *testing.T) {
require.NoError(t, err)
// cache must contain session token
cp, err = pool.connection()
cp, err = pool.pool.Load().connection()
require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ = pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
}
@ -362,17 +508,17 @@ func TestPriority(t *testing.T) {
expectedAuthKey1 := frostfsecdsa.PublicKey(clientKeys[0].PublicKey)
firstNode := func() bool {
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
return st.AssertAuthKey(&expectedAuthKey1)
}
expectedAuthKey2 := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
secondNode := func() bool {
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
return st.AssertAuthKey(&expectedAuthKey2)
}
require.Never(t, secondNode, time.Second, 200*time.Millisecond)
@ -407,9 +553,9 @@ func TestSessionCacheWithKey(t *testing.T) {
require.NoError(t, err)
// cache must contain session token
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectDelete
@ -419,7 +565,7 @@ func TestSessionCacheWithKey(t *testing.T) {
err = pool.DeleteObject(ctx, prm)
require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey, false))
st, _ = pool.pool.Load().cache.Get(formCacheKey(cp.address(), anonKey, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
}
@ -460,10 +606,10 @@ func TestSessionTokenOwner(t *testing.T) {
cc.sessionTarget = func(tok session.Object) {
tkn = tok
}
err = p.initCallContext(&cc, prm, prmCtx)
err = p.pool.Load().initCallContext(&cc, prm, prmCtx)
require.NoError(t, err)
err = p.openDefaultSession(ctx, &cc)
err = p.pool.Load().openDefaultSession(ctx, &cc)
require.NoError(t, err)
require.True(t, tkn.VerifySignature())
require.True(t, tkn.Issuer().Equals(anonOwner))
@ -668,6 +814,25 @@ func TestHandleError(t *testing.T) {
}
}
func TestAdjustNodeParams(t *testing.T) {
nodes1 := []NodeParam{
{1, "peer0", 1},
{1, "peer1", 2},
{1, "peer2", 3},
{2, "peer21", 2},
}
nodes2 := []NodeParam{
{1, "peer0", 1},
{1, "peer2", 3},
{1, "peer1", 2},
{2, "peer21", 2},
}
nodesParam1, _ := adjustNodeParams(nodes1)
nodesParam2, _ := adjustNodeParams(nodes2)
require.True(t, reflect.DeepEqual(nodesParam1, nodesParam2))
}
func TestSwitchAfterErrorThreshold(t *testing.T) {
nodes := []NodeParam{
{1, "peer0", 1},
@ -708,14 +873,14 @@ func TestSwitchAfterErrorThreshold(t *testing.T) {
t.Cleanup(pool.Close)
for i := 0; i < errorThreshold; i++ {
conn, err := pool.connection()
conn, err := pool.pool.Load().connection()
require.NoError(t, err)
require.Equal(t, nodes[0].address, conn.address())
_, err = conn.objectGet(ctx, PrmObjectGet{})
require.Error(t, err)
}
conn, err := pool.connection()
conn, err := pool.pool.Load().connection()
require.NoError(t, err)
require.Equal(t, nodes[1].address, conn.address())
_, err = conn.objectGet(ctx, PrmObjectGet{})

View file

@ -59,7 +59,7 @@ func TestHealthyReweight(t *testing.T) {
sampler: newSampler(weights, rand.NewSource(0)),
clients: []client{client1, client2},
}
p := &Pool{
p := &pool{
innerPools: []*innerPool{inner},
cache: cache,
key: newPrivateKey(t),
@ -108,7 +108,7 @@ func TestHealthyNoReweight(t *testing.T) {
newMockClient(names[1], *newPrivateKey(t)),
},
}
p := &Pool{
p := &pool{
innerPools: []*innerPool{inner},
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
}