Add pool Update #204

Open
achuprov wants to merge 2 commits from achuprov/frostfs-sdk-go:feat/pool_update into master
Member

Add the Update function to refresh the connection list in the pool.

Close #30

Add the `Update` function to refresh the connection list in the `pool`. Close #30
achuprov changed title from feat/pool_update to Add pool Update 2024-02-19 10:06:07 +00:00
achuprov requested review from storage-core-committers 2024-02-19 13:08:36 +00:00
achuprov requested review from storage-core-developers 2024-02-19 13:08:40 +00:00
dstepanov-yadro reviewed 2024-02-19 13:15:13 +00:00
pool/pool.go Outdated
@ -1868,0 +1869,4 @@
p.cancel = cancel
p.closedCh = make(chan struct{})
err := p.dial(ctx, nil)

Hm, does it work? You pass nil as existClients, but in p.dial there is no nil validation

Hm, does it work? You pass `nil` as `existClients`, but in `p.dial` there is no nil validation
Author
Member

If existClients is nil, then ok will be false.
d37d313460/pool/pool.go (L1893)

If `existClients` is `nil`, then `ok` will be `false`. https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/commit/d37d31346095c2a9683cc0161546404e1bcb8a99/pool/pool.go#L1893
dstepanov-yadro marked this conversation as resolved
dstepanov-yadro requested review from storage-services-committers 2024-02-19 15:24:43 +00:00
dstepanov-yadro requested review from storage-services-developers 2024-02-19 15:24:43 +00:00
fyrchik reviewed 2024-02-19 15:30:24 +00:00
pool/pool.go Outdated
@ -1920,0 +1943,4 @@
func (p *Pool) Update(ctx context.Context, prm []NodeParam) error {
for _, pool := range p.innerPools {
pool.lock.Lock()
defer pool.lock.Unlock()
Owner

Hm, I though some linter would complain about defer in loop.
Not that it is necessarily bad, but what prevents us from more fine-grained locking scheme?

Hm, I though some linter would complain about `defer` in loop. Not that it is necessarily bad, but what prevents us from more fine-grained locking scheme?
Author
Member

Fixed

Fixed
fyrchik marked this conversation as resolved
dstepanov-yadro reviewed 2024-02-19 15:33:12 +00:00
pool/pool.go Outdated
@ -1920,0 +1969,4 @@
}
for _, client := range existClients {
client.client.close()

What about error handling?

What about error handling?
Author
Member

fixed

fixed

Now first error returns. I suppose to try close all connections that are not necessary and return join error.

Now first error returns. I suppose to try close all connections that are not necessary and return join error.
Author
Member

fixed

fixed
fyrchik marked this conversation as resolved
dstepanov-yadro reviewed 2024-02-19 15:36:34 +00:00
pool/pool.go Outdated
@ -1873,1 +1892,4 @@
for j, addr := range params.addresses {
if client, ok := existClients[NodeParam{address: addr, priority: params.priority}]; ok {
clients[j] = client.client
_ = p.cache.Put(formCacheKey(addr, p.key, false), client.session)

Why do you put existed client to cache again?

Why do you put existed client to cache again?
Author
Member

Fixed. Add cache.Purge()

Fixed. Add `cache.Purge()`

And why do you purged existed client?

And why do you purged existed client?
Author
Member

The update method might bring in a completely new list of nodes, causing the cache to contain some garbage for a while.

The `update` method might bring in a completely new list of nodes, causing the cache to contain some garbage for a while.
Member

Do we really need this? It seems this brings unnecessary complexity. I suppose we are OK with having some old invalid cache values (it seems no code that can use it and fail because of them) for a while

Do we really need this? It seems this brings unnecessary complexity. I suppose we are OK with having some old invalid cache values (it seems no code that can use it and fail because of them) for a while
Author
Member

fixed

fixed
dkirillov marked this conversation as resolved
dkirillov reviewed 2024-02-20 07:53:07 +00:00
pool/pool.go Outdated
@ -1913,4 +1933,1 @@
}
p.maxObjectSize = ni.MaxObjectSize()
go p.startRebalance(ctx)
Member

We have to stop previous rebalance routine before run new one. Otherwise, goroutines will start to leak

We have to stop previous rebalance routine before run new one. Otherwise, goroutines will start to leak
Author
Member

Added cancel function.

Added `cancel` function.
dkirillov marked this conversation as resolved
pool/pool.go Outdated
@ -1920,0 +1940,4 @@
}
// Update is a method that lets you refresh the list of nodes without recreating the pool.
func (p *Pool) Update(ctx context.Context, prm []NodeParam) error {
Member

Probably we have to mention that context must be long lived, otherwise rebalance will stop

Probably we have to mention that context must be long lived, otherwise rebalance will stop
Author
Member

fixed

fixed
dkirillov marked this conversation as resolved
pool/pool.go Outdated
@ -1920,0 +1941,4 @@
// Update is a method that lets you refresh the list of nodes without recreating the pool.
func (p *Pool) Update(ctx context.Context, prm []NodeParam) error {
for _, pool := range p.innerPools {
Member

We have to protect p.innerPools field, because in p.dial we update it

We have to protect `p.innerPools` field, because in `p.dial` we update it
dkirillov marked this conversation as resolved
pool/pool.go Outdated
@ -1920,0 +1961,4 @@
return err
}
p.rebalanceParams.nodesParams = nodesParams
Member

Should we protect this write by mutex (also for read on 1951 line)?

Should we protect this `write` by mutex (also for `read` on 1951 line)?
Author
Member

You're right. Added a mutex.

You're right. Added a mutex.
dkirillov marked this conversation as resolved
achuprov force-pushed feat/pool_update from d37d313460 to 4bfb6d87a3 2024-02-21 08:54:48 +00:00 Compare
achuprov force-pushed feat/pool_update from 4bfb6d87a3 to a23a3ef51b 2024-02-21 09:49:48 +00:00 Compare
dkirillov reviewed 2024-02-21 12:06:51 +00:00
pool/pool.go Outdated
@ -1905,4 +1937,1 @@
ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel
p.closedCh = make(chan struct{})
p.innerPools = inner
Member

We have to protect this write operation because in p.Update we read this field

We have to protect this `write` operation because in `p.Update` we read this field
Member

Actually we don't protect p.innerPools across the code at all currently

Actually we don't protect `p.innerPools` across the code at all currently
Member

The same for p.rebalanceParams.nodesParams in updateInnerNodesHealth

The same for `p.rebalanceParams.nodesParams` in `updateInnerNodesHealth`
Author
Member

Fixed. I've added atomic.Pointer to Pool, so we no longer need a mutex.

Fixed. I've added atomic.Pointer to Pool, so we no longer need a mutex.
dkirillov marked this conversation as resolved
aarifullin reviewed 2024-02-22 10:28:22 +00:00
pool/pool.go Outdated
@ -1866,2 +1869,4 @@
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel
Member

Hm, I don't see any reason to create the context with cancellation and pass it out by p.cancel = cancel: here are no functions that are startup-ed within goroutine

Hm, I don't see any reason to create the context with cancellation and pass it out by `p.cancel = cancel`: here are no functions that are startup-ed within goroutine
Author
Member

You're right. Fixed. Now, p.cancel is used to stop the rebalance.

You're right. Fixed. Now, `p.cancel` is used to stop the `rebalance`.
aarifullin marked this conversation as resolved
achuprov force-pushed feat/pool_update from a23a3ef51b to 858ef53b13 2024-02-26 09:17:32 +00:00 Compare
aarifullin approved these changes 2024-02-26 09:52:01 +00:00
dkirillov requested changes 2024-02-28 07:38:42 +00:00
pool/pool.go Outdated
@ -1793,0 +1984,4 @@
func (p *Pool) Update(ctx context.Context, prm []NodeParam) error {
p.pool.Load().stopRebalance()
newPool := *p.pool.Load()
Member

Here we make copy of pool so pool.cancelLock is also copying by value that lead to potential deadlock (sometimes we can copy pool that contains locked mutex (that was locked in stopRebalace in case of using Update from multiple threads), but after copying by value unlock in origin mutex doesn't affect copy).

Run this test multiple times to get a deadlock.

func TestUpdateNodeDeadlock(t *testing.T) {
	key1 := newPrivateKey(t)
	mockClientBuilder := func(addr string) client {
		return newMockClient(addr, *key1)
	}

	opts := InitParameters{
		key:        newPrivateKey(t),
		nodeParams: []NodeParam{{1, "peer0", 1}},
	}
	opts.setClientBuilder(mockClientBuilder)

	pool, err := NewPool(opts)
	require.NoError(t, err)
	err = pool.Dial(context.Background())
	require.NoError(t, err)
	t.Cleanup(pool.Close)

	wg := sync.WaitGroup{}

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			err := pool.Update(context.Background(), []NodeParam{{1, "peer" + strconv.Itoa(i+1), 1}})
			require.NoError(t, err)
		}(i)
	}

	wg.Wait()
}
Here we make copy of `pool` so `pool.cancelLock` is also copying by value that lead to potential deadlock (sometimes we can copy pool that contains locked mutex (that was locked in `stopRebalace` in case of using `Update` from multiple threads), but after copying by value unlock in origin mutex doesn't affect copy). Run this test multiple times to get a deadlock. ```golang func TestUpdateNodeDeadlock(t *testing.T) { key1 := newPrivateKey(t) mockClientBuilder := func(addr string) client { return newMockClient(addr, *key1) } opts := InitParameters{ key: newPrivateKey(t), nodeParams: []NodeParam{{1, "peer0", 1}}, } opts.setClientBuilder(mockClientBuilder) pool, err := NewPool(opts) require.NoError(t, err) err = pool.Dial(context.Background()) require.NoError(t, err) t.Cleanup(pool.Close) wg := sync.WaitGroup{} for i := 0; i < 10; i++ { wg.Add(1) go func(i int) { defer wg.Done() err := pool.Update(context.Background(), []NodeParam{{1, "peer" + strconv.Itoa(i+1), 1}}) require.NoError(t, err) }(i) } wg.Wait() } ```
Author
Member

Fixed. p.cancel is now a pointer. This solves the issue:

    Update_1 starts

    Update_1 stops rebalance on pool

    Update_1 cp pool pool_1

    Update_2 starts

    Update_2 stops rebalance on pool

    Update_2 cp pool pool_2

    Update_1 mv pool_1 pool

    Update_2 mv pool_2 pool // p.cancel_1 leaks 
Fixed. `p.cancel` is now a pointer. This solves the issue: ``` Update_1 starts Update_1 stops rebalance on pool Update_1 cp pool pool_1 Update_2 starts Update_2 stops rebalance on pool Update_2 cp pool pool_2 Update_1 mv pool_1 pool Update_2 mv pool_2 pool // p.cancel_1 leaks ```
dkirillov marked this conversation as resolved
pool/pool.go Outdated
@ -1793,0 +1986,4 @@
newPool := *p.pool.Load()
if err := newPool.Update(ctx, prm); err != nil {
return err
Member

In case of error old rebalance routine be stopped, but new one not started. It seems this is not what we want.

In case of error old rebalance routine be stopped, but new one not started. It seems this is not what we want.
Author
Member

Fixed

Fixed
dkirillov marked this conversation as resolved
pool/pool.go Outdated
@ -1874,0 +2087,4 @@
if client, ok := existClients[NodeParam{address: addr, priority: params.priority}]; ok {
clients[j] = client
delete(existClients, NodeParam{address: addr, priority: params.priority})
continue
Member

In case of using only existing clients, flag atLeastOneHealhty be false and we get error though all clients are healhy

In case of using only existing clients, flag `atLeastOneHealhty` be false and we get error though all clients are healhy
Author
Member

Fixed

Fixed
dkirillov marked this conversation as resolved
pool/pool.go Outdated
@ -1921,0 +2163,4 @@
return err
}
err = nil
Member

err here already nil

`err` here already `nil`
Author
Member

Fixed

Fixed
dkirillov marked this conversation as resolved
pool/pool.go Outdated
@ -1921,0 +2164,4 @@
}
err = nil
for _, client := range existClients {
Member

Can we add comment that existClients here contains only not used anymore clients?

Can we add comment that existClients here contains only not used anymore clients?
Author
Member

Fixed

Fixed
dkirillov marked this conversation as resolved
achuprov force-pushed feat/pool_update from 858ef53b13 to da6b36bf36 2024-03-04 07:58:09 +00:00 Compare
achuprov force-pushed feat/pool_update from da6b36bf36 to 29b133505d 2024-03-04 09:20:10 +00:00 Compare
fyrchik requested changes 2024-03-04 10:16:46 +00:00
pool/pool.go Outdated
@ -1793,0 +1988,4 @@
p.pool.Load().stopRebalance()
defer p.pool.Load().startRebalance(ctx)
newPool := *p.pool.Load()
Owner

Implicit logic tied with dereferencing can be a source of bugs.

I suggest the following scheme:

  1. Check whether the list of nodes is different, if not, do nothing.
  2. If there are changes, we explicitly create and copy it to a new structure (also create new clients and remove removed ones).

Basically, newPool.Update the line below can have the following signature (func (*pool).Update(...) (*pool, bool, error)). If it returns true, we replace the pointer.

This allows us to remove those foreach mutexes, because the old instance is valid.
This allows us to have 0 overhead for non-update threads when nothing has changed.

After this it will become more obvious whether we need atomics here (it is beautifully written, but a set of yet another wrappers rubs me the wrong way).

Implicit logic tied with dereferencing can be a source of bugs. I suggest the following scheme: 1. Check whether the list of nodes is different, if not, do nothing. 2. If there are changes, we explicitly create and copy it to a new structure (also create new clients and remove removed ones). Basically, `newPool.Update` the line below can have the following signature (`func (*pool).Update(...) (*pool, bool, error)`). If it returns true, we replace the pointer. This allows us to remove those `foreach` mutexes, because the old instance is valid. This allows us to have 0 overhead for non-update threads when nothing has changed. After this it will become more obvious whether we need atomics here (it is beautifully written, but a set of yet another wrappers rubs me the wrong way).
Member
  1. If there are changes, we explicitly create and copy it to a new structure

Do you mean copy pool structure or innerPools field?

Actually, I don't like changing the pool itself. Probably we can make innerPool more complex entity (struct), that incapsulates rebalance and other things. So we can thеn create new one struct (as you suggested) and replace it, or somehow change it inside.

> 2. If there are changes, we explicitly create and copy it to a new structure Do you mean copy `pool` structure or `innerPools` field? Actually, I don't like changing the pool itself. Probably we can make `innerPool` more complex entity (struct), that incapsulates rebalance and other things. So we can thеn create new one struct (as you suggested) and replace it, or somehow change it inside.
Owner

I mean if there are no changes, there is no need to do anything, right?
Otherwise as you described, create new struct and close removed clients.

I mean if there are no changes, there is no need to do anything, right? Otherwise as you described, create new struct and close removed clients.
Author
Member

I made the logic involving dereferencing more explicit and added a check for the absence of changes.

I made the logic involving dereferencing more explicit and added a check for the absence of changes.
Member

By the way, can we remove using atomic for pool itself? We won't invoke Pool.Update often, more often we will just invoke some other methods on pool (that use p.pool.Load()). So we have something like read-heavy operations and it would be nice to read Pool pointer by multiple thread without any synchronization.

By the way, can we remove using atomic for `pool` itself? We won't invoke `Pool.Update` often, more often we will just invoke some other methods on pool (that use `p.pool.Load()`). So we have something like read-heavy operations and it would be nice to read Pool pointer by multiple thread without any synchronization.
Author
Member

Removing atomic for the pool itself won't greatly improve performance (4 picoseconds), since our operations are network-related. However, it increases the risk of errors.

benchmark results:

goos: linux
goarch: amd64
pkg: benchPtr
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
=== RUN   BenchmarkAtomicPointerReadConcurrent
BenchmarkAtomicPointerReadConcurrent
BenchmarkAtomicPointerReadConcurrent-8          1000000000               0.3064 ns/op          0 B/op          0 allocs/op
=== RUN   BenchmarkAtomicPointerRWRarely
BenchmarkAtomicPointerRWRarely
BenchmarkAtomicPointerRWRarely-8                1000000000               0.3414 ns/op          0 B/op          0 allocs/op
=== RUN   BenchmarkNormalPointerReadConcurrent
BenchmarkNormalPointerReadConcurrent
BenchmarkNormalPointerReadConcurrent-8          1000000000               0.2827 ns/op          0 B/op          0 allocs/op
=== RUN   BenchmarkNormalPointerRWRarely
BenchmarkNormalPointerRWRarely
BenchmarkNormalPointerRWRarely-8                1000000000               0.3319 ns/op          0 B/op          0 allocs/op
=== RUN   BenchmarkRWMutexPointerRConcurrent
BenchmarkRWMutexPointerRConcurrent
BenchmarkRWMutexPointerRConcurrent-8            34726486                34.34 ns/op            0 B/op          0 allocs/op
=== RUN   BenchmarkRWMutexPointerRWRarely
BenchmarkRWMutexPointerRWRarely
BenchmarkRWMutexPointerRWRarely-8               35075522                33.48 ns/op            0 B/op          0 allocs/op
PASS
ok      benchPtr        3.846s
Removing atomic for the pool itself won't greatly improve performance (4 picoseconds), since our operations are network-related. However, it increases the risk of errors. <details><summary>benchmark results:</summary> ``` goos: linux goarch: amd64 pkg: benchPtr cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz === RUN BenchmarkAtomicPointerReadConcurrent BenchmarkAtomicPointerReadConcurrent BenchmarkAtomicPointerReadConcurrent-8 1000000000 0.3064 ns/op 0 B/op 0 allocs/op === RUN BenchmarkAtomicPointerRWRarely BenchmarkAtomicPointerRWRarely BenchmarkAtomicPointerRWRarely-8 1000000000 0.3414 ns/op 0 B/op 0 allocs/op === RUN BenchmarkNormalPointerReadConcurrent BenchmarkNormalPointerReadConcurrent BenchmarkNormalPointerReadConcurrent-8 1000000000 0.2827 ns/op 0 B/op 0 allocs/op === RUN BenchmarkNormalPointerRWRarely BenchmarkNormalPointerRWRarely BenchmarkNormalPointerRWRarely-8 1000000000 0.3319 ns/op 0 B/op 0 allocs/op === RUN BenchmarkRWMutexPointerRConcurrent BenchmarkRWMutexPointerRConcurrent BenchmarkRWMutexPointerRConcurrent-8 34726486 34.34 ns/op 0 B/op 0 allocs/op === RUN BenchmarkRWMutexPointerRWRarely BenchmarkRWMutexPointerRWRarely BenchmarkRWMutexPointerRWRarely-8 35075522 33.48 ns/op 0 B/op 0 allocs/op PASS ok benchPtr 3.846s ``` </details>
Owner

I fail to see how it increases a risk of errors, atomics are not a silver bullet they just have different tradeoffs.
However, it seems even with mutexes we need to take some RLock in all methods to protect access to a current pool instance, in terms of LOC written it is similar to a wrapper (this PR), so I do not mind having atomics here.

I fail to see how it increases a risk of errors, atomics are not a silver bullet they just have different tradeoffs. However, it seems even with mutexes we need to take some `RLock` in all methods to protect access to a current pool instance, in terms of LOC written it is similar to a wrapper (this PR), so I do not mind having atomics here.
fyrchik marked this conversation as resolved
pool/pool.go Outdated
@ -1921,0 +2140,4 @@
}
func (p *pool) Update(ctx context.Context, prm []NodeParam) error {
foreach(p.innerPools, func(pool *innerPool) { pool.lock.Lock() })
Owner

It doesn't address my comment, we still take all the mutexes.

It doesn't address my comment, we still take all the mutexes.
fyrchik marked this conversation as resolved
achuprov force-pushed feat/pool_update from 29b133505d to 38102795fe 2024-03-04 14:54:37 +00:00 Compare
achuprov force-pushed feat/pool_update from 38102795fe to bf46fa7e74 2024-03-04 15:06:36 +00:00 Compare
achuprov force-pushed feat/pool_update from bf46fa7e74 to 11554102c5 2024-03-05 08:31:46 +00:00 Compare
fyrchik requested changes 2024-03-11 08:27:58 +00:00
pool/pool.go Outdated
@ -1793,0 +1987,4 @@
// Use a long-lived context to avoid early rebalance stop.
// Can interrupt an operation being performed on a node that was removed.
func (p *Pool) Update(ctx context.Context, prm []NodeParam) error {
p.pool.Load().stopRebalance()
Owner

This doesn't prevent multiple concurrent Update executions. And what you Load() can change between the lines.
What about performing Load() to a local variable?

This doesn't prevent multiple concurrent `Update` executions. And what you `Load()` can change between the lines. What about performing `Load()` to a local variable?
Author
Member

fixed

fixed
fyrchik marked this conversation as resolved
pool/pool.go Outdated
@ -1868,0 +2084,4 @@
return nil
}
func (p *pool) dial(ctx context.Context, existClients map[NodeParam]client) error {
Owner

More like existingClients?

More like `existingClients`?
Author
Member

fixed

fixed
fyrchik marked this conversation as resolved
pool/pool.go Outdated
@ -1921,0 +2141,4 @@
}
for i, v := range a {
if !reflect.DeepEqual(v, b[i]) {
Owner

Please, let's not use reflect when it can be avoided, nodesParam as only 3 fields.
Also, inside nodesParam we should probably allow different order of addresses field if weights are the same, because they all have the same priority and connection is random anyway. Need some validation here @dkirillov

Anyway, this could be done in a separate issue it seems your code reuses connections anyway.
One way to ensure this is to perform sorting based on (weight, address)

Please, let's not use reflect when it can be avoided, `nodesParam` as only 3 fields. Also, inside `nodesParam` we should probably allow different order of `addresses` field if `weights` are the same, because they all have the same priority and connection is random anyway. Need some validation here @dkirillov Anyway, this could be done in a separate issue it seems your code reuses connections anyway. One way to ensure this is to perform sorting based on `(weight, address)`
Member

Also, inside nodesParam we should probably allow different order of addresses field if weights are the same, because they all have the same priority and connection is random anyway. Need some validation here @dkirillov

Yes. Also, it's related to priority, because we use sort.Slice (rather than sort.SliceStable ) here

> Also, inside nodesParam we should probably allow different order of addresses field if weights are the same, because they all have the same priority and connection is random anyway. Need some validation here @dkirillov Yes. Also, it's related to `priority`, because we use `sort.Slice` (rather than `sort.SliceStable` ) [here](https://git.frostfs.info/achuprov/frostfs-sdk-go/src/commit/11554102c53ae5a7f84b1bcf2e9d8484cdbb9a39/pool/pool.go#L2263)
fyrchik marked this conversation as resolved
pool/pool.go Outdated
@ -1921,0 +2155,4 @@
existClients := make(map[NodeParam]client)
for i, pool := range newPool.rebalanceParams.nodesParams {
for j := range pool.weights {
existClients[NodeParam{address: pool.addresses[j], priority: pool.priority}] = newPool.innerPools[i].clients[j]
Owner

The keytype in this map should be an address: if the client has just changed weight or priority, the connection should not be closed.

The keytype in this map should be an `address`: if the client has just changed weight or priority, the connection should not be closed.
Author
Member

fixed

fixed
fyrchik marked this conversation as resolved
pool/pool.go Outdated
@ -1921,0 +2175,4 @@
return nil, false, err
}
//Removing outdated clients
Owner

No need for comment here, quite obvious from the code.

No need for comment here, quite obvious from the code.
Member

I don't think this is quite obvious. We need to look in dial method to find out that existClients map is being changed.

I don't think this is quite obvious. We need to look in `dial` method to find out that `existClients` map is being changed.
Owner

The comment says remove, the cycle below just closes clients.
To me the comment doesn't mention about existClients processing in dial

The comment says remove, the cycle below just closes clients. To me the comment doesn't mention about existClients processing in `dial`
Author
Member

fixed

fixed
fyrchik marked this conversation as resolved
pool/pool.go Outdated
@ -1921,0 +2178,4 @@
//Removing outdated clients
for _, client := range existClients {
if clientErr := client.close(); clientErr != nil {
err = clientErr
Owner

May we use multierr for multiple errors here?
Also, it would be nice to use some dedicated error here, to avoid confusion on caller and describe it in function comment:

This function guarantees that
1. Preserved connections would not be closed.
2. On any error, old pool will remain functional, so the error may just be logged.
May we use multierr for multiple errors here? Also, it would be nice to use some dedicated error here, to avoid confusion on caller and describe it in function comment: ``` This function guarantees that 1. Preserved connections would not be closed. 2. On any error, old pool will remain functional, so the error may just be logged. ```
Author
Member

Added errors.Join to handle multiple errors

Added `errors.Join` to handle multiple errors
fyrchik marked this conversation as resolved
pool/pool.go Outdated
@ -2004,0 +2271,4 @@
p.cancelLock.Lock()
//stop rebalance
if p.cancel != nil {
Owner

Could you explain, why do we have *context.CancelFunc and not context.CancelFunc?

Could you explain, why do we have `*context.CancelFunc` and not `context.CancelFunc`?
Author
Member

Previously, *context.CancelFunc protected against losing context.CancelFunc() during concurrent execution of Update. Now, Update includes a mutex.

Previously, `*context.CancelFunc` protected against losing `context.CancelFunc()` during concurrent execution of `Update`. Now, `Update` includes a mutex.
fyrchik marked this conversation as resolved
achuprov force-pushed feat/pool_update from 11554102c5 to 55e4f7505d 2024-03-11 16:20:31 +00:00 Compare
fyrchik requested changes 2024-03-12 18:28:56 +00:00
pool/pool.go Outdated
@ -1789,10 +1789,230 @@ type resCreateSession struct {
// (e.g. sdkClient.IsErrContainerNotFound, sdkClient.IsErrObjectNotFound).
//
// See pool package overview to get some examples.
Owner

empty line

empty line
fyrchik marked this conversation as resolved
pool/pool.go Outdated
@ -1793,0 +1997,4 @@
pool := p.pool.Load()
pool.stopRebalance()
defer pool.startRebalance(ctx)
Owner

We store newPool, but we call startRebalance on the old instance, is it expected?

We store `newPool`, but we call `startRebalance` on the old instance, is it expected?
Author
Member

Yes, we need to stop Rebalance in the old version of the pool.

Yes, we need to stop Rebalance in the old version of the pool.
fyrchik marked this conversation as resolved
pool/pool.go Outdated
@ -1921,0 +2193,4 @@
return nil, false, err
}
//After newPool.dial(ctx, existingClients), existingClients will contain only outdated clients.
Owner

Let's use space after // this is how all comments in this repo are written.

Let's use space after `//` this is how all comments in this repo are written.
Owner

Let's use space after // this is how all comments in this repo are written.

Let's use space after `//` this is how all comments in this repo are written.
Author
Member

fixed

fixed
fyrchik marked this conversation as resolved
pool/pool.go Outdated
@ -2004,0 +2301,4 @@
p.cancel = cancel
go p.rebalance(rebalanceCtx)
p.cancelLock.Unlock()
Owner

Can it be in defer?

Can it be in defer?
Author
Member

fixed

fixed
fyrchik marked this conversation as resolved
pool/pool.go Outdated
@ -2023,0 +2328,4 @@
p.cancelLock.Lock()
defer p.cancelLock.Unlock()
if p.cancel != nil {
Owner

Can p.cancel ever be nil?

Can `p.cancel` ever be nil?
Author
Member

fixed

fixed
fyrchik marked this conversation as resolved
achuprov force-pushed feat/pool_update from 55e4f7505d to 9529bb75a9 2024-03-15 10:59:34 +00:00 Compare
achuprov force-pushed feat/pool_update from 9529bb75a9 to d0537954c7 2024-03-15 12:05:35 +00:00 Compare
achuprov force-pushed feat/pool_update from d0537954c7 to fcc8e869f4 2024-03-18 09:14:04 +00:00 Compare
achuprov force-pushed feat/pool_update from fcc8e869f4 to e6d56f6be4 2024-03-18 09:14:32 +00:00 Compare
fyrchik reviewed 2024-03-19 13:23:32 +00:00
pool/pool.go Outdated
@ -1817,0 +1875,4 @@
func (p *Pool) Dial(ctx context.Context) error {
pool := p.pool.Load()
err := pool.Dial(ctx)
pool.startRebalance(ctx)
Owner

If err != nil, we are leaking goroutine here (err on dial should not require any explicit Close)

If `err != nil`, we are leaking goroutine here (`err` on dial should not require any explicit `Close`)
Author
Member

Already fixed

Already fixed
fyrchik marked this conversation as resolved
achuprov force-pushed feat/pool_update from e6d56f6be4 to fa85c792cd 2024-03-19 14:06:12 +00:00 Compare
fyrchik reviewed 2024-03-19 14:40:36 +00:00
pool/pool.go Outdated
@ -1817,0 +1876,4 @@
pool := p.pool.Load()
err := pool.Dial(ctx)
if pool.cancel != nil {
Owner

Why not just check if err != nil?
Then Dial can ensure that it doesn't start any routines unless nil is returned (this was the previous behaviour I believe).

Why not just check `if err != nil`? Then `Dial` can ensure that it doesn't start any routines unless nil is returned (this was the previous behaviour I believe).
Author
Member

It seems this comment is also not relevant anymore. We initiate rebalance if Dial does not return an error.
9b05e56e52/pool/pool.go (L1877)

It seems this comment is also not relevant anymore. We initiate `rebalance` if `Dial` does not return an error. https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/commit/9b05e56e52623f8a892e73f804b2cba157d43e7b/pool/pool.go#L1877
fyrchik marked this conversation as resolved
dkirillov reviewed 2024-03-19 14:51:24 +00:00
pool/pool.go Outdated
@ -1817,0 +1876,4 @@
pool := p.pool.Load()
err := pool.Dial(ctx)
if pool.cancel != nil {
Member

Do we really need check this (especially without thread-safe acquiring)? We can add comment that calling this method multiple time leads to undefined behavior and client code must not operate pool this way.

Do we really need check this (especially without thread-safe acquiring)? We can add comment that calling this method multiple time leads to undefined behavior and client code must not operate pool this way.
Author
Member

fixed

fixed
dkirillov marked this conversation as resolved
pool/pool.go Outdated
@ -1817,0 +1879,4 @@
if pool.cancel != nil {
pool.cancel()
}
pool.startRebalance(ctx)
Member

Why do we always start rebalance?

It seems we should return error after pool.Dial (if error is occurred) immediately

Why do we always start rebalance? It seems we should return error after `pool.Dial` (if error is occurred) immediately
Author
Member

fixed

fixed
dkirillov marked this conversation as resolved
pool/pool.go Outdated
@ -1817,0 +2023,4 @@
defer p.lockUpdate.Unlock()
pool := p.pool.Load()
pool.stopRebalance()
Member

Do we really need stop rebalancing even if new pool equals to old one?

Do we really need stop rebalancing even if new pool equals to old one?
Author
Member

fixed

fixed
dkirillov marked this conversation as resolved
pool/pool.go Outdated
@ -1945,0 +2192,4 @@
return true
}
// It is necessary to prevent parallel operations on the *pool instance.
Member

Comment should have the following format 'Update ...'

Comment should have the following format 'Update ...'
Author
Member

fixed

fixed
dkirillov marked this conversation as resolved
pool/pool.go Outdated
@ -1945,0 +2193,4 @@
}
// It is necessary to prevent parallel operations on the *pool instance.
func (p *pool) Update(ctx context.Context, prm []NodeParam) (*pool, bool, error) {
Member

Why is this method exported?

Why is this method exported?
Author
Member

fixed

fixed
dkirillov marked this conversation as resolved
achuprov force-pushed feat/pool_update from fa85c792cd to 3984aeafb2 2024-03-19 15:18:41 +00:00 Compare
achuprov force-pushed feat/pool_update from 3984aeafb2 to fdc7f95a33 2024-03-19 15:23:31 +00:00 Compare
achuprov force-pushed feat/pool_update from fdc7f95a33 to 6d87fb5716 2024-03-19 15:32:44 +00:00 Compare
dkirillov reviewed 2024-03-21 12:25:27 +00:00
pool/pool.go Outdated
@ -1817,0 +1876,4 @@
pool := p.pool.Load()
err := pool.Dial(ctx)
if err != nil {
Member

Can we don't put empty line above?
Probably we can write if err := pool.Dial(ctx); err != nil {

Can we don't put empty line above? Probably we can write `if err := pool.Dial(ctx); err != nil {`
Author
Member

fixed

fixed
dkirillov marked this conversation as resolved
pool/pool.go Outdated
@ -1817,0 +1881,4 @@
}
if pool.cancel != nil {
pool.cancelLock.Lock()
Member

Actually we should guard if pool.canecl != nil { too

Actually we should guard `if pool.canecl != nil {` too
Author
Member

Yes, this check is necessary. On the first invocation of Dial, pool.cancel will be nil. If Dial is called again, we can properly stop the rebalance.

Yes, this check is necessary. On the first invocation of `Dial`, `pool.cancel` will be `nil`. If `Dial` is called again, we can properly stop the rebalance.
Member

I meant that if we guard pool.cancel() by mutex, we also must guard pool.cancel != nil by mutex

I meant that if we guard `pool.cancel()` by mutex, we also must guard `pool.cancel != nil` by mutex
Author
Member

Oh, you're correct. I've corrected it.

Oh, you're correct. I've corrected it.
dkirillov marked this conversation as resolved
achuprov force-pushed feat/pool_update from 7b118ba668 to b88567767c 2024-03-21 14:05:57 +00:00 Compare
achuprov force-pushed feat/pool_update from b88567767c to 043586501f 2024-03-21 14:08:23 +00:00 Compare
achuprov force-pushed feat/pool_update from 043586501f to 9b05e56e52 2024-03-21 15:00:37 +00:00 Compare
Owner

@fyrchik @dkirillov Can you resolve fixed comments of yours? I want to join review party but it is a bit hard to track what is still being discussed and what is not at this point.

@fyrchik @dkirillov Can you resolve fixed comments of yours? I want to join review party but it is a bit hard to track what is still being discussed and what is not at this point.
Owner

@alexvanin It was harder than I thought because each time I click "resolve conversation" the page updates and is scrolled to top, what a wonderful UX!

@alexvanin It was harder than I thought because each time I click "resolve conversation" the page updates and is scrolled to top, what a wonderful UX!
Owner

@alexvanin @dkirillov This PR is waiting for your approval

@alexvanin @dkirillov This PR is waiting for your approval
Member

We still don't resolve this comment #204 (comment)

We still don't resolve this comment https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pulls/204#issuecomment-34997
dkirillov reviewed 2024-04-08 09:33:26 +00:00
pool/pool.go Outdated
@ -1817,0 +1817,4 @@
pool atomic.Pointer[pool]
// Ensures that pool.cancel will not be lost due to concurrent calls to Update().
lockUpdate sync.Mutex
Member

If we have this mutex that protect Pool from concurrent Update invocation do we really need the additional atomic that we use for the same purpose?
I understand that this allows not to block pool operations during update procedure but it looks like too complicated having two different sync primitives that are intersected in pool protection

If we have this mutex that protect `Pool` from concurrent `Update` invocation do we really need the additional atomic that we use for the same purpose? I understand that this allows not to block pool operations during update procedure but it looks like too complicated having two different sync primitives that are intersected in pool protection
Author
Member

Fixed. Used atomic.Swap instead of atomic.Store.

Fixed. Used `atomic.Swap` instead of `atomic.Store`.
achuprov force-pushed feat/pool_update from 9b05e56e52 to 977261bb7b 2024-04-08 16:59:24 +00:00 Compare
achuprov force-pushed feat/pool_update from 977261bb7b to 5cb18bf6c3 2024-04-08 17:00:45 +00:00 Compare
achuprov force-pushed feat/pool_update from 5cb18bf6c3 to 9b15935a7b 2024-04-08 17:07:57 +00:00 Compare
achuprov force-pushed feat/pool_update from 9b15935a7b to ce038b04df 2024-04-08 17:08:36 +00:00 Compare
Owner

Please, rebase.

Please, rebase.
All checks were successful
DCO / DCO (pull_request) Successful in 1m17s
Tests and linters / Tests (1.21) (pull_request) Successful in 1m37s
Tests and linters / Tests (1.20) (pull_request) Successful in 1m43s
Tests and linters / Lint (pull_request) Successful in 3m7s
This pull request has changes conflicting with the target branch.
  • pool/pool.go
  • pool/pool_test.go
View command line instructions

Checkout

From your project repository, check out a new branch and test the changes.
git fetch -u feat/pool_update:achuprov-feat/pool_update
git checkout achuprov-feat/pool_update
Sign in to join this conversation.
No reviewers
TrueCloudLab/storage-services-developers
No milestone
No project
No assignees
6 participants
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference: TrueCloudLab/frostfs-sdk-go#204
No description provided.