forked from TrueCloudLab/frostfs-sdk-go
[#48] pool: add Close
method
Fix occasional panic in tests: ``` > for i in (seq 1 100); go test -race -count=1 ./pool/... ; end ... {"level":"warn","ts":1635251466.567485,"caller":"pool/pool.go:122","msg":"failed to create neofs session token for client","address":"peer0","error":"error session"} panic: Fail in goroutine after TestTwoNodes has completed goroutine 6 [running]: testing.(*common).Fail(0xc0002e1380) /usr/lib/go/src/testing/testing.go:710 +0x1b4 testing.(*common).FailNow(0xc0002e1380) /usr/lib/go/src/testing/testing.go:732 +0x2f testing.(*common).Fatalf(0xc000074070, {0xd9d816, 0x2e}, {0xc000094050, 0x5, 0x5}) /usr/lib/go/src/testing/testing.go:830 +0x85 github.com/golang/mock/gomock.(*Controller).Call.func1(0xc0002f4120, {0xd68380, 0xc0002dac30}, {0xd8847f, 0xc}, {0xc000074020, 0x1, 0x1}) /home/dzeta/go/pkg/mod/github.com/golang/mock@v1.6.0/gomock/controller.go:231 +0x44d github.com/golang/mock/gomock.(*Controller).Call(0xc0002f4120, {0xd68380, 0xc0002dac30}, {0xd8847f, 0xc}, {0xc000074020, 0x1, 0x1}) /home/dzeta/go/pkg/mod/github.com/golang/mock@v1.6.0/gomock/controller.go:247 +0xce github.com/nspcc-dev/neofs-sdk-go/pool.(*MockClient).EndpointInfo(0xc0002dac30, {0xe85528, 0xc00008a120}, {0x0, 0x0, 0x0}) /home/dzeta/repo/neofs-sdk-go/pool/mock_test.go:186 +0x298 github.com/nspcc-dev/neofs-sdk-go/pool.updateNodesHealth.func1(0x1, {0xe950d8, 0xc0002dac30}) /home/dzeta/repo/neofs-sdk-go/pool/pool.go:183 +0x188 created by github.com/nspcc-dev/neofs-sdk-go/pool.updateNodesHealth /home/dzeta/repo/neofs-sdk-go/pool/pool.go:174 +0x233 ``` Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
d541aab1ff
commit
cd5e08d725
2 changed files with 21 additions and 17 deletions
13
pool/pool.go
13
pool/pool.go
|
@ -88,6 +88,7 @@ type Pool interface {
|
||||||
Connection() (client.Client, *session.Token, error)
|
Connection() (client.Client, *session.Token, error)
|
||||||
OwnerID() *owner.ID
|
OwnerID() *owner.ID
|
||||||
WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error
|
WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error
|
||||||
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientPack struct {
|
type clientPack struct {
|
||||||
|
@ -103,6 +104,8 @@ type pool struct {
|
||||||
sampler *Sampler
|
sampler *Sampler
|
||||||
owner *owner.ID
|
owner *owner.ID
|
||||||
clientPacks []*clientPack
|
clientPacks []*clientPack
|
||||||
|
cancel context.CancelFunc
|
||||||
|
closedCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
||||||
|
@ -139,7 +142,8 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
||||||
}
|
}
|
||||||
ownerID := owner.NewIDFromNeo3Wallet(wallet)
|
ownerID := owner.NewIDFromNeo3Wallet(wallet)
|
||||||
|
|
||||||
pool := &pool{sampler: sampler, owner: ownerID, clientPacks: clientPacks}
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
pool := &pool{sampler: sampler, owner: ownerID, clientPacks: clientPacks, cancel: cancel, closedCh: make(chan struct{})}
|
||||||
go startRebalance(ctx, pool, options)
|
go startRebalance(ctx, pool, options)
|
||||||
return pool, nil
|
return pool, nil
|
||||||
}
|
}
|
||||||
|
@ -151,6 +155,7 @@ func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
close(p.closedCh)
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
updateNodesHealth(ctx, p, options, buffer)
|
updateNodesHealth(ctx, p, options, buffer)
|
||||||
|
@ -406,3 +411,9 @@ func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollPa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cloce closes the pool and releases all the associated resources.
|
||||||
|
func (p *pool) Close() {
|
||||||
|
p.cancel()
|
||||||
|
<-p.closedCh
|
||||||
|
}
|
||||||
|
|
|
@ -117,6 +117,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
|
||||||
|
|
||||||
clientPool, err := pb.Build(context.TODO(), opts)
|
clientPool, err := pb.Build(context.TODO(), opts)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(clientPool.Close)
|
||||||
|
|
||||||
condition := func() bool {
|
condition := func() bool {
|
||||||
_, st, err := clientPool.Connection()
|
_, st, err := clientPool.Connection()
|
||||||
|
@ -163,11 +164,9 @@ func TestOneNode(t *testing.T) {
|
||||||
clientBuilder: clientBuilder,
|
clientBuilder: clientBuilder,
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
pool, err := pb.Build(context.Background(), opts)
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
pool, err := pb.Build(ctx, opts)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
_, st, err := pool.Connection()
|
_, st, err := pool.Connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -203,11 +202,9 @@ func TestTwoNodes(t *testing.T) {
|
||||||
clientBuilder: clientBuilder,
|
clientBuilder: clientBuilder,
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
pool, err := pb.Build(context.Background(), opts)
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
pool, err := pb.Build(ctx, opts)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
_, st, err := pool.Connection()
|
_, st, err := pool.Connection()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -259,11 +256,9 @@ func TestOneOfTwoFailed(t *testing.T) {
|
||||||
ClientRebalanceInterval: 200 * time.Millisecond,
|
ClientRebalanceInterval: 200 * time.Millisecond,
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
pool, err := pb.Build(context.Background(), opts)
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
pool, err := pb.Build(ctx, opts)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
@ -297,11 +292,9 @@ func TestTwoFailed(t *testing.T) {
|
||||||
ClientRebalanceInterval: 200 * time.Millisecond,
|
ClientRebalanceInterval: 200 * time.Millisecond,
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
pool, err := pb.Build(context.Background(), opts)
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
pool, err := pb.Build(ctx, opts)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(pool.Close)
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue