From c82615667d069e8e3400de5104ec10ea66b6f48d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 21 Jun 2021 17:56:19 +0300 Subject: [PATCH] [#607] network: Return group-address client from ClientCache Add group-address `Client` implementation. Return instances of this implementation from `ClientCache.Get` method. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/object.go | 21 +-- pkg/network/cache/client.go | 12 +- pkg/network/cache/multi.go | 332 ++++++++++++++++++++++++++++++++++++ 3 files changed, 338 insertions(+), 27 deletions(-) create mode 100644 pkg/network/cache/multi.go diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index b97e3b15..b7afcfb8 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -11,14 +11,13 @@ import ( cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" - client3 "github.com/nspcc-dev/neofs-api-go/rpc/client" "github.com/nspcc-dev/neofs-api-go/util/signature" "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" apiclientconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/apiclient" policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer" replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator" - client2 "github.com/nspcc-dev/neofs-node/pkg/core/client" + coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" @@ -150,23 +149,13 @@ func (n *innerRingFetcher) InnerRingKeys() ([][]byte, error) { type coreClientConstructor reputationClientConstructor -func (x *coreClientConstructor) Get(addr network.Address) (client2.Client, error) { +func (x *coreClientConstructor) Get(addr network.Address) (coreclient.Client, error) { c, err := (*reputationClientConstructor)(x).Get(addr) if err != nil { return nil, err } - return apiclient{ - Client: c, - }, nil -} - -type apiclient struct { - client.Client -} - -func (x apiclient) RawForAddress(network.Address) *client3.Client { - return x.Client.Raw() + return c.(coreclient.Client), nil } func initObjectService(c *cfg) { @@ -438,7 +427,7 @@ type reputationClientConstructor struct { } type reputationClient struct { - client.Client + coreclient.Client prm truststorage.UpdatePrm @@ -535,7 +524,7 @@ func (c *reputationClientConstructor) Get(addr network.Address) (client.Client, prm.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey())) return &reputationClient{ - Client: cl, + Client: cl.(coreclient.Client), prm: prm, cons: c, }, nil diff --git a/pkg/network/cache/client.go b/pkg/network/cache/client.go index 225f83bb..020dfb98 100644 --- a/pkg/network/cache/client.go +++ b/pkg/network/cache/client.go @@ -1,7 +1,6 @@ package cache import ( - "crypto/tls" "sync" "github.com/nspcc-dev/neofs-api-go/pkg/client" @@ -54,16 +53,7 @@ func (c *ClientCache) Get(netAddr network.Address) (client.Client, error) { return cli, nil } - opts := append(c.opts, client.WithAddress(netAddr.HostAddr())) - - if netAddr.TLSEnabled() { - opts = append(opts, client.WithTLSConfig(&tls.Config{})) - } - - cli, err := client.New(opts...) - if err != nil { - return nil, err - } + cli := newMultiClient(network.GroupFromAddress(netAddr), c.opts) c.clients[mAddr] = cli diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go new file mode 100644 index 00000000..5d3fed46 --- /dev/null +++ b/pkg/network/cache/multi.go @@ -0,0 +1,332 @@ +package cache + +import ( + "context" + "crypto/sha256" + "crypto/tls" + "io" + "sync" + + "github.com/nspcc-dev/neofs-api-go/pkg/accounting" + "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl" + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-api-go/pkg/session" + rawclient "github.com/nspcc-dev/neofs-api-go/rpc/client" + "github.com/nspcc-dev/neofs-node/pkg/network" +) + +type multiClient struct { + mtx sync.RWMutex + + clients map[string]client.Client + + addr network.AddressGroup + + opts []client.Option +} + +func newMultiClient(addr network.AddressGroup, opts []client.Option) *multiClient { + return &multiClient{ + clients: make(map[string]client.Client), + addr: addr, + opts: opts, + } +} + +// note: must be wrapped into mutex lock. +func (x *multiClient) createForAddress(addr network.Address) client.Client { + opts := append(x.opts, client.WithAddress(addr.HostAddr())) + + if addr.TLSEnabled() { + opts = append(opts, client.WithTLSConfig(&tls.Config{})) + } + + c, err := client.New(opts...) + if err != nil { + // client never returns an error + panic(err) + } + + x.clients[addr.String()] = c + + return c +} + +func (x *multiClient) iterateClients(f func(client.Client) error) error { + var firstErr error + + x.addr.IterateAddresses(func(addr network.Address) bool { + x.mtx.Lock() + + strAddr := addr.String() + + var err error + + c, cached := x.clients[strAddr] + if !cached { + c = x.createForAddress(addr) + } + + x.mtx.Unlock() + + err = f(c) + + success := err == nil + + if success || firstErr == nil { + firstErr = err + } + + return success + }) + + return firstErr +} + +func (x *multiClient) PutObject(ctx context.Context, p *client.PutObjectParams, opts ...client.CallOption) (*object.ID, error) { + var res *object.ID + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.PutObject(ctx, p, opts...) + return + }) + + return res, err +} + +func (x *multiClient) GetBalance(ctx context.Context, id *owner.ID, opts ...client.CallOption) (*accounting.Decimal, error) { + var res *accounting.Decimal + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.GetBalance(ctx, id, opts...) + return + }) + + return res, err +} + +func (x *multiClient) PutContainer(ctx context.Context, cnr *container.Container, opts ...client.CallOption) (*cid.ID, error) { + var res *cid.ID + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.PutContainer(ctx, cnr, opts...) + return + }) + + return res, err +} + +func (x *multiClient) GetContainer(ctx context.Context, id *cid.ID, opts ...client.CallOption) (*container.Container, error) { + var res *container.Container + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.GetContainer(ctx, id, opts...) + return + }) + + return res, err +} + +func (x *multiClient) ListContainers(ctx context.Context, id *owner.ID, opts ...client.CallOption) ([]*cid.ID, error) { + var res []*cid.ID + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.ListContainers(ctx, id, opts...) + return + }) + + return res, err +} + +func (x *multiClient) DeleteContainer(ctx context.Context, id *cid.ID, opts ...client.CallOption) error { + return x.iterateClients(func(c client.Client) error { + return c.DeleteContainer(ctx, id, opts...) + }) +} + +func (x *multiClient) GetEACL(ctx context.Context, id *cid.ID, opts ...client.CallOption) (*client.EACLWithSignature, error) { + var res *client.EACLWithSignature + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.GetEACL(ctx, id, opts...) + return + }) + + return res, err +} + +func (x *multiClient) SetEACL(ctx context.Context, t *eacl.Table, opts ...client.CallOption) error { + return x.iterateClients(func(c client.Client) error { + return c.SetEACL(ctx, t, opts...) + }) +} + +func (x *multiClient) AnnounceContainerUsedSpace(ctx context.Context, as []container.UsedSpaceAnnouncement, opts ...client.CallOption) error { + return x.iterateClients(func(c client.Client) error { + return c.AnnounceContainerUsedSpace(ctx, as, opts...) + }) +} + +func (x *multiClient) EndpointInfo(ctx context.Context, opts ...client.CallOption) (*client.EndpointInfo, error) { + var res *client.EndpointInfo + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.EndpointInfo(ctx, opts...) + return + }) + + return res, err +} + +func (x *multiClient) NetworkInfo(ctx context.Context, opts ...client.CallOption) (*netmap.NetworkInfo, error) { + var res *netmap.NetworkInfo + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.NetworkInfo(ctx, opts...) + return + }) + + return res, err +} + +func (x *multiClient) DeleteObject(ctx context.Context, p *client.DeleteObjectParams, opts ...client.CallOption) error { + return x.iterateClients(func(c client.Client) error { + return c.DeleteObject(ctx, p, opts...) + }) +} + +func (x *multiClient) GetObject(ctx context.Context, p *client.GetObjectParams, opts ...client.CallOption) (*object.Object, error) { + var res *object.Object + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.GetObject(ctx, p, opts...) + return + }) + + return res, err +} + +func (x *multiClient) GetObjectHeader(ctx context.Context, p *client.ObjectHeaderParams, opts ...client.CallOption) (*object.Object, error) { + var res *object.Object + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.GetObjectHeader(ctx, p, opts...) + return + }) + + return res, err +} + +func (x *multiClient) ObjectPayloadRangeData(ctx context.Context, p *client.RangeDataParams, opts ...client.CallOption) ([]byte, error) { + var res []byte + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.ObjectPayloadRangeData(ctx, p, opts...) + return + }) + + return res, err +} + +func (x *multiClient) ObjectPayloadRangeSHA256(ctx context.Context, p *client.RangeChecksumParams, opts ...client.CallOption) ([][sha256.Size]byte, error) { + var res [][sha256.Size]byte + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.ObjectPayloadRangeSHA256(ctx, p, opts...) + return + }) + + return res, err +} + +func (x *multiClient) ObjectPayloadRangeTZ(ctx context.Context, p *client.RangeChecksumParams, opts ...client.CallOption) ([][client.TZSize]byte, error) { + var res [][client.TZSize]byte + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.ObjectPayloadRangeTZ(ctx, p, opts...) + return + }) + + return res, err +} + +func (x *multiClient) SearchObject(ctx context.Context, p *client.SearchObjectParams, opts ...client.CallOption) ([]*object.ID, error) { + var res []*object.ID + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.SearchObject(ctx, p, opts...) + return + }) + + return res, err +} + +func (x *multiClient) CreateSession(ctx context.Context, exp uint64, opts ...client.CallOption) (*session.Token, error) { + var res *session.Token + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.CreateSession(ctx, exp, opts...) + return + }) + + return res, err +} + +func (x *multiClient) AnnounceLocalTrust(ctx context.Context, p client.AnnounceLocalTrustPrm, opts ...client.CallOption) (*client.AnnounceLocalTrustRes, error) { + var res *client.AnnounceLocalTrustRes + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.AnnounceLocalTrust(ctx, p, opts...) + return + }) + + return res, err +} + +func (x *multiClient) AnnounceIntermediateTrust(ctx context.Context, p client.AnnounceIntermediateTrustPrm, opts ...client.CallOption) (*client.AnnounceIntermediateTrustRes, error) { + var res *client.AnnounceIntermediateTrustRes + + err := x.iterateClients(func(c client.Client) (err error) { + res, err = c.AnnounceIntermediateTrust(ctx, p, opts...) + return + }) + + return res, err +} + +func (x *multiClient) Raw() *rawclient.Client { + panic("multiClient.Raw() must not be called") +} + +func (x *multiClient) Conn() io.Closer { + return x +} + +func (x *multiClient) Close() error { + x.mtx.RLock() + + { + for _, c := range x.clients { + _ = c.Conn().Close() + } + } + + x.mtx.RUnlock() + + return nil +} + +func (x *multiClient) RawForAddress(addr network.Address) *rawclient.Client { + x.mtx.Lock() + + c := x.createForAddress(addr).Raw() + + x.mtx.Unlock() + + return c +}