Add pool Update #204
527
pool/pool.go
|
@ -96,7 +96,7 @@ type clientStatus interface {
|
||||||
address() string
|
address() string
|
||||||
// currentErrorRate returns current errors rate.
|
// currentErrorRate returns current errors rate.
|
||||||
// After specific threshold connection is considered as unhealthy.
|
// After specific threshold connection is considered as unhealthy.
|
||||||
// Pool.startRebalance routine can make this connection healthy again.
|
// Pool.rebalance routine can make this connection healthy again.
|
||||||
currentErrorRate() uint32
|
currentErrorRate() uint32
|
||||||
// overallErrorRate returns the number of all happened errors.
|
// overallErrorRate returns the number of all happened errors.
|
||||||
overallErrorRate() uint64
|
overallErrorRate() uint64
|
||||||
|
@ -293,7 +293,7 @@ func (x *wrapperPrm) setStreamTimeout(timeout time.Duration) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// setErrorThreshold sets threshold after reaching which connection is considered unhealthy
|
// setErrorThreshold sets threshold after reaching which connection is considered unhealthy
|
||||||
// until Pool.startRebalance routing updates its status.
|
// until Pool.rebalance routing updates its status.
|
||||||
func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
|
func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
|
||||||
x.errorThreshold = threshold
|
x.errorThreshold = threshold
|
||||||
}
|
}
|
||||||
|
@ -1814,9 +1814,232 @@ type resCreateSession struct {
|
||||||
//
|
//
|
||||||
// See pool package overview to get some examples.
|
// See pool package overview to get some examples.
|
||||||
type Pool struct {
|
type Pool struct {
|
||||||
|
pool atomic.Pointer[pool]
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPool creates connection pool using parameters.
|
||||||
|
|||||||
|
func NewPool(options InitParameters) (*Pool, error) {
|
||||||
|
pool, err := newPool(options)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var p Pool
|
||||||
|
p.pool.Store(pool)
|
||||||
|
return &p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Balance requests current balance of the FrostFS account.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
|
||||||
|
return p.pool.Load().Balance(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the Pool and releases all the associated resources.
|
||||||
|
func (p *Pool) Close() {
|
||||||
|
p.pool.Load().Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete.
|
||||||
|
//
|
||||||
|
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
||||||
|
//
|
||||||
|
// polling interval: 5s
|
||||||
|
// waiting timeout: 120s
|
||||||
|
//
|
||||||
|
// Success can be verified by reading by identifier (see GetContainer).
|
||||||
|
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
|
||||||
|
return p.pool.Load().DeleteContainer(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteObject marks an object for deletion from the container using FrostFS API protocol.
|
||||||
|
// As a marker, a special unit called a tombstone is placed in the container.
|
||||||
|
// It confirms the user's intent to delete the object, and is itself a container object.
|
||||||
|
// Explicit deletion is done asynchronously, and is generally not guaranteed.
|
||||||
|
func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
||||||
|
return p.pool.Load().DeleteObject(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dial establishes a connection to the servers from the FrostFS network.
|
||||||
|
// It also starts a routine that checks the health of the nodes and
|
||||||
|
// updates the weights of the nodes for balancing.
|
||||||
|
// Returns an error describing failure reason.
|
||||||
|
//
|
||||||
|
// If failed, the Pool SHOULD NOT be used.
|
||||||
|
//
|
||||||
|
// See also InitParameters.SetClientRebalanceInterval.
|
||||||
|
func (p *Pool) Dial(ctx context.Context) error {
|
||||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Hm, does it work? You pass Hm, does it work? You pass `nil` as `existClients`, but in `p.dial` there is no nil validation
achuprov
commented
If If `existClients` is `nil`, then `ok` will be `false`.
https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/commit/d37d31346095c2a9683cc0161546404e1bcb8a99/pool/pool.go#L1893
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
Hm, I don't see any reason to create the context with cancellation and pass it out by Hm, I don't see any reason to create the context with cancellation and pass it out by `p.cancel = cancel`: here are no functions that are startup-ed within goroutine
achuprov
commented
You're right. Fixed. Now, You're right. Fixed. Now, `p.cancel` is used to stop the `rebalance`.
|
|||||||
|
pool := p.pool.Load()
|
||||||
|
if err := pool.Dial(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
pool.cancelLock.Lock()
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
If If `err != nil`, we are leaking goroutine here (`err` on dial should not require any explicit `Close`)
achuprov
commented
Already fixed Already fixed
|
|||||||
|
if pool.cancel != nil {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Do we really need check this (especially without thread-safe acquiring)? We can add comment that calling this method multiple time leads to undefined behavior and client code must not operate pool this way. Do we really need check this (especially without thread-safe acquiring)? We can add comment that calling this method multiple time leads to undefined behavior and client code must not operate pool this way.
achuprov
commented
fixed fixed
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why not just check Why not just check `if err != nil`?
Then `Dial` can ensure that it doesn't start any routines unless nil is returned (this was the previous behaviour I believe).
achuprov
commented
It seems this comment is also not relevant anymore. We initiate It seems this comment is also not relevant anymore. We initiate `rebalance` if `Dial` does not return an error.
https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/commit/9b05e56e52623f8a892e73f804b2cba157d43e7b/pool/pool.go#L1877
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Can we don't put empty line above? Can we don't put empty line above?
Probably we can write `if err := pool.Dial(ctx); err != nil {`
achuprov
commented
fixed fixed
|
|||||||
|
pool.cancel()
|
||||||
|
}
|
||||||
|
pool.cancelLock.Unlock()
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why do we always start rebalance? It seems we should return error after Why do we always start rebalance?
It seems we should return error after `pool.Dial` (if error is occurred) immediately
achuprov
commented
fixed fixed
|
|||||||
|
|
||||||
|
pool.startRebalance(ctx)
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Actually we should guard Actually we should guard `if pool.canecl != nil {` too
achuprov
commented
Yes, this check is necessary. On the first invocation of Yes, this check is necessary. On the first invocation of `Dial`, `pool.cancel` will be `nil`. If `Dial` is called again, we can properly stop the rebalance.
dkirillov
commented
I meant that if we guard I meant that if we guard `pool.cancel()` by mutex, we also must guard `pool.cancel != nil` by mutex
achuprov
commented
Oh, you're correct. I've corrected it. Oh, you're correct. I've corrected it.
|
|||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindSiblingByParentID implements relations.Relations.
|
||||||
|
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
|
return p.pool.Load().FindSiblingByParentID(ctx, cnrID, objID, tokens)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindSiblingBySplitID implements relations.Relations.
|
||||||
|
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Why do you put existed client to cache again? Why do you put existed client to cache again?
achuprov
commented
Fixed. Add Fixed. Add `cache.Purge()`
dstepanov-yadro
commented
And why do you purged existed client? And why do you purged existed client?
achuprov
commented
The The `update` method might bring in a completely new list of nodes, causing the cache to contain some garbage for a while.
dkirillov
commented
Do we really need this? It seems this brings unnecessary complexity. I suppose we are OK with having some old invalid cache values (it seems no code that can use it and fail because of them) for a while Do we really need this? It seems this brings unnecessary complexity. I suppose we are OK with having some old invalid cache values (it seems no code that can use it and fail because of them) for a while
achuprov
commented
fixed fixed
|
|||||||
|
return p.pool.Load().FindSiblingBySplitID(ctx, cnrID, splitID, tokens)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetContainer reads FrostFS container by ID.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
|
||||||
|
return p.pool.Load().GetContainer(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetEACL reads eACL table of the FrostFS container.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
|
||||||
|
return p.pool.Load().GetEACL(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLeftSibling implements relations.Relations.
|
||||||
|
func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
|
||||||
|
return p.pool.Load().GetLeftSibling(ctx, cnrID, objID, tokens)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
|
||||||
|
return p.pool.Load().GetObject(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSplitInfo implements relations.Relations.
|
||||||
|
func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
|
||||||
|
return p.pool.Load().GetSplitInfo(ctx, cnrID, objID, tokens)
|
||||||
|
}
|
||||||
|
|
||||||
|
// HeadObject reads object header through a remote server using FrostFS API protocol.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
We have to stop previous rebalance routine before run new one. Otherwise, goroutines will start to leak We have to stop previous rebalance routine before run new one. Otherwise, goroutines will start to leak
achuprov
commented
Added Added `cancel` function.
|
|||||||
|
return p.pool.Load().HeadObject(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListChildrenByLinker implements relations.Relations.
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
We have to protect this We have to protect this `write` operation because in `p.Update` we read this field
dkirillov
commented
Actually we don't protect Actually we don't protect `p.innerPools` across the code at all currently
dkirillov
commented
The same for The same for `p.rebalanceParams.nodesParams` in `updateInnerNodesHealth`
achuprov
commented
Fixed. I've added atomic.Pointer to Pool, so we no longer need a mutex. Fixed. I've added atomic.Pointer to Pool, so we no longer need a mutex.
|
|||||||
|
func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
|
return p.pool.Load().ListChildrenByLinker(ctx, cnrID, objID, tokens)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListContainers requests identifiers of the account-owned containers.
|
||||||
|
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Probably we have to mention that context must be long lived, otherwise rebalance will stop Probably we have to mention that context must be long lived, otherwise rebalance will stop
achuprov
commented
fixed fixed
|
|||||||
|
return p.pool.Load().ListContainers(ctx, prm)
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
We have to protect We have to protect `p.innerPools` field, because in `p.dial` we update it
|
|||||||
|
}
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Hm, I though some linter would complain about Hm, I though some linter would complain about `defer` in loop.
Not that it is necessarily bad, but what prevents us from more fine-grained locking scheme?
achuprov
commented
Fixed Fixed
|
|||||||
|
// NetMapSnapshot requests information about the FrostFS network map.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
||||||
|
return p.pool.Load().NetMapSnapshot(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NetworkInfo requests information about the FrostFS network of which the remote server is a part.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
||||||
|
return p.pool.Load().NetworkInfo(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObjectRange initiates reading an object's payload range through a remote
|
||||||
|
// server using FrostFS API protocol.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Should we protect this Should we protect this `write` by mutex (also for `read` on 1951 line)?
achuprov
commented
You're right. Added a mutex. You're right. Added a mutex.
|
|||||||
|
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
|
||||||
|
return p.pool.Load().ObjectRange(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutContainer sends request to save container in FrostFS and waits for the operation to complete.
|
||||||
|
//
|
||||||
|
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
||||||
|
//
|
||||||
fyrchik marked this conversation as resolved
Outdated
dstepanov-yadro
commented
What about error handling? What about error handling?
achuprov
commented
fixed fixed
dstepanov-yadro
commented
Now first error returns. I suppose to try close all connections that are not necessary and return join error. Now first error returns. I suppose to try close all connections that are not necessary and return join error.
achuprov
commented
fixed fixed
|
|||||||
|
// polling interval: 5s
|
||||||
|
// waiting timeout: 120s
|
||||||
|
//
|
||||||
|
// Success can be verified by reading by identifier (see GetContainer).
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
|
||||||
|
return p.pool.Load().PutContainer(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutObject writes an object through a remote server using FrostFS API protocol.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||||
|
return p.pool.Load().PutObject(ctx, prm)
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Here we make copy of Run this test multiple times to get a deadlock.
Here we make copy of `pool` so `pool.cancelLock` is also copying by value that lead to potential deadlock (sometimes we can copy pool that contains locked mutex (that was locked in `stopRebalace` in case of using `Update` from multiple threads), but after copying by value unlock in origin mutex doesn't affect copy).
Run this test multiple times to get a deadlock.
```golang
func TestUpdateNodeDeadlock(t *testing.T) {
key1 := newPrivateKey(t)
mockClientBuilder := func(addr string) client {
return newMockClient(addr, *key1)
}
opts := InitParameters{
key: newPrivateKey(t),
nodeParams: []NodeParam{{1, "peer0", 1}},
}
opts.setClientBuilder(mockClientBuilder)
pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(context.Background())
require.NoError(t, err)
t.Cleanup(pool.Close)
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
err := pool.Update(context.Background(), []NodeParam{{1, "peer" + strconv.Itoa(i+1), 1}})
require.NoError(t, err)
}(i)
}
wg.Wait()
}
```
achuprov
commented
Fixed.
Fixed. `p.cancel` is now a pointer. This solves the issue:
```
Update_1 starts
Update_1 stops rebalance on pool
Update_1 cp pool pool_1
Update_2 starts
Update_2 stops rebalance on pool
Update_2 cp pool pool_2
Update_1 mv pool_1 pool
Update_2 mv pool_2 pool // p.cancel_1 leaks
```
|
|||||||
|
}
|
||||||
|
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
In case of error old rebalance routine be stopped, but new one not started. It seems this is not what we want. In case of error old rebalance routine be stopped, but new one not started. It seems this is not what we want.
achuprov
commented
Fixed Fixed
|
|||||||
|
// SearchObjects initiates object selection through a remote server using FrostFS API protocol.
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
This doesn't prevent multiple concurrent This doesn't prevent multiple concurrent `Update` executions. And what you `Load()` can change between the lines.
What about performing `Load()` to a local variable?
achuprov
commented
fixed fixed
|
|||||||
|
//
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Implicit logic tied with dereferencing can be a source of bugs. I suggest the following scheme:
Basically, This allows us to remove those After this it will become more obvious whether we need atomics here (it is beautifully written, but a set of yet another wrappers rubs me the wrong way). Implicit logic tied with dereferencing can be a source of bugs.
I suggest the following scheme:
1. Check whether the list of nodes is different, if not, do nothing.
2. If there are changes, we explicitly create and copy it to a new structure (also create new clients and remove removed ones).
Basically, `newPool.Update` the line below can have the following signature (`func (*pool).Update(...) (*pool, bool, error)`). If it returns true, we replace the pointer.
This allows us to remove those `foreach` mutexes, because the old instance is valid.
This allows us to have 0 overhead for non-update threads when nothing has changed.
After this it will become more obvious whether we need atomics here (it is beautifully written, but a set of yet another wrappers rubs me the wrong way).
dkirillov
commented
Do you mean copy Actually, I don't like changing the pool itself. Probably we can make > 2. If there are changes, we explicitly create and copy it to a new structure
Do you mean copy `pool` structure or `innerPools` field?
Actually, I don't like changing the pool itself. Probably we can make `innerPool` more complex entity (struct), that incapsulates rebalance and other things. So we can thеn create new one struct (as you suggested) and replace it, or somehow change it inside.
fyrchik
commented
I mean if there are no changes, there is no need to do anything, right? I mean if there are no changes, there is no need to do anything, right?
Otherwise as you described, create new struct and close removed clients.
achuprov
commented
I made the logic involving dereferencing more explicit and added a check for the absence of changes. I made the logic involving dereferencing more explicit and added a check for the absence of changes.
dkirillov
commented
By the way, can we remove using atomic for By the way, can we remove using atomic for `pool` itself? We won't invoke `Pool.Update` often, more often we will just invoke some other methods on pool (that use `p.pool.Load()`). So we have something like read-heavy operations and it would be nice to read Pool pointer by multiple thread without any synchronization.
achuprov
commented
Removing atomic for the pool itself won't greatly improve performance (4 picoseconds), since our operations are network-related. However, it increases the risk of errors. benchmark results:
Removing atomic for the pool itself won't greatly improve performance (4 picoseconds), since our operations are network-related. However, it increases the risk of errors.
<details><summary>benchmark results:</summary>
```
goos: linux
goarch: amd64
pkg: benchPtr
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
=== RUN BenchmarkAtomicPointerReadConcurrent
BenchmarkAtomicPointerReadConcurrent
BenchmarkAtomicPointerReadConcurrent-8 1000000000 0.3064 ns/op 0 B/op 0 allocs/op
=== RUN BenchmarkAtomicPointerRWRarely
BenchmarkAtomicPointerRWRarely
BenchmarkAtomicPointerRWRarely-8 1000000000 0.3414 ns/op 0 B/op 0 allocs/op
=== RUN BenchmarkNormalPointerReadConcurrent
BenchmarkNormalPointerReadConcurrent
BenchmarkNormalPointerReadConcurrent-8 1000000000 0.2827 ns/op 0 B/op 0 allocs/op
=== RUN BenchmarkNormalPointerRWRarely
BenchmarkNormalPointerRWRarely
BenchmarkNormalPointerRWRarely-8 1000000000 0.3319 ns/op 0 B/op 0 allocs/op
=== RUN BenchmarkRWMutexPointerRConcurrent
BenchmarkRWMutexPointerRConcurrent
BenchmarkRWMutexPointerRConcurrent-8 34726486 34.34 ns/op 0 B/op 0 allocs/op
=== RUN BenchmarkRWMutexPointerRWRarely
BenchmarkRWMutexPointerRWRarely
BenchmarkRWMutexPointerRWRarely-8 35075522 33.48 ns/op 0 B/op 0 allocs/op
PASS
ok benchPtr 3.846s
```
</details>
fyrchik
commented
I fail to see how it increases a risk of errors, atomics are not a silver bullet they just have different tradeoffs. I fail to see how it increases a risk of errors, atomics are not a silver bullet they just have different tradeoffs.
However, it seems even with mutexes we need to take some `RLock` in all methods to protect access to a current pool instance, in terms of LOC written it is similar to a wrapper (this PR), so I do not mind having atomics here.
|
|||||||
|
// The call only opens the transmission channel, explicit fetching of matched objects
|
||||||
|
// is done using the ResObjectSearch. Resulting reader must be finally closed.
|
||||||
|
//
|
||||||
|
// Main return value MUST NOT be processed on an erroneous return.
|
||||||
|
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
|
||||||
|
return p.pool.Load().SearchObjects(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetEACL sends request to update eACL table of the FrostFS container and waits for the operation to complete.
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
We store We store `newPool`, but we call `startRebalance` on the old instance, is it expected?
achuprov
commented
Yes, we need to stop Rebalance in the old version of the pool. Yes, we need to stop Rebalance in the old version of the pool.
|
|||||||
|
//
|
||||||
|
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
||||||
|
//
|
||||||
|
// polling interval: 5s
|
||||||
|
// waiting timeout: 120s
|
||||||
|
//
|
||||||
|
// Success can be verified by reading by identifier (see GetEACL).
|
||||||
|
func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
||||||
|
return p.pool.Load().SetEACL(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Statistic returns connection statistics.
|
||||||
|
func (p *Pool) Statistic() Statistic {
|
||||||
|
return p.pool.Load().Statistic()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update is a method that lets you refresh the list of nodes without recreating the pool.
|
||||||
|
// Use a long-lived context to avoid early rebalance stop.
|
||||||
|
// Can interrupt an operation being performed on a node that was removed.
|
||||||
|
// Ensures that:
|
||||||
|
// 1) Preserved connections would not be closed.
|
||||||
|
// 2) In the event of an error, the pool remains operational.
|
||||||
|
func (p *Pool) Update(ctx context.Context, prm []NodeParam) error {
|
||||||
|
pool := p.pool.Load()
|
||||||
|
|
||||||
|
newPool, equal, err := pool.update(ctx, prm)
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Do we really need stop rebalancing even if new pool equals to old one? Do we really need stop rebalancing even if new pool equals to old one?
achuprov
commented
fixed fixed
|
|||||||
|
if equal || err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
newPool.startRebalance(ctx)
|
||||||
|
oldPool := p.pool.Swap(newPool)
|
||||||
|
oldPool.stopRebalance()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type pool struct {
|
||||||
innerPools []*innerPool
|
innerPools []*innerPool
|
||||||
key *ecdsa.PrivateKey
|
key *ecdsa.PrivateKey
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
cancelLock *sync.Mutex
|
||||||
closedCh chan struct{}
|
closedCh chan struct{}
|
||||||
cache *sessionCache
|
cache *sessionCache
|
||||||
stokenDuration uint64
|
stokenDuration uint64
|
||||||
|
@ -1845,8 +2068,7 @@ const (
|
||||||
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
|
defaultBufferMaxSizeForPut = 3 * 1024 * 1024 // 3 MB
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewPool creates connection pool using parameters.
|
func newPool(options InitParameters) (*pool, error) {
|
||||||
func NewPool(options InitParameters) (*Pool, error) {
|
|
||||||
if options.key == nil {
|
if options.key == nil {
|
||||||
return nil, fmt.Errorf("missed required parameter 'Key'")
|
return nil, fmt.Errorf("missed required parameter 'Key'")
|
||||||
}
|
}
|
||||||
|
@ -1863,7 +2085,7 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
|
|
||||||
fillDefaultInitParams(&options, cache)
|
fillDefaultInitParams(&options, cache)
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
More like More like `existingClients`?
achuprov
commented
fixed fixed
|
|||||||
pool := &Pool{
|
pool := &pool{
|
||||||
key: options.key,
|
key: options.key,
|
||||||
cache: cache,
|
cache: cache,
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
In case of using only existing clients, flag In case of using only existing clients, flag `atLeastOneHealhty` be false and we get error though all clients are healhy
achuprov
commented
Fixed Fixed
|
|||||||
logger: options.logger,
|
logger: options.logger,
|
||||||
|
@ -1875,26 +2097,44 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
sessionExpirationDuration: options.sessionExpirationDuration,
|
sessionExpirationDuration: options.sessionExpirationDuration,
|
||||||
},
|
},
|
||||||
clientBuilder: options.clientBuilder,
|
clientBuilder: options.clientBuilder,
|
||||||
|
cancelLock: &sync.Mutex{},
|
||||||
}
|
}
|
||||||
|
|
||||||
return pool, nil
|
return pool, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dial establishes a connection to the servers from the FrostFS network.
|
func (p *pool) Dial(ctx context.Context) error {
|
||||||
// It also starts a routine that checks the health of the nodes and
|
err := p.dial(ctx, nil)
|
||||||
// updates the weights of the nodes for balancing.
|
if err != nil {
|
||||||
// Returns an error describing failure reason.
|
return err
|
||||||
//
|
}
|
||||||
// If failed, the Pool SHOULD NOT be used.
|
|
||||||
//
|
ni, err := p.NetworkInfo(ctx)
|
||||||
// See also InitParameters.SetClientRebalanceInterval.
|
if err != nil {
|
||||||
func (p *Pool) Dial(ctx context.Context) error {
|
return fmt.Errorf("get network info for max object size: %w", err)
|
||||||
|
}
|
||||||
|
p.maxObjectSize = ni.MaxObjectSize()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// dial initializes clients in accordance with p.rebalanceParams.nodesParams.
|
||||||
|
// existingClients is optional. After dial is executed, existingClients will contain only unused clients.
|
||||||
|
func (p *pool) dial(ctx context.Context, existingClients map[string]client) error {
|
||||||
inner := make([]*innerPool, len(p.rebalanceParams.nodesParams))
|
inner := make([]*innerPool, len(p.rebalanceParams.nodesParams))
|
||||||
var atLeastOneHealthy bool
|
var atLeastOneHealthy bool
|
||||||
|
|
||||||
for i, params := range p.rebalanceParams.nodesParams {
|
for i, params := range p.rebalanceParams.nodesParams {
|
||||||
clients := make([]client, len(params.weights))
|
clients := make([]client, len(params.weights))
|
||||||
for j, addr := range params.addresses {
|
for j, addr := range params.addresses {
|
||||||
|
if client, ok := existingClients[addr]; ok {
|
||||||
|
clients[j] = client
|
||||||
|
atLeastOneHealthy = true
|
||||||
|
|
||||||
|
delete(existingClients, addr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
clients[j] = p.clientBuilder(addr)
|
clients[j] = p.clientBuilder(addr)
|
||||||
if err := clients[j].dial(ctx); err != nil {
|
if err := clients[j].dial(ctx); err != nil {
|
||||||
p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
|
p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
|
||||||
|
@ -1926,22 +2166,70 @@ func (p *Pool) Dial(ctx context.Context) error {
|
||||||
return fmt.Errorf("at least one node must be healthy")
|
return fmt.Errorf("at least one node must be healthy")
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
`err` here already `nil`
achuprov
commented
Fixed Fixed
|
|||||||
}
|
}
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Can we add comment that existClients here contains only not used anymore clients? Can we add comment that existClients here contains only not used anymore clients?
achuprov
commented
Fixed Fixed
|
|||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
p.cancel = cancel
|
|
||||||
p.closedCh = make(chan struct{})
|
|
||||||
p.innerPools = inner
|
p.innerPools = inner
|
||||||
|
|
||||||
ni, err := p.NetworkInfo(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("get network info for max object size: %w", err)
|
|
||||||
}
|
|
||||||
p.maxObjectSize = ni.MaxObjectSize()
|
|
||||||
|
|
||||||
go p.startRebalance(ctx)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
func nodesParamEqual(a, b []*nodesParam) bool {
|
||||||
|
if len(a) != len(b) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
No need for comment here, quite obvious from the code. No need for comment here, quite obvious from the code.
dkirillov
commented
I don't think this is quite obvious. We need to look in I don't think this is quite obvious. We need to look in `dial` method to find out that `existClients` map is being changed.
fyrchik
commented
The comment says remove, the cycle below just closes clients. The comment says remove, the cycle below just closes clients.
To me the comment doesn't mention about existClients processing in `dial`
achuprov
commented
fixed fixed
|
|||||||
|
for i, v := range a {
|
||||||
|
if v.priority != b[i].priority || len(v.addresses) != len(b[i].addresses) {
|
||||||
|
return false
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
May we use multierr for multiple errors here?
May we use multierr for multiple errors here?
Also, it would be nice to use some dedicated error here, to avoid confusion on caller and describe it in function comment:
```
This function guarantees that
1. Preserved connections would not be closed.
2. On any error, old pool will remain functional, so the error may just be logged.
```
achuprov
commented
Added Added `errors.Join` to handle multiple errors
|
|||||||
|
}
|
||||||
|
|
||||||
|
for j, address := range v.addresses {
|
||||||
|
if address != b[i].addresses[j] {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update requires that no other parallel operations are executed concurrently on the pool instance.
|
||||||
|
func (p *pool) update(ctx context.Context, prm []NodeParam) (*pool, bool, error) {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Comment should have the following format 'Update ...' Comment should have the following format 'Update ...'
achuprov
commented
fixed fixed
|
|||||||
|
newPool := *p
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Let's use space after Let's use space after `//` this is how all comments in this repo are written.
fyrchik
commented
Let's use space after Let's use space after `//` this is how all comments in this repo are written.
achuprov
commented
fixed fixed
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why is this method exported? Why is this method exported?
achuprov
commented
fixed fixed
|
|||||||
|
|
||||||
|
existingClients := make(map[string]client)
|
||||||
|
for i, pool := range newPool.rebalanceParams.nodesParams {
|
||||||
|
for j := range pool.weights {
|
||||||
|
existingClients[pool.addresses[j]] = newPool.innerPools[i].clients[j]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nodesParams, err := adjustNodeParams(prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if nodesParamEqual(newPool.rebalanceParams.nodesParams, nodesParams) {
|
||||||
|
return nil, true, err
|
||||||
|
}
|
||||||
|
|
||||||
|
newPool.rebalanceParams.nodesParams = nodesParams
|
||||||
|
|
||||||
|
err = newPool.dial(ctx, existingClients)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// After newPool.dial(ctx, existingClients), existingClients will contain only outdated clients.
|
||||||
|
// Removing outdated clients
|
||||||
|
for _, client := range existingClients {
|
||||||
|
if clientErr := client.close(); clientErr != nil {
|
||||||
|
err = errors.Join(err, clientErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &newPool, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||||
if p.logger == nil {
|
if p.logger == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -1994,6 +2282,11 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type addressWeightPair struct {
|
||||||
|
address string
|
||||||
|
weight float64
|
||||||
|
}
|
||||||
|
|
||||||
func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
|
func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
|
||||||
if len(nodeParams) == 0 {
|
if len(nodeParams) == 0 {
|
||||||
return nil, errors.New("no FrostFS peers configured")
|
return nil, errors.New("no FrostFS peers configured")
|
||||||
|
@ -2020,11 +2313,39 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
|
||||||
return nodesParams[i].priority < nodesParams[j].priority
|
return nodesParams[i].priority < nodesParams[j].priority
|
||||||
})
|
})
|
||||||
|
|
||||||
|
for _, nodes := range nodesParams {
|
||||||
|
addressWeightPairs := make([]addressWeightPair, len(nodes.addresses))
|
||||||
|
for i := range nodes.addresses {
|
||||||
|
addressWeightPairs[i] = addressWeightPair{address: nodes.addresses[i], weight: nodes.weights[i]}
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(addressWeightPairs, func(i, j int) bool {
|
||||||
|
return addressWeightPairs[i].address < addressWeightPairs[j].address
|
||||||
|
})
|
||||||
|
|
||||||
|
for i, pair := range addressWeightPairs {
|
||||||
|
nodes.addresses[i] = pair.address
|
||||||
|
nodes.weights[i] = pair.weight
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Can Can `p.cancel` ever be nil?
achuprov
commented
fixed fixed
|
|||||||
return nodesParams, nil
|
return nodesParams, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// startRebalance runs loop to monitor connection healthy status.
|
func (p *pool) startRebalance(ctx context.Context) {
|
||||||
func (p *Pool) startRebalance(ctx context.Context) {
|
p.cancelLock.Lock()
|
||||||
|
defer p.cancelLock.Unlock()
|
||||||
|
|
||||||
|
rebalanceCtx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
|
p.closedCh = make(chan struct{})
|
||||||
|
p.cancel = cancel
|
||||||
|
|
||||||
|
go p.rebalance(rebalanceCtx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// rebalance runs loop to monitor connection healthy status.
|
||||||
|
func (p *pool) rebalance(ctx context.Context) {
|
||||||
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
|
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
|
||||||
buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
|
buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
|
||||||
for i, params := range p.rebalanceParams.nodesParams {
|
for i, params := range p.rebalanceParams.nodesParams {
|
||||||
|
@ -2043,7 +2364,15 @@ func (p *Pool) startRebalance(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
|
func (p *pool) stopRebalance() {
|
||||||
|
p.cancelLock.Lock()
|
||||||
|
defer p.cancelLock.Unlock()
|
||||||
|
|
||||||
|
p.cancel()
|
||||||
|
<-p.closedCh
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for i, inner := range p.innerPools {
|
for i, inner := range p.innerPools {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
@ -2057,12 +2386,11 @@ func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]float64) {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
|
func (p *pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights []float64) {
|
||||||
if i > len(p.innerPools)-1 {
|
if i > len(p.innerPools)-1 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pool := p.innerPools[i]
|
pool := p.innerPools[i]
|
||||||
options := p.rebalanceParams
|
|
||||||
|
|
||||||
healthyChanged := new(atomic.Bool)
|
healthyChanged := new(atomic.Bool)
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
|
@ -2072,12 +2400,12 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
|
||||||
go func(j int, cli client) {
|
go func(j int, cli client) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
|
tctx, c := context.WithTimeout(ctx, p.rebalanceParams.nodeRequestTimeout)
|
||||||
defer c()
|
defer c()
|
||||||
|
|
||||||
healthy, changed := cli.restartIfUnhealthy(tctx)
|
healthy, changed := cli.restartIfUnhealthy(tctx)
|
||||||
if healthy {
|
if healthy {
|
||||||
bufferWeights[j] = options.nodesParams[i].weights[j]
|
bufferWeights[j] = p.rebalanceParams.nodesParams[i].weights[j]
|
||||||
} else {
|
} else {
|
||||||
bufferWeights[j] = 0
|
bufferWeights[j] = 0
|
||||||
p.cache.DeleteByPrefix(cli.address())
|
p.cache.DeleteByPrefix(cli.address())
|
||||||
|
@ -2116,7 +2444,7 @@ func adjustWeights(weights []float64) []float64 {
|
||||||
return adjusted
|
return adjusted
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) connection() (client, error) {
|
func (p *pool) connection() (client, error) {
|
||||||
for _, inner := range p.innerPools {
|
for _, inner := range p.innerPools {
|
||||||
cp, err := inner.connection()
|
cp, err := inner.connection()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -2159,7 +2487,7 @@ func formCacheKey(address string, key *ecdsa.PrivateKey, clientCut bool) string
|
||||||
return address + stype + k.String()
|
return address + stype + k.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) checkSessionTokenErr(err error, address string) bool {
|
func (p *pool) checkSessionTokenErr(err error, address string) bool {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -2238,7 +2566,7 @@ type callContext struct {
|
||||||
sessionClientCut bool
|
sessionClientCut bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
|
func (p *pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -2271,7 +2599,7 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
|
||||||
|
|
||||||
// opens new session or uses cached one.
|
// opens new session or uses cached one.
|
||||||
// Must be called only on initialized callContext with set sessionTarget.
|
// Must be called only on initialized callContext with set sessionTarget.
|
||||||
func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
|
func (p *pool) openDefaultSession(ctx context.Context, cc *callContext) error {
|
||||||
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
|
cacheKey := formCacheKey(cc.endpoint, cc.key, cc.sessionClientCut)
|
||||||
|
|
||||||
tok, ok := p.cache.Get(cacheKey)
|
tok, ok := p.cache.Get(cacheKey)
|
||||||
|
@ -2305,7 +2633,7 @@ func (p *Pool) openDefaultSession(ctx context.Context, cc *callContext) error {
|
||||||
|
|
||||||
// opens default session (if sessionDefault is set), and calls f. If f returns
|
// opens default session (if sessionDefault is set), and calls f. If f returns
|
||||||
// session-related error then cached token is removed.
|
// session-related error then cached token is removed.
|
||||||
func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error {
|
func (p *pool) call(ctx context.Context, cc *callContext, f func() error) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if cc.sessionDefault {
|
if cc.sessionDefault {
|
||||||
|
@ -2322,16 +2650,13 @@ func (p *Pool) call(ctx context.Context, cc *callContext, f func() error) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// fillAppropriateKey use pool key if caller didn't specify its own.
|
// fillAppropriateKey use pool key if caller didn't specify its own.
|
||||||
func (p *Pool) fillAppropriateKey(prm *prmCommon) {
|
func (p *pool) fillAppropriateKey(prm *prmCommon) {
|
||||||
if prm.key == nil {
|
if prm.key == nil {
|
||||||
prm.key = p.key
|
prm.key = p.key
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutObject writes an object through a remote server using FrostFS API protocol.
|
func (p *pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error) {
|
|
||||||
cnr, _ := prm.hdr.ContainerID()
|
cnr, _ := prm.hdr.ContainerID()
|
||||||
|
|
||||||
var prmCtx prmContext
|
var prmCtx prmContext
|
||||||
|
@ -2371,11 +2696,7 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
|
||||||
return id, nil
|
return id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObject marks an object for deletion from the container using FrostFS API protocol.
|
func (p *pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
||||||
// As a marker, a special unit called a tombstone is placed in the container.
|
|
||||||
// It confirms the user's intent to delete the object, and is itself a container object.
|
|
||||||
// Explicit deletion is done asynchronously, and is generally not guaranteed.
|
|
||||||
func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
|
||||||
var prmCtx prmContext
|
var prmCtx prmContext
|
||||||
prmCtx.useDefaultSession()
|
prmCtx.useDefaultSession()
|
||||||
prmCtx.useVerb(session.VerbObjectDelete)
|
prmCtx.useVerb(session.VerbObjectDelete)
|
||||||
|
@ -2441,10 +2762,7 @@ type ResGetObject struct {
|
||||||
Payload io.ReadCloser
|
Payload io.ReadCloser
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObject reads object header and initiates reading an object payload through a remote server using FrostFS API protocol.
|
func (p *pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, error) {
|
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
p.fillAppropriateKey(&prm.prmCommon)
|
||||||
|
|
||||||
var cc callContext
|
var cc callContext
|
||||||
|
@ -2466,10 +2784,7 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, e
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// HeadObject reads object header through a remote server using FrostFS API protocol.
|
func (p *pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object, error) {
|
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
p.fillAppropriateKey(&prm.prmCommon)
|
||||||
|
|
||||||
var cc callContext
|
var cc callContext
|
||||||
|
@ -2516,11 +2831,7 @@ func (x *ResObjectRange) Close() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectRange initiates reading an object's payload range through a remote
|
func (p *pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
|
||||||
// server using FrostFS API protocol.
|
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRange, error) {
|
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
p.fillAppropriateKey(&prm.prmCommon)
|
||||||
|
|
||||||
var cc callContext
|
var cc callContext
|
||||||
|
@ -2578,13 +2889,7 @@ func (x *ResObjectSearch) Close() {
|
||||||
_, _ = x.r.Close()
|
_, _ = x.r.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SearchObjects initiates object selection through a remote server using FrostFS API protocol.
|
func (p *pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
|
||||||
//
|
|
||||||
// The call only opens the transmission channel, explicit fetching of matched objects
|
|
||||||
// is done using the ResObjectSearch. Resulting reader must be finally closed.
|
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
|
|
||||||
p.fillAppropriateKey(&prm.prmCommon)
|
p.fillAppropriateKey(&prm.prmCommon)
|
||||||
|
|
||||||
var cc callContext
|
var cc callContext
|
||||||
|
@ -2606,17 +2911,7 @@ func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjec
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutContainer sends request to save container in FrostFS and waits for the operation to complete.
|
func (p *pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
|
||||||
//
|
|
||||||
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
|
||||||
//
|
|
||||||
// polling interval: 5s
|
|
||||||
// waiting timeout: 120s
|
|
||||||
//
|
|
||||||
// Success can be verified by reading by identifier (see GetContainer).
|
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.ID{}, err
|
return cid.ID{}, err
|
||||||
|
@ -2630,10 +2925,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, e
|
||||||
return cnrID, nil
|
return cnrID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetContainer reads FrostFS container by ID.
|
func (p *pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container.Container, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return container.Container{}, err
|
return container.Container{}, err
|
||||||
|
@ -2647,8 +2939,7 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container
|
||||||
return cnrs, nil
|
return cnrs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListContainers requests identifiers of the account-owned containers.
|
func (p *pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
|
||||||
func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -2662,15 +2953,7 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.
|
||||||
return cnrIDs, nil
|
return cnrIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete.
|
func (p *pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
|
||||||
//
|
|
||||||
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
|
||||||
//
|
|
||||||
// polling interval: 5s
|
|
||||||
// waiting timeout: 120s
|
|
||||||
//
|
|
||||||
// Success can be verified by reading by identifier (see GetContainer).
|
|
||||||
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -2684,10 +2967,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetEACL reads eACL table of the FrostFS container.
|
func (p *pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return eacl.Table{}, err
|
return eacl.Table{}, err
|
||||||
|
@ -2701,15 +2981,7 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, e
|
||||||
return eaclResult, nil
|
return eaclResult, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetEACL sends request to update eACL table of the FrostFS container and waits for the operation to complete.
|
func (p *pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
||||||
//
|
|
||||||
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
|
||||||
//
|
|
||||||
// polling interval: 5s
|
|
||||||
// waiting timeout: 120s
|
|
||||||
//
|
|
||||||
// Success can be verified by reading by identifier (see GetEACL).
|
|
||||||
func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -2723,10 +2995,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Balance requests current balance of the FrostFS account.
|
func (p *pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return accounting.Decimal{}, err
|
return accounting.Decimal{}, err
|
||||||
|
@ -2740,8 +3009,7 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decim
|
||||||
return balance, nil
|
return balance, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Statistic returns connection statistics.
|
func (p *pool) Statistic() Statistic {
|
||||||
func (p Pool) Statistic() Statistic {
|
|
||||||
stat := Statistic{}
|
stat := Statistic{}
|
||||||
for _, inner := range p.innerPools {
|
for _, inner := range p.innerPools {
|
||||||
nodes := make([]string, 0, len(inner.clients))
|
nodes := make([]string, 0, len(inner.clients))
|
||||||
|
@ -2818,10 +3086,7 @@ func waitFor(ctx context.Context, params *WaitParams, condition func(context.Con
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NetworkInfo requests information about the FrostFS network of which the remote server is a part.
|
func (p *pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return netmap.NetworkInfo{}, err
|
return netmap.NetworkInfo{}, err
|
||||||
|
@ -2835,10 +3100,7 @@ func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
||||||
return netInfo, nil
|
return netInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NetMapSnapshot requests information about the FrostFS network map.
|
func (p *pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
||||||
//
|
|
||||||
// Main return value MUST NOT be processed on an erroneous return.
|
|
||||||
func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return netmap.NetMap{}, err
|
return netmap.NetMap{}, err
|
||||||
|
@ -2852,12 +3114,8 @@ func (p *Pool) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
||||||
return netMap, nil
|
return netMap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the Pool and releases all the associated resources.
|
func (p *pool) Close() {
|
||||||
func (p *Pool) Close() {
|
p.stopRebalance()
|
||||||
p.cancel()
|
|
||||||
<-p.closedCh
|
|
||||||
|
|
||||||
// close all clients
|
|
||||||
for _, pools := range p.innerPools {
|
for _, pools := range p.innerPools {
|
||||||
for _, cli := range pools.clients {
|
for _, cli := range pools.clients {
|
||||||
if cli.isDialed() {
|
if cli.isDialed() {
|
||||||
|
@ -2875,7 +3133,7 @@ func (p *Pool) Close() {
|
||||||
//
|
//
|
||||||
// Returns any error that does not allow reading configuration
|
// Returns any error that does not allow reading configuration
|
||||||
// from the network.
|
// from the network.
|
||||||
func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *Pool) error {
|
func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *pool) error {
|
||||||
ni, err := p.NetworkInfo(ctx)
|
ni, err := p.NetworkInfo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("network info: %w", err)
|
return fmt.Errorf("network info: %w", err)
|
||||||
|
@ -2886,8 +3144,7 @@ func SyncContainerWithNetwork(ctx context.Context, cnr *container.Container, p *
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetSplitInfo implements relations.Relations.
|
func (p *pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
|
||||||
func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (*object.SplitInfo, error) {
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(cnrID)
|
addr.SetContainer(cnrID)
|
||||||
addr.SetObject(objID)
|
addr.SetObject(objID)
|
||||||
|
@ -2916,8 +3173,7 @@ func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tok
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListChildrenByLinker implements relations.Relations.
|
func (p *pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(cnrID)
|
addr.SetContainer(cnrID)
|
||||||
addr.SetObject(objID)
|
addr.SetObject(objID)
|
||||||
|
@ -2939,8 +3195,7 @@ func (p *Pool) ListChildrenByLinker(ctx context.Context, cnrID cid.ID, objID oid
|
||||||
return res.Children(), nil
|
return res.Children(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLeftSibling implements relations.Relations.
|
func (p *pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
|
||||||
func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) (oid.ID, error) {
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(cnrID)
|
addr.SetContainer(cnrID)
|
||||||
addr.SetObject(objID)
|
addr.SetObject(objID)
|
||||||
|
@ -2966,8 +3221,7 @@ func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, t
|
||||||
return idMember, nil
|
return idMember, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindSiblingBySplitID implements relations.Relations.
|
func (p *pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
|
|
||||||
var query object.SearchFilters
|
var query object.SearchFilters
|
||||||
query.AddSplitIDFilter(object.MatchStringEqual, splitID)
|
query.AddSplitIDFilter(object.MatchStringEqual, splitID)
|
||||||
|
|
||||||
|
@ -2998,8 +3252,7 @@ func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *
|
||||||
return members, nil
|
return members, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindSiblingByParentID implements relations.Relations.
|
func (p *pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
|
||||||
var query object.SearchFilters
|
var query object.SearchFilters
|
||||||
query.AddParentIDFilter(object.MatchStringEqual, objID)
|
query.AddParentIDFilter(object.MatchStringEqual, objID)
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,9 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"errors"
|
"errors"
|
||||||
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -101,11 +103,11 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
|
||||||
|
|
||||||
expectedAuthKey := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
expectedAuthKey := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
||||||
condition := func() bool {
|
condition := func() bool {
|
||||||
cp, err := clientPool.connection()
|
cp, err := clientPool.pool.Load().connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
st, _ := clientPool.cache.Get(formCacheKey(cp.address(), clientPool.key, false))
|
st, _ := clientPool.pool.Load().cache.Get(formCacheKey(cp.address(), clientPool.pool.Load().key, false))
|
||||||
return st.AssertAuthKey(&expectedAuthKey)
|
return st.AssertAuthKey(&expectedAuthKey)
|
||||||
}
|
}
|
||||||
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
|
require.Never(t, condition, 900*time.Millisecond, 100*time.Millisecond)
|
||||||
|
@ -138,9 +140,9 @@ func TestOneNode(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
t.Cleanup(pool.Close)
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
|
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
|
||||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
}
|
}
|
||||||
|
@ -168,12 +170,156 @@ func TestTwoNodes(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
t.Cleanup(pool.Close)
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTwoNodesUpdate(t *testing.T) {
|
||||||
|
var clientKeys []*ecdsa.PrivateKey
|
||||||
|
mockClientBuilder := func(addr string) client {
|
||||||
|
key := newPrivateKey(t)
|
||||||
|
clientKeys = append(clientKeys, key)
|
||||||
|
return newMockClient(addr, *key)
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := InitParameters{
|
||||||
|
key: newPrivateKey(t),
|
||||||
|
nodeParams: []NodeParam{
|
||||||
|
{2, "peer0", 1},
|
||||||
|
{2, "peer1", 1},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
opts.setClientBuilder(mockClientBuilder)
|
||||||
|
|
||||||
|
pool, err := NewPool(opts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = pool.Dial(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
|
cp, err := pool.pool.Load().connection()
|
||||||
|
require.NoError(t, err)
|
||||||
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
|
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
||||||
|
|
||||||
|
pool.Update(context.Background(), []NodeParam{
|
||||||
|
{1, "peer-1", 1},
|
||||||
|
{2, "peer0", 1},
|
||||||
|
{2, "peer1", 1},
|
||||||
|
})
|
||||||
|
|
||||||
|
st1, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
|
require.Equal(t, &st1, &st)
|
||||||
|
|
||||||
|
cp2, err := pool.pool.Load().connection()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, cp2.address(), "peer-1")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateNodeMultithread(t *testing.T) {
|
||||||
|
key1 := newPrivateKey(t)
|
||||||
|
mockClientBuilder := func(addr string) client {
|
||||||
|
return newMockClient(addr, *key1)
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := InitParameters{
|
||||||
|
key: newPrivateKey(t),
|
||||||
|
nodeParams: []NodeParam{{1, "peer0", 1}},
|
||||||
|
}
|
||||||
|
opts.setClientBuilder(mockClientBuilder)
|
||||||
|
|
||||||
|
pool, err := NewPool(opts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = pool.Dial(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
err := pool.Update(context.Background(), []NodeParam{{1, "peer" + strconv.Itoa(i+1), 1}})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateNodeEqualConfig(t *testing.T) {
|
||||||
|
key1 := newPrivateKey(t)
|
||||||
|
mockClientBuilder := func(addr string) client {
|
||||||
|
return newMockClient(addr, *key1)
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := InitParameters{
|
||||||
|
key: newPrivateKey(t),
|
||||||
|
nodeParams: []NodeParam{{1, "peer0", 1}},
|
||||||
|
}
|
||||||
|
opts.setClientBuilder(mockClientBuilder)
|
||||||
|
|
||||||
|
pool, err := NewPool(opts)
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = pool.Dial(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
|
cp, err := pool.pool.Load().connection()
|
||||||
|
require.NoError(t, err)
|
||||||
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
|
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
|
||||||
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
|
|
||||||
|
_, flag, err := pool.pool.Load().update(context.Background(), []NodeParam{{1, "peer0", 1}})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, flag)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateNode(t *testing.T) {
|
||||||
|
key1 := newPrivateKey(t)
|
||||||
|
mockClientBuilder := func(addr string) client {
|
||||||
|
return newMockClient(addr, *key1)
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := InitParameters{
|
||||||
|
key: newPrivateKey(t),
|
||||||
|
nodeParams: []NodeParam{{1, "peer0", 1}},
|
||||||
|
}
|
||||||
|
opts.setClientBuilder(mockClientBuilder)
|
||||||
|
|
||||||
|
pool, err := NewPool(opts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = pool.Dial(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
|
cp, err := pool.pool.Load().connection()
|
||||||
|
require.NoError(t, err)
|
||||||
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
|
expectedAuthKey := frostfsecdsa.PublicKey(key1.PublicKey)
|
||||||
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
|
|
||||||
|
pool.Update(context.Background(), []NodeParam{{1, "peer0", 1}})
|
||||||
|
cp1, err := pool.pool.Load().connection()
|
||||||
|
st1, _ := pool.pool.Load().cache.Get(formCacheKey(cp1.address(), pool.pool.Load().key, false))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, &st, &st1)
|
||||||
|
require.Equal(t, &cp, &cp1)
|
||||||
|
|
||||||
|
pool.Update(context.Background(), []NodeParam{{1, "peer1", 1}})
|
||||||
|
cp2, err := pool.pool.Load().connection()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
st2, _ := pool.pool.Load().cache.Get(formCacheKey(cp2.address(), pool.pool.Load().key, false))
|
||||||
|
require.NotEqual(t, cp.address(), cp2.address())
|
||||||
|
require.NotEqual(t, &st, &st2)
|
||||||
|
}
|
||||||
|
|
||||||
func assertAuthKeyForAny(st session.Object, clientKeys []*ecdsa.PrivateKey) bool {
|
func assertAuthKeyForAny(st session.Object, clientKeys []*ecdsa.PrivateKey) bool {
|
||||||
for _, key := range clientKeys {
|
for _, key := range clientKeys {
|
||||||
expectedAuthKey := frostfsecdsa.PublicKey(key.PublicKey)
|
expectedAuthKey := frostfsecdsa.PublicKey(key.PublicKey)
|
||||||
|
@ -223,9 +369,9 @@ func TestOneOfTwoFailed(t *testing.T) {
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
require.True(t, assertAuthKeyForAny(st, clientKeys))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -259,7 +405,7 @@ func TestTwoFailed(t *testing.T) {
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
_, err = pool.connection()
|
_, err = pool.pool.Load().connection()
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Contains(t, err.Error(), "no healthy")
|
require.Contains(t, err.Error(), "no healthy")
|
||||||
}
|
}
|
||||||
|
@ -293,9 +439,9 @@ func TestSessionCache(t *testing.T) {
|
||||||
t.Cleanup(pool.Close)
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
// cache must contain session token
|
// cache must contain session token
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
|
|
||||||
var prm PrmObjectGet
|
var prm PrmObjectGet
|
||||||
|
@ -306,9 +452,9 @@ func TestSessionCache(t *testing.T) {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|
||||||
// cache must not contain session token
|
// cache must not contain session token
|
||||||
cp, err = pool.connection()
|
cp, err = pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, ok := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
_, ok := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
require.False(t, ok)
|
require.False(t, ok)
|
||||||
|
|
||||||
var prm2 PrmObjectPut
|
var prm2 PrmObjectPut
|
||||||
|
@ -318,9 +464,9 @@ func TestSessionCache(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// cache must contain session token
|
// cache must contain session token
|
||||||
cp, err = pool.connection()
|
cp, err = pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ = pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -362,17 +508,17 @@ func TestPriority(t *testing.T) {
|
||||||
|
|
||||||
expectedAuthKey1 := frostfsecdsa.PublicKey(clientKeys[0].PublicKey)
|
expectedAuthKey1 := frostfsecdsa.PublicKey(clientKeys[0].PublicKey)
|
||||||
firstNode := func() bool {
|
firstNode := func() bool {
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
return st.AssertAuthKey(&expectedAuthKey1)
|
return st.AssertAuthKey(&expectedAuthKey1)
|
||||||
}
|
}
|
||||||
|
|
||||||
expectedAuthKey2 := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
expectedAuthKey2 := frostfsecdsa.PublicKey(clientKeys[1].PublicKey)
|
||||||
secondNode := func() bool {
|
secondNode := func() bool {
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
return st.AssertAuthKey(&expectedAuthKey2)
|
return st.AssertAuthKey(&expectedAuthKey2)
|
||||||
}
|
}
|
||||||
require.Never(t, secondNode, time.Second, 200*time.Millisecond)
|
require.Never(t, secondNode, time.Second, 200*time.Millisecond)
|
||||||
|
@ -407,9 +553,9 @@ func TestSessionCacheWithKey(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// cache must contain session token
|
// cache must contain session token
|
||||||
cp, err := pool.connection()
|
cp, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ := pool.cache.Get(formCacheKey(cp.address(), pool.key, false))
|
st, _ := pool.pool.Load().cache.Get(formCacheKey(cp.address(), pool.pool.Load().key, false))
|
||||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
|
|
||||||
var prm PrmObjectDelete
|
var prm PrmObjectDelete
|
||||||
|
@ -419,7 +565,7 @@ func TestSessionCacheWithKey(t *testing.T) {
|
||||||
|
|
||||||
err = pool.DeleteObject(ctx, prm)
|
err = pool.DeleteObject(ctx, prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
st, _ = pool.cache.Get(formCacheKey(cp.address(), anonKey, false))
|
st, _ = pool.pool.Load().cache.Get(formCacheKey(cp.address(), anonKey, false))
|
||||||
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
require.True(t, st.AssertAuthKey(&expectedAuthKey))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -460,10 +606,10 @@ func TestSessionTokenOwner(t *testing.T) {
|
||||||
cc.sessionTarget = func(tok session.Object) {
|
cc.sessionTarget = func(tok session.Object) {
|
||||||
tkn = tok
|
tkn = tok
|
||||||
}
|
}
|
||||||
err = p.initCallContext(&cc, prm, prmCtx)
|
err = p.pool.Load().initCallContext(&cc, prm, prmCtx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = p.openDefaultSession(ctx, &cc)
|
err = p.pool.Load().openDefaultSession(ctx, &cc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, tkn.VerifySignature())
|
require.True(t, tkn.VerifySignature())
|
||||||
require.True(t, tkn.Issuer().Equals(anonOwner))
|
require.True(t, tkn.Issuer().Equals(anonOwner))
|
||||||
|
@ -668,6 +814,25 @@ func TestHandleError(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAdjustNodeParams(t *testing.T) {
|
||||||
|
nodes1 := []NodeParam{
|
||||||
|
{1, "peer0", 1},
|
||||||
|
{1, "peer1", 2},
|
||||||
|
{1, "peer2", 3},
|
||||||
|
{2, "peer21", 2},
|
||||||
|
}
|
||||||
|
nodes2 := []NodeParam{
|
||||||
|
{1, "peer0", 1},
|
||||||
|
{1, "peer2", 3},
|
||||||
|
{1, "peer1", 2},
|
||||||
|
{2, "peer21", 2},
|
||||||
|
}
|
||||||
|
|
||||||
|
nodesParam1, _ := adjustNodeParams(nodes1)
|
||||||
|
nodesParam2, _ := adjustNodeParams(nodes2)
|
||||||
|
require.True(t, reflect.DeepEqual(nodesParam1, nodesParam2))
|
||||||
|
}
|
||||||
|
|
||||||
func TestSwitchAfterErrorThreshold(t *testing.T) {
|
func TestSwitchAfterErrorThreshold(t *testing.T) {
|
||||||
nodes := []NodeParam{
|
nodes := []NodeParam{
|
||||||
{1, "peer0", 1},
|
{1, "peer0", 1},
|
||||||
|
@ -708,14 +873,14 @@ func TestSwitchAfterErrorThreshold(t *testing.T) {
|
||||||
t.Cleanup(pool.Close)
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
for i := 0; i < errorThreshold; i++ {
|
for i := 0; i < errorThreshold; i++ {
|
||||||
conn, err := pool.connection()
|
conn, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, nodes[0].address, conn.address())
|
require.Equal(t, nodes[0].address, conn.address())
|
||||||
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := pool.connection()
|
conn, err := pool.pool.Load().connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, nodes[1].address, conn.address())
|
require.Equal(t, nodes[1].address, conn.address())
|
||||||
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
_, err = conn.objectGet(ctx, PrmObjectGet{})
|
||||||
|
|
|
@ -59,7 +59,7 @@ func TestHealthyReweight(t *testing.T) {
|
||||||
sampler: newSampler(weights, rand.NewSource(0)),
|
sampler: newSampler(weights, rand.NewSource(0)),
|
||||||
clients: []client{client1, client2},
|
clients: []client{client1, client2},
|
||||||
}
|
}
|
||||||
p := &Pool{
|
p := &pool{
|
||||||
innerPools: []*innerPool{inner},
|
innerPools: []*innerPool{inner},
|
||||||
cache: cache,
|
cache: cache,
|
||||||
key: newPrivateKey(t),
|
key: newPrivateKey(t),
|
||||||
|
@ -108,7 +108,7 @@ func TestHealthyNoReweight(t *testing.T) {
|
||||||
newMockClient(names[1], *newPrivateKey(t)),
|
newMockClient(names[1], *newPrivateKey(t)),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
p := &Pool{
|
p := &pool{
|
||||||
innerPools: []*innerPool{inner},
|
innerPools: []*innerPool{inner},
|
||||||
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
|
rebalanceParams: rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}},
|
||||||
}
|
}
|
||||||
|
|
If we have this mutex that protect
Pool
from concurrentUpdate
invocation do we really need the additional atomic that we use for the same purpose?I understand that this allows not to block pool operations during update procedure but it looks like too complicated having two different sync primitives that are intersected in pool protection
Fixed. Used
atomic.Swap
instead ofatomic.Store
.