forked from TrueCloudLab/frostfs-sdk-go
[#30] pool: Refactor Pool
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
This commit is contained in:
parent
3790142b10
commit
6441423063
3 changed files with 318 additions and 165 deletions
427
pool/pool.go
427
pool/pool.go
|
@ -96,7 +96,7 @@ type clientStatus interface {
|
||||||
address() string
|
address() string
|
||||||
// currentErrorRate returns current errors rate.
|
// currentErrorRate returns current errors rate.
|
||||||
// After specific threshold connection is considered as unhealthy.
|
// After specific threshold connection is considered as unhealthy.
|
||||||
// Pool.startRebalance routine can make this connection healthy again.
|
// Pool.rebalance routine can make this connection healthy again.
|
||||||
currentErrorRate() uint32
|
currentErrorRate() uint32
|
||||||
// overallErrorRate returns the number of all happened errors.
|
// overallErrorRate returns the number of all happened errors.
|
||||||
overallErrorRate() uint64
|
overallErrorRate() uint64
|
||||||
|
@ -293,7 +293,7 @@ func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// setErrorThreshold sets threshold after reaching which connection is considered unhealthy
|
// setErrorThreshold sets threshold after reaching which connection is considered unhealthy
|
||||||
// until Pool.startRebalance routing updates its status.
|
// until Pool.rebalance routing updates its status.
|
||||||
func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
|
func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
|
||||||
x.errorThreshold = threshold
|
x.errorThreshold = threshold
|
||||||
}
|
}
|
||||||
|
@ -1814,9 +1814,201 @@ type resCreateSession struct {
|
||||||
//
|
//
|
||||||
// See pool package overview to get some examples.
|
// See pool package overview to get some examples.
|
||||||
type Pool struct {
|
type Pool struct {
|
||||||
|
pool atomic.Pointer[pool]
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPool creates connection pool using parameters.
|
||||||
|
func NewPool(options InitParameters) (*Pool, error) {
|
||||||
|
pool, err := newPool(options)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var p Pool
|
||||||
|
p.pool.Store(pool)
|
||||||
|
return &p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Balance requests current balance of the FrostFS account.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
|
||||||
|
return p.pool.Load().Balance(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the Pool and releases all the associated resources.
|
||||||
|
func (p *Pool) Close() {
|
||||||
|
p.pool.Load().Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete.
|
||||||
|
//
|
||||||
|
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
||||||
|
//
|
||||||
|
// polling interval: 5s
|
||||||
|
// waiting timeout: 120s
|
||||||
|
//
|
||||||
|
// Success can be verified by reading by identifier (see GetContainer).
|
||||||
|
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
|
||||||
|
return p.pool.Load().DeleteContainer(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteObject marks an object for deletion from the container using FrostFS API protocol.
|
||||||
|
// As a marker, a special unit called a tombstone is placed in the container.
|
||||||
|
// It confirms the user's intent to delete the object, and is itself a container object.
|
||||||
|
// Explicit deletion is done asynchronously, and is generally not guaranteed.
|
||||||
|
func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
||||||
|
return p.pool.Load().DeleteObject(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dial establishes a connection to the servers from the FrostFS network.
|
||||||
|
// It also starts a routine that checks the health of the nodes and
|
||||||
|
// updates the weights of the nodes for balancing.
|
||||||
|
// Returns an error describing failure reason.
|
||||||
|
//
|
||||||
|
// If failed, the Pool SHOULD NOT be used.
|
||||||
|
//
|
||||||
|
// See also InitParameters.SetClientRebalanceInterval.
|
||||||
|
func (p *Pool) Dial(ctx context.Context) error {
|
||||||
|
pool := p.pool.Load()
|
||||||
|
err := pool.Dial(ctx)
|
||||||
|
pool.startRebalance(ctx)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindSiblingByParentID implements relations.Relations.
|
||||||
|
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
|
return p.pool.Load().FindSiblingByParentID(ctx, cnrID, objID, tokens)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindSiblingBySplitID implements relations.Relations.
|
||||||
|
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
|
return p.pool.Load().FindSiblingBySplitID(ctx, cnrID, splitID, tokens)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetContainer reads FrostFS container by ID.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
|
||||||
|
return p.pool.Load().GetContainer(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetEACL reads eACL table of the FrostFS container.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
|
||||||
|
return p.pool.Load().GetEACL(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLeftSibling implements relations.Relations.
|
||||||
|
func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
|
||||||
|
return p.pool.Load().GetLeftSibling(ctx, cnrID, objID, tokens)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
|
||||||
|
return p.pool.Load().GetObject(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSplitInfo implements relations.Relations.
|
||||||
|
func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
|
||||||
|
return p.pool.Load().GetSplitInfo(ctx, cnrID, objID, tokens)
|
||||||
|
}
|
||||||
|
|
||||||
|
// HeadObject reads object header through a remote server using FrostFS API protocol.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
|
||||||
|
return p.pool.Load().HeadObject(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListChildrenByLinker implements relations.Relations.
|
||||||
|
func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
|
return p.pool.Load().ListChildrenByLinker(ctx, cnrID, objID, tokens)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListContainers requests identifiers of the account-owned containers.
|
||||||
|
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
|
||||||
|
return p.pool.Load().ListContainers(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NetMapSnapshot requests information about the FrostFS network map.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
||||||
|
return p.pool.Load().NetMapSnapshot(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NetworkInfo requests information about the FrostFS network of which the remote server is a part.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
||||||
|
return p.pool.Load().NetworkInfo(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObjectRange initiates reading an object's payload range through a remote
|
||||||
|
// server using FrostFS API protocol.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
|
||||||
|
return p.pool.Load().ObjectRange(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutContainer sends request to save container in FrostFS and waits for the operation to complete.
|
||||||
|
//
|
||||||
|
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
||||||
|
//
|
||||||
|
// polling interval: 5s
|
||||||
|
// waiting timeout: 120s
|
||||||
|
//
|
||||||
|
// Success can be verified by reading by identifier (see GetContainer).
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
|
||||||
|
return p.pool.Load().PutContainer(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutObject writes an object through a remote server using FrostFS API protocol.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||||
|
return p.pool.Load().PutObject(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SearchObjects initiates object selection through a remote server using FrostFS API protocol.
|
||||||
|
//
|
||||||
|
// The call only opens the transmission channel, explicit fetching of matched objects
|
||||||
|
// is done using the ResObjectSearch. Resulting reader must be finally closed.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
|
||||||
|
return p.pool.Load().SearchObjects(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetEACL sends request to update eACL table of the FrostFS container and waits for the operation to complete.
|
||||||
|
//
|
||||||
|
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
||||||
|
//
|
||||||
|
// polling interval: 5s
|
||||||
|
// waiting timeout: 120s
|
||||||
|
//
|
||||||
|
// Success can be verified by reading by identifier (see GetEACL).
|
||||||
|
func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
||||||
|
return p.pool.Load().SetEACL(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Statistic returns connection statistics.
|
||||||
|
func (p *Pool) Statistic() Statistic {
|
||||||
|
return p.pool.Load().Statistic()
|
||||||
|
}
|
||||||
|
|
||||||
|
type pool struct {
|
||||||
innerPools []*innerPool
|
innerPools []*innerPool
|
||||||
key *ecdsa.PrivateKey
|
key *ecdsa.PrivateKey
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
cancelLock *sync.Mutex
|
||||||
closedCh chan struct{}
|
closedCh chan struct{}
|
||||||
cache *sessionCache
|
cache *sessionCache
|
||||||
stokenDuration uint64
|
stokenDuration uint64
|
||||||
|
@ -1845,8 +2037,7 @@ const (
|
||||||
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
|
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewPool creates connection pool using parameters.
|
func newPool(options InitParameters) (*pool, error) {
|
||||||
func NewPool(options InitParameters) (*Pool, error) {
|
|
||||||
if options.key == nil {
|
if options.key == nil {
|
||||||
return nil, fmt.Errorf("missed required parameter 'Key'")
|
return nil, fmt.Errorf("missed required parameter 'Key'")
|
||||||
}
|
}
|
||||||
|
@ -1863,7 +2054,7 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
|
|
||||||
fillDefaultInitParams(&options, cache)
|
fillDefaultInitParams(&options, cache)
|
||||||
|
|
||||||
pool := &Pool{
|
pool := &pool{
|
||||||
key: options.key,
|
key: options.key,
|
||||||
cache: cache,
|
cache: cache,
|
||||||
logger: options.logger,
|
logger: options.logger,
|
||||||
|
@ -1875,26 +2066,46 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
sessionExpirationDuration: options.sessionExpirationDuration,
|
sessionExpirationDuration: options.sessionExpirationDuration,
|
||||||
},
|
},
|
||||||
clientBuilder: options.clientBuilder,
|
clientBuilder: options.clientBuilder,
|
||||||
|
cancelLock: &sync.Mutex{},
|
||||||
}
|
}
|
||||||
|
|
||||||
return pool, nil
|
return pool, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dial establishes a connection to the servers from the FrostFS network.
|
func (p *pool) Dial(ctx context.Context) error {
|
||||||
// It also starts a routine that checks the health of the nodes and
|
p.stopRebalance()
|
||||||
// updates the weights of the nodes for balancing.
|
|
||||||
// Returns an error describing failure reason.
|
err := p.dial(ctx, nil)
|
||||||
//
|
if err != nil {
|
||||||
// If failed, the Pool SHOULD NOT be used.
|
return err
|
||||||
//
|
}
|
||||||
// See also InitParameters.SetClientRebalanceInterval.
|
|
||||||
func (p *Pool) Dial(ctx context.Context) error {
|
ni, err := p.NetworkInfo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get network info for max object size: %w", err)
|
||||||
|
}
|
||||||
|
p.maxObjectSize = ni.MaxObjectSize()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// dial initializes clients in accordance with p.rebalanceParams.nodesParams.
|
||||||
|
// existingClients is optional. After dial is executed, existingClients will contain only unused clients.
|
||||||
|
func (p *pool) dial(ctx context.Context, existingClients map[string]client) error {
|
||||||
inner := make([]*innerPool, len(p.rebalanceParams.nodesParams))
|
inner := make([]*innerPool, len(p.rebalanceParams.nodesParams))
|
||||||
var atLeastOneHealthy bool
|
var atLeastOneHealthy bool
|
||||||
|
|
||||||
for i, params := range p.rebalanceParams.nodesParams {
|
for i, params := range p.rebalanceParams.nodesParams {
|
||||||
clients := make([]client, len(params.weights))
|
clients := make([]client, len(params.weights))
|
||||||
for j, addr := range params.addresses {
|
for j, addr := range params.addresses {
|
||||||
|
if client, ok := existingClients[addr]; ok {
|
||||||
|
clients[j] = client
|
||||||
|
atLeastOneHealthy = true
|
||||||
|
|
||||||
|
delete(existingClients, addr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
clients[j] = p.clientBuilder(addr)
|
clients[j] = p.clientBuilder(addr)
|
||||||
if err := clients[j].dial(ctx); err != nil {
|
if err := clients[j].dial(ctx); err != nil {
|
||||||
p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
|
p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
|
||||||
|
@ -1926,22 +2137,12 @@ func (p *Pool) Dial(ctx context.Context) error {
|
||||||
return fmt.Errorf("at least one node must be healthy")
|
return fmt.Errorf("at least one node must be healthy")
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
p.cancel = cancel
|
|
||||||
p.closedCh = make(chan struct{})
|
|
||||||
p.innerPools = inner
|
p.innerPools = inner
|
||||||
|
|
||||||
ni, err := p.NetworkInfo(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("get network info for max object size: %w", err)
|
|
||||||
}
|
|
||||||
p.maxObjectSize = ni.MaxObjectSize()
|
|
||||||
|
|
||||||
go p.startRebalance(ctx)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
func (p *pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||||
if p.logger == nil {
|
if p.logger == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -2023,8 +2224,26 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
|
||||||
return nodesParams, nil
|
return nodesParams, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// startRebalance runs loop to monitor connection healthy status.
|
func (p *pool) startRebalance(ctx context.Context) {
|
||||||
func (p *Pool) startRebalance(ctx context.Context) {
|
p.cancelLock.Lock()
|
||||||
|
defer p.cancelLock.Unlock()
|
||||||
|
|
||||||
|
// stop rebalance
|
||||||
|
if p.cancel != nil {
|
||||||
|
p.cancel()
|
||||||
|
<-p.closedCh
|
||||||
|
}
|
||||||
|
|
||||||
|
rebalanceCtx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
|
p.closedCh = make(chan struct{})
|
||||||
|
p.cancel = cancel
|
||||||
|
|
||||||
|
go p.rebalance(rebalanceCtx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// rebalance runs loop to monitor connection healthy status.
|
||||||
|
func (p *pool) rebalance(ctx context.Context) {
|
||||||
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
|
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
|
||||||
buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
|
buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
|
||||||
for i, params := range p.rebalanceParams.nodesParams {
|
for i, params := range p.rebalanceParams.nodesParams {
|
||||||
|
@ -2043,7 +2262,17 @@ func (p *Pool) startRebalance(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
|
func (p *pool) stopRebalance() {
|
||||||
|
p.cancelLock.Lock()
|
||||||
|
defer p.cancelLock.Unlock()
|
||||||
|
|
||||||
|
if p.cancel != nil {
|
||||||
|
p.cancel()
|
||||||
|
<-p.closedCh
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for i, inner := range p.innerPools {
|
for i, inner := range p.innerPools {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
@ -2057,12 +2286,11 @@ func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
|
func (p *pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
|
||||||
if i > len(p.innerPools)-1 {
|
if i > len(p.innerPools)-1 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pool := p.innerPools[i]
|
pool := p.innerPools[i]
|
||||||
options := p.rebalanceParams
|
|
||||||
|
|
||||||
healthyChanged := new(atomic.Bool)
|
healthyChanged := new(atomic.Bool)
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
|
@ -2072,12 +2300,12 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
|
||||||
go func(j int, cli client) {
|
go func(j int, cli client) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
|
tctx, c := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
|
||||||
defer c()
|
defer c()
|
||||||
|
|
||||||
healthy, changed := cli.restartIfUnhealthy(tctx)
|
healthy, changed := cli.restartIfUnhealthy(tctx)
|
||||||
if healthy {
|
if healthy {
|
||||||
bufferWeights[j] = options.nodesParams[i].weights[j]
|
bufferWeights[j] = p.rebalanceParams.nodesParams[i].weights[j]
|
||||||
} else {
|
} else {
|
||||||
bufferWeights[j] = 0
|
bufferWeights[j] = 0
|
||||||
p.cache.DeleteByPrefix(cli.address())
|
p.cache.DeleteByPrefix(cli.address())
|
||||||
|
@ -2116,7 +2344,7 @@ func adjustWeights(weights []float64) []float64 {
|
||||||
return adjusted
|
return adjusted
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) connection() (client, error) {
|
func (p *pool) connection() (client, error) {
|
||||||
for _, inner := range p.innerPools {
|
for _, inner := range p.innerPools {
|
||||||
cp, err := inner.connection()
|
cp, err := inner.connection()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -2159,7 +2387,7 @@ func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string
|
||||||
return address + stype + k.String()
|
return address + stype + k.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) checkSessionTokenErr(err error, address string) bool {
|
func (p *pool) checkSessionTokenErr(err error, address string) bool {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -2238,7 +2466,7 @@ type callContext struct {
|
||||||
sessionClientCut bool
|
sessionClientCut bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
|
func (p *pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -2271,7 +2499,7 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
|
||||||
|
|
||||||
// opens new session or uses cached one.
|
// opens new session or uses cached one.
|
||||||
// Must be called only on initialized callContext with set sessionTarget.
|
// Must be called only on initialized callContext with set sessionTarget.
|
||||||
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
|
func (p *pool) openDefaultSession(ctx context.Context, cc *callContext) error {
|
||||||
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
|
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
|
||||||
|
|
||||||
tok, ok := p.cache.Get(cacheKey)
|
tok, ok := p.cache.Get(cacheKey)
|
||||||
|
@ -2305,7 +2533,7 @@ func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
|
||||||
|
|
||||||
// opens default session (if sessionDefault is set), and calls f. If f returns
|
// opens default session (if sessionDefault is set), and calls f. If f returns
|
||||||
// session-related error then cached token is removed.
|
// session-related error then cached token is removed.
|
||||||
func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error {
|
func (p *pool) call(ctx context.Context, cc *callContext, f func() error) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if cc.sessionDefault {
|
if cc.sessionDefault {
|
||||||
|
@ -2322,16 +2550,13 @@ func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// fillAppropriateKey use pool key if caller didn't specify its own.
|
// fillAppropriateKey use pool key if caller didn't specify its own.
|
||||||
func (p *Pool) fillAppropriateKey(prm *prmCommon) {
|
func (p *pool) fillAppropriateKey(prm *prmCommon) {
|
||||||
if prm.key == nil {
|
if prm.key == nil {
|
||||||
prm.key = p.key
|
prm.key = p.key
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutObject writes an object through a remote server using FrostFS API protocol.
|
func (p *pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
|
||||||
cnr, _ := prm.hdr.ContainerID()
|
cnr, _ := prm.hdr.ContainerID()
|
||||||
|
|
||||||
var prmCtx prmContext
|
var prmCtx prmContext
|
||||||
|
@ -2371,11 +2596,7 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
|
||||||
return id, nil
|
return id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObject marks an object for deletion from the container using FrostFS API protocol.
|
func (p *pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
||||||
// As a marker, a special unit called a tombstone is placed in the container.
|
|
||||||
// It confirms the user's intent to delete the object, and is itself a container object.
|
|
||||||
// Explicit deletion is done asynchronously, and is generally not guaranteed.
|
|
||||||
func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
|
||||||
var prmCtx prmContext
|
var prmCtx prmContext
|
||||||
prmCtx.useDefaultSession()
|
prmCtx.useDefaultSession()
|
||||||
prmCtx.useVerb(session.VerbObjectDelete)
|
prmCtx.useVerb(session.VerbObjectDelete)
|
||||||
|
@ -2441,10 +2662,7 @@ type ResGetObject struct {
|
||||||
Payload io.ReadCloser
|
Payload io.ReadCloser
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol.
|
func (p *pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
|
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
p.fillAppropriateKey(&prm.prmCommon)
|
||||||
|
|
||||||
var cc callContext
|
var cc callContext
|
||||||
|
@ -2466,10 +2684,7 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, e
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// HeadObject reads object header through a remote server using FrostFS API protocol.
|
func (p *pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
|
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
p.fillAppropriateKey(&prm.prmCommon)
|
||||||
|
|
||||||
var cc callContext
|
var cc callContext
|
||||||
|
@ -2516,11 +2731,7 @@ func (x *ResObjectRange) Close() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectRange initiates reading an object's payload range through a remote
|
func (p *pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
|
||||||
// server using FrostFS API protocol.
|
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
|
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
p.fillAppropriateKey(&prm.prmCommon)
|
||||||
|
|
||||||
var cc callContext
|
var cc callContext
|
||||||
|
@ -2578,13 +2789,7 @@ func (x *ResObjectSearch) Close() {
|
||||||
_, _ = x.r.Close()
|
_, _ = x.r.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SearchObjects initiates object selection through a remote server using FrostFS API protocol.
|
func (p *pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
|
||||||
//
|
|
||||||
// The call only opens the transmission channel, explicit fetching of matched objects
|
|
||||||
// is done using the ResObjectSearch. Resulting reader must be finally closed.
|
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
|
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
p.fillAppropriateKey(&prm.prmCommon)
|
||||||
|
|
||||||
var cc callContext
|
var cc callContext
|
||||||
|
@ -2606,17 +2811,7 @@ func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjec
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutContainer sends request to save container in FrostFS and waits for the operation to complete.
|
func (p *pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
|
||||||
//
|
|
||||||
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
|
||||||
//
|
|
||||||
// polling interval: 5s
|
|
||||||
// waiting timeout: 120s
|
|
||||||
//
|
|
||||||
// Success can be verified by reading by identifier (see GetContainer).
|
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.ID{}, err
|
return cid.ID{}, err
|
||||||
|
@ -2630,10 +2825,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, e
|
||||||
return cnrID, nil
|
return cnrID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetContainer reads FrostFS container by ID.
|
func (p *pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return container.Container{}, err
|
return container.Container{}, err
|
||||||
|
@ -2647,8 +2839,7 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container
|
||||||
return cnrs, nil
|
return cnrs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListContainers requests identifiers of the account-owned containers.
|
func (p *pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
|
||||||
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -2662,15 +2853,7 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.
|
||||||
return cnrIDs, nil
|
return cnrIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete.
|
func (p *pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
|
||||||
//
|
|
||||||
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
|
||||||
//
|
|
||||||
// polling interval: 5s
|
|
||||||
// waiting timeout: 120s
|
|
||||||
//
|
|
||||||
// Success can be verified by reading by identifier (see GetContainer).
|
|
||||||
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -2684,10 +2867,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetEACL reads eACL table of the FrostFS container.
|
func (p *pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return eacl.Table{}, err
|
return eacl.Table{}, err
|
||||||
|
@ -2701,15 +2881,7 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, e
|
||||||
return eaclResult, nil
|
return eaclResult, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetEACL sends request to update eACL table of the FrostFS container and waits for the operation to complete.
|
func (p *pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
||||||
//
|
|
||||||
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
|
||||||
//
|
|
||||||
// polling interval: 5s
|
|
||||||
// waiting timeout: 120s
|
|
||||||
//
|
|
||||||
// Success can be verified by reading by identifier (see GetEACL).
|
|
||||||
func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -2723,10 +2895,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Balance requests current balance of the FrostFS account.
|
func (p *pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return accounting.Decimal{}, err
|
return accounting.Decimal{}, err
|
||||||
|
@ -2740,8 +2909,7 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decim
|
||||||
return balance, nil
|
return balance, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Statistic returns connection statistics.
|
func (p *pool) Statistic() Statistic {
|
||||||
func (p Pool) Statistic() Statistic {
|
|
||||||
stat := Statistic{}
|
stat := Statistic{}
|
||||||
for _, inner := range p.innerPools {
|
for _, inner := range p.innerPools {
|
||||||
nodes := make([]string, 0, len(inner.clients))
|
nodes := make([]string, 0, len(inner.clients))
|
||||||
|
@ -2818,10 +2986,7 @@ func waitFor(ctx context.Context, params *WaitParams, condition func(context.Con
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NetworkInfo requests information about the FrostFS network of which the remote server is a part.
|
func (p *pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return netmap.NetworkInfo{}, err
|
return netmap.NetworkInfo{}, err
|
||||||
|
@ -2835,10 +3000,7 @@ func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
||||||
return netInfo, nil
|
return netInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NetMapSnapshot requests information about the FrostFS network map.
|
func (p *pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return netmap.NetMap{}, err
|
return netmap.NetMap{}, err
|
||||||
|
@ -2852,12 +3014,8 @@ func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
||||||
return netMap, nil
|
return netMap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the Pool and releases all the associated resources.
|
func (p *pool) Close() {
|
||||||
func (p *Pool) Close() {
|
p.stopRebalance()
|
||||||
p.cancel()
|
|
||||||
<-p.closedCh
|
|
||||||
|
|
||||||
// close all clients
|
|
||||||
for _, pools := range p.innerPools {
|
for _, pools := range p.innerPools {
|
||||||
for _, cli := range pools.clients {
|
for _, cli := range pools.clients {
|
||||||
if cli.isDialed() {
|
if cli.isDialed() {
|
||||||
|
@ -2875,7 +3033,7 @@ func (p *Pool) Close() {
|
||||||
//
|
//
|
||||||
// Returns any error that does not allow reading configuration
|
// Returns any error that does not allow reading configuration
|
||||||
// from the network.
|
// from the network.
|
||||||
func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *Pool) error {
|
func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *pool) error {
|
||||||
ni, err := p.NetworkInfo(ctx)
|
ni, err := p.NetworkInfo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("network info: %w", err)
|
return fmt.Errorf("network info: %w", err)
|
||||||
|
@ -2886,8 +3044,7 @@ func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetSplitInfo implements relations.Relations.
|
func (p *pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
|
||||||
func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(cnrID)
|
addr.SetContainer(cnrID)
|
||||||
addr.SetObject(objID)
|
addr.SetObject(objID)
|
||||||
|
@ -2916,8 +3073,7 @@ func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tok
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListChildrenByLinker implements relations.Relations.
|
func (p *pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(cnrID)
|
addr.SetContainer(cnrID)
|
||||||
addr.SetObject(objID)
|
addr.SetObject(objID)
|
||||||
|
@ -2939,8 +3095,7 @@ func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid
|
||||||
return res.Children(), nil
|
return res.Children(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLeftSibling implements relations.Relations.
|
func (p *pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
|
||||||
func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(cnrID)
|
addr.SetContainer(cnrID)
|
||||||
addr.SetObject(objID)
|
addr.SetObject(objID)
|
||||||
|
@ -2966,8 +3121,7 @@ func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, t
|
||||||
return idMember, nil
|
return idMember, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindSiblingBySplitID implements relations.Relations.
|
func (p *pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
|
|
||||||
var query object.SearchFilters
|
var query object.SearchFilters
|
||||||
query.AddSplitIDFilter(object.MatchStringEqual, splitID)
|
query.AddSplitIDFilter(object.MatchStringEqual, splitID)
|
||||||
|
|
||||||
|
@ -2998,8 +3152,7 @@ func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *
|
||||||
return members, nil
|
return members, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindSiblingByParentID implements relations.Relations.
|
func (p *pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
|
||||||
var query object.SearchFilters
|
var query object.SearchFilters
|
||||||
query.AddParentIDFilter(object.MatchStringEqual, objID)
|
query.AddParentIDFilter(object.MatchStringEqual, objID)
|
||||||
|
|
||||||
|
|
|
@ -101,11 +101,11 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
|
||||||
|
|
||||||
expectedAuthKey := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
expectedAuthKey := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
||||||
condition := func() bool {
|
condition := func() bool {
|
||||||
cp, err := clientPool.connection()
|
cp, err := clientPool.pool.Load().connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key, false))
|
st, _ := clientPool.pool.Load().cache.Get(formCacheKey(cp.address(), clientPool.pool.Load().key, false))
|
||||||
return st.AssertAuthKey(&expectedAuthKey)
|
return st.AssertAuthKey(&expectedAuthKey)
|
||||||
}
|
}
|
||||||
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
|
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
|
||||||
|
@ -138,9 +138,9 @@ func TestOneNode(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
t.Cleanup(pool.Close)
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
|
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
|
||||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
}
|
}
|
||||||
|
@ -168,9 +168,9 @@ func TestTwoNodes(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
t.Cleanup(pool.Close)
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -223,9 +223,9 @@ func TestOneOfTwoFailed(t *testing.T) {
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -259,7 +259,7 @@ func TestTwoFailed(t *testing.T) {
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
_, err = pool.connection()
|
_, err = pool.pool.Load().connection()
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Contains(t, err.Error(), "no healthy")
|
require.Contains(t, err.Error(), "no healthy")
|
||||||
}
|
}
|
||||||
|
@ -293,9 +293,9 @@ func TestSessionCache(t *testing.T) {
|
||||||
t.Cleanup(pool.Close)
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
// cache must contain session token
|
// cache must contain session token
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
|
|
||||||
var prm PrmObjectGet
|
var prm PrmObjectGet
|
||||||
|
@ -306,9 +306,9 @@ func TestSessionCache(t *testing.T) {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|
||||||
// cache must not contain session token
|
// cache must not contain session token
|
||||||
cp, err = pool.connection()
|
cp, err = pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
_, ok := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
require.False(t, ok)
|
require.False(t, ok)
|
||||||
|
|
||||||
var prm2 PrmObjectPut
|
var prm2 PrmObjectPut
|
||||||
|
@ -318,9 +318,9 @@ func TestSessionCache(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// cache must contain session token
|
// cache must contain session token
|
||||||
cp, err = pool.connection()
|
cp, err = pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ = pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -362,17 +362,17 @@ func TestPriority(t *testing.T) {
|
||||||
|
|
||||||
expectedAuthKey1 := frostfsecdsa.PublicKey(clientKeys[0].PublicKey)
|
expectedAuthKey1 := frostfsecdsa.PublicKey(clientKeys[0].PublicKey)
|
||||||
firstNode := func() bool {
|
firstNode := func() bool {
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
return st.AssertAuthKey(&expectedAuthKey1)
|
return st.AssertAuthKey(&expectedAuthKey1)
|
||||||
}
|
}
|
||||||
|
|
||||||
expectedAuthKey2 := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
expectedAuthKey2 := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
||||||
secondNode := func() bool {
|
secondNode := func() bool {
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
return st.AssertAuthKey(&expectedAuthKey2)
|
return st.AssertAuthKey(&expectedAuthKey2)
|
||||||
}
|
}
|
||||||
require.Never(t, secondNode, time.Second, 200*time.Millisecond)
|
require.Never(t, secondNode, time.Second, 200*time.Millisecond)
|
||||||
|
@ -407,9 +407,9 @@ func TestSessionCacheWithKey(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// cache must contain session token
|
// cache must contain session token
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
|
|
||||||
var prm PrmObjectDelete
|
var prm PrmObjectDelete
|
||||||
|
@ -419,7 +419,7 @@ func TestSessionCacheWithKey(t *testing.T) {
|
||||||
|
|
||||||
err = pool.DeleteObject(ctx, prm)
|
err = pool.DeleteObject(ctx, prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey, false))
|
st, _ = pool.pool.Load().cache.Get(formCacheKey(cp.address(), anonKey, false))
|
||||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -460,10 +460,10 @@ func TestSessionTokenOwner(t *testing.T) {
|
||||||
cc.sessionTarget = func(tok session.Object) {
|
cc.sessionTarget = func(tok session.Object) {
|
||||||
tkn = tok
|
tkn = tok
|
||||||
}
|
}
|
||||||
err = p.initCallContext(&cc, prm, prmCtx)
|
err = p.pool.Load().initCallContext(&cc, prm, prmCtx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = p.openDefaultSession(ctx, &cc)
|
err = p.pool.Load().openDefaultSession(ctx, &cc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, tkn.VerifySignature())
|
require.True(t, tkn.VerifySignature())
|
||||||
require.True(t, tkn.Issuer().Equals(anonOwner))
|
require.True(t, tkn.Issuer().Equals(anonOwner))
|
||||||
|
@ -708,14 +708,14 @@ func TestSwitchAfterErrorThreshold(t *testing.T) {
|
||||||
t.Cleanup(pool.Close)
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
for i := 0; i < errorThreshold; i++ {
|
for i := 0; i < errorThreshold; i++ {
|
||||||
conn, err := pool.connection()
|
conn, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, nodes[0].address, conn.address())
|
require.Equal(t, nodes[0].address, conn.address())
|
||||||
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := pool.connection()
|
conn, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, nodes[1].address, conn.address())
|
require.Equal(t, nodes[1].address, conn.address())
|
||||||
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
||||||
|
|
|
@ -59,7 +59,7 @@ func TestHealthyReweight(t *testing.T) {
|
||||||
sampler: newSampler(weights, rand.NewSource(0)),
|
sampler: newSampler(weights, rand.NewSource(0)),
|
||||||
clients: []client{client1, client2},
|
clients: []client{client1, client2},
|
||||||
}
|
}
|
||||||
p := &Pool{
|
p := &pool{
|
||||||
innerPools: []*innerPool{inner},
|
innerPools: []*innerPool{inner},
|
||||||
cache: cache,
|
cache: cache,
|
||||||
key: newPrivateKey(t),
|
key: newPrivateKey(t),
|
||||||
|
@ -108,7 +108,7 @@ func TestHealthyNoReweight(t *testing.T) {
|
||||||
newMockClient(names[1], *newPrivateKey(t)),
|
newMockClient(names[1], *newPrivateKey(t)),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
p := &Pool{
|
p := &pool{
|
||||||
innerPools: []*innerPool{inner},
|
innerPools: []*innerPool{inner},
|
||||||
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
|
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue