forked from TrueCloudLab/frostfs-sdk-go
[#111] pool: Adopt new client.New
signature
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
543291f8c5
commit
b50bff53e7
2 changed files with 44 additions and 17 deletions
41
pool/pool.go
41
pool/pool.go
|
@ -6,6 +6,7 @@ import (
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -14,6 +15,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
apiclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/accounting"
|
"github.com/nspcc-dev/neofs-sdk-go/accounting"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/client"
|
"github.com/nspcc-dev/neofs-sdk-go/client"
|
||||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
|
@ -29,7 +31,30 @@ import (
|
||||||
|
|
||||||
// Client is a wrapper for client.Client to generate mock.
|
// Client is a wrapper for client.Client to generate mock.
|
||||||
type Client interface {
|
type Client interface {
|
||||||
client.Client
|
GetBalance(context.Context, *owner.ID, ...client.CallOption) (*client.BalanceOfRes, error)
|
||||||
|
PutContainer(context.Context, *container.Container, ...client.CallOption) (*client.ContainerPutRes, error)
|
||||||
|
GetContainer(context.Context, *cid.ID, ...client.CallOption) (*client.ContainerGetRes, error)
|
||||||
|
ListContainers(context.Context, *owner.ID, ...client.CallOption) (*client.ContainerListRes, error)
|
||||||
|
DeleteContainer(context.Context, *cid.ID, ...client.CallOption) (*client.ContainerDeleteRes, error)
|
||||||
|
EACL(context.Context, *cid.ID, ...client.CallOption) (*client.EACLRes, error)
|
||||||
|
SetEACL(context.Context, *eacl.Table, ...client.CallOption) (*client.SetEACLRes, error)
|
||||||
|
AnnounceContainerUsedSpace(context.Context, []container.UsedSpaceAnnouncement, ...client.CallOption) (*client.AnnounceSpaceRes, error)
|
||||||
|
EndpointInfo(context.Context, ...client.CallOption) (*client.EndpointInfoRes, error)
|
||||||
|
NetworkInfo(context.Context, ...client.CallOption) (*client.NetworkInfoRes, error)
|
||||||
|
PutObject(context.Context, *client.PutObjectParams, ...client.CallOption) (*client.ObjectPutRes, error)
|
||||||
|
DeleteObject(context.Context, *client.DeleteObjectParams, ...client.CallOption) (*client.ObjectDeleteRes, error)
|
||||||
|
GetObject(context.Context, *client.GetObjectParams, ...client.CallOption) (*client.ObjectGetRes, error)
|
||||||
|
HeadObject(context.Context, *client.ObjectHeaderParams, ...client.CallOption) (*client.ObjectHeadRes, error)
|
||||||
|
ObjectPayloadRangeData(context.Context, *client.RangeDataParams, ...client.CallOption) (*client.ObjectRangeRes, error)
|
||||||
|
HashObjectPayloadRanges(context.Context, *client.RangeChecksumParams, ...client.CallOption) (*client.ObjectRangeHashRes, error)
|
||||||
|
SearchObjects(context.Context, *client.SearchObjectParams, ...client.CallOption) (*client.ObjectSearchRes, error)
|
||||||
|
AnnounceLocalTrust(context.Context, client.AnnounceLocalTrustPrm, ...client.CallOption) (*client.AnnounceLocalTrustRes, error)
|
||||||
|
AnnounceIntermediateTrust(context.Context, client.AnnounceIntermediateTrustPrm, ...client.CallOption) (*client.AnnounceIntermediateTrustRes, error)
|
||||||
|
CreateSession(context.Context, uint64, ...client.CallOption) (*client.CreateSessionRes, error)
|
||||||
|
|
||||||
|
Raw() *apiclient.Client
|
||||||
|
|
||||||
|
Conn() io.Closer
|
||||||
}
|
}
|
||||||
|
|
||||||
// BuilderOptions contains options used to build connection pool.
|
// BuilderOptions contains options used to build connection pool.
|
||||||
|
@ -41,7 +66,7 @@ type BuilderOptions struct {
|
||||||
ClientRebalanceInterval time.Duration
|
ClientRebalanceInterval time.Duration
|
||||||
SessionExpirationEpoch uint64
|
SessionExpirationEpoch uint64
|
||||||
nodesParams []*NodesParam
|
nodesParams []*NodesParam
|
||||||
clientBuilder func(opts ...client.Option) (client.Client, error)
|
clientBuilder func(opts ...client.Option) (Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type NodesParam struct {
|
type NodesParam struct {
|
||||||
|
@ -113,7 +138,9 @@ func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (Pool, er
|
||||||
})
|
})
|
||||||
|
|
||||||
if options.clientBuilder == nil {
|
if options.clientBuilder == nil {
|
||||||
options.clientBuilder = client.New
|
options.clientBuilder = func(opts ...client.Option) (Client, error) {
|
||||||
|
return client.New(opts...)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return newPool(ctx, options)
|
return newPool(ctx, options)
|
||||||
|
@ -124,7 +151,7 @@ type Pool interface {
|
||||||
Object
|
Object
|
||||||
Container
|
Container
|
||||||
Accounting
|
Accounting
|
||||||
Connection() (client.Client, *session.Token, error)
|
Connection() (Client, *session.Token, error)
|
||||||
OwnerID() *owner.ID
|
OwnerID() *owner.ID
|
||||||
WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error
|
WaitForContainerPresence(context.Context, *cid.ID, *ContainerPollingParams) error
|
||||||
Close()
|
Close()
|
||||||
|
@ -156,7 +183,7 @@ type Accounting interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientPack struct {
|
type clientPack struct {
|
||||||
client client.Client
|
client Client
|
||||||
healthy bool
|
healthy bool
|
||||||
address string
|
address string
|
||||||
}
|
}
|
||||||
|
@ -333,7 +360,7 @@ func updateInnerNodesHealth(ctx context.Context, pool *pool, i int, options *Bui
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for j, cPack := range p.clientPacks {
|
for j, cPack := range p.clientPacks {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(j int, client client.Client) {
|
go func(j int, client Client) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
ok := true
|
ok := true
|
||||||
tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout)
|
tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout)
|
||||||
|
@ -396,7 +423,7 @@ func adjustWeights(weights []float64) []float64 {
|
||||||
return adjusted
|
return adjusted
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) Connection() (client.Client, *session.Token, error) {
|
func (p *pool) Connection() (Client, *session.Token, error) {
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
|
|
@ -21,7 +21,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestBuildPoolClientFailed(t *testing.T) {
|
func TestBuildPoolClientFailed(t *testing.T) {
|
||||||
clientBuilder := func(opts ...client.Option) (client.Client, error) {
|
clientBuilder := func(opts ...client.Option) (Client, error) {
|
||||||
return nil, fmt.Errorf("error")
|
return nil, fmt.Errorf("error")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) {
|
||||||
ni := &netmap.NodeInfo{}
|
ni := &netmap.NodeInfo{}
|
||||||
ni.SetAddresses("addr1", "addr2")
|
ni.SetAddresses("addr1", "addr2")
|
||||||
|
|
||||||
clientBuilder := func(opts ...client.Option) (client.Client, error) {
|
clientBuilder := func(opts ...client.Option) (Client, error) {
|
||||||
mockClient := NewMockClient(ctrl)
|
mockClient := NewMockClient(ctrl)
|
||||||
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error session")).AnyTimes()
|
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error session")).AnyTimes()
|
||||||
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfoRes{}, nil).AnyTimes()
|
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfoRes{}, nil).AnyTimes()
|
||||||
|
@ -79,7 +79,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
|
||||||
|
|
||||||
var expectedToken *session.Token
|
var expectedToken *session.Token
|
||||||
clientCount := -1
|
clientCount := -1
|
||||||
clientBuilder := func(opts ...client.Option) (client.Client, error) {
|
clientBuilder := func(opts ...client.Option) (Client, error) {
|
||||||
clientCount++
|
clientCount++
|
||||||
mockClient := NewMockClient(ctrl)
|
mockClient := NewMockClient(ctrl)
|
||||||
mockInvokes := 0
|
mockInvokes := 0
|
||||||
|
@ -148,7 +148,7 @@ func TestOneNode(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
tok.SetID(uid)
|
tok.SetID(uid)
|
||||||
|
|
||||||
clientBuilder := func(opts ...client.Option) (client.Client, error) {
|
clientBuilder := func(opts ...client.Option) (Client, error) {
|
||||||
mockClient := NewMockClient(ctrl)
|
mockClient := NewMockClient(ctrl)
|
||||||
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(tok, nil)
|
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(tok, nil)
|
||||||
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfo{}, nil).AnyTimes()
|
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(&client.EndpointInfo{}, nil).AnyTimes()
|
||||||
|
@ -178,7 +178,7 @@ func TestTwoNodes(t *testing.T) {
|
||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
|
|
||||||
var tokens []*session.Token
|
var tokens []*session.Token
|
||||||
clientBuilder := func(opts ...client.Option) (client.Client, error) {
|
clientBuilder := func(opts ...client.Option) (Client, error) {
|
||||||
mockClient := NewMockClient(ctrl)
|
mockClient := NewMockClient(ctrl)
|
||||||
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
|
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
|
||||||
tok := session.NewToken()
|
tok := session.NewToken()
|
||||||
|
@ -218,7 +218,7 @@ func TestOneOfTwoFailed(t *testing.T) {
|
||||||
|
|
||||||
var tokens []*session.Token
|
var tokens []*session.Token
|
||||||
clientCount := -1
|
clientCount := -1
|
||||||
clientBuilder := func(opts ...client.Option) (client.Client, error) {
|
clientBuilder := func(opts ...client.Option) (Client, error) {
|
||||||
clientCount++
|
clientCount++
|
||||||
mockClient := NewMockClient(ctrl)
|
mockClient := NewMockClient(ctrl)
|
||||||
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
|
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
|
||||||
|
@ -272,7 +272,7 @@ func TestTwoFailed(t *testing.T) {
|
||||||
|
|
||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
|
|
||||||
clientBuilder := func(opts ...client.Option) (client.Client, error) {
|
clientBuilder := func(opts ...client.Option) (Client, error) {
|
||||||
mockClient := NewMockClient(ctrl)
|
mockClient := NewMockClient(ctrl)
|
||||||
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
|
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
|
||||||
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error")).AnyTimes()
|
mockClient.EXPECT().EndpointInfo(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("error")).AnyTimes()
|
||||||
|
@ -306,7 +306,7 @@ func TestSessionCache(t *testing.T) {
|
||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
|
|
||||||
var tokens []*session.Token
|
var tokens []*session.Token
|
||||||
clientBuilder := func(opts ...client.Option) (client.Client, error) {
|
clientBuilder := func(opts ...client.Option) (Client, error) {
|
||||||
mockClient := NewMockClient(ctrl)
|
mockClient := NewMockClient(ctrl)
|
||||||
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
|
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
|
||||||
tok := session.NewToken()
|
tok := session.NewToken()
|
||||||
|
@ -369,7 +369,7 @@ func TestPriority(t *testing.T) {
|
||||||
|
|
||||||
var tokens []*session.Token
|
var tokens []*session.Token
|
||||||
clientCount := -1
|
clientCount := -1
|
||||||
clientBuilder := func(opts ...client.Option) (client.Client, error) {
|
clientBuilder := func(opts ...client.Option) (Client, error) {
|
||||||
clientCount++
|
clientCount++
|
||||||
mockClient := NewMockClient(ctrl)
|
mockClient := NewMockClient(ctrl)
|
||||||
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
|
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
|
||||||
|
@ -432,7 +432,7 @@ func TestSessionCacheWithKey(t *testing.T) {
|
||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
|
|
||||||
var tokens []*session.Token
|
var tokens []*session.Token
|
||||||
clientBuilder := func(opts ...client.Option) (client.Client, error) {
|
clientBuilder := func(opts ...client.Option) (Client, error) {
|
||||||
mockClient := NewMockClient(ctrl)
|
mockClient := NewMockClient(ctrl)
|
||||||
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
|
mockClient.EXPECT().CreateSession(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(_, _ interface{}, _ ...interface{}) (*session.Token, error) {
|
||||||
tok := session.NewToken()
|
tok := session.NewToken()
|
||||||
|
|
Loading…
Reference in a new issue