Add pool Update #204
No reviewers
Labels
No labels
P0
P1
P2
P3
good first issue
pool
Infrastructure
blocked
bug
config
discussion
documentation
duplicate
enhancement
go
help wanted
internal
invalid
kludge
observability
perfomance
question
refactoring
wontfix
No milestone
No project
No assignees
6 participants
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference: TrueCloudLab/frostfs-sdk-go#204
Loading…
Reference in a new issue
No description provided.
Delete branch "achuprov/frostfs-sdk-go:feat/pool_update"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Add the
Update
function to refresh the connection list in thepool
.Close #30
feat/pool_updateto Add pool Update@ -1868,0 +1869,4 @@
p.cancel = cancel
p.closedCh = make(chan struct{})
err := p.dial(ctx, nil)
Hm, does it work? You pass
nil
asexistClients
, but inp.dial
there is no nil validationIf
existClients
isnil
, thenok
will befalse
.d37d313460/pool/pool.go (L1893)
@ -1920,0 +1943,4 @@
func (p *Pool) Update(ctx context.Context, prm []NodeParam) error {
for _, pool := range p.innerPools {
pool.lock.Lock()
defer pool.lock.Unlock()
Hm, I though some linter would complain about
defer
in loop.Not that it is necessarily bad, but what prevents us from more fine-grained locking scheme?
Fixed
@ -1920,0 +1969,4 @@
}
for _, client := range existClients {
client.client.close()
What about error handling?
fixed
Now first error returns. I suppose to try close all connections that are not necessary and return join error.
fixed
@ -1873,1 +1892,4 @@
for j, addr := range params.addresses {
if client, ok := existClients[NodeParam{address: addr, priority: params.priority}]; ok {
clients[j] = client.client
_ = p.cache.Put(formCacheKey(addr, p.key, false), client.session)
Why do you put existed client to cache again?
Fixed. Add
cache.Purge()
And why do you purged existed client?
The
update
method might bring in a completely new list of nodes, causing the cache to contain some garbage for a while.Do we really need this? It seems this brings unnecessary complexity. I suppose we are OK with having some old invalid cache values (it seems no code that can use it and fail because of them) for a while
fixed
@ -1913,4 +1933,1 @@
}
p.maxObjectSize = ni.MaxObjectSize()
go p.startRebalance(ctx)
We have to stop previous rebalance routine before run new one. Otherwise, goroutines will start to leak
Added
cancel
function.@ -1920,0 +1940,4 @@
}
// Update is a method that lets you refresh the list of nodes without recreating the pool.
func (p *Pool) Update(ctx context.Context, prm []NodeParam) error {
Probably we have to mention that context must be long lived, otherwise rebalance will stop
fixed
@ -1920,0 +1941,4 @@
// Update is a method that lets you refresh the list of nodes without recreating the pool.
func (p *Pool) Update(ctx context.Context, prm []NodeParam) error {
for _, pool := range p.innerPools {
We have to protect
p.innerPools
field, because inp.dial
we update it@ -1920,0 +1961,4 @@
return err
}
p.rebalanceParams.nodesParams = nodesParams
Should we protect this
write
by mutex (also forread
on 1951 line)?You're right. Added a mutex.
d37d313460
to4bfb6d87a3
4bfb6d87a3
toa23a3ef51b
@ -1905,4 +1937,1 @@
ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel
p.closedCh = make(chan struct{})
p.innerPools = inner
We have to protect this
write
operation because inp.Update
we read this fieldActually we don't protect
p.innerPools
across the code at all currentlyThe same for
p.rebalanceParams.nodesParams
inupdateInnerNodesHealth
Fixed. I've added atomic.Pointer to Pool, so we no longer need a mutex.
@ -1866,2 +1869,4 @@
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel
Hm, I don't see any reason to create the context with cancellation and pass it out by
p.cancel = cancel
: here are no functions that are startup-ed within goroutineYou're right. Fixed. Now,
p.cancel
is used to stop therebalance
.a23a3ef51b
to858ef53b13
@ -1793,0 +1984,4 @@
func (p *Pool) Update(ctx context.Context, prm []NodeParam) error {
p.pool.Load().stopRebalance()
newPool := *p.pool.Load()
Here we make copy of
pool
sopool.cancelLock
is also copying by value that lead to potential deadlock (sometimes we can copy pool that contains locked mutex (that was locked instopRebalace
in case of usingUpdate
from multiple threads), but after copying by value unlock in origin mutex doesn't affect copy).Run this test multiple times to get a deadlock.
Fixed.
p.cancel
is now a pointer. This solves the issue:@ -1793,0 +1986,4 @@
newPool := *p.pool.Load()
if err := newPool.Update(ctx, prm); err != nil {
return err
In case of error old rebalance routine be stopped, but new one not started. It seems this is not what we want.
Fixed
@ -1874,0 +2087,4 @@
if client, ok := existClients[NodeParam{address: addr, priority: params.priority}]; ok {
clients[j] = client
delete(existClients, NodeParam{address: addr, priority: params.priority})
continue
In case of using only existing clients, flag
atLeastOneHealhty
be false and we get error though all clients are healhyFixed
@ -1921,0 +2163,4 @@
return err
}
err = nil
err
here alreadynil
Fixed
@ -1921,0 +2164,4 @@
}
err = nil
for _, client := range existClients {
Can we add comment that existClients here contains only not used anymore clients?
Fixed
858ef53b13
toda6b36bf36
da6b36bf36
to29b133505d
@ -1793,0 +1988,4 @@
p.pool.Load().stopRebalance()
defer p.pool.Load().startRebalance(ctx)
newPool := *p.pool.Load()
Implicit logic tied with dereferencing can be a source of bugs.
I suggest the following scheme:
Basically,
newPool.Update
the line below can have the following signature (func (*pool).Update(...) (*pool, bool, error)
). If it returns true, we replace the pointer.This allows us to remove those
foreach
mutexes, because the old instance is valid.This allows us to have 0 overhead for non-update threads when nothing has changed.
After this it will become more obvious whether we need atomics here (it is beautifully written, but a set of yet another wrappers rubs me the wrong way).
Do you mean copy
pool
structure orinnerPools
field?Actually, I don't like changing the pool itself. Probably we can make
innerPool
more complex entity (struct), that incapsulates rebalance and other things. So we can thеn create new one struct (as you suggested) and replace it, or somehow change it inside.I mean if there are no changes, there is no need to do anything, right?
Otherwise as you described, create new struct and close removed clients.
I made the logic involving dereferencing more explicit and added a check for the absence of changes.
By the way, can we remove using atomic for
pool
itself? We won't invokePool.Update
often, more often we will just invoke some other methods on pool (that usep.pool.Load()
). So we have something like read-heavy operations and it would be nice to read Pool pointer by multiple thread without any synchronization.Removing atomic for the pool itself won't greatly improve performance (4 picoseconds), since our operations are network-related. However, it increases the risk of errors.
benchmark results:
I fail to see how it increases a risk of errors, atomics are not a silver bullet they just have different tradeoffs.
However, it seems even with mutexes we need to take some
RLock
in all methods to protect access to a current pool instance, in terms of LOC written it is similar to a wrapper (this PR), so I do not mind having atomics here.@ -1921,0 +2140,4 @@
}
func (p *pool) Update(ctx context.Context, prm []NodeParam) error {
foreach(p.innerPools, func(pool *innerPool) { pool.lock.Lock() })
It doesn't address my comment, we still take all the mutexes.
29b133505d
to38102795fe
38102795fe
tobf46fa7e74
bf46fa7e74
to11554102c5
@ -1793,0 +1987,4 @@
// Use a long-lived context to avoid early rebalance stop.
// Can interrupt an operation being performed on a node that was removed.
func (p *Pool) Update(ctx context.Context, prm []NodeParam) error {
p.pool.Load().stopRebalance()
This doesn't prevent multiple concurrent
Update
executions. And what youLoad()
can change between the lines.What about performing
Load()
to a local variable?fixed
@ -1868,0 +2084,4 @@
return nil
}
func (p *pool) dial(ctx context.Context, existClients map[NodeParam]client) error {
More like
existingClients
?fixed
@ -1921,0 +2141,4 @@
}
for i, v := range a {
if !reflect.DeepEqual(v, b[i]) {
Please, let's not use reflect when it can be avoided,
nodesParam
as only 3 fields.Also, inside
nodesParam
we should probably allow different order ofaddresses
field ifweights
are the same, because they all have the same priority and connection is random anyway. Need some validation here @dkirillovAnyway, this could be done in a separate issue it seems your code reuses connections anyway.
One way to ensure this is to perform sorting based on
(weight, address)
Yes. Also, it's related to
priority
, because we usesort.Slice
(rather thansort.SliceStable
) here@ -1921,0 +2155,4 @@
existClients := make(map[NodeParam]client)
for i, pool := range newPool.rebalanceParams.nodesParams {
for j := range pool.weights {
existClients[NodeParam{address: pool.addresses[j], priority: pool.priority}] = newPool.innerPools[i].clients[j]
The keytype in this map should be an
address
: if the client has just changed weight or priority, the connection should not be closed.fixed
@ -1921,0 +2175,4 @@
return nil, false, err
}
//Removing outdated clients
No need for comment here, quite obvious from the code.
I don't think this is quite obvious. We need to look in
dial
method to find out thatexistClients
map is being changed.The comment says remove, the cycle below just closes clients.
To me the comment doesn't mention about existClients processing in
dial
fixed
@ -1921,0 +2178,4 @@
//Removing outdated clients
for _, client := range existClients {
if clientErr := client.close(); clientErr != nil {
err = clientErr
May we use multierr for multiple errors here?
Also, it would be nice to use some dedicated error here, to avoid confusion on caller and describe it in function comment:
Added
errors.Join
to handle multiple errors@ -2004,0 +2271,4 @@
p.cancelLock.Lock()
//stop rebalance
if p.cancel != nil {
Could you explain, why do we have
*context.CancelFunc
and notcontext.CancelFunc
?Previously,
*context.CancelFunc
protected against losingcontext.CancelFunc()
during concurrent execution ofUpdate
. Now,Update
includes a mutex.11554102c5
to55e4f7505d
@ -1789,10 +1789,230 @@ type resCreateSession struct {
// (e.g. sdkClient.IsErrContainerNotFound, sdkClient.IsErrObjectNotFound).
//
// See pool package overview to get some examples.
empty line
@ -1793,0 +1997,4 @@
pool := p.pool.Load()
pool.stopRebalance()
defer pool.startRebalance(ctx)
We store
newPool
, but we callstartRebalance
on the old instance, is it expected?Yes, we need to stop Rebalance in the old version of the pool.
@ -1921,0 +2193,4 @@
return nil, false, err
}
//After newPool.dial(ctx, existingClients), existingClients will contain only outdated clients.
Let's use space after
//
this is how all comments in this repo are written.Let's use space after
//
this is how all comments in this repo are written.fixed
@ -2004,0 +2301,4 @@
p.cancel = cancel
go p.rebalance(rebalanceCtx)
p.cancelLock.Unlock()
Can it be in defer?
fixed
@ -2023,0 +2328,4 @@
p.cancelLock.Lock()
defer p.cancelLock.Unlock()
if p.cancel != nil {
Can
p.cancel
ever be nil?fixed
55e4f7505d
to9529bb75a9
9529bb75a9
tod0537954c7
d0537954c7
tofcc8e869f4
fcc8e869f4
toe6d56f6be4
@ -1817,0 +1875,4 @@
func (p *Pool) Dial(ctx context.Context) error {
pool := p.pool.Load()
err := pool.Dial(ctx)
pool.startRebalance(ctx)
If
err != nil
, we are leaking goroutine here (err
on dial should not require any explicitClose
)Already fixed
e6d56f6be4
tofa85c792cd
@ -1817,0 +1876,4 @@
pool := p.pool.Load()
err := pool.Dial(ctx)
if pool.cancel != nil {
Why not just check
if err != nil
?Then
Dial
can ensure that it doesn't start any routines unless nil is returned (this was the previous behaviour I believe).It seems this comment is also not relevant anymore. We initiate
rebalance
ifDial
does not return an error.9b05e56e52/pool/pool.go (L1877)
@ -1817,0 +1876,4 @@
pool := p.pool.Load()
err := pool.Dial(ctx)
if pool.cancel != nil {
Do we really need check this (especially without thread-safe acquiring)? We can add comment that calling this method multiple time leads to undefined behavior and client code must not operate pool this way.
fixed
@ -1817,0 +1879,4 @@
if pool.cancel != nil {
pool.cancel()
}
pool.startRebalance(ctx)
Why do we always start rebalance?
It seems we should return error after
pool.Dial
(if error is occurred) immediatelyfixed
@ -1817,0 +2023,4 @@
defer p.lockUpdate.Unlock()
pool := p.pool.Load()
pool.stopRebalance()
Do we really need stop rebalancing even if new pool equals to old one?
fixed
@ -1945,0 +2192,4 @@
return true
}
// It is necessary to prevent parallel operations on the *pool instance.
Comment should have the following format 'Update ...'
fixed
@ -1945,0 +2193,4 @@
}
// It is necessary to prevent parallel operations on the *pool instance.
func (p *pool) Update(ctx context.Context, prm []NodeParam) (*pool, bool, error) {
Why is this method exported?
fixed
fa85c792cd
to3984aeafb2
3984aeafb2
tofdc7f95a33
fdc7f95a33
to6d87fb5716
@ -1817,0 +1876,4 @@
pool := p.pool.Load()
err := pool.Dial(ctx)
if err != nil {
Can we don't put empty line above?
Probably we can write
if err := pool.Dial(ctx); err != nil {
fixed
@ -1817,0 +1881,4 @@
}
if pool.cancel != nil {
pool.cancelLock.Lock()
Actually we should guard
if pool.canecl != nil {
tooYes, this check is necessary. On the first invocation of
Dial
,pool.cancel
will benil
. IfDial
is called again, we can properly stop the rebalance.I meant that if we guard
pool.cancel()
by mutex, we also must guardpool.cancel != nil
by mutexOh, you're correct. I've corrected it.
7b118ba668
tob88567767c
b88567767c
to043586501f
043586501f
to9b05e56e52
@fyrchik @dkirillov Can you resolve fixed comments of yours? I want to join review party but it is a bit hard to track what is still being discussed and what is not at this point.
@alexvanin It was harder than I thought because each time I click "resolve conversation" the page updates and is scrolled to top, what a wonderful UX!
@alexvanin @dkirillov This PR is waiting for your approval
We still don't resolve this comment #204 (comment)
@ -1817,0 +1817,4 @@
pool atomic.Pointer[pool]
// Ensures that pool.cancel will not be lost due to concurrent calls to Update().
lockUpdate sync.Mutex
If we have this mutex that protect
Pool
from concurrentUpdate
invocation do we really need the additional atomic that we use for the same purpose?I understand that this allows not to block pool operations during update procedure but it looks like too complicated having two different sync primitives that are intersected in pool protection
Fixed. Used
atomic.Swap
instead ofatomic.Store
.9b05e56e52
to977261bb7b
977261bb7b
to5cb18bf6c3
5cb18bf6c3
to9b15935a7b
9b15935a7b
toce038b04df
Please, rebase.
To implement this, it is necessary to refactor the
Pool
and then recreate thePR
.Pool
toConnectionManager
#300Pull request closed