Add pool Update #204

Open
achuprov wants to merge 2 commits from achuprov/frostfs-sdk-go:feat/pool_update into master
3 changed files with 583 additions and 165 deletions

View file

@ -96,7 +96,7 @@ type clientStatus interface {
address() string
// currentErrorRate returns current errors rate.
// After specific threshold connection is considered as unhealthy.
// Pool.startRebalance routine can make this connection healthy again.
// Pool.rebalance routine can make this connection healthy again.
currentErrorRate() uint32
// overallErrorRate returns the number of all happened errors.
overallErrorRate() uint64
@ -293,7 +293,7 @@ func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) {
}
// setErrorThreshold sets threshold after reaching which connection is considered unhealthy
// until Pool.startRebalance routing updates its status.
// until Pool.rebalance routing updates its status.
func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
x.errorThreshold = threshold
}
@ -1814,9 +1814,232 @@ type resCreateSession struct {
//
// See pool package overview to get some examples.
type Pool struct {
pool atomic.Pointer[pool]
}
// NewPool creates connection pool using parameters.

If we have this mutex that protect Pool from concurrent Update invocation do we really need the additional atomic that we use for the same purpose?
I understand that this allows not to block pool operations during update procedure but it looks like too complicated having two different sync primitives that are intersected in pool protection

If we have this mutex that protect `Pool` from concurrent `Update` invocation do we really need the additional atomic that we use for the same purpose? I understand that this allows not to block pool operations during update procedure but it looks like too complicated having two different sync primitives that are intersected in pool protection

Fixed. Used atomic.Swap instead of atomic.Store.

Fixed. Used `atomic.Swap` instead of `atomic.Store`.
func NewPool(options InitParameters) (*Pool, error) {
pool, err := newPool(options)
if err != nil {
return nil, err
}
var p Pool
p.pool.Store(pool)
return &p, nil
}
// Balance requests current balance of the FrostFS account.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
return p.pool.Load().Balance(ctx, prm)
}
// Close closes the Pool and releases all the associated resources.
func (p *Pool) Close() {
p.pool.Load().Close()
}
// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetContainer).
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
return p.pool.Load().DeleteContainer(ctx, prm)
}
// DeleteObject marks an object for deletion from the container using FrostFS API protocol.
// As a marker, a special unit called a tombstone is placed in the container.
// It confirms the user's intent to delete the object, and is itself a container object.
// Explicit deletion is done asynchronously, and is generally not guaranteed.
func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
return p.pool.Load().DeleteObject(ctx, prm)
}
// Dial establishes a connection to the servers from the FrostFS network.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed, the Pool SHOULD NOT be used.
//
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
dstepanov-yadro marked this conversation as resolved Outdated

Hm, does it work? You pass nil as existClients, but in p.dial there is no nil validation

Hm, does it work? You pass `nil` as `existClients`, but in `p.dial` there is no nil validation

If existClients is nil, then ok will be false.
d37d313460/pool/pool.go (L1893)

If `existClients` is `nil`, then `ok` will be `false`. https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/commit/d37d31346095c2a9683cc0161546404e1bcb8a99/pool/pool.go#L1893
aarifullin marked this conversation as resolved Outdated

Hm, I don't see any reason to create the context with cancellation and pass it out by p.cancel = cancel: here are no functions that are startup-ed within goroutine

Hm, I don't see any reason to create the context with cancellation and pass it out by `p.cancel = cancel`: here are no functions that are startup-ed within goroutine

You're right. Fixed. Now, p.cancel is used to stop the rebalance.

You're right. Fixed. Now, `p.cancel` is used to stop the `rebalance`.
pool := p.pool.Load()
if err := pool.Dial(ctx); err != nil {
return err
}
pool.cancelLock.Lock()
fyrchik marked this conversation as resolved Outdated

If err != nil, we are leaking goroutine here (err on dial should not require any explicit Close)

If `err != nil`, we are leaking goroutine here (`err` on dial should not require any explicit `Close`)

Already fixed

Already fixed
if pool.cancel != nil {
dkirillov marked this conversation as resolved Outdated

Do we really need check this (especially without thread-safe acquiring)? We can add comment that calling this method multiple time leads to undefined behavior and client code must not operate pool this way.

Do we really need check this (especially without thread-safe acquiring)? We can add comment that calling this method multiple time leads to undefined behavior and client code must not operate pool this way.

fixed

fixed
fyrchik marked this conversation as resolved Outdated

Why not just check if err != nil?
Then Dial can ensure that it doesn't start any routines unless nil is returned (this was the previous behaviour I believe).

Why not just check `if err != nil`? Then `Dial` can ensure that it doesn't start any routines unless nil is returned (this was the previous behaviour I believe).

It seems this comment is also not relevant anymore. We initiate rebalance if Dial does not return an error.
9b05e56e52/pool/pool.go (L1877)

It seems this comment is also not relevant anymore. We initiate `rebalance` if `Dial` does not return an error. https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/commit/9b05e56e52623f8a892e73f804b2cba157d43e7b/pool/pool.go#L1877
dkirillov marked this conversation as resolved Outdated

Can we don't put empty line above?
Probably we can write if err := pool.Dial(ctx); err != nil {

Can we don't put empty line above? Probably we can write `if err := pool.Dial(ctx); err != nil {`

fixed

fixed
pool.cancel()
}
pool.cancelLock.Unlock()
dkirillov marked this conversation as resolved Outdated

Why do we always start rebalance?

It seems we should return error after pool.Dial (if error is occurred) immediately

Why do we always start rebalance? It seems we should return error after `pool.Dial` (if error is occurred) immediately

fixed

fixed
pool.startRebalance(ctx)
dkirillov marked this conversation as resolved Outdated

Actually we should guard if pool.canecl != nil { too

Actually we should guard `if pool.canecl != nil {` too

Yes, this check is necessary. On the first invocation of Dial, pool.cancel will be nil. If Dial is called again, we can properly stop the rebalance.

Yes, this check is necessary. On the first invocation of `Dial`, `pool.cancel` will be `nil`. If `Dial` is called again, we can properly stop the rebalance.

I meant that if we guard pool.cancel() by mutex, we also must guard pool.cancel != nil by mutex

I meant that if we guard `pool.cancel()` by mutex, we also must guard `pool.cancel != nil` by mutex

Oh, you're correct. I've corrected it.

Oh, you're correct. I've corrected it.
return nil
}
// FindSiblingByParentID implements relations.Relations.
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
return p.pool.Load().FindSiblingByParentID(ctx, cnrID, objID, tokens)
}
// FindSiblingBySplitID implements relations.Relations.
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
dkirillov marked this conversation as resolved Outdated

Why do you put existed client to cache again?

Why do you put existed client to cache again?

Fixed. Add cache.Purge()

Fixed. Add `cache.Purge()`

And why do you purged existed client?

And why do you purged existed client?

The update method might bring in a completely new list of nodes, causing the cache to contain some garbage for a while.

The `update` method might bring in a completely new list of nodes, causing the cache to contain some garbage for a while.

Do we really need this? It seems this brings unnecessary complexity. I suppose we are OK with having some old invalid cache values (it seems no code that can use it and fail because of them) for a while

Do we really need this? It seems this brings unnecessary complexity. I suppose we are OK with having some old invalid cache values (it seems no code that can use it and fail because of them) for a while

fixed

fixed
return p.pool.Load().FindSiblingBySplitID(ctx, cnrID, splitID, tokens)
}
// GetContainer reads FrostFS container by ID.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
return p.pool.Load().GetContainer(ctx, prm)
}
// GetEACL reads eACL table of the FrostFS container.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
return p.pool.Load().GetEACL(ctx, prm)
}
// GetLeftSibling implements relations.Relations.
func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
return p.pool.Load().GetLeftSibling(ctx, cnrID, objID, tokens)
}
// GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
return p.pool.Load().GetObject(ctx, prm)
}
// GetSplitInfo implements relations.Relations.
func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
return p.pool.Load().GetSplitInfo(ctx, cnrID, objID, tokens)
}
// HeadObject reads object header through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
dkirillov marked this conversation as resolved Outdated

We have to stop previous rebalance routine before run new one. Otherwise, goroutines will start to leak

We have to stop previous rebalance routine before run new one. Otherwise, goroutines will start to leak

Added cancel function.

Added `cancel` function.
return p.pool.Load().HeadObject(ctx, prm)
}
// ListChildrenByLinker implements relations.Relations.
dkirillov marked this conversation as resolved Outdated

We have to protect this write operation because in p.Update we read this field

We have to protect this `write` operation because in `p.Update` we read this field

Actually we don't protect p.innerPools across the code at all currently

Actually we don't protect `p.innerPools` across the code at all currently

The same for p.rebalanceParams.nodesParams in updateInnerNodesHealth

The same for `p.rebalanceParams.nodesParams` in `updateInnerNodesHealth`

Fixed. I've added atomic.Pointer to Pool, so we no longer need a mutex.

Fixed. I've added atomic.Pointer to Pool, so we no longer need a mutex.
func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
return p.pool.Load().ListChildrenByLinker(ctx, cnrID, objID, tokens)
}
// ListContainers requests identifiers of the account-owned containers.
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
dkirillov marked this conversation as resolved Outdated

Probably we have to mention that context must be long lived, otherwise rebalance will stop

Probably we have to mention that context must be long lived, otherwise rebalance will stop

fixed

fixed
return p.pool.Load().ListContainers(ctx, prm)
dkirillov marked this conversation as resolved Outdated

We have to protect p.innerPools field, because in p.dial we update it

We have to protect `p.innerPools` field, because in `p.dial` we update it
}
fyrchik marked this conversation as resolved Outdated

Hm, I though some linter would complain about defer in loop.
Not that it is necessarily bad, but what prevents us from more fine-grained locking scheme?

Hm, I though some linter would complain about `defer` in loop. Not that it is necessarily bad, but what prevents us from more fine-grained locking scheme?

Fixed

Fixed
// NetMapSnapshot requests information about the FrostFS network map.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
return p.pool.Load().NetMapSnapshot(ctx)
}
// NetworkInfo requests information about the FrostFS network of which the remote server is a part.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
return p.pool.Load().NetworkInfo(ctx)
}
// ObjectRange initiates reading an object's payload range through a remote
// server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
dkirillov marked this conversation as resolved Outdated

Should we protect this write by mutex (also for read on 1951 line)?

Should we protect this `write` by mutex (also for `read` on 1951 line)?

You're right. Added a mutex.

You're right. Added a mutex.
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
return p.pool.Load().ObjectRange(ctx, prm)
}
// PutContainer sends request to save container in FrostFS and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
fyrchik marked this conversation as resolved Outdated

What about error handling?

What about error handling?

fixed

fixed

Now first error returns. I suppose to try close all connections that are not necessary and return join error.

Now first error returns. I suppose to try close all connections that are not necessary and return join error.

fixed

fixed
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetContainer).
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
return p.pool.Load().PutContainer(ctx, prm)
}
// PutObject writes an object through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
return p.pool.Load().PutObject(ctx, prm)
dkirillov marked this conversation as resolved Outdated

Here we make copy of pool so pool.cancelLock is also copying by value that lead to potential deadlock (sometimes we can copy pool that contains locked mutex (that was locked in stopRebalace in case of using Update from multiple threads), but after copying by value unlock in origin mutex doesn't affect copy).

Run this test multiple times to get a deadlock.

func TestUpdateNodeDeadlock(t *testing.T) {
	key1 := newPrivateKey(t)
	mockClientBuilder := func(addr string) client {
		return newMockClient(addr, *key1)
	}

	opts := InitParameters{
		key:        newPrivateKey(t),
		nodeParams: []NodeParam{{1, "peer0", 1}},
	}
	opts.setClientBuilder(mockClientBuilder)

	pool, err := NewPool(opts)
	require.NoError(t, err)
	err = pool.Dial(context.Background())
	require.NoError(t, err)
	t.Cleanup(pool.Close)

	wg := sync.WaitGroup{}

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			err := pool.Update(context.Background(), []NodeParam{{1, "peer" + strconv.Itoa(i+1), 1}})
			require.NoError(t, err)
		}(i)
	}

	wg.Wait()
}
Here we make copy of `pool` so `pool.cancelLock` is also copying by value that lead to potential deadlock (sometimes we can copy pool that contains locked mutex (that was locked in `stopRebalace` in case of using `Update` from multiple threads), but after copying by value unlock in origin mutex doesn't affect copy). Run this test multiple times to get a deadlock. ```golang func TestUpdateNodeDeadlock(t *testing.T) { key1 := newPrivateKey(t) mockClientBuilder := func(addr string) client { return newMockClient(addr, *key1) } opts := InitParameters{ key: newPrivateKey(t), nodeParams: []NodeParam{{1, "peer0", 1}}, } opts.setClientBuilder(mockClientBuilder) pool, err := NewPool(opts) require.NoError(t, err) err = pool.Dial(context.Background()) require.NoError(t, err) t.Cleanup(pool.Close) wg := sync.WaitGroup{} for i := 0; i < 10; i++ { wg.Add(1) go func(i int) { defer wg.Done() err := pool.Update(context.Background(), []NodeParam{{1, "peer" + strconv.Itoa(i+1), 1}}) require.NoError(t, err) }(i) } wg.Wait() } ```

Fixed. p.cancel is now a pointer. This solves the issue:

    Update_1 starts

    Update_1 stops rebalance on pool

    Update_1 cp pool pool_1

    Update_2 starts

    Update_2 stops rebalance on pool

    Update_2 cp pool pool_2

    Update_1 mv pool_1 pool

    Update_2 mv pool_2 pool // p.cancel_1 leaks 
Fixed. `p.cancel` is now a pointer. This solves the issue: ``` Update_1 starts Update_1 stops rebalance on pool Update_1 cp pool pool_1 Update_2 starts Update_2 stops rebalance on pool Update_2 cp pool pool_2 Update_1 mv pool_1 pool Update_2 mv pool_2 pool // p.cancel_1 leaks ```
}
dkirillov marked this conversation as resolved Outdated

In case of error old rebalance routine be stopped, but new one not started. It seems this is not what we want.

In case of error old rebalance routine be stopped, but new one not started. It seems this is not what we want.

Fixed

Fixed
// SearchObjects initiates object selection through a remote server using FrostFS API protocol.
fyrchik marked this conversation as resolved Outdated

This doesn't prevent multiple concurrent Update executions. And what you Load() can change between the lines.
What about performing Load() to a local variable?

This doesn't prevent multiple concurrent `Update` executions. And what you `Load()` can change between the lines. What about performing `Load()` to a local variable?

fixed

fixed
//
fyrchik marked this conversation as resolved Outdated

Implicit logic tied with dereferencing can be a source of bugs.

I suggest the following scheme:

  1. Check whether the list of nodes is different, if not, do nothing.
  2. If there are changes, we explicitly create and copy it to a new structure (also create new clients and remove removed ones).

Basically, newPool.Update the line below can have the following signature (func (*pool).Update(...) (*pool, bool, error)). If it returns true, we replace the pointer.

This allows us to remove those foreach mutexes, because the old instance is valid.
This allows us to have 0 overhead for non-update threads when nothing has changed.

After this it will become more obvious whether we need atomics here (it is beautifully written, but a set of yet another wrappers rubs me the wrong way).

Implicit logic tied with dereferencing can be a source of bugs. I suggest the following scheme: 1. Check whether the list of nodes is different, if not, do nothing. 2. If there are changes, we explicitly create and copy it to a new structure (also create new clients and remove removed ones). Basically, `newPool.Update` the line below can have the following signature (`func (*pool).Update(...) (*pool, bool, error)`). If it returns true, we replace the pointer. This allows us to remove those `foreach` mutexes, because the old instance is valid. This allows us to have 0 overhead for non-update threads when nothing has changed. After this it will become more obvious whether we need atomics here (it is beautifully written, but a set of yet another wrappers rubs me the wrong way).
  1. If there are changes, we explicitly create and copy it to a new structure

Do you mean copy pool structure or innerPools field?

Actually, I don't like changing the pool itself. Probably we can make innerPool more complex entity (struct), that incapsulates rebalance and other things. So we can thеn create new one struct (as you suggested) and replace it, or somehow change it inside.

> 2. If there are changes, we explicitly create and copy it to a new structure Do you mean copy `pool` structure or `innerPools` field? Actually, I don't like changing the pool itself. Probably we can make `innerPool` more complex entity (struct), that incapsulates rebalance and other things. So we can thеn create new one struct (as you suggested) and replace it, or somehow change it inside.

I mean if there are no changes, there is no need to do anything, right?
Otherwise as you described, create new struct and close removed clients.

I mean if there are no changes, there is no need to do anything, right? Otherwise as you described, create new struct and close removed clients.

I made the logic involving dereferencing more explicit and added a check for the absence of changes.

I made the logic involving dereferencing more explicit and added a check for the absence of changes.

By the way, can we remove using atomic for pool itself? We won't invoke Pool.Update often, more often we will just invoke some other methods on pool (that use p.pool.Load()). So we have something like read-heavy operations and it would be nice to read Pool pointer by multiple thread without any synchronization.

By the way, can we remove using atomic for `pool` itself? We won't invoke `Pool.Update` often, more often we will just invoke some other methods on pool (that use `p.pool.Load()`). So we have something like read-heavy operations and it would be nice to read Pool pointer by multiple thread without any synchronization.

Removing atomic for the pool itself won't greatly improve performance (4 picoseconds), since our operations are network-related. However, it increases the risk of errors.

benchmark results:

goos: linux
goarch: amd64
pkg: benchPtr
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
=== RUN   BenchmarkAtomicPointerReadConcurrent
BenchmarkAtomicPointerReadConcurrent
BenchmarkAtomicPointerReadConcurrent-8          1000000000               0.3064 ns/op          0 B/op          0 allocs/op
=== RUN   BenchmarkAtomicPointerRWRarely
BenchmarkAtomicPointerRWRarely
BenchmarkAtomicPointerRWRarely-8                1000000000               0.3414 ns/op          0 B/op          0 allocs/op
=== RUN   BenchmarkNormalPointerReadConcurrent
BenchmarkNormalPointerReadConcurrent
BenchmarkNormalPointerReadConcurrent-8          1000000000               0.2827 ns/op          0 B/op          0 allocs/op
=== RUN   BenchmarkNormalPointerRWRarely
BenchmarkNormalPointerRWRarely
BenchmarkNormalPointerRWRarely-8                1000000000               0.3319 ns/op          0 B/op          0 allocs/op
=== RUN   BenchmarkRWMutexPointerRConcurrent
BenchmarkRWMutexPointerRConcurrent
BenchmarkRWMutexPointerRConcurrent-8            34726486                34.34 ns/op            0 B/op          0 allocs/op
=== RUN   BenchmarkRWMutexPointerRWRarely
BenchmarkRWMutexPointerRWRarely
BenchmarkRWMutexPointerRWRarely-8               35075522                33.48 ns/op            0 B/op          0 allocs/op
PASS
ok      benchPtr        3.846s
Removing atomic for the pool itself won't greatly improve performance (4 picoseconds), since our operations are network-related. However, it increases the risk of errors. <details><summary>benchmark results:</summary> ``` goos: linux goarch: amd64 pkg: benchPtr cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz === RUN BenchmarkAtomicPointerReadConcurrent BenchmarkAtomicPointerReadConcurrent BenchmarkAtomicPointerReadConcurrent-8 1000000000 0.3064 ns/op 0 B/op 0 allocs/op === RUN BenchmarkAtomicPointerRWRarely BenchmarkAtomicPointerRWRarely BenchmarkAtomicPointerRWRarely-8 1000000000 0.3414 ns/op 0 B/op 0 allocs/op === RUN BenchmarkNormalPointerReadConcurrent BenchmarkNormalPointerReadConcurrent BenchmarkNormalPointerReadConcurrent-8 1000000000 0.2827 ns/op 0 B/op 0 allocs/op === RUN BenchmarkNormalPointerRWRarely BenchmarkNormalPointerRWRarely BenchmarkNormalPointerRWRarely-8 1000000000 0.3319 ns/op 0 B/op 0 allocs/op === RUN BenchmarkRWMutexPointerRConcurrent BenchmarkRWMutexPointerRConcurrent BenchmarkRWMutexPointerRConcurrent-8 34726486 34.34 ns/op 0 B/op 0 allocs/op === RUN BenchmarkRWMutexPointerRWRarely BenchmarkRWMutexPointerRWRarely BenchmarkRWMutexPointerRWRarely-8 35075522 33.48 ns/op 0 B/op 0 allocs/op PASS ok benchPtr 3.846s ``` </details>

I fail to see how it increases a risk of errors, atomics are not a silver bullet they just have different tradeoffs.
However, it seems even with mutexes we need to take some RLock in all methods to protect access to a current pool instance, in terms of LOC written it is similar to a wrapper (this PR), so I do not mind having atomics here.

I fail to see how it increases a risk of errors, atomics are not a silver bullet they just have different tradeoffs. However, it seems even with mutexes we need to take some `RLock` in all methods to protect access to a current pool instance, in terms of LOC written it is similar to a wrapper (this PR), so I do not mind having atomics here.
// The call only opens the transmission channel, explicit fetching of matched objects
// is done using the ResObjectSearch. Resulting reader must be finally closed.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
return p.pool.Load().SearchObjects(ctx, prm)
}
// SetEACL sends request to update eACL table of the FrostFS container and waits for the operation to complete.
fyrchik marked this conversation as resolved Outdated

We store newPool, but we call startRebalance on the old instance, is it expected?

We store `newPool`, but we call `startRebalance` on the old instance, is it expected?

Yes, we need to stop Rebalance in the old version of the pool.

