From c8199099060b7599fe79821aa9742b4c7d626e47 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 12 Mar 2021 16:07:52 +0300 Subject: [PATCH] [#263] pkg/client: Refactor the client to use raw protobuf client Make `Client` to be the wrapper over raw protobuf client. Provide public method to get the underlying raw client. Change implementations of all methods with the new approach of the RPC execution. Additional changes: * key replaced from `New` argument to `WithDefaultPrivateKey` option; * `GetSelfBalance` is removed as non-viable; * `GetEACLWithSignature` is removed, `GetEACL` returns `EACLWithSignature`; * `AttachSessionToken` / `AttachBearerToken` are removed as non-viable; * redundant options are removed. Signed-off-by: Leonard Lyubich --- pkg/client/accounting.go | 90 +------- pkg/client/client.go | 63 ++---- pkg/client/client_test.go | 67 ------ pkg/client/container.go | 446 ++++++++++---------------------------- pkg/client/netmap.go | 166 ++++---------- pkg/client/object.go | 345 ++++++++++------------------- pkg/client/opts.go | 150 ++++--------- pkg/client/raw.go | 14 ++ pkg/client/session.go | 107 ++------- 9 files changed, 380 insertions(+), 1068 deletions(-) delete mode 100644 pkg/client/client_test.go create mode 100644 pkg/client/raw.go diff --git a/pkg/client/accounting.go b/pkg/client/accounting.go index e9db7f1..a6028e4 100644 --- a/pkg/client/accounting.go +++ b/pkg/client/accounting.go @@ -5,54 +5,29 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/accounting" "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-api-go/rpc/client" v2accounting "github.com/nspcc-dev/neofs-api-go/v2/accounting" - "github.com/nspcc-dev/neofs-api-go/v2/client" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" v2signature "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/pkg/errors" ) // Accounting contains methods related to balance querying. type Accounting interface { - // GetSelfBalance returns balance of the account deduced from client's key. - GetSelfBalance(context.Context, ...CallOption) (*accounting.Decimal, error) // GetBalance returns balance of provided account. GetBalance(context.Context, *owner.ID, ...CallOption) (*accounting.Decimal, error) } -func (c clientImpl) GetSelfBalance(ctx context.Context, opts ...CallOption) (*accounting.Decimal, error) { - return c.GetBalance(ctx, nil, opts...) -} - -func (c clientImpl) GetBalance(ctx context.Context, owner *owner.ID, opts ...CallOption) (*accounting.Decimal, error) { - // check remote node version - switch c.remoteNode.Version.Major() { - case 2: - return c.getBalanceV2(ctx, owner, opts...) - default: - return nil, errUnsupportedProtocol - } -} - -func (c clientImpl) getBalanceV2(ctx context.Context, ownerID *owner.ID, opts ...CallOption) (*accounting.Decimal, error) { +func (c *clientImpl) GetBalance(ctx context.Context, owner *owner.ID, opts ...CallOption) (*accounting.Decimal, error) { // apply all available options callOptions := c.defaultCallOptions() for i := range opts { - opts[i].apply(&callOptions) - } - - if ownerID == nil { - w, err := owner.NEO3WalletFromPublicKey(&callOptions.key.PublicKey) - if err != nil { - return nil, err - } - - ownerID = new(owner.ID) - ownerID.SetNeo3Wallet(w) + opts[i](callOptions) } reqBody := new(v2accounting.BalanceRequestBody) - reqBody.SetOwnerID(ownerID.ToV2()) + reqBody.SetOwnerID(owner.ToV2()) req := new(v2accounting.BalanceRequest) req.SetBody(reqBody) @@ -63,56 +38,15 @@ func (c clientImpl) getBalanceV2(ctx context.Context, ownerID *owner.ID, opts .. return nil, err } - switch c.remoteNode.Protocol { - case GRPC: - cli, err := v2AccountingClientFromOptions(c.opts) - if err != nil { - return nil, errors.Wrap(err, "can't create grpc client") - } - - resp, err := cli.Balance(ctx, req) - if err != nil { - return nil, errors.Wrap(err, "transport error") - } - - err = v2signature.VerifyServiceMessage(resp) - if err != nil { - return nil, errors.Wrap(err, "can't verify response message") - } - - return accounting.NewDecimalFromV2(resp.GetBody().GetBalance()), nil - default: - return nil, errUnsupportedProtocol - } -} - -func v2AccountingClientFromOptions(opts *clientOptions) (cli *v2accounting.Client, err error) { - switch { - case opts.grpcOpts.v2AccountingClient != nil: - // return value from client cache - return opts.grpcOpts.v2AccountingClient, nil - - case opts.grpcOpts.conn != nil: - cli, err = v2accounting.NewClient(v2accounting.WithGlobalOpts( - client.WithGRPCConn(opts.grpcOpts.conn)), - ) - - case opts.addr != "": - cli, err = v2accounting.NewClient(v2accounting.WithGlobalOpts( - client.WithNetworkAddress(opts.addr), - client.WithDialTimeout(opts.dialTimeout), - )) - - default: - return nil, errOptionsLack("Accounting") - } - - // check if client correct and save in cache + resp, err := rpcapi.Balance(c.Raw(), req, client.WithContext(ctx)) if err != nil { - return nil, err + return nil, errors.Wrap(err, "transport error") } - opts.grpcOpts.v2AccountingClient = cli + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, errors.Wrap(err, "can't verify response message") + } - return cli, nil + return accounting.NewDecimalFromV2(resp.GetBody().GetBalance()), nil } diff --git a/pkg/client/client.go b/pkg/client/client.go index 42bd454..99d1e59 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -1,64 +1,37 @@ package client import ( - "crypto/ecdsa" - "errors" + "sync" - "github.com/nspcc-dev/neofs-api-go/pkg" - "github.com/nspcc-dev/neofs-api-go/pkg/token" + "github.com/nspcc-dev/neofs-api-go/rpc/client" ) -type ( - // Client represents NeoFS client. - Client interface { - Accounting - Container - Netmap - Object - Session - } +// Client represents NeoFS client. +type Client interface { + Accounting + Container + Netmap + Object + Session +} - clientImpl struct { - key *ecdsa.PrivateKey - remoteNode TransportInfo +type clientImpl struct { + onceInit sync.Once - opts *clientOptions + raw *client.Client - sessionToken *token.SessionToken + opts *clientOptions +} - bearerToken *token.BearerToken - } - - TransportProtocol uint32 - - TransportInfo struct { - Version *pkg.Version - Protocol TransportProtocol - } -) - -const ( - Unknown TransportProtocol = iota - GRPC -) - -var errUnsupportedProtocol = errors.New("unsupported transport protocol") - -// New returns new client which uses key as default signing key. -func New(key *ecdsa.PrivateKey, opts ...Option) (Client, error) { +func New(opts ...Option) (Client, error) { clientOptions := defaultClientOptions() for i := range opts { - opts[i].apply(clientOptions) + opts[i](clientOptions) } - // todo: make handshake to check latest version return &clientImpl{ - key: key, - remoteNode: TransportInfo{ - Version: pkg.SDKVersion(), - Protocol: GRPC, - }, + raw: client.New(), opts: clientOptions, }, nil } diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go deleted file mode 100644 index cb36606..0000000 --- a/pkg/client/client_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package client_test - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/nspcc-dev/neofs-api-go/pkg/client" - "github.com/nspcc-dev/neofs-api-go/pkg/container" - "github.com/nspcc-dev/neofs-api-go/pkg/netmap" - "github.com/nspcc-dev/neofs-crypto/test" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" -) - -func TestExample(t *testing.T) { - t.Skip() - target := "s01.localtest.nspcc.ru:50501" - key := test.DecodeKey(-1) - - // create client from address - cli, err := client.New(key, client.WithAddress(target)) - require.NoError(t, err) - - // ask for balance - resp, err := cli.GetSelfBalance(context.Background()) - require.NoError(t, err) - - fmt.Println(resp.Value(), resp.Precision()) - - // create client from grpc connection - conn, err := grpc.DialContext(context.Background(), target, grpc.WithBlock(), grpc.WithInsecure()) - require.NoError(t, err) - - cli, err = client.New(key, client.WithGRPCConnection(conn)) - require.NoError(t, err) - - replica := new(netmap.Replica) - replica.SetCount(2) - replica.SetSelector("*") - - policy := new(netmap.PlacementPolicy) - policy.SetContainerBackupFactor(2) - policy.SetReplicas(replica) - - // this container has random nonce and it does not set owner id - cnr := container.New( - container.WithAttribute("CreatedAt", time.Now().String()), - container.WithPolicy(policy), - container.WithReadOnlyBasicACL(), - ) - require.NoError(t, err) - - // here container will have owner id from client key, and it will be signed - containerID, err := cli.PutContainer(context.Background(), cnr, client.WithTTL(10)) - require.NoError(t, err) - - fmt.Println(containerID) - - list, err := cli.ListSelfContainers(context.Background()) - require.NoError(t, err) - - for i := range list { - fmt.Println("found container:", list[i]) - } -} diff --git a/pkg/client/container.go b/pkg/client/container.go index 1328028..757fb46 100644 --- a/pkg/client/container.go +++ b/pkg/client/container.go @@ -7,10 +7,11 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-api-go/rpc/client" "github.com/nspcc-dev/neofs-api-go/util/signature" - "github.com/nspcc-dev/neofs-api-go/v2/client" v2container "github.com/nspcc-dev/neofs-api-go/v2/container" "github.com/nspcc-dev/neofs-api-go/v2/refs" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" v2signature "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/pkg/errors" ) @@ -23,14 +24,10 @@ type Container interface { GetContainer(context.Context, *container.ID, ...CallOption) (*container.Container, error) // ListContainers return container list with the provided owner. ListContainers(context.Context, *owner.ID, ...CallOption) ([]*container.ID, error) - // ListSelfContainers is similar to ListContainers but uses client's key to deduce owner ID. - ListSelfContainers(context.Context, ...CallOption) ([]*container.ID, error) // DeleteContainer removes container from NeoFS network. DeleteContainer(context.Context, *container.ID, ...CallOption) error // GetEACL returns extended ACL for a given container. - GetEACL(context.Context, *container.ID, ...CallOption) (*eacl.Table, error) - // GetEACLWithSignature is similar to GetEACL but returns signed ACL. - GetEACLWithSignature(context.Context, *container.ID, ...CallOption) (*EACLWithSignature, error) + GetEACL(context.Context, *container.ID, ...CallOption) (*EACLWithSignature, error) // SetEACL sets extended ACL. SetEACL(context.Context, *eacl.Table, ...CallOption) error // AnnounceContainerUsedSpace announces amount of space which is taken by stored objects. @@ -48,8 +45,6 @@ type EACLWithSignature struct { sig *pkg.Signature } -var errNilReponseBody = errors.New("response body is nil") - func (c delContainerSignWrapper) ReadSignedData(bytes []byte) ([]byte, error) { return c.body.GetContainerID().GetValue(), nil } @@ -68,129 +63,16 @@ func (e EACLWithSignature) Signature() *pkg.Signature { return e.sig } -func (c clientImpl) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*container.ID, error) { - switch c.remoteNode.Version.Major() { - case 2: - return c.putContainerV2(ctx, cnr, opts...) - default: - return nil, errUnsupportedProtocol - } -} - -// GetContainer receives container structure through NeoFS API call. -// -// Returns error if container structure is received but does not meet NeoFS API specification. -func (c clientImpl) GetContainer(ctx context.Context, id *container.ID, opts ...CallOption) (*container.Container, error) { - switch c.remoteNode.Version.Major() { - case 2: - return c.getContainerV2(ctx, id, opts...) - default: - return nil, errUnsupportedProtocol - } -} - -// GetVerifiedContainerStructure is a wrapper over Client.GetContainer method -// which checks if the structure of the resulting container matches its identifier. -// -// Returns container.ErrIDMismatch if container does not match the identifier. -func GetVerifiedContainerStructure(ctx context.Context, c Client, id *container.ID, opts ...CallOption) (*container.Container, error) { - cnr, err := c.GetContainer(ctx, id, opts...) - if err != nil { - return nil, err - } - - if !container.CalculateID(cnr).Equal(id) { - return nil, container.ErrIDMismatch - } - - return cnr, nil -} - -func (c clientImpl) ListContainers(ctx context.Context, owner *owner.ID, opts ...CallOption) ([]*container.ID, error) { - switch c.remoteNode.Version.Major() { - case 2: - return c.listContainerV2(ctx, owner, opts...) - default: - return nil, errUnsupportedProtocol - } -} - -func (c clientImpl) ListSelfContainers(ctx context.Context, opts ...CallOption) ([]*container.ID, error) { - return c.ListContainers(ctx, nil, opts...) -} - -func (c clientImpl) DeleteContainer(ctx context.Context, id *container.ID, opts ...CallOption) error { - switch c.remoteNode.Version.Major() { - case 2: - return c.delContainerV2(ctx, id, opts...) - default: - return errUnsupportedProtocol - } -} - -func (c clientImpl) GetEACL(ctx context.Context, id *container.ID, opts ...CallOption) (*eacl.Table, error) { - v, err := c.getEACL(ctx, id, true, opts...) - if err != nil { - return nil, err - } - - return v.table, nil -} - -func (c clientImpl) GetEACLWithSignature(ctx context.Context, id *container.ID, opts ...CallOption) (*EACLWithSignature, error) { - return c.getEACL(ctx, id, false, opts...) -} - -func (c clientImpl) getEACL(ctx context.Context, id *container.ID, verify bool, opts ...CallOption) (*EACLWithSignature, error) { - switch c.remoteNode.Version.Major() { - case 2: - resp, err := c.getEACLV2(ctx, id, verify, opts...) - if err != nil { - return nil, err - } - - return &EACLWithSignature{ - table: eacl.NewTableFromV2(resp.GetEACL()), - sig: pkg.NewSignatureFromV2(resp.GetSignature()), - }, nil - default: - return nil, errUnsupportedProtocol - } -} - -func (c clientImpl) SetEACL(ctx context.Context, eacl *eacl.Table, opts ...CallOption) error { - switch c.remoteNode.Version.Major() { - case 2: - return c.setEACLV2(ctx, eacl, opts...) - default: - return errUnsupportedProtocol - } -} - -// AnnounceContainerUsedSpace used by storage nodes to estimate their container -// sizes during lifetime. Use it only in storage node applications. -func (c clientImpl) AnnounceContainerUsedSpace( - ctx context.Context, - announce []container.UsedSpaceAnnouncement, - opts ...CallOption) error { - switch c.remoteNode.Version.Major() { - case 2: - return c.announceContainerUsedSpaceV2(ctx, announce, opts...) - default: - return errUnsupportedProtocol - } -} - -func (c clientImpl) putContainerV2(ctx context.Context, cnr *container.Container, opts ...CallOption) (*container.ID, error) { +func (c *clientImpl) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*container.ID, error) { // apply all available options callOptions := c.defaultCallOptions() for i := range opts { - opts[i].apply(&callOptions) + opts[i](callOptions) } // set transport version - cnr.SetVersion(c.remoteNode.Version) + cnr.SetVersion(pkg.SDKVersion()) // if container owner is not set, then use client key as owner if cnr.OwnerID() == nil { @@ -230,35 +112,28 @@ func (c clientImpl) putContainerV2(ctx context.Context, cnr *container.Container return nil, err } - switch c.remoteNode.Protocol { - case GRPC: - cli, err := v2ContainerClientFromOptions(c.opts) - if err != nil { - return nil, errors.Wrap(err, "can't create grpc client") - } - - resp, err := cli.Put(ctx, req) - if err != nil { - return nil, errors.Wrap(err, "transport error") - } - - err = v2signature.VerifyServiceMessage(resp) - if err != nil { - return nil, errors.Wrap(err, "can't verify response message") - } - - return container.NewIDFromV2(resp.GetBody().GetContainerID()), nil - default: - return nil, errUnsupportedProtocol + resp, err := rpcapi.PutContainer(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, err } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, errors.Wrap(err, "can't verify response message") + } + + return container.NewIDFromV2(resp.GetBody().GetContainerID()), nil } -func (c clientImpl) getContainerV2(ctx context.Context, id *container.ID, opts ...CallOption) (*container.Container, error) { +// GetContainer receives container structure through NeoFS API call. +// +// Returns error if container structure is received but does not meet NeoFS API specification. +func (c *clientImpl) GetContainer(ctx context.Context, id *container.ID, opts ...CallOption) (*container.Container, error) { // apply all available options callOptions := c.defaultCallOptions() for i := range opts { - opts[i].apply(&callOptions) + opts[i](callOptions) } reqBody := new(v2container.GetRequestBody) @@ -273,35 +148,42 @@ func (c clientImpl) getContainerV2(ctx context.Context, id *container.ID, opts . return nil, err } - switch c.remoteNode.Protocol { - case GRPC: - cli, err := v2ContainerClientFromOptions(c.opts) - if err != nil { - return nil, errors.Wrap(err, "can't create grpc client") - } - - resp, err := cli.Get(ctx, req) - if err != nil { - return nil, errors.Wrap(err, "transport error") - } - - err = v2signature.VerifyServiceMessage(resp) - if err != nil { - return nil, errors.Wrap(err, "can't verify response message") - } - - return container.NewVerifiedFromV2(resp.GetBody().GetContainer()) - default: - return nil, errUnsupportedProtocol + resp, err := rpcapi.GetContainer(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, errors.Wrap(err, "transport error") } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, errors.Wrap(err, "can't verify response message") + } + + return container.NewVerifiedFromV2(resp.GetBody().GetContainer()) } -func (c clientImpl) listContainerV2(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*container.ID, error) { +// GetVerifiedContainerStructure is a wrapper over Client.GetContainer method +// which checks if the structure of the resulting container matches its identifier. +// +// Returns container.ErrIDMismatch if container does not match the identifier. +func GetVerifiedContainerStructure(ctx context.Context, c Client, id *container.ID, opts ...CallOption) (*container.Container, error) { + cnr, err := c.GetContainer(ctx, id, opts...) + if err != nil { + return nil, err + } + + if !container.CalculateID(cnr).Equal(id) { + return nil, container.ErrIDMismatch + } + + return cnr, nil +} + +func (c *clientImpl) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*container.ID, error) { // apply all available options callOptions := c.defaultCallOptions() for i := range opts { - opts[i].apply(&callOptions) + opts[i](callOptions) } if ownerID == nil { @@ -326,40 +208,31 @@ func (c clientImpl) listContainerV2(ctx context.Context, ownerID *owner.ID, opts return nil, err } - switch c.remoteNode.Protocol { - case GRPC: - cli, err := v2ContainerClientFromOptions(c.opts) - if err != nil { - return nil, errors.Wrap(err, "can't create grpc client") - } - - resp, err := cli.List(ctx, req) - if err != nil { - return nil, errors.Wrap(err, "transport error") - } - - err = v2signature.VerifyServiceMessage(resp) - if err != nil { - return nil, errors.Wrap(err, "can't verify response message") - } - - result := make([]*container.ID, 0, len(resp.GetBody().GetContainerIDs())) - for _, cidV2 := range resp.GetBody().GetContainerIDs() { - result = append(result, container.NewIDFromV2(cidV2)) - } - - return result, nil - default: - return nil, errUnsupportedProtocol + resp, err := rpcapi.ListContainers(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, errors.Wrap(err, "transport error") } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, errors.Wrap(err, "can't verify response message") + } + + result := make([]*container.ID, 0, len(resp.GetBody().GetContainerIDs())) + for _, cidV2 := range resp.GetBody().GetContainerIDs() { + result = append(result, container.NewIDFromV2(cidV2)) + } + + return result, nil + } -func (c clientImpl) delContainerV2(ctx context.Context, id *container.ID, opts ...CallOption) error { +func (c *clientImpl) DeleteContainer(ctx context.Context, id *container.ID, opts ...CallOption) error { // apply all available options callOptions := c.defaultCallOptions() for i := range opts { - opts[i].apply(&callOptions) + opts[i](callOptions) } reqBody := new(v2container.DeleteRequestBody) @@ -390,35 +263,20 @@ func (c clientImpl) delContainerV2(ctx context.Context, id *container.ID, opts . return err } - switch c.remoteNode.Protocol { - case GRPC: - cli, err := v2ContainerClientFromOptions(c.opts) - if err != nil { - return errors.Wrap(err, "can't create grpc client") - } - - resp, err := cli.Delete(ctx, req) - if err != nil { - return errors.Wrap(err, "transport error") - } - - err = v2signature.VerifyServiceMessage(resp) - if err != nil { - return errors.Wrap(err, "can't verify response message") - } - - return nil - default: - return errUnsupportedProtocol + resp, err := rpcapi.DeleteContainer(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return errors.Wrap(err, "transport error") } + + return errors.Wrap(v2signature.VerifyServiceMessage(resp), "can't verify response message") } -func (c clientImpl) getEACLV2(ctx context.Context, id *container.ID, verify bool, opts ...CallOption) (*v2container.GetExtendedACLResponseBody, error) { +func (c *clientImpl) GetEACL(ctx context.Context, id *container.ID, opts ...CallOption) (*EACLWithSignature, error) { // apply all available options callOptions := c.defaultCallOptions() for i := range opts { - opts[i].apply(&callOptions) + opts[i](callOptions) } reqBody := new(v2container.GetExtendedACLRequestBody) @@ -433,61 +291,35 @@ func (c clientImpl) getEACLV2(ctx context.Context, id *container.ID, verify bool return nil, err } - switch c.remoteNode.Protocol { - case GRPC: - cli, err := v2ContainerClientFromOptions(c.opts) - if err != nil { - return nil, errors.Wrap(err, "can't create grpc client") - } - - resp, err := cli.GetExtendedACL(ctx, req) - if err != nil { - return nil, errors.Wrap(err, "transport error") - } - - err = v2signature.VerifyServiceMessage(resp) - if err != nil { - return nil, errors.Wrap(err, "can't verify response message") - } - - body := resp.GetBody() - if body == nil { - return nil, errNilReponseBody - } - - if verify { - if err := signature.VerifyDataWithSource( - v2signature.StableMarshalerWrapper{ - SM: body.GetEACL(), - }, - func() (key, sig []byte) { - s := body.GetSignature() - - return s.GetKey(), s.GetSign() - }, - signature.SignWithRFC6979(), - ); err != nil { - return nil, errors.Wrap(err, "incorrect signature") - } - } - - return body, nil - default: - return nil, errUnsupportedProtocol + resp, err := rpcapi.GetEACL(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, errors.Wrap(err, "transport error") } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, errors.Wrap(err, "can't verify response message") + } + + body := resp.GetBody() + + return &EACLWithSignature{ + table: eacl.NewTableFromV2(body.GetEACL()), + sig: pkg.NewSignatureFromV2(body.GetSignature()), + }, nil } -func (c clientImpl) setEACLV2(ctx context.Context, eacl *eacl.Table, opts ...CallOption) error { +func (c *clientImpl) SetEACL(ctx context.Context, eacl *eacl.Table, opts ...CallOption) error { // apply all available options callOptions := c.defaultCallOptions() for i := range opts { - opts[i].apply(&callOptions) + opts[i](callOptions) } reqBody := new(v2container.SetExtendedACLRequestBody) reqBody.SetEACL(eacl.ToV2()) - reqBody.GetEACL().SetVersion(c.remoteNode.Version.ToV2()) + reqBody.GetEACL().SetVersion(pkg.SDKVersion().ToV2()) signWrapper := v2signature.StableMarshalerWrapper{SM: reqBody.GetEACL()} @@ -510,37 +342,29 @@ func (c clientImpl) setEACLV2(ctx context.Context, eacl *eacl.Table, opts ...Cal return err } - switch c.remoteNode.Protocol { - case GRPC: - cli, err := v2ContainerClientFromOptions(c.opts) - if err != nil { - return errors.Wrap(err, "can't create grpc client") - } - - resp, err := cli.SetExtendedACL(ctx, req) - if err != nil { - return errors.Wrap(err, "transport error") - } - - err = v2signature.VerifyServiceMessage(resp) - if err != nil { - return errors.Wrap(err, "can't verify response message") - } - - return nil - default: - return errUnsupportedProtocol + resp, err := rpcapi.SetEACL(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return errors.Wrap(err, "transport error") } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return errors.Wrap(err, "can't verify response message") + } + + return nil } -func (c clientImpl) announceContainerUsedSpaceV2( +// AnnounceContainerUsedSpace used by storage nodes to estimate their container +// sizes during lifetime. Use it only in storage node applications. +func (c *clientImpl) AnnounceContainerUsedSpace( ctx context.Context, announce []container.UsedSpaceAnnouncement, opts ...CallOption) error { callOptions := c.defaultCallOptions() // apply all available options for i := range opts { - opts[i].apply(&callOptions) + opts[i](callOptions) } // convert list of SDK announcement structures into NeoFS-API v2 list @@ -563,57 +387,15 @@ func (c clientImpl) announceContainerUsedSpaceV2( return err } - // choose underline transport protocol and send message over it - switch c.remoteNode.Protocol { - case GRPC: - cli, err := v2ContainerClientFromOptions(c.opts) - if err != nil { - return errors.Wrap(err, "can't create grpc client") - } - - resp, err := cli.AnnounceUsedSpace(ctx, req) - if err != nil { - return errors.Wrap(err, "transport error") - } - - err = v2signature.VerifyServiceMessage(resp) - if err != nil { - return errors.Wrap(err, "can't verify response message") - } - - return nil - default: - return errUnsupportedProtocol - } -} - -func v2ContainerClientFromOptions(opts *clientOptions) (cli *v2container.Client, err error) { - switch { - case opts.grpcOpts.v2ContainerClient != nil: - // return value from client cache - return opts.grpcOpts.v2ContainerClient, nil - - case opts.grpcOpts.conn != nil: - cli, err = v2container.NewClient(v2container.WithGlobalOpts( - client.WithGRPCConn(opts.grpcOpts.conn)), - ) - - case opts.addr != "": - cli, err = v2container.NewClient(v2container.WithGlobalOpts( - client.WithNetworkAddress(opts.addr), - client.WithDialTimeout(opts.dialTimeout), - )) - - default: - return nil, errOptionsLack("Container") - } - - // check if client correct and save in cache + resp, err := rpcapi.AnnounceUsedSpace(c.Raw(), req, client.WithContext(ctx)) if err != nil { - return nil, err + return errors.Wrap(err, "transport error") } - opts.grpcOpts.v2ContainerClient = cli + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return errors.Wrap(err, "can't verify response message") + } - return cli, nil + return nil } diff --git a/pkg/client/netmap.go b/pkg/client/netmap.go index b3f8966..057b0c3 100644 --- a/pkg/client/netmap.go +++ b/pkg/client/netmap.go @@ -3,9 +3,11 @@ package client import ( "context" + "github.com/nspcc-dev/neofs-api-go/pkg" "github.com/nspcc-dev/neofs-api-go/pkg/netmap" - "github.com/nspcc-dev/neofs-api-go/v2/client" + "github.com/nspcc-dev/neofs-api-go/rpc/client" v2netmap "github.com/nspcc-dev/neofs-api-go/v2/netmap" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" v2signature "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/pkg/errors" ) @@ -15,51 +17,37 @@ type Netmap interface { // EndpointInfo returns attributes, address and public key of the node, specified // in client constructor via address or open connection. This can be used as a // health check to see if node is alive and responses to requests. - EndpointInfo(context.Context, ...CallOption) (*netmap.NodeInfo, error) - // Epoch returns the epoch number from the local state of the remote host. - Epoch(context.Context, ...CallOption) (uint64, error) + EndpointInfo(context.Context, ...CallOption) (*EndpointInfo, error) // NetworkInfo returns information about the NeoFS network of which the remote server is a part. NetworkInfo(context.Context, ...CallOption) (*netmap.NetworkInfo, error) } +// EACLWithSignature represents eACL table/signature pair. +type EndpointInfo struct { + version *pkg.Version + + ni *netmap.NodeInfo +} + +// LatestVersion returns latest NeoFS API version in use. +func (e *EndpointInfo) LatestVersion() *pkg.Version { + return e.version +} + +// NodeInfo returns returns information about the NeoFS node. +func (e *EndpointInfo) NodeInfo() *netmap.NodeInfo { + return e.ni +} + // EndpointInfo returns attributes, address and public key of the node, specified // in client constructor via address or open connection. This can be used as a // health check to see if node is alive and responses to requests. -func (c clientImpl) EndpointInfo(ctx context.Context, opts ...CallOption) (*netmap.NodeInfo, error) { - switch c.remoteNode.Version.Major() { - case 2: - resp, err := c.endpointInfoV2(ctx, opts...) - if err != nil { - return nil, err - } - - return netmap.NewNodeInfoFromV2(resp.GetBody().GetNodeInfo()), nil - default: - return nil, errUnsupportedProtocol - } -} - -// Epoch returns the epoch number from the local state of the remote host. -func (c clientImpl) Epoch(ctx context.Context, opts ...CallOption) (uint64, error) { - switch c.remoteNode.Version.Major() { - case 2: - resp, err := c.endpointInfoV2(ctx, opts...) - if err != nil { - return 0, err - } - - return resp.GetMetaHeader().GetEpoch(), nil - default: - return 0, errUnsupportedProtocol - } -} - -func (c clientImpl) endpointInfoV2(ctx context.Context, opts ...CallOption) (*v2netmap.LocalNodeInfoResponse, error) { +func (c *clientImpl) EndpointInfo(ctx context.Context, opts ...CallOption) (*EndpointInfo, error) { // apply all available options callOptions := c.defaultCallOptions() for i := range opts { - opts[i].apply(&callOptions) + opts[i](callOptions) } reqBody := new(v2netmap.LocalNodeInfoRequestBody) @@ -73,81 +61,31 @@ func (c clientImpl) endpointInfoV2(ctx context.Context, opts ...CallOption) (*v2 return nil, err } - switch c.remoteNode.Protocol { - case GRPC: - cli, err := v2NetmapClientFromOptions(c.opts) - if err != nil { - return nil, errors.Wrap(err, "can't create grpc client") - } - - resp, err := cli.LocalNodeInfo(ctx, req) - if err != nil { - return nil, errors.Wrap(err, "transport error") - } - - err = v2signature.VerifyServiceMessage(resp) - if err != nil { - return nil, errors.Wrap(err, "can't verify response message") - } - - return resp, nil - default: - return nil, errUnsupportedProtocol - } -} - -func v2NetmapClientFromOptions(opts *clientOptions) (cli *v2netmap.Client, err error) { - switch { - case opts.grpcOpts.v2NetmapClient != nil: - // return value from client cache - return opts.grpcOpts.v2NetmapClient, nil - - case opts.grpcOpts.conn != nil: - cli, err = v2netmap.NewClient(v2netmap.WithGlobalOpts( - client.WithGRPCConn(opts.grpcOpts.conn)), - ) - - case opts.addr != "": - cli, err = v2netmap.NewClient(v2netmap.WithGlobalOpts( - client.WithNetworkAddress(opts.addr), - client.WithDialTimeout(opts.dialTimeout), - )) - - default: - return nil, errOptionsLack("Netmap") - } - - // check if client correct and save in cache + resp, err := rpcapi.LocalNodeInfo(c.Raw(), req) if err != nil { - return nil, err + return nil, errors.Wrap(err, "transport error") } - opts.grpcOpts.v2NetmapClient = cli + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, errors.Wrap(err, "can't verify response message") + } - return cli, nil + body := resp.GetBody() + + return &EndpointInfo{ + version: pkg.NewVersionFromV2(body.GetVersion()), + ni: netmap.NewNodeInfoFromV2(body.GetNodeInfo()), + }, nil } // NetworkInfo returns information about the NeoFS network of which the remote server is a part. -func (c clientImpl) NetworkInfo(ctx context.Context, opts ...CallOption) (*netmap.NetworkInfo, error) { - switch c.remoteNode.Version.Major() { - case 2: - resp, err := c.networkInfoV2(ctx, opts...) - if err != nil { - return nil, err - } - - return netmap.NewNetworkInfoFromV2(resp.GetBody().GetNetworkInfo()), nil - default: - return nil, errUnsupportedProtocol - } -} - -func (c clientImpl) networkInfoV2(ctx context.Context, opts ...CallOption) (*v2netmap.NetworkInfoResponse, error) { +func (c *clientImpl) NetworkInfo(ctx context.Context, opts ...CallOption) (*netmap.NetworkInfo, error) { // apply all available options callOptions := c.defaultCallOptions() for i := range opts { - opts[i].apply(&callOptions) + opts[i](callOptions) } reqBody := new(v2netmap.NetworkInfoRequestBody) @@ -161,25 +99,15 @@ func (c clientImpl) networkInfoV2(ctx context.Context, opts ...CallOption) (*v2n return nil, err } - switch c.remoteNode.Protocol { - case GRPC: - cli, err := v2NetmapClientFromOptions(c.opts) - if err != nil { - return nil, errors.Wrap(err, "could not create grpc client") - } - - resp, err := cli.NetworkInfo(ctx, req) - if err != nil { - return nil, errors.Wrap(err, "v2 NetworkInfo RPC failure") - } - - err = v2signature.VerifyServiceMessage(resp) - if err != nil { - return nil, errors.Wrap(err, "response message verification failed") - } - - return resp, nil - default: - return nil, errUnsupportedProtocol + resp, err := rpcapi.NetworkInfo(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, errors.Wrap(err, "v2 NetworkInfo RPC failure") } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, errors.Wrap(err, "response message verification failed") + } + + return netmap.NewNetworkInfoFromV2(resp.GetBody().GetNetworkInfo()), nil } diff --git a/pkg/client/object.go b/pkg/client/object.go index 62f99cf..b242422 100644 --- a/pkg/client/object.go +++ b/pkg/client/object.go @@ -10,10 +10,11 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/rpc/client" signer "github.com/nspcc-dev/neofs-api-go/util/signature" - "github.com/nspcc-dev/neofs-api-go/v2/client" v2object "github.com/nspcc-dev/neofs-api-go/v2/object" v2refs "github.com/nspcc-dev/neofs-api-go/v2/refs" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" v2session "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/pkg/errors" @@ -114,7 +115,7 @@ type putObjectV2Writer struct { req *v2object.PutRequest - stream v2object.PutObjectStreamer + stream *rpcapi.PutRequestWriter } type checksumType int @@ -171,7 +172,7 @@ func (w *putObjectV2Writer) Write(p []byte) (int, error) { return 0, errors.Wrap(err, "could not sign chunk request message") } - if err := w.stream.Send(w.req); err != nil { + if err := w.stream.Write(w.req); err != nil { return 0, errors.Wrap(err, "could not send chunk request message") } @@ -211,32 +212,11 @@ func (p *PutObjectParams) PayloadReader() io.Reader { } func (c *clientImpl) PutObject(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*object.ID, error) { - // check remote node version - switch c.remoteNode.Version.Major() { - case 2: - return c.putObjectV2(ctx, p, opts...) - default: - return nil, errUnsupportedProtocol - } -} - -func (c *clientImpl) putObjectV2(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*object.ID, error) { - // create V2 Object client - cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) - if err != nil { - return nil, errors.Wrap(err, "could not create Object V2 client") - } - - stream, err := cli.Put(ctx) - if err != nil { - return nil, errors.Wrap(err, "could not open Put object stream") - } - callOpts := c.defaultCallOptions() for i := range opts { if opts[i] != nil { - opts[i].apply(&callOpts) + opts[i](callOpts) } } @@ -253,11 +233,12 @@ func (c *clientImpl) putObjectV2(ctx context.Context, p *PutObjectParams, opts . // set meta header meta := v2MetaHeaderFromOpts(callOpts) - if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ addr: v2Addr, verb: v2session.ObjectVerbPut, }); err != nil { - return nil, errors.Wrap(err, "could not sign session token") + return nil, errors.Wrap(err, "could not attach session token") } req.SetMetaHeader(meta) @@ -275,12 +256,21 @@ func (c *clientImpl) putObjectV2(ctx context.Context, p *PutObjectParams, opts . // sign the request if err := signature.SignServiceMessage(callOpts.key, req); err != nil { - return nil, errors.Wrapf(err, "could not sign %T", req) + return nil, errors.Wrapf(err, "signing the request failed") + } + + // open stream + resp := new(v2object.PutResponse) + + stream, err := rpcapi.PutObject(c.Raw(), resp, client.WithContext(ctx)) + if err != nil { + return nil, errors.Wrap(err, "stream opening failed") } // send init part - if err := stream.Send(req); err != nil { - return nil, errors.Wrapf(err, "could not send %T", req) + err = stream.Write(req) + if err != nil { + return nil, errors.Wrap(err, "sending the initial message to stream failed") } // create payload bytes reader @@ -305,18 +295,18 @@ func (c *clientImpl) putObjectV2(ctx context.Context, p *PutObjectParams, opts . // copy payload from reader to stream writer _, err = io.CopyBuffer(w, r, make([]byte, chunkSize)) if err != nil && !errors.Is(errors.Cause(err), io.EOF) { - return nil, errors.Wrap(err, "could not send payload bytes to Put object stream") + return nil, errors.Wrap(err, "payload streaming failed") } // close object stream and receive response from remote node - resp, err := stream.CloseAndRecv() + err = stream.Close() if err != nil { - return nil, errors.Wrapf(err, "could not close %T", stream) + return nil, errors.Wrap(err, "closing the stream failed") } // verify response structure if err := signature.VerifyServiceMessage(resp); err != nil { - return nil, errors.Wrapf(err, "could not verify %T", resp) + return nil, errors.Wrap(err, "response verification failed") } // convert object identifier @@ -377,39 +367,11 @@ func DeleteObject(ctx context.Context, c Client, p *DeleteObjectParams, opts ... // // If target of tombstone address is not set, the address is ignored. func (c *clientImpl) DeleteObject(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) error { - // check remote node version - switch c.remoteNode.Version.Major() { - case 2: - if p.tombTgt == nil { - p.tombTgt = new(objectAddressWriter) - } - - resp, err := c.deleteObjectV2(ctx, p, opts...) - if err != nil { - return err - } - - addrV2 := resp.GetBody().GetTombstone() - p.tombTgt.SetAddress(object.NewAddressFromV2(addrV2)) - - return nil - default: - return errUnsupportedProtocol - } -} - -func (c *clientImpl) deleteObjectV2(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) (*v2object.DeleteResponse, error) { - // create V2 Object client - cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) - if err != nil { - return nil, errors.Wrap(err, "could not create Object V2 client") - } - callOpts := c.defaultCallOptions() for i := range opts { if opts[i] != nil { - opts[i].apply(&callOpts) + opts[i](callOpts) } } @@ -422,11 +384,12 @@ func (c *clientImpl) deleteObjectV2(ctx context.Context, p *DeleteObjectParams, // set meta header meta := v2MetaHeaderFromOpts(callOpts) - if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ addr: p.addr.ToV2(), verb: v2session.ObjectVerbDelete, }); err != nil { - return nil, errors.Wrap(err, "could not sign session token") + return errors.Wrap(err, "could not attach session token") } req.SetMetaHeader(meta) @@ -436,21 +399,25 @@ func (c *clientImpl) deleteObjectV2(ctx context.Context, p *DeleteObjectParams, // sign the request if err := signature.SignServiceMessage(callOpts.key, req); err != nil { - return nil, errors.Wrapf(err, "could not sign %T", req) + return errors.Wrap(err, "signing the request failed") } // send request - resp, err := cli.Delete(ctx, req) + resp, err := rpcapi.DeleteObject(c.Raw(), req, client.WithContext(ctx)) if err != nil { - return nil, errors.Wrapf(err, "could not send %T", req) + return errors.Wrap(err, "sending the request failed") } // verify response structure if err := signature.VerifyServiceMessage(resp); err != nil { - return nil, errors.Wrapf(err, "could not verify %T", resp) + return errors.Wrap(err, "response verification failed") } - return resp, nil + if p.tombTgt != nil { + p.tombTgt.SetAddress(object.NewAddressFromV2(resp.GetBody().GetTombstone())) + } + + return nil } func (p *GetObjectParams) WithAddress(v *object.Address) *GetObjectParams { @@ -501,28 +468,14 @@ func (p *GetObjectParams) RawFlag() bool { return false } +var errWrongMessageSeq = errors.New("incorrect message sequence") + func (c *clientImpl) GetObject(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*object.Object, error) { - // check remote node version - switch c.remoteNode.Version.Major() { - case 2: - return c.getObjectV2(ctx, p, opts...) - default: - return nil, errUnsupportedProtocol - } -} - -func (c *clientImpl) getObjectV2(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*object.Object, error) { - // create V2 Object client - cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) - if err != nil { - return nil, errors.Wrap(err, "could not create Object V2 client") - } - callOpts := c.defaultCallOptions() for i := range opts { if opts[i] != nil { - opts[i].apply(&callOpts) + opts[i](callOpts) } } @@ -535,11 +488,12 @@ func (c *clientImpl) getObjectV2(ctx context.Context, p *GetObjectParams, opts . // set meta header meta := v2MetaHeaderFromOpts(callOpts) - if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ addr: p.addr.ToV2(), verb: v2session.ObjectVerbGet, }); err != nil { - return nil, errors.Wrap(err, "could not sign session token") + return nil, errors.Wrap(err, "could not attach session token") } req.SetMetaHeader(meta) @@ -550,40 +504,52 @@ func (c *clientImpl) getObjectV2(ctx context.Context, p *GetObjectParams, opts . // sign the request if err := signature.SignServiceMessage(callOpts.key, req); err != nil { - return nil, errors.Wrapf(err, "could not sign %T", req) + return nil, errors.Wrap(err, "signing the request failed") } - // create Get object stream - stream, err := cli.Get(ctx, req) + // open stream + stream, err := rpcapi.GetObject(c.Raw(), req, client.WithContext(ctx)) if err != nil { - return nil, errors.Wrap(err, "could not create Get object stream") + return nil, errors.Wrap(err, "stream opening failed") } var ( + headWas bool payload []byte obj = new(v2object.Object) + resp = new(v2object.GetResponse) ) for { // receive message from server stream - resp, err := stream.Recv() + err := stream.Read(resp) if err != nil { if errors.Is(errors.Cause(err), io.EOF) { + if !headWas { + return nil, io.ErrUnexpectedEOF + } + break } - return nil, errors.Wrap(err, "could not receive Get response") + return nil, errors.Wrap(err, "reading the response failed") } // verify response structure if err := signature.VerifyServiceMessage(resp); err != nil { - return nil, errors.Wrapf(err, "could not verify %T", resp) + return nil, errors.Wrap(err, "response verification failed") } switch v := resp.GetBody().GetObjectPart().(type) { - case nil: - return nil, errNilObjectPart + default: + return nil, errors.Errorf("unexpected object part %T", v) case *v2object.GetObjectPartInit: + if headWas { + return nil, errWrongMessageSeq + } + + headWas = true + obj.SetObjectID(v.GetObjectID()) obj.SetSignature(v.GetSignature()) @@ -594,6 +560,10 @@ func (c *clientImpl) getObjectV2(ctx context.Context, p *GetObjectParams, opts . payload = make([]byte, 0, hdr.GetPayloadLength()) } case *v2object.GetObjectPartChunk: + if !headWas { + return nil, errWrongMessageSeq + } + if p.w != nil { if _, err := p.w.Write(v.GetChunk()); err != nil { return nil, errors.Wrap(err, "could not write payload chunk") @@ -604,8 +574,6 @@ func (c *clientImpl) getObjectV2(ctx context.Context, p *GetObjectParams, opts . case *v2object.SplitInfo: si := object.NewSplitInfoFromV2(v) return nil, object.NewSplitInfoError(si) - default: - panic(fmt.Sprintf("unexpected Get object part type %T", v)) } } @@ -674,27 +642,11 @@ func (p *ObjectHeaderParams) RawFlag() bool { } func (c *clientImpl) GetObjectHeader(ctx context.Context, p *ObjectHeaderParams, opts ...CallOption) (*object.Object, error) { - // check remote node version - switch c.remoteNode.Version.Major() { - case 2: - return c.getObjectHeaderV2(ctx, p, opts...) - default: - return nil, errUnsupportedProtocol - } -} - -func (c *clientImpl) getObjectHeaderV2(ctx context.Context, p *ObjectHeaderParams, opts ...CallOption) (*object.Object, error) { - // create V2 Object client - cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) - if err != nil { - return nil, errors.Wrap(err, "could not create Object V2 client") - } - callOpts := c.defaultCallOptions() for i := range opts { if opts[i] != nil { - opts[i].apply(&callOpts) + opts[i](callOpts) } } @@ -707,11 +659,12 @@ func (c *clientImpl) getObjectHeaderV2(ctx context.Context, p *ObjectHeaderParam // set meta header meta := v2MetaHeaderFromOpts(callOpts) - if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ addr: p.addr.ToV2(), verb: v2session.ObjectVerbHead, }); err != nil { - return nil, errors.Wrap(err, "could not sign session token") + return nil, errors.Wrap(err, "could not attach session token") } req.SetMetaHeader(meta) @@ -723,18 +676,18 @@ func (c *clientImpl) getObjectHeaderV2(ctx context.Context, p *ObjectHeaderParam // sign the request if err := signature.SignServiceMessage(callOpts.key, req); err != nil { - return nil, errors.Wrapf(err, "could not sign %T", req) + return nil, errors.Wrap(err, "signing the request failed") } // send Head request - resp, err := cli.Head(ctx, req) + resp, err := rpcapi.HeadObject(c.Raw(), req, client.WithContext(ctx)) if err != nil { - return nil, errors.Wrapf(err, "could not send %T", req) + return nil, errors.Wrap(err, "sending the request failed") } // verify response structure if err := signature.VerifyServiceMessage(resp); err != nil { - return nil, errors.Wrapf(err, "could not verify %T", resp) + return nil, errors.Wrap(err, "response verification failed") } var ( @@ -744,7 +697,7 @@ func (c *clientImpl) getObjectHeaderV2(ctx context.Context, p *ObjectHeaderParam switch v := resp.GetBody().GetHeaderPart().(type) { case nil: - return nil, errNilObjectPart + return nil, errors.Errorf("unexpected header type %T", v) case *v2object.ShortHeader: if !p.short { return nil, errors.Errorf("wrong header part type: expected %T, received %T", @@ -791,8 +744,6 @@ func (c *clientImpl) getObjectHeaderV2(ctx context.Context, p *ObjectHeaderParam si := object.NewSplitInfoFromV2(v) return nil, object.NewSplitInfoError(si) - default: - panic(fmt.Sprintf("unexpected Head object type %T", v)) } obj := new(v2object.Object) @@ -871,27 +822,11 @@ func (p *RangeDataParams) DataWriter() io.Writer { } func (c *clientImpl) ObjectPayloadRangeData(ctx context.Context, p *RangeDataParams, opts ...CallOption) ([]byte, error) { - // check remote node version - switch c.remoteNode.Version.Major() { - case 2: - return c.objectPayloadRangeV2(ctx, p, opts...) - default: - return nil, errUnsupportedProtocol - } -} - -func (c *clientImpl) objectPayloadRangeV2(ctx context.Context, p *RangeDataParams, opts ...CallOption) ([]byte, error) { - // create V2 Object client - cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) - if err != nil { - return nil, errors.Wrap(err, "could not create Object V2 client") - } - callOpts := c.defaultCallOptions() for i := range opts { if opts[i] != nil { - opts[i].apply(&callOpts) + opts[i](callOpts) } } @@ -904,11 +839,12 @@ func (c *clientImpl) objectPayloadRangeV2(ctx context.Context, p *RangeDataParam // set meta header meta := v2MetaHeaderFromOpts(callOpts) - if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ addr: p.addr.ToV2(), verb: v2session.ObjectVerbRange, }); err != nil { - return nil, errors.Wrap(err, "could not sign session token") + return nil, errors.Wrap(err, "could not attach session token") } req.SetMetaHeader(meta) @@ -920,11 +856,11 @@ func (c *clientImpl) objectPayloadRangeV2(ctx context.Context, p *RangeDataParam // sign the request if err := signature.SignServiceMessage(callOpts.key, req); err != nil { - return nil, errors.Wrapf(err, "could not sign %T", req) + return nil, errors.Wrapf(err, "signing the request failed") } - // create Get payload range stream - stream, err := cli.GetRange(ctx, req) + // open stream + stream, err := rpcapi.GetObjectRange(c.Raw(), req, client.WithContext(ctx)) if err != nil { return nil, errors.Wrap(err, "could not create Get payload range stream") } @@ -934,15 +870,17 @@ func (c *clientImpl) objectPayloadRangeV2(ctx context.Context, p *RangeDataParam payload = make([]byte, p.r.GetLength()) } + resp := new(v2object.GetRangeResponse) + for { // receive message from server stream - resp, err := stream.Recv() + err := stream.Read(resp) if err != nil { if errors.Is(errors.Cause(err), io.EOF) { break } - return nil, errors.Wrap(err, "could not receive Get payload range response") + return nil, errors.Wrap(err, "reading the response failed") } // verify response structure @@ -952,7 +890,7 @@ func (c *clientImpl) objectPayloadRangeV2(ctx context.Context, p *RangeDataParam switch v := resp.GetBody().GetRangePart().(type) { case nil: - return nil, errNilObjectPart + return nil, errors.Errorf("unexpected range type %T", v) case *v2object.GetRangePartChunk: if p.w != nil { if _, err = p.w.Write(v.GetChunk()); err != nil { @@ -965,8 +903,6 @@ func (c *clientImpl) objectPayloadRangeV2(ctx context.Context, p *RangeDataParam si := object.NewSplitInfoFromV2(v) return nil, object.NewSplitInfoError(si) - default: - panic(fmt.Sprintf("unexpected GetRange object type %T", v)) } } @@ -1048,27 +984,11 @@ func (c *clientImpl) ObjectPayloadRangeTZ(ctx context.Context, p *RangeChecksumP } func (c *clientImpl) objectPayloadRangeHash(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (interface{}, error) { - // check remote node version - switch c.remoteNode.Version.Major() { - case 2: - return c.objectPayloadRangeHashV2(ctx, p, opts...) - default: - return nil, errUnsupportedProtocol - } -} - -func (c *clientImpl) objectPayloadRangeHashV2(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (interface{}, error) { - // create V2 Object client - cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) - if err != nil { - return nil, errors.Wrap(err, "could not create Object V2 client") - } - callOpts := c.defaultCallOptions() for i := range opts { if opts[i] != nil { - opts[i].apply(&callOpts) + opts[i](callOpts) } } @@ -1081,11 +1001,12 @@ func (c *clientImpl) objectPayloadRangeHashV2(ctx context.Context, p *RangeCheck // set meta header meta := v2MetaHeaderFromOpts(callOpts) - if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ addr: p.addr.ToV2(), verb: v2session.ObjectVerbRangeHash, }); err != nil { - return nil, errors.Wrap(err, "could not sign session token") + return nil, errors.Wrap(err, "could not attach session token") } req.SetMetaHeader(meta) @@ -1102,18 +1023,18 @@ func (c *clientImpl) objectPayloadRangeHashV2(ctx context.Context, p *RangeCheck // sign the request if err := signature.SignServiceMessage(callOpts.key, req); err != nil { - return nil, errors.Wrapf(err, "could not sign %T", req) + return nil, errors.Wrapf(err, "signing the request failed") } // send request - resp, err := cli.GetRangeHash(ctx, req) + resp, err := rpcapi.HashObjectRange(c.Raw(), req, client.WithContext(ctx)) if err != nil { - return nil, errors.Wrapf(err, "could not send %T", req) + return nil, errors.Wrap(err, "sending the request failed") } // verify response structure if err := signature.VerifyServiceMessage(resp); err != nil { - return nil, errors.Wrapf(err, "could not verify %T", resp) + return nil, errors.Wrap(err, "response verification failed") } respBody := resp.GetBody() @@ -1197,27 +1118,11 @@ func (p *SearchObjectParams) SearchFilters() object.SearchFilters { } func (c *clientImpl) SearchObject(ctx context.Context, p *SearchObjectParams, opts ...CallOption) ([]*object.ID, error) { - // check remote node version - switch c.remoteNode.Version.Major() { - case 2: - return c.searchObjectV2(ctx, p, opts...) - default: - return nil, errUnsupportedProtocol - } -} - -func (c *clientImpl) searchObjectV2(ctx context.Context, p *SearchObjectParams, opts ...CallOption) ([]*object.ID, error) { - // create V2 Object client - cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts) - if err != nil { - return nil, errors.Wrap(err, "could not create Object V2 client") - } - callOpts := c.defaultCallOptions() for i := range opts { if opts[i] != nil { - opts[i].apply(&callOpts) + opts[i](callOpts) } } @@ -1233,11 +1138,12 @@ func (c *clientImpl) searchObjectV2(ctx context.Context, p *SearchObjectParams, // set meta header meta := v2MetaHeaderFromOpts(callOpts) - if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ addr: v2Addr, verb: v2session.ObjectVerbSearch, }); err != nil { - return nil, errors.Wrap(err, "could not sign session token") + return nil, errors.Wrap(err, "could not attach session token") } req.SetMetaHeader(meta) @@ -1249,26 +1155,29 @@ func (c *clientImpl) searchObjectV2(ctx context.Context, p *SearchObjectParams, // sign the request if err := signature.SignServiceMessage(callOpts.key, req); err != nil { - return nil, errors.Wrapf(err, "could not sign %T", req) + return nil, errors.Wrapf(err, "signing the request failed") } // create search stream - stream, err := cli.Search(ctx, req) + stream, err := rpcapi.SearchObjects(c.Raw(), req, client.WithContext(ctx)) if err != nil { - return nil, errors.Wrap(err, "could not create search stream") + return nil, errors.Wrap(err, "stream opening failed") } - var searchResult []*object.ID + var ( + searchResult []*object.ID + resp = new(v2object.SearchResponse) + ) for { // receive message from server stream - resp, err := stream.Recv() + err := stream.Read(resp) if err != nil { if errors.Is(errors.Cause(err), io.EOF) { break } - return nil, errors.Wrap(err, "could not receive search response") + return nil, errors.Wrap(err, "reading the response failed") } // verify response structure @@ -1285,39 +1194,7 @@ func (c *clientImpl) searchObjectV2(ctx context.Context, p *SearchObjectParams, return searchResult, nil } -func v2ObjectClient(proto TransportProtocol, opts *clientOptions) (*v2object.Client, error) { - switch proto { - case GRPC: - var err error - - if opts.grpcOpts.objectClientV2 == nil { - var optsV2 []v2object.Option - - if opts.grpcOpts.conn != nil { - optsV2 = []v2object.Option{ - v2object.WithGlobalOpts( - client.WithGRPCConn(opts.grpcOpts.conn), - ), - } - } else { - optsV2 = []v2object.Option{ - v2object.WithGlobalOpts( - client.WithNetworkAddress(opts.addr), - client.WithDialTimeout(opts.dialTimeout), - ), - } - } - - opts.grpcOpts.objectClientV2, err = v2object.NewClient(optsV2...) - } - - return opts.grpcOpts.objectClientV2, err - default: - return nil, errUnsupportedProtocol - } -} - -func (c clientImpl) attachV2SessionToken(opts callOptions, hdr *v2session.RequestMetaHeader, info v2SessionReqInfo) error { +func (c *clientImpl) attachV2SessionToken(opts *callOptions, hdr *v2session.RequestMetaHeader, info v2SessionReqInfo) error { if opts.session == nil { return nil } diff --git a/pkg/client/opts.go b/pkg/client/opts.go index 9f16888..9825d49 100644 --- a/pkg/client/opts.go +++ b/pkg/client/opts.go @@ -2,29 +2,20 @@ package client import ( "crypto/ecdsa" - "fmt" "time" "github.com/nspcc-dev/neofs-api-go/pkg" - "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/pkg/token" - v2accounting "github.com/nspcc-dev/neofs-api-go/v2/accounting" - v2container "github.com/nspcc-dev/neofs-api-go/v2/container" - v2netmap "github.com/nspcc-dev/neofs-api-go/v2/netmap" - v2object "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/rpc/client" "github.com/nspcc-dev/neofs-api-go/v2/refs" v2session "github.com/nspcc-dev/neofs-api-go/v2/session" "google.golang.org/grpc" ) type ( - CallOption interface { - apply(*callOptions) - } + CallOption func(*callOptions) - Option interface { - apply(*clientOptions) - } + Option func(*clientOptions) callOptions struct { version *pkg.Version @@ -37,21 +28,9 @@ type ( } clientOptions struct { - addr string + key *ecdsa.PrivateKey - dialTimeout time.Duration - - grpcOpts *grpcOptions - } - - grpcOptions struct { - conn *grpc.ClientConn - v2ContainerClient *v2container.Client - v2AccountingClient *v2accounting.Client - v2SessionClient *v2session.Client - v2NetmapClient *v2netmap.Client - - objectClientV2 *v2object.Client + rawOpts []client.Option } v2SessionReqInfo struct { @@ -62,74 +41,52 @@ type ( } ) -type errOptionsLack string - -func (e errOptionsLack) Error() string { - return fmt.Sprintf("lack of sdk client options to create %s client", string(e)) -} - -func (c clientImpl) defaultCallOptions() callOptions { - return callOptions{ - ttl: 2, +func (c *clientImpl) defaultCallOptions() *callOptions { + return &callOptions{ version: pkg.SDKVersion(), - key: c.key, - session: c.sessionToken, - bearer: c.bearerToken, - } -} - -type funcCallOption struct { - f func(*callOptions) -} - -func (fco *funcCallOption) apply(co *callOptions) { - fco.f(co) -} - -func newFuncCallOption(f func(option *callOptions)) *funcCallOption { - return &funcCallOption{ - f: f, + ttl: 2, + key: c.opts.key, } } func WithXHeader(x *pkg.XHeader) CallOption { - return newFuncCallOption(func(option *callOptions) { - option.xHeaders = append(option.xHeaders, x) - }) + return func(opts *callOptions) { + opts.xHeaders = append(opts.xHeaders, x) + } } func WithTTL(ttl uint32) CallOption { - return newFuncCallOption(func(option *callOptions) { - option.ttl = ttl - }) + return func(opts *callOptions) { + opts.ttl = ttl + } } // WithKey sets client's key for the next request. func WithKey(key *ecdsa.PrivateKey) CallOption { - return newFuncCallOption(func(option *callOptions) { - option.key = key - }) + return func(opts *callOptions) { + opts.key = key + } } func WithEpoch(epoch uint64) CallOption { - return newFuncCallOption(func(option *callOptions) { - option.epoch = epoch - }) + return func(opts *callOptions) { + opts.epoch = epoch + } } func WithSession(token *token.SessionToken) CallOption { - return newFuncCallOption(func(option *callOptions) { - option.session = token - }) + return func(opts *callOptions) { + opts.session = token + } } func WithBearer(token *token.BearerToken) CallOption { - return newFuncCallOption(func(option *callOptions) { - option.bearer = token - }) + return func(opts *callOptions) { + opts.bearer = token + } } -func v2MetaHeaderFromOpts(options callOptions) *v2session.RequestMetaHeader { +func v2MetaHeaderFromOpts(options *callOptions) *v2session.RequestMetaHeader { meta := new(v2session.RequestMetaHeader) meta.SetVersion(options.version.ToV2()) meta.SetTTL(options.ttl) @@ -153,50 +110,33 @@ func v2MetaHeaderFromOpts(options callOptions) *v2session.RequestMetaHeader { func defaultClientOptions() *clientOptions { return &clientOptions{ - grpcOpts: new(grpcOptions), - } -} - -type funcClientOption struct { - f func(*clientOptions) -} - -func (fco *funcClientOption) apply(co *clientOptions) { - fco.f(co) -} - -func newFuncClientOption(f func(option *clientOptions)) *funcClientOption { - return &funcClientOption{ - f: f, + rawOpts: make([]client.Option, 0, 3), } } func WithAddress(addr string) Option { - return newFuncClientOption(func(option *clientOptions) { - option.addr = addr - }) + return func(opts *clientOptions) { + opts.rawOpts = append(opts.rawOpts, client.WithNetworkAddress(addr)) + } } func WithGRPCConnection(grpcConn *grpc.ClientConn) Option { - return newFuncClientOption(func(option *clientOptions) { - option.grpcOpts.conn = grpcConn - }) + return func(opts *clientOptions) { + opts.rawOpts = append(opts.rawOpts, client.WithGRPCConn(grpcConn)) + } } // WithDialTimeout returns option to set connection timeout to the remote node. func WithDialTimeout(dur time.Duration) Option { - return newFuncClientOption(func(option *clientOptions) { - option.dialTimeout = dur - }) -} - -func newOwnerIDFromKey(key *ecdsa.PublicKey) (*owner.ID, error) { - w, err := owner.NEO3WalletFromPublicKey(key) - if err != nil { - return nil, err + return func(opts *clientOptions) { + opts.rawOpts = append(opts.rawOpts, client.WithDialTimeout(dur)) + } +} + +// WithDefaultPrivateKey returns option to set default private key +// used for the work. +func WithDefaultPrivateKey(key *ecdsa.PrivateKey) Option { + return func(opts *clientOptions) { + opts.key = key } - - ownerID := new(owner.ID) - ownerID.SetNeo3Wallet(w) - return ownerID, nil } diff --git a/pkg/client/raw.go b/pkg/client/raw.go new file mode 100644 index 0000000..a8fce33 --- /dev/null +++ b/pkg/client/raw.go @@ -0,0 +1,14 @@ +package client + +import ( + "github.com/nspcc-dev/neofs-api-go/rpc/client" +) + +// Raw returns underlying raw protobuf client. +func (c *clientImpl) Raw() *client.Client { + c.onceInit.Do(func() { + c.raw = client.New(c.opts.rawOpts...) + }) + + return c.raw +} diff --git a/pkg/client/session.go b/pkg/client/session.go index 4df7263..b7afa03 100644 --- a/pkg/client/session.go +++ b/pkg/client/session.go @@ -5,7 +5,8 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/pkg/token" - "github.com/nspcc-dev/neofs-api-go/v2/client" + "github.com/nspcc-dev/neofs-api-go/rpc/client" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" v2session "github.com/nspcc-dev/neofs-api-go/v2/session" v2signature "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/pkg/errors" @@ -15,29 +16,16 @@ import ( type Session interface { // CreateSession creates session using provided expiration time. CreateSession(context.Context, uint64, ...CallOption) (*token.SessionToken, error) - // AttachSessionToken attaches session token to be used by default for following requests. - AttachSessionToken(*token.SessionToken) - // AttachBearerToken attaches bearer token to be used by default for following requests. - AttachBearerToken(*token.BearerToken) } var errMalformedResponseBody = errors.New("malformed response body") -func (c clientImpl) CreateSession(ctx context.Context, expiration uint64, opts ...CallOption) (*token.SessionToken, error) { - switch c.remoteNode.Version.Major() { - case 2: - return c.createSessionV2(ctx, expiration, opts...) - default: - return nil, errUnsupportedProtocol - } -} - -func (c clientImpl) createSessionV2(ctx context.Context, expiration uint64, opts ...CallOption) (*token.SessionToken, error) { +func (c *clientImpl) CreateSession(ctx context.Context, expiration uint64, opts ...CallOption) (*token.SessionToken, error) { // apply all available options callOptions := c.defaultCallOptions() for i := range opts { - opts[i].apply(&callOptions) + opts[i](callOptions) } w, err := owner.NEO3WalletFromPublicKey(&callOptions.key.PublicKey) @@ -61,82 +49,25 @@ func (c clientImpl) createSessionV2(ctx context.Context, expiration uint64, opts return nil, err } - switch c.remoteNode.Protocol { - case GRPC: - cli, err := v2SessionClientFromOptions(c.opts) - if err != nil { - return nil, errors.Wrap(err, "can't create grpc client") - } - - resp, err := cli.Create(ctx, req) - if err != nil { - return nil, errors.Wrap(err, "transport error") - } - - err = v2signature.VerifyServiceMessage(resp) - if err != nil { - return nil, errors.Wrap(err, "can't verify response message") - } - - body := resp.GetBody() - if body == nil { - return nil, errMalformedResponseBody - } - - sessionToken := token.NewSessionToken() - sessionToken.SetID(body.GetID()) - sessionToken.SetSessionKey(body.GetSessionKey()) - sessionToken.SetOwnerID(ownerID) - - return sessionToken, nil - default: - return nil, errUnsupportedProtocol - } -} - -func v2SessionClientFromOptions(opts *clientOptions) (cli *v2session.Client, err error) { - switch { - case opts.grpcOpts.v2SessionClient != nil: - // return value from client cache - return opts.grpcOpts.v2SessionClient, nil - - case opts.grpcOpts.conn != nil: - cli, err = v2session.NewClient(v2session.WithGlobalOpts( - client.WithGRPCConn(opts.grpcOpts.conn)), - ) - - case opts.addr != "": - cli, err = v2session.NewClient(v2session.WithGlobalOpts( - client.WithNetworkAddress(opts.addr), - client.WithDialTimeout(opts.dialTimeout), - )) - - default: - return nil, errOptionsLack("Session") - } - - // check if client correct and save in cache + resp, err := rpcapi.CreateSession(c.Raw(), req, client.WithContext(ctx)) if err != nil { - return nil, err + return nil, errors.Wrap(err, "transport error") } - opts.grpcOpts.v2SessionClient = cli + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, errors.Wrap(err, "can't verify response message") + } - return cli, nil -} + body := resp.GetBody() + if body == nil { + return nil, errMalformedResponseBody + } -// AttachSessionToken attaches session token to client. -// -// Provided token is attached to all requests without WithSession option. -// Use WithSession(nil) option in order to send request without session token. -func (c *clientImpl) AttachSessionToken(token *token.SessionToken) { - c.sessionToken = token -} + sessionToken := token.NewSessionToken() + sessionToken.SetID(body.GetID()) + sessionToken.SetSessionKey(body.GetSessionKey()) + sessionToken.SetOwnerID(ownerID) -// AttachBearerToken attaches bearer token to client. -// -// Provided bearer is attached to all requests without WithBearer option. -// Use WithBearer(nil) option in order to send request without bearer token. -func (c *clientImpl) AttachBearerToken(token *token.BearerToken) { - c.bearerToken = token + return sessionToken, nil }