diff --git a/pool/pool.go b/pool/pool.go index ba71fa1..fc61c64 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -88,6 +88,7 @@ type Pool interface { Connection() (client.Client, *session.Token, error) OwnerID() *owner.ID WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error + Close() } type clientPack struct { @@ -103,6 +104,8 @@ type pool struct { sampler *Sampler owner *owner.ID clientPacks []*clientPack + cancel context.CancelFunc + closedCh chan struct{} } func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { @@ -139,7 +142,8 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { } ownerID := owner.NewIDFromNeo3Wallet(wallet) - pool := &pool{sampler: sampler, owner: ownerID, clientPacks: clientPacks} + ctx, cancel := context.WithCancel(ctx) + pool := &pool{sampler: sampler, owner: ownerID, clientPacks: clientPacks, cancel: cancel, closedCh: make(chan struct{})} go startRebalance(ctx, pool, options) return pool, nil } @@ -151,6 +155,7 @@ func startRebalance(ctx context.Context, p *pool, options *BuilderOptions) { for { select { case <-ctx.Done(): + close(p.closedCh) return case <-ticker.C: updateNodesHealth(ctx, p, options, buffer) @@ -406,3 +411,9 @@ func (p *pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollPa } } } + +// Cloce closes the pool and releases all the associated resources. +func (p *pool) Close() { + p.cancel() + <-p.closedCh +} diff --git a/pool/pool_test.go b/pool/pool_test.go index ceb8a6c..5b7644e 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -117,6 +117,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) { clientPool, err := pb.Build(context.TODO(), opts) require.NoError(t, err) + t.Cleanup(clientPool.Close) condition := func() bool { _, st, err := clientPool.Connection() @@ -163,11 +164,9 @@ func TestOneNode(t *testing.T) { clientBuilder: clientBuilder, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - pool, err := pb.Build(ctx, opts) + pool, err := pb.Build(context.Background(), opts) require.NoError(t, err) + t.Cleanup(pool.Close) _, st, err := pool.Connection() require.NoError(t, err) @@ -203,11 +202,9 @@ func TestTwoNodes(t *testing.T) { clientBuilder: clientBuilder, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - pool, err := pb.Build(ctx, opts) + pool, err := pb.Build(context.Background(), opts) require.NoError(t, err) + t.Cleanup(pool.Close) _, st, err := pool.Connection() require.NoError(t, err) @@ -259,11 +256,9 @@ func TestOneOfTwoFailed(t *testing.T) { ClientRebalanceInterval: 200 * time.Millisecond, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - pool, err := pb.Build(ctx, opts) + pool, err := pb.Build(context.Background(), opts) require.NoError(t, err) + t.Cleanup(pool.Close) time.Sleep(2 * time.Second) @@ -297,11 +292,9 @@ func TestTwoFailed(t *testing.T) { ClientRebalanceInterval: 200 * time.Millisecond, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - pool, err := pb.Build(ctx, opts) + pool, err := pb.Build(context.Background(), opts) require.NoError(t, err) + t.Cleanup(pool.Close) time.Sleep(2 * time.Second)