Yes, we need to stop Rebalance in the old version of the pool.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetEACL).
func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
return p.pool.Load().SetEACL(ctx, prm)
}
// Statistic returns connection statistics.
func (p *Pool) Statistic() Statistic {
return p.pool.Load().Statistic()
}
// Update is a method that lets you refresh the list of nodes without recreating the pool.
// Use a long-lived context to avoid early rebalance stop.
// Can interrupt an operation being performed on a node that was removed.
// Ensures that:
// 1) Preserved connections would not be closed.
// 2) In the event of an error, the pool remains operational.
func (p *Pool) Update(ctx context.Context, prm []NodeParam) error {
pool := p.pool.Load()
newPool, equal, err := pool.update(ctx, prm)
dkirillov marked this conversation as resolved Outdated

Do we really need stop rebalancing even if new pool equals to old one?

Do we really need stop rebalancing even if new pool equals to old one?

fixed

fixed
if equal || err != nil {
return err
}
newPool.startRebalance(ctx)
oldPool := p.pool.Swap(newPool)
oldPool.stopRebalance()
return nil
}
type pool struct {
innerPools []*innerPool
key *ecdsa.PrivateKey
cancel context.CancelFunc
cancelLock *sync.Mutex
closedCh chan struct{}
cache *sessionCache
stokenDuration uint64
@ -1845,8 +2068,7 @@ const (
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
)
// NewPool creates connection pool using parameters.
func NewPool(options InitParameters) (*Pool, error) {
func newPool(options InitParameters) (*pool, error) {
if options.key == nil {
return nil, fmt.Errorf("missed required parameter 'Key'")
}
@ -1863,7 +2085,7 @@ func NewPool(options InitParameters) (*Pool, error) {
fillDefaultInitParams(&options, cache)
fyrchik marked this conversation as resolved Outdated

More like existingClients?

More like `existingClients`?

fixed

fixed
pool := &Pool{
pool := &pool{
key: options.key,
cache: cache,
dkirillov marked this conversation as resolved Outdated

In case of using only existing clients, flag atLeastOneHealhty be false and we get error though all clients are healhy

In case of using only existing clients, flag `atLeastOneHealhty` be false and we get error though all clients are healhy

Fixed

Fixed
logger: options.logger,
@ -1875,26 +2097,44 @@ func NewPool(options InitParameters) (*Pool, error) {
sessionExpirationDuration: options.sessionExpirationDuration,
},
clientBuilder: options.clientBuilder,
cancelLock: &sync.Mutex{},
}
return pool, nil
}
// Dial establishes a connection to the servers from the FrostFS network.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed, the Pool SHOULD NOT be used.
//
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
func (p *pool) Dial(ctx context.Context) error {
err := p.dial(ctx, nil)
if err != nil {
return err
}
ni, err := p.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info for max object size: %w", err)
}
p.maxObjectSize = ni.MaxObjectSize()
return nil
}
// dial initializes clients in accordance with p.rebalanceParams.nodesParams.
// existingClients is optional. After dial is executed, existingClients will contain only unused clients.
func (p *pool) dial(ctx context.Context, existingClients map[string]client) error {
inner := make([]*innerPool, len(p.rebalanceParams.nodesParams))
var atLeastOneHealthy bool
for i, params := range p.rebalanceParams.nodesParams {
clients := make([]client, len(params.weights))
for j, addr := range params.addresses {
if client, ok := existingClients[addr]; ok {
clients[j] = client
atLeastOneHealthy = true
delete(existingClients, addr)
continue
}
clients[j] = p.clientBuilder(addr)
if err := clients[j].dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
@ -1926,22 +2166,70 @@ func (p *Pool) Dial(ctx context.Context) error {
return fmt.Errorf("at least one node must be healthy")
dkirillov marked this conversation as resolved Outdated

err here already nil

`err` here already `nil`

Fixed

Fixed
}
dkirillov marked this conversation as resolved Outdated

Can we add comment that existClients here contains only not used anymore clients?

Can we add comment that existClients here contains only not used anymore clients?

Fixed

Fixed
ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel
p.closedCh = make(chan struct{})
p.innerPools = inner
ni, err := p.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info for max object size: %w", err)
}
p.maxObjectSize = ni.MaxObjectSize()
go p.startRebalance(ctx)
return nil
}
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
func nodesParamEqual(a, b []*nodesParam) bool {
if len(a) != len(b) {
return false
}
fyrchik marked this conversation as resolved Outdated

No need for comment here, quite obvious from the code.

No need for comment here, quite obvious from the code.

I don't think this is quite obvious. We need to look in dial method to find out that existClients map is being changed.

I don't think this is quite obvious. We need to look in `dial` method to find out that `existClients` map is being changed.

The comment says remove, the cycle below just closes clients.
To me the comment doesn't mention about existClients processing in dial

The comment says remove, the cycle below just closes clients. To me the comment doesn't mention about existClients processing in `dial`

fixed

fixed
for i, v := range a {
if v.priority != b[i].priority || len(v.addresses) != len(b[i].addresses) {
return false
fyrchik marked this conversation as resolved Outdated

May we use multierr for multiple errors here?
Also, it would be nice to use some dedicated error here, to avoid confusion on caller and describe it in function comment:

This function guarantees that
1. Preserved connections would not be closed.
2. On any error, old pool will remain functional, so the error may just be logged.
May we use multierr for multiple errors here? Also, it would be nice to use some dedicated error here, to avoid confusion on caller and describe it in function comment: ``` This function guarantees that 1. Preserved connections would not be closed. 2. On any error, old pool will remain functional, so the error may just be logged. ```

Added errors.Join to handle multiple errors

Added `errors.Join` to handle multiple errors
}
for j, address := range v.addresses {
if address != b[i].addresses[j] {
return false
}
}
}
return true
}
// Update requires that no other parallel operations are executed concurrently on the pool instance.
func (p *pool) update(ctx context.Context, prm []NodeParam) (*pool, bool, error) {
dkirillov marked this conversation as resolved Outdated

Comment should have the following format 'Update ...'

Comment should have the following format 'Update ...'

fixed

fixed
newPool := *p
fyrchik marked this conversation as resolved Outdated

Let's use space after // this is how all comments in this repo are written.

Let's use space after `//` this is how all comments in this repo are written.

Let's use space after // this is how all comments in this repo are written.

Let's use space after `//` this is how all comments in this repo are written.

fixed

fixed
dkirillov marked this conversation as resolved Outdated

Why is this method exported?

Why is this method exported?

fixed

fixed
existingClients := make(map[string]client)
for i, pool := range newPool.rebalanceParams.nodesParams {
for j := range pool.weights {
existingClients[pool.addresses[j]] = newPool.innerPools[i].clients[j]
}
}
nodesParams, err := adjustNodeParams(prm)
if err != nil {
return nil, false, err
}
if nodesParamEqual(newPool.rebalanceParams.nodesParams, nodesParams) {
return nil, true, err
}
newPool.rebalanceParams.nodesParams = nodesParams
err = newPool.dial(ctx, existingClients)
if err != nil {
return nil, false, err
}
// After newPool.dial(ctx, existingClients), existingClients will contain only outdated clients.
// Removing outdated clients
for _, client := range existingClients {
if clientErr := client.close(); clientErr != nil {
err = errors.Join(err, clientErr)
}
}
return &newPool, false, err
}
func (p *pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
if p.logger == nil {
return
}
@ -1994,6 +2282,11 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
}
}
type addressWeightPair struct {
address string
weight float64
}
func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
if len(nodeParams) == 0 {
return nil, errors.New("no FrostFS peers configured")
@ -2020,11 +2313,39 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
return nodesParams[i].priority < nodesParams[j].priority
})
for _, nodes := range nodesParams {
addressWeightPairs := make([]addressWeightPair, len(nodes.addresses))
for i := range nodes.addresses {
addressWeightPairs[i] = addressWeightPair{address: nodes.addresses[i], weight: nodes.weights[i]}
}
sort.Slice(addressWeightPairs, func(i, j int) bool {
return addressWeightPairs[i].address < addressWeightPairs[j].address
})
for i, pair := range addressWeightPairs {
nodes.addresses[i] = pair.address
nodes.weights[i] = pair.weight
}
}
fyrchik marked this conversation as resolved Outdated

Can p.cancel ever be nil?

Can `p.cancel` ever be nil?

fixed

fixed
return nodesParams, nil
}
// startRebalance runs loop to monitor connection healthy status.
func (p *Pool) startRebalance(ctx context.Context) {
func (p *pool) startRebalance(ctx context.Context) {
p.cancelLock.Lock()
defer p.cancelLock.Unlock()
rebalanceCtx, cancel := context.WithCancel(ctx)
p.closedCh = make(chan struct{})
p.cancel = cancel
go p.rebalance(rebalanceCtx)
}
// rebalance runs loop to monitor connection healthy status.
func (p *pool) rebalance(ctx context.Context) {
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
for i, params := range p.rebalanceParams.nodesParams {
@ -2043,7 +2364,15 @@ func (p *Pool) startRebalance(ctx context.Context) {
}
}
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
func (p *pool) stopRebalance() {
p.cancelLock.Lock()
defer p.cancelLock.Unlock()
p.cancel()
<-p.closedCh
}
func (p *pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
wg := sync.WaitGroup{}
for i, inner := range p.innerPools {
wg.Add(1)
@ -2057,12 +2386,11 @@ func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
wg.Wait()
}
func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
func (p *pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
if i > len(p.innerPools)-1 {
return
}
pool := p.innerPools[i]
options := p.rebalanceParams
healthyChanged := new(atomic.Bool)
wg := sync.WaitGroup{}
@ -2072,12 +2400,12 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
go func(j int, cli client) {
defer wg.Done()
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
tctx, c := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
defer c()
healthy, changed := cli.restartIfUnhealthy(tctx)
if healthy {
bufferWeights[j] = options.nodesParams[i].weights[j]
bufferWeights[j] = p.rebalanceParams.nodesParams[i].weights[j]
} else {
bufferWeights[j] = 0
p.cache.DeleteByPrefix(cli.address())
@ -2116,7 +2444,7 @@ func adjustWeights(weights []float64) []float64 {
return adjusted
}
func (p *Pool) connection() (client, error) {
func (p *pool) connection() (client, error) {
for _, inner := range p.innerPools {
cp, err := inner.connection()
if err == nil {
@ -2159,7 +2487,7 @@ func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string
return address + stype + k.String()
}
func (p *Pool) checkSessionTokenErr(err error, address string) bool {
func (p *pool) checkSessionTokenErr(err error, address string) bool {
if err == nil {
return false
}
@ -2238,7 +2566,7 @@ type callContext struct {
sessionClientCut bool
}
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
func (p *pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
cp, err := p.connection()
if err != nil {
return err
@ -2271,7 +2599,7 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
// opens new session or uses cached one.
// Must be called only on initialized callContext with set sessionTarget.
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
func (p *pool) openDefaultSession(ctx context.Context, cc *callContext) error {
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
tok, ok := p.cache.Get(cacheKey)
@ -2305,7 +2633,7 @@ func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
// opens default session (if sessionDefault is set), and calls f. If f returns
// session-related error then cached token is removed.
func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error {
func (p *pool) call(ctx context.Context, cc *callContext, f func() error) error {
var err error
if cc.sessionDefault {
@ -2322,16 +2650,13 @@ func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error
}
// fillAppropriateKey use pool key if caller didn't specify its own.
func (p *Pool) fillAppropriateKey(prm *prmCommon) {
func (p *pool) fillAppropriateKey(prm *prmCommon) {
if prm.key == nil {
prm.key = p.key
}
}
// PutObject writes an object through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
func (p *pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
cnr, _ := prm.hdr.ContainerID()
var prmCtx prmContext
@ -2371,11 +2696,7 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
return id, nil
}
// DeleteObject marks an object for deletion from the container using FrostFS API protocol.
// As a marker, a special unit called a tombstone is placed in the container.
// It confirms the user's intent to delete the object, and is itself a container object.
// Explicit deletion is done asynchronously, and is generally not guaranteed.
func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
func (p *pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
var prmCtx prmContext
prmCtx.useDefaultSession()
prmCtx.useVerb(session.VerbObjectDelete)
@ -2441,10 +2762,7 @@ type ResGetObject struct {
Payload io.ReadCloser
}
// GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
func (p *pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
p.fillAppropriateKey(&prm.prmCommon)
var cc callContext
@ -2466,10 +2784,7 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, e
})
}
// HeadObject reads object header through a remote server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
func (p *pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
p.fillAppropriateKey(&prm.prmCommon)
var cc callContext
@ -2516,11 +2831,7 @@ func (x *ResObjectRange) Close() error {
return err
}
// ObjectRange initiates reading an object's payload range through a remote
// server using FrostFS API protocol.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
func (p *pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
p.fillAppropriateKey(&prm.prmCommon)
var cc callContext
@ -2578,13 +2889,7 @@ func (x *ResObjectSearch) Close() {
_, _ = x.r.Close()
}
// SearchObjects initiates object selection through a remote server using FrostFS API protocol.
//
// The call only opens the transmission channel, explicit fetching of matched objects
// is done using the ResObjectSearch. Resulting reader must be finally closed.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
func (p *pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
p.fillAppropriateKey(&prm.prmCommon)
var cc callContext
@ -2606,17 +2911,7 @@ func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjec
})
}
// PutContainer sends request to save container in FrostFS and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetContainer).
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
func (p *pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
cp, err := p.connection()
if err != nil {
return cid.ID{}, err
@ -2630,10 +2925,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, e
return cnrID, nil
}
// GetContainer reads FrostFS container by ID.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
func (p *pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
cp, err := p.connection()
if err != nil {
return container.Container{}, err
@ -2647,8 +2939,7 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container
return cnrs, nil
}
// ListContainers requests identifiers of the account-owned containers.
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
func (p *pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
cp, err := p.connection()
if err != nil {
return nil, err
@ -2662,15 +2953,7 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.
return cnrIDs, nil
}
// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetContainer).
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
func (p *pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
cp, err := p.connection()
if err != nil {
return err
@ -2684,10 +2967,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro
return nil
}
// GetEACL reads eACL table of the FrostFS container.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
func (p *pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
cp, err := p.connection()
if err != nil {
return eacl.Table{}, err
@ -2701,15 +2981,7 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, e
return eaclResult, nil
}
// SetEACL sends request to update eACL table of the FrostFS container and waits for the operation to complete.
//
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
//
// polling interval: 5s
// waiting timeout: 120s
//
// Success can be verified by reading by identifier (see GetEACL).
func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
func (p *pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
cp, err := p.connection()
if err != nil {
return err
@ -2723,10 +2995,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
return nil
}
// Balance requests current balance of the FrostFS account.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
func (p *pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
cp, err := p.connection()
if err != nil {
return accounting.Decimal{}, err
@ -2740,8 +3009,7 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decim
return balance, nil
}
// Statistic returns connection statistics.
func (p Pool) Statistic() Statistic {
func (p *pool) Statistic() Statistic {
stat := Statistic{}
for _, inner := range p.innerPools {
nodes := make([]string, 0, len(inner.clients))
@ -2818,10 +3086,7 @@ func waitFor(ctx context.Context, params *WaitParams, condition func(context.Con
}
}
// NetworkInfo requests information about the FrostFS network of which the remote server is a part.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
func (p *pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
cp, err := p.connection()
if err != nil {
return netmap.NetworkInfo{}, err
@ -2835,10 +3100,7 @@ func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
return netInfo, nil
}
// NetMapSnapshot requests information about the FrostFS network map.
//
// Main return value MUST NOT be processed on an erroneous return.
func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
func (p *pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
cp, err := p.connection()
if err != nil {
return netmap.NetMap{}, err
@ -2852,12 +3114,8 @@ func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
return netMap, nil
}
// Close closes the Pool and releases all the associated resources.
func (p *Pool) Close() {
p.cancel()
<-p.closedCh
// close all clients
func (p *pool) Close() {
p.stopRebalance()
for _, pools := range p.innerPools {
for _, cli := range pools.clients {
if cli.isDialed() {
@ -2875,7 +3133,7 @@ func (p *Pool) Close() {
//
// Returns any error that does not allow reading configuration
// from the network.
func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *Pool) error {
func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *pool) error {
ni, err := p.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("network info: %w", err)
@ -2886,8 +3144,7 @@ func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *
return nil
}
// GetSplitInfo implements relations.Relations.
func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
func (p *pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
var addr oid.Address
addr.SetContainer(cnrID)
addr.SetObject(objID)
@ -2916,8 +3173,7 @@ func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tok
}
}
// ListChildrenByLinker implements relations.Relations.
func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
func (p *pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
var addr oid.Address
addr.SetContainer(cnrID)
addr.SetObject(objID)
@ -2939,8 +3195,7 @@ func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid
return res.Children(), nil
}
// GetLeftSibling implements relations.Relations.
func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
func (p *pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
var addr oid.Address
addr.SetContainer(cnrID)
addr.SetObject(objID)
@ -2966,8 +3221,7 @@ func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, t
return idMember, nil
}
// FindSiblingBySplitID implements relations.Relations.
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
func (p *pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
var query object.SearchFilters
query.AddSplitIDFilter(object.MatchStringEqual, splitID)
@ -2998,8 +3252,7 @@ func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *
return members, nil
}
// FindSiblingByParentID implements relations.Relations.
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
func (p *pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
var query object.SearchFilters
query.AddParentIDFilter(object.MatchStringEqual, objID)

View file

@ -4,7 +4,9 @@ import (
"context"
"crypto/ecdsa"
"errors"
"reflect"
"strconv"
"sync"
"testing"
"time"
@ -101,11 +103,11 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
expectedAuthKey := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
condition := func() bool {
cp, err := clientPool.connection()
cp, err := clientPool.pool.Load().connection()
if err != nil {
return false
}
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key, false))
st, _ := clientPool.pool.Load().cache.Get(formCacheKey(cp.address(), clientPool.pool.Load().key, false))
return st.AssertAuthKey(&expectedAuthKey)
}
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
@ -138,9 +140,9 @@ func TestOneNode(t *testing.T) {
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
require.True(t, st.AssertAuthKey(&expectedAuthKey))
}
@ -168,12 +170,156 @@ func TestTwoNodes(t *testing.T) {
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys))
}
func TestTwoNodesUpdate(t *testing.T) {
var clientKeys []*ecdsa.PrivateKey
mockClientBuilder := func(addr string) client {
key := newPrivateKey(t)
clientKeys = append(clientKeys, key)
return newMockClient(addr, *key)
}
opts := InitParameters{
key: newPrivateKey(t),
nodeParams: []NodeParam{
{2, "peer0", 1},
{2, "peer1", 1},
},
}
opts.setClientBuilder(mockClientBuilder)
pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys))
pool.Update(context.Background(), []NodeParam{
{1, "peer-1", 1},
{2, "peer0", 1},
{2, "peer1", 1},
})
st1, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.Equal(t, &st1, &st)
cp2, err := pool.pool.Load().connection()
require.NoError(t, err)
require.Equal(t, cp2.address(), "peer-1")
}
func TestUpdateNodeMultithread(t *testing.T) {
key1 := newPrivateKey(t)
mockClientBuilder := func(addr string) client {
return newMockClient(addr, *key1)
}
opts := InitParameters{
key: newPrivateKey(t),
nodeParams: []NodeParam{{1, "peer0", 1}},
}
opts.setClientBuilder(mockClientBuilder)
pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.NoError(t, err)
t.Cleanup(pool.Close)
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
err := pool.Update(context.Background(), []NodeParam{{1, "peer" + strconv.Itoa(i+1), 1}})
require.NoError(t, err)
}(i)
}
wg.Wait()
}
func TestUpdateNodeEqualConfig(t *testing.T) {
key1 := newPrivateKey(t)
mockClientBuilder := func(addr string) client {
return newMockClient(addr, *key1)
}
opts := InitParameters{
key: newPrivateKey(t),
nodeParams: []NodeParam{{1, "peer0", 1}},
}
opts.setClientBuilder(mockClientBuilder)
pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
require.True(t, st.AssertAuthKey(&expectedAuthKey))
_, flag, err := pool.pool.Load().update(context.Background(), []NodeParam{{1, "peer0", 1}})
require.NoError(t, err)
require.True(t, flag)
}
func TestUpdateNode(t *testing.T) {
key1 := newPrivateKey(t)
mockClientBuilder := func(addr string) client {
return newMockClient(addr, *key1)
}
opts := InitParameters{
key: newPrivateKey(t),
nodeParams: []NodeParam{{1, "peer0", 1}},
}
opts.setClientBuilder(mockClientBuilder)
pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.NoError(t, err)
t.Cleanup(pool.Close)
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
require.True(t, st.AssertAuthKey(&expectedAuthKey))
pool.Update(context.Background(), []NodeParam{{1, "peer0", 1}})
cp1, err := pool.pool.Load().connection()
st1, _ := pool.pool.Load().cache.Get(formCacheKey(cp1.address(), pool.pool.Load().key, false))
require.NoError(t, err)
require.Equal(t, &st, &st1)
require.Equal(t, &cp, &cp1)
pool.Update(context.Background(), []NodeParam{{1, "peer1", 1}})
cp2, err := pool.pool.Load().connection()
require.NoError(t, err)
st2, _ := pool.pool.Load().cache.Get(formCacheKey(cp2.address(), pool.pool.Load().key, false))
require.NotEqual(t, cp.address(), cp2.address())
require.NotEqual(t, &st, &st2)
}
func assertAuthKeyForAny(st session.Object, clientKeys []*ecdsa.PrivateKey) bool {
for _, key := range clientKeys {
expectedAuthKey := frostfsecdsa.PublicKey(key.PublicKey)
@ -223,9 +369,9 @@ func TestOneOfTwoFailed(t *testing.T) {
time.Sleep(2 * time.Second)
for i := 0; i < 5; i++ {
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.True(t, assertAuthKeyForAny(st, clientKeys))
}
}
@ -259,7 +405,7 @@ func TestTwoFailed(t *testing.T) {
time.Sleep(2 * time.Second)
_, err = pool.connection()
_, err = pool.pool.Load().connection()
require.Error(t, err)
require.Contains(t, err.Error(), "no healthy")
}
@ -293,9 +439,9 @@ func TestSessionCache(t *testing.T) {
t.Cleanup(pool.Close)
// cache must contain session token
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectGet
@ -306,9 +452,9 @@ func TestSessionCache(t *testing.T) {
require.Error(t, err)
// cache must not contain session token
cp, err = pool.connection()
cp, err = pool.pool.Load().connection()
require.NoError(t, err)
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
_, ok := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.False(t, ok)
var prm2 PrmObjectPut
@ -318,9 +464,9 @@ func TestSessionCache(t *testing.T) {
require.NoError(t, err)
// cache must contain session token
cp, err = pool.connection()
cp, err = pool.pool.Load().connection()
require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ = pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
}
@ -362,17 +508,17 @@ func TestPriority(t *testing.T) {
expectedAuthKey1 := frostfsecdsa.PublicKey(clientKeys[0].PublicKey)
firstNode := func() bool {
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
return st.AssertAuthKey(&expectedAuthKey1)
}
expectedAuthKey2 := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
secondNode := func() bool {
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
return st.AssertAuthKey(&expectedAuthKey2)
}
require.Never(t, secondNode, time.Second, 200*time.Millisecond)
@ -407,9 +553,9 @@ func TestSessionCacheWithKey(t *testing.T) {
require.NoError(t, err)
// cache must contain session token
cp, err := pool.connection()
cp, err := pool.pool.Load().connection()
require.NoError(t, err)
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
var prm PrmObjectDelete
@ -419,7 +565,7 @@ func TestSessionCacheWithKey(t *testing.T) {
err = pool.DeleteObject(ctx, prm)
require.NoError(t, err)
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey, false))
st, _ = pool.pool.Load().cache.Get(formCacheKey(cp.address(), anonKey, false))
require.True(t, st.AssertAuthKey(&expectedAuthKey))
}
@ -460,10 +606,10 @@ func TestSessionTokenOwner(t *testing.T) {
cc.sessionTarget = func(tok session.Object) {
tkn = tok
}
err = p.initCallContext(&cc, prm, prmCtx)
err = p.pool.Load().initCallContext(&cc, prm, prmCtx)
require.NoError(t, err)
err = p.openDefaultSession(ctx, &cc)
err = p.pool.Load().openDefaultSession(ctx, &cc)
require.NoError(t, err)
require.True(t, tkn.VerifySignature())
require.True(t, tkn.Issuer().Equals(anonOwner))
@ -668,6 +814,25 @@ func TestHandleError(t *testing.T) {
}
}
func TestAdjustNodeParams(t *testing.T) {
nodes1 := []NodeParam{
{1, "peer0", 1},
{1, "peer1", 2},
{1, "peer2", 3},
{2, "peer21", 2},
}
nodes2 := []NodeParam{
{1, "peer0", 1},
{1, "peer2", 3},
{1, "peer1", 2},
{2, "peer21", 2},
}
nodesParam1, _ := adjustNodeParams(nodes1)
nodesParam2, _ := adjustNodeParams(nodes2)
require.True(t, reflect.DeepEqual(nodesParam1, nodesParam2))
}
func TestSwitchAfterErrorThreshold(t *testing.T) {
nodes := []NodeParam{
{1, "peer0", 1},
@ -708,14 +873,14 @@ func TestSwitchAfterErrorThreshold(t *testing.T) {
t.Cleanup(pool.Close)
for i := 0; i < errorThreshold; i++ {
conn, err := pool.connection()
conn, err := pool.pool.Load().connection()
require.NoError(t, err)
require.Equal(t, nodes[0].address, conn.address())
_, err = conn.objectGet(ctx, PrmObjectGet{})
require.Error(t, err)
}
conn, err := pool.connection()
conn, err := pool.pool.Load().connection()
require.NoError(t, err)
require.Equal(t, nodes[1].address, conn.address())
_, err = conn.objectGet(ctx, PrmObjectGet{})

View file

@ -59,7 +59,7 @@ func TestHealthyReweight(t *testing.T) {
sampler: newSampler(weights, rand.NewSource(0)),
clients: []client{client1, client2},
}
p := &Pool{
p := &pool{
innerPools: []*innerPool{inner},
cache: cache,
key: newPrivateKey(t),
@ -108,7 +108,7 @@ func TestHealthyNoReweight(t *testing.T) {
newMockClient(names[1], *newPrivateKey(t)),
},
}
p := &Pool{
p := &pool{
innerPools: []*innerPool{inner},
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
}