From 4855154b35391179c3b670445a27751e78fd4930 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 9 Nov 2021 11:07:49 +0300 Subject: [PATCH] [#62] client: move package from neofs-api-go Signed-off-by: Evgenii Stratonikov --- client/accounting.go | 57 ++ client/client.go | 45 ++ client/container.go | 465 ++++++++++++++ client/netmap.go | 124 ++++ client/object.go | 1373 +++++++++++++++++++++++++++++++++++++++++ client/object_test.go | 94 +++ client/opts.go | 189 ++++++ client/raw.go | 17 + client/reputation.go | 171 +++++ client/response.go | 37 ++ client/session.go | 79 +++ 11 files changed, 2651 insertions(+) create mode 100644 client/accounting.go create mode 100644 client/client.go create mode 100644 client/container.go create mode 100644 client/netmap.go create mode 100644 client/object.go create mode 100644 client/object_test.go create mode 100644 client/opts.go create mode 100644 client/raw.go create mode 100644 client/reputation.go create mode 100644 client/response.go create mode 100644 client/session.go diff --git a/client/accounting.go b/client/accounting.go new file mode 100644 index 00000000..b88cbf9f --- /dev/null +++ b/client/accounting.go @@ -0,0 +1,57 @@ +package client + +import ( + "context" + "fmt" + + "github.com/nspcc-dev/neofs-api-go/rpc/client" + v2accounting "github.com/nspcc-dev/neofs-api-go/v2/accounting" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" + v2signature "github.com/nspcc-dev/neofs-api-go/v2/signature" + "github.com/nspcc-dev/neofs-sdk-go/accounting" + "github.com/nspcc-dev/neofs-sdk-go/owner" +) + +// Accounting contains methods related to balance querying. +type Accounting interface { + // GetBalance returns balance of provided account. + GetBalance(context.Context, *owner.ID, ...CallOption) (*accounting.Decimal, error) +} + +func (c *clientImpl) GetBalance(ctx context.Context, owner *owner.ID, opts ...CallOption) (*accounting.Decimal, error) { + // apply all available options + callOptions := c.defaultCallOptions() + + for i := range opts { + opts[i](callOptions) + } + + reqBody := new(v2accounting.BalanceRequestBody) + reqBody.SetOwnerID(owner.ToV2()) + + req := new(v2accounting.BalanceRequest) + req.SetBody(reqBody) + req.SetMetaHeader(v2MetaHeaderFromOpts(callOptions)) + + err := v2signature.SignServiceMessage(callOptions.key, req) + if err != nil { + return nil, err + } + + resp, err := rpcapi.Balance(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("transport error: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOptions, resp); err != nil { + return nil, err + } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, fmt.Errorf("can't verify response message: %w", err) + } + + return accounting.NewDecimalFromV2(resp.GetBody().GetBalance()), nil +} diff --git a/client/client.go b/client/client.go new file mode 100644 index 00000000..09484296 --- /dev/null +++ b/client/client.go @@ -0,0 +1,45 @@ +package client + +import ( + "io" + + "github.com/nspcc-dev/neofs-api-go/rpc/client" +) + +// Client represents NeoFS client. +type Client interface { + Accounting + Container + Netmap + Object + Session + Reputation + + // Raw must return underlying raw protobuf client. + Raw() *client.Client + + // Conn must return underlying connection. + // + // Must return a non-nil result after the first RPC call + // completed without a connection error. + Conn() io.Closer +} + +type clientImpl struct { + raw *client.Client + + opts *clientOptions +} + +func New(opts ...Option) (Client, error) { + clientOptions := defaultClientOptions() + + for i := range opts { + opts[i](clientOptions) + } + + return &clientImpl{ + opts: clientOptions, + raw: client.New(clientOptions.rawOpts...), + }, nil +} diff --git a/client/container.go b/client/container.go new file mode 100644 index 00000000..73e5fb23 --- /dev/null +++ b/client/container.go @@ -0,0 +1,465 @@ +package client + +import ( + "context" + "errors" + "fmt" + + "github.com/nspcc-dev/neofs-api-go/rpc/client" + v2container "github.com/nspcc-dev/neofs-api-go/v2/container" + "github.com/nspcc-dev/neofs-api-go/v2/refs" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" + v2signature "github.com/nspcc-dev/neofs-api-go/v2/signature" + "github.com/nspcc-dev/neofs-sdk-go/container" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/eacl" + "github.com/nspcc-dev/neofs-sdk-go/owner" + "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/signature" + sigutil "github.com/nspcc-dev/neofs-sdk-go/util/signature" + "github.com/nspcc-dev/neofs-sdk-go/version" +) + +// Container contains methods related to container and ACL. +type Container interface { + // PutContainer creates new container in the NeoFS network. + PutContainer(context.Context, *container.Container, ...CallOption) (*cid.ID, error) + + // GetContainer returns container by ID. + GetContainer(context.Context, *cid.ID, ...CallOption) (*container.Container, error) + + // ListContainers return container list with the provided owner. + ListContainers(context.Context, *owner.ID, ...CallOption) ([]*cid.ID, error) + + // DeleteContainer removes container from NeoFS network. + DeleteContainer(context.Context, *cid.ID, ...CallOption) error + + // GetEACL returns extended ACL for a given container. + GetEACL(context.Context, *cid.ID, ...CallOption) (*EACLWithSignature, error) + + // SetEACL sets extended ACL. + SetEACL(context.Context, *eacl.Table, ...CallOption) error + + // AnnounceContainerUsedSpace announces amount of space which is taken by stored objects. + AnnounceContainerUsedSpace(context.Context, []container.UsedSpaceAnnouncement, ...CallOption) error +} + +type delContainerSignWrapper struct { + body *v2container.DeleteRequestBody +} + +// EACLWithSignature represents eACL table/signature pair. +type EACLWithSignature struct { + table *eacl.Table +} + +func (c delContainerSignWrapper) ReadSignedData(bytes []byte) ([]byte, error) { + return c.body.GetContainerID().GetValue(), nil +} + +func (c delContainerSignWrapper) SignedDataSize() int { + return len(c.body.GetContainerID().GetValue()) +} + +// EACL returns eACL table. +func (e EACLWithSignature) EACL() *eacl.Table { + return e.table +} + +// Signature returns table signature. +// +// Deprecated: use EACL().Signature() instead. +func (e EACLWithSignature) Signature() *signature.Signature { + return e.table.Signature() +} + +func (c *clientImpl) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) { + // apply all available options + callOptions := c.defaultCallOptions() + + for i := range opts { + opts[i](callOptions) + } + + // set transport version + cnr.SetVersion(version.Current()) + + // if container owner is not set, then use client key as owner + if cnr.OwnerID() == nil { + w, err := owner.NEO3WalletFromPublicKey(&callOptions.key.PublicKey) + if err != nil { + return nil, err + } + + ownerID := new(owner.ID) + ownerID.SetNeo3Wallet(w) + + cnr.SetOwnerID(ownerID) + } + + reqBody := new(v2container.PutRequestBody) + reqBody.SetContainer(cnr.ToV2()) + + // sign container + signWrapper := v2signature.StableMarshalerWrapper{SM: reqBody.GetContainer()} + + err := sigutil.SignDataWithHandler(callOptions.key, signWrapper, func(key []byte, sig []byte) { + containerSignature := new(refs.Signature) + containerSignature.SetKey(key) + containerSignature.SetSign(sig) + reqBody.SetSignature(containerSignature) + }, sigutil.SignWithRFC6979()) + if err != nil { + return nil, err + } + + req := new(v2container.PutRequest) + req.SetBody(reqBody) + + meta := v2MetaHeaderFromOpts(callOptions) + meta.SetSessionToken(cnr.SessionToken().ToV2()) + + req.SetMetaHeader(meta) + + err = v2signature.SignServiceMessage(callOptions.key, req) + if err != nil { + return nil, err + } + + resp, err := rpcapi.PutContainer(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, err + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOptions, resp); err != nil { + return nil, err + } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, fmt.Errorf("can't verify response message: %w", err) + } + + return cid.NewFromV2(resp.GetBody().GetContainerID()), nil +} + +// GetContainer receives container structure through NeoFS API call. +// +// Returns error if container structure is received but does not meet NeoFS API specification. +func (c *clientImpl) GetContainer(ctx context.Context, id *cid.ID, opts ...CallOption) (*container.Container, error) { + // apply all available options + callOptions := c.defaultCallOptions() + + for i := range opts { + opts[i](callOptions) + } + + reqBody := new(v2container.GetRequestBody) + reqBody.SetContainerID(id.ToV2()) + + req := new(v2container.GetRequest) + req.SetBody(reqBody) + req.SetMetaHeader(v2MetaHeaderFromOpts(callOptions)) + + err := v2signature.SignServiceMessage(callOptions.key, req) + if err != nil { + return nil, err + } + + resp, err := rpcapi.GetContainer(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("transport error: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOptions, resp); err != nil { + return nil, err + } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, fmt.Errorf("can't verify response message: %w", err) + } + + body := resp.GetBody() + + cnr := container.NewContainerFromV2(body.GetContainer()) + cnr.SetSessionToken(session.NewTokenFromV2(body.GetSessionToken())) + cnr.SetSignature(signature.NewFromV2(body.GetSignature())) + + return cnr, nil +} + +// GetVerifiedContainerStructure is a wrapper over Client.GetContainer method +// which checks if the structure of the resulting container matches its identifier. +// +// Returns an error if container does not match the identifier. +func GetVerifiedContainerStructure(ctx context.Context, c Client, id *cid.ID, opts ...CallOption) (*container.Container, error) { + cnr, err := c.GetContainer(ctx, id, opts...) + if err != nil { + return nil, err + } + + if !container.CalculateID(cnr).Equal(id) { + return nil, errors.New("container structure does not match the identifier") + } + + return cnr, nil +} + +func (c *clientImpl) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) { + // apply all available options + callOptions := c.defaultCallOptions() + + for i := range opts { + opts[i](callOptions) + } + + if ownerID == nil { + w, err := owner.NEO3WalletFromPublicKey(&callOptions.key.PublicKey) + if err != nil { + return nil, err + } + + ownerID = new(owner.ID) + ownerID.SetNeo3Wallet(w) + } + + reqBody := new(v2container.ListRequestBody) + reqBody.SetOwnerID(ownerID.ToV2()) + + req := new(v2container.ListRequest) + req.SetBody(reqBody) + req.SetMetaHeader(v2MetaHeaderFromOpts(callOptions)) + + err := v2signature.SignServiceMessage(callOptions.key, req) + if err != nil { + return nil, err + } + + resp, err := rpcapi.ListContainers(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("transport error: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOptions, resp); err != nil { + return nil, err + } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, fmt.Errorf("can't verify response message: %w", err) + } + + result := make([]*cid.ID, 0, len(resp.GetBody().GetContainerIDs())) + for _, cidV2 := range resp.GetBody().GetContainerIDs() { + result = append(result, cid.NewFromV2(cidV2)) + } + + return result, nil +} + +func (c *clientImpl) DeleteContainer(ctx context.Context, id *cid.ID, opts ...CallOption) error { + // apply all available options + callOptions := c.defaultCallOptions() + + for i := range opts { + opts[i](callOptions) + } + + reqBody := new(v2container.DeleteRequestBody) + reqBody.SetContainerID(id.ToV2()) + + // sign container + err := sigutil.SignDataWithHandler(callOptions.key, + delContainerSignWrapper{ + body: reqBody, + }, + func(key []byte, sig []byte) { + containerSignature := new(refs.Signature) + containerSignature.SetKey(key) + containerSignature.SetSign(sig) + reqBody.SetSignature(containerSignature) + }, + sigutil.SignWithRFC6979()) + if err != nil { + return err + } + + req := new(v2container.DeleteRequest) + req.SetBody(reqBody) + req.SetMetaHeader(v2MetaHeaderFromOpts(callOptions)) + + err = v2signature.SignServiceMessage(callOptions.key, req) + if err != nil { + return err + } + + resp, err := rpcapi.DeleteContainer(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return fmt.Errorf("transport error: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOptions, resp); err != nil { + return err + } + + if err := v2signature.VerifyServiceMessage(resp); err != nil { + return fmt.Errorf("can't verify response message: %w", err) + } + + return nil +} + +func (c *clientImpl) GetEACL(ctx context.Context, id *cid.ID, opts ...CallOption) (*EACLWithSignature, error) { + // apply all available options + callOptions := c.defaultCallOptions() + + for i := range opts { + opts[i](callOptions) + } + + reqBody := new(v2container.GetExtendedACLRequestBody) + reqBody.SetContainerID(id.ToV2()) + + req := new(v2container.GetExtendedACLRequest) + req.SetBody(reqBody) + req.SetMetaHeader(v2MetaHeaderFromOpts(callOptions)) + + err := v2signature.SignServiceMessage(callOptions.key, req) + if err != nil { + return nil, err + } + + resp, err := rpcapi.GetEACL(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("transport error: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOptions, resp); err != nil { + return nil, err + } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, fmt.Errorf("can't verify response message: %w", err) + } + + body := resp.GetBody() + + table := eacl.NewTableFromV2(body.GetEACL()) + table.SetSessionToken(session.NewTokenFromV2(body.GetSessionToken())) + table.SetSignature(signature.NewFromV2(body.GetSignature())) + + return &EACLWithSignature{ + table: table, + }, nil +} + +func (c *clientImpl) SetEACL(ctx context.Context, eacl *eacl.Table, opts ...CallOption) error { + // apply all available options + callOptions := c.defaultCallOptions() + + for i := range opts { + opts[i](callOptions) + } + + reqBody := new(v2container.SetExtendedACLRequestBody) + reqBody.SetEACL(eacl.ToV2()) + reqBody.GetEACL().SetVersion(version.Current().ToV2()) + + signWrapper := v2signature.StableMarshalerWrapper{SM: reqBody.GetEACL()} + + err := sigutil.SignDataWithHandler(callOptions.key, signWrapper, func(key []byte, sig []byte) { + eaclSignature := new(refs.Signature) + eaclSignature.SetKey(key) + eaclSignature.SetSign(sig) + reqBody.SetSignature(eaclSignature) + }, sigutil.SignWithRFC6979()) + if err != nil { + return err + } + + meta := v2MetaHeaderFromOpts(callOptions) + meta.SetSessionToken(eacl.SessionToken().ToV2()) + + req := new(v2container.SetExtendedACLRequest) + req.SetBody(reqBody) + req.SetMetaHeader(meta) + + err = v2signature.SignServiceMessage(callOptions.key, req) + if err != nil { + return err + } + + resp, err := rpcapi.SetEACL(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return fmt.Errorf("transport error: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOptions, resp); err != nil { + return err + } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return fmt.Errorf("can't verify response message: %w", err) + } + + return nil +} + +// AnnounceContainerUsedSpace used by storage nodes to estimate their container +// sizes during lifetime. Use it only in storage node applications. +func (c *clientImpl) AnnounceContainerUsedSpace( + ctx context.Context, + announce []container.UsedSpaceAnnouncement, + opts ...CallOption) error { + callOptions := c.defaultCallOptions() // apply all available options + + for i := range opts { + opts[i](callOptions) + } + + // convert list of SDK announcement structures into NeoFS-API v2 list + v2announce := make([]*v2container.UsedSpaceAnnouncement, 0, len(announce)) + for i := range announce { + v2announce = append(v2announce, announce[i].ToV2()) + } + + // prepare body of the NeoFS-API v2 request and request itself + reqBody := new(v2container.AnnounceUsedSpaceRequestBody) + reqBody.SetAnnouncements(v2announce) + + req := new(v2container.AnnounceUsedSpaceRequest) + req.SetBody(reqBody) + req.SetMetaHeader(v2MetaHeaderFromOpts(callOptions)) + + // sign the request + err := v2signature.SignServiceMessage(callOptions.key, req) + if err != nil { + return err + } + + resp, err := rpcapi.AnnounceUsedSpace(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return fmt.Errorf("transport error: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOptions, resp); err != nil { + return err + } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return fmt.Errorf("can't verify response message: %w", err) + } + + return nil +} diff --git a/client/netmap.go b/client/netmap.go new file mode 100644 index 00000000..be618930 --- /dev/null +++ b/client/netmap.go @@ -0,0 +1,124 @@ +package client + +import ( + "context" + "fmt" + + "github.com/nspcc-dev/neofs-api-go/rpc/client" + v2netmap "github.com/nspcc-dev/neofs-api-go/v2/netmap" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" + v2signature "github.com/nspcc-dev/neofs-api-go/v2/signature" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/version" +) + +// Netmap contains methods related to netmap. +type Netmap interface { + // EndpointInfo returns attributes, address and public key of the node, specified + // in client constructor via address or open connection. This can be used as a + // health check to see if node is alive and responses to requests. + EndpointInfo(context.Context, ...CallOption) (*EndpointInfo, error) + + // NetworkInfo returns information about the NeoFS network of which the remote server is a part. + NetworkInfo(context.Context, ...CallOption) (*netmap.NetworkInfo, error) +} + +// EACLWithSignature represents eACL table/signature pair. +type EndpointInfo struct { + version *version.Version + + ni *netmap.NodeInfo +} + +// LatestVersion returns latest NeoFS API version in use. +func (e *EndpointInfo) LatestVersion() *version.Version { + return e.version +} + +// NodeInfo returns returns information about the NeoFS node. +func (e *EndpointInfo) NodeInfo() *netmap.NodeInfo { + return e.ni +} + +// EndpointInfo returns attributes, address and public key of the node, specified +// in client constructor via address or open connection. This can be used as a +// health check to see if node is alive and responses to requests. +func (c *clientImpl) EndpointInfo(ctx context.Context, opts ...CallOption) (*EndpointInfo, error) { + // apply all available options + callOptions := c.defaultCallOptions() + + for i := range opts { + opts[i](callOptions) + } + + reqBody := new(v2netmap.LocalNodeInfoRequestBody) + + req := new(v2netmap.LocalNodeInfoRequest) + req.SetBody(reqBody) + req.SetMetaHeader(v2MetaHeaderFromOpts(callOptions)) + + err := v2signature.SignServiceMessage(callOptions.key, req) + if err != nil { + return nil, err + } + + resp, err := rpcapi.LocalNodeInfo(c.Raw(), req) + if err != nil { + return nil, fmt.Errorf("transport error: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOptions, resp); err != nil { + return nil, err + } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, fmt.Errorf("can't verify response message: %w", err) + } + + body := resp.GetBody() + + return &EndpointInfo{ + version: version.NewFromV2(body.GetVersion()), + ni: netmap.NewNodeInfoFromV2(body.GetNodeInfo()), + }, nil +} + +// NetworkInfo returns information about the NeoFS network of which the remote server is a part. +func (c *clientImpl) NetworkInfo(ctx context.Context, opts ...CallOption) (*netmap.NetworkInfo, error) { + // apply all available options + callOptions := c.defaultCallOptions() + + for i := range opts { + opts[i](callOptions) + } + + reqBody := new(v2netmap.NetworkInfoRequestBody) + + req := new(v2netmap.NetworkInfoRequest) + req.SetBody(reqBody) + req.SetMetaHeader(v2MetaHeaderFromOpts(callOptions)) + + err := v2signature.SignServiceMessage(callOptions.key, req) + if err != nil { + return nil, err + } + + resp, err := rpcapi.NetworkInfo(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("v2 NetworkInfo RPC failure: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOptions, resp); err != nil { + return nil, err + } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, fmt.Errorf("response message verification failed: %w", err) + } + + return netmap.NewNetworkInfoFromV2(resp.GetBody().GetNetworkInfo()), nil +} diff --git a/client/object.go b/client/object.go new file mode 100644 index 00000000..9b56298a --- /dev/null +++ b/client/object.go @@ -0,0 +1,1373 @@ +package client + +import ( + "bytes" + "context" + "crypto/ecdsa" + "crypto/sha256" + "errors" + "fmt" + "io" + + "github.com/nspcc-dev/neofs-api-go/rpc/client" + v2object "github.com/nspcc-dev/neofs-api-go/v2/object" + v2refs "github.com/nspcc-dev/neofs-api-go/v2/refs" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" + v2session "github.com/nspcc-dev/neofs-api-go/v2/session" + "github.com/nspcc-dev/neofs-api-go/v2/signature" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + signer "github.com/nspcc-dev/neofs-sdk-go/util/signature" +) + +// Object contains methods for working with objects. +type Object interface { + // PutObject puts new object to NeoFS. + PutObject(context.Context, *PutObjectParams, ...CallOption) (*object.ID, error) + + // DeleteObject deletes object to NeoFS. + DeleteObject(context.Context, *DeleteObjectParams, ...CallOption) error + + // GetObject returns object stored in NeoFS. + GetObject(context.Context, *GetObjectParams, ...CallOption) (*object.Object, error) + + // GetObjectHeader returns object header. + GetObjectHeader(context.Context, *ObjectHeaderParams, ...CallOption) (*object.Object, error) + + // ObjectPayloadRangeData returns range of object payload. + ObjectPayloadRangeData(context.Context, *RangeDataParams, ...CallOption) ([]byte, error) + + // ObjectPayloadRangeSHA256 returns sha-256 hashes of object sub-ranges from NeoFS. + ObjectPayloadRangeSHA256(context.Context, *RangeChecksumParams, ...CallOption) ([][sha256.Size]byte, error) + + // ObjectPayloadRangeTZ returns homomorphic hashes of object sub-ranges from NeoFS. + ObjectPayloadRangeTZ(context.Context, *RangeChecksumParams, ...CallOption) ([][TZSize]byte, error) + + // SearchObject searches for objects in NeoFS using provided parameters. + SearchObject(context.Context, *SearchObjectParams, ...CallOption) ([]*object.ID, error) +} + +type PutObjectParams struct { + obj *object.Object + + r io.Reader +} + +// ObjectAddressWriter is an interface of the +// component that writes the object address. +type ObjectAddressWriter interface { + SetAddress(*object.Address) +} + +type objectAddressWriter struct { + addr *object.Address +} + +type DeleteObjectParams struct { + addr *object.Address + + tombTgt ObjectAddressWriter +} + +type GetObjectParams struct { + addr *object.Address + + raw bool + + w io.Writer + + readerHandler ReaderHandler +} + +type ObjectHeaderParams struct { + addr *object.Address + + raw bool + + short bool +} + +type RangeDataParams struct { + addr *object.Address + + raw bool + + r *object.Range + + w io.Writer +} + +type RangeChecksumParams struct { + typ checksumType + + addr *object.Address + + rs []*object.Range + + salt []byte +} + +type SearchObjectParams struct { + cid *cid.ID + + filters object.SearchFilters +} + +type putObjectV2Reader struct { + r io.Reader +} + +type putObjectV2Writer struct { + key *ecdsa.PrivateKey + + chunkPart *v2object.PutObjectPartChunk + + req *v2object.PutRequest + + stream *rpcapi.PutRequestWriter +} + +type checksumType int + +const ( + _ checksumType = iota + checksumSHA256 + checksumTZ +) + +const chunkSize = 3 * (1 << 20) + +const TZSize = 64 + +const searchQueryVersion uint32 = 1 + +var errNilObjectPart = errors.New("received nil object part") + +func (w *objectAddressWriter) SetAddress(addr *object.Address) { + w.addr = addr +} + +func rangesToV2(rs []*object.Range) []*v2object.Range { + r2 := make([]*v2object.Range, len(rs)) + + for i := range rs { + r2[i] = rs[i].ToV2() + } + + return r2 +} + +func (t checksumType) toV2() v2refs.ChecksumType { + switch t { + case checksumSHA256: + return v2refs.SHA256 + case checksumTZ: + return v2refs.TillichZemor + default: + panic(fmt.Sprintf("invalid checksum type %d", t)) + } +} + +func (w *putObjectV2Reader) Read(p []byte) (int, error) { + return w.r.Read(p) +} + +func (w *putObjectV2Writer) Write(p []byte) (int, error) { + w.chunkPart.SetChunk(p) + + w.req.SetVerificationHeader(nil) + + if err := signature.SignServiceMessage(w.key, w.req); err != nil { + return 0, fmt.Errorf("could not sign chunk request message: %w", err) + } + + if err := w.stream.Write(w.req); err != nil { + return 0, fmt.Errorf("could not send chunk request message: %w", err) + } + + return len(p), nil +} + +func (p *PutObjectParams) WithObject(v *object.Object) *PutObjectParams { + if p != nil { + p.obj = v + } + + return p +} + +func (p *PutObjectParams) Object() *object.Object { + if p != nil { + return p.obj + } + + return nil +} + +func (p *PutObjectParams) WithPayloadReader(v io.Reader) *PutObjectParams { + if p != nil { + p.r = v + } + + return p +} + +func (p *PutObjectParams) PayloadReader() io.Reader { + if p != nil { + return p.r + } + + return nil +} + +func (c *clientImpl) PutObject(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*object.ID, error) { + callOpts := c.defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i](callOpts) + } + } + + // create request + req := new(v2object.PutRequest) + + // initialize request body + body := new(v2object.PutRequestBody) + req.SetBody(body) + + v2Addr := new(v2refs.Address) + v2Addr.SetObjectID(p.obj.ID().ToV2()) + v2Addr.SetContainerID(p.obj.ContainerID().ToV2()) + + // set meta header + meta := v2MetaHeaderFromOpts(callOpts) + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + addr: v2Addr, + verb: v2session.ObjectVerbPut, + }); err != nil { + return nil, fmt.Errorf("could not attach session token: %w", err) + } + + req.SetMetaHeader(meta) + + // initialize init part + initPart := new(v2object.PutObjectPartInit) + body.SetObjectPart(initPart) + + obj := p.obj.ToV2() + + // set init part fields + initPart.SetObjectID(obj.GetObjectID()) + initPart.SetSignature(obj.GetSignature()) + initPart.SetHeader(obj.GetHeader()) + + // sign the request + if err := signature.SignServiceMessage(callOpts.key, req); err != nil { + return nil, fmt.Errorf("signing the request failed: %w", err) + } + + // open stream + resp := new(v2object.PutResponse) + + stream, err := rpcapi.PutObject(c.Raw(), resp, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("stream opening failed: %w", err) + } + + // send init part + err = stream.Write(req) + if err != nil { + return nil, fmt.Errorf("sending the initial message to stream failed: %w", err) + } + + // create payload bytes reader + var rPayload io.Reader = bytes.NewReader(obj.GetPayload()) + if p.r != nil { + rPayload = io.MultiReader(rPayload, p.r) + } + + // create v2 payload stream writer + chunkPart := new(v2object.PutObjectPartChunk) + body.SetObjectPart(chunkPart) + + w := &putObjectV2Writer{ + key: callOpts.key, + chunkPart: chunkPart, + req: req, + stream: stream, + } + + r := &putObjectV2Reader{r: rPayload} + + // copy payload from reader to stream writer + _, err = io.CopyBuffer(w, r, make([]byte, chunkSize)) + if err != nil && !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("payload streaming failed: %w", err) + } + + // close object stream and receive response from remote node + err = stream.Close() + if err != nil { + return nil, fmt.Errorf("closing the stream failed: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOpts, resp); err != nil { + return nil, err + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, fmt.Errorf("response verification failed: %w", err) + } + + // convert object identifier + id := object.NewIDFromV2(resp.GetBody().GetObjectID()) + + return id, nil +} + +func (p *DeleteObjectParams) WithAddress(v *object.Address) *DeleteObjectParams { + if p != nil { + p.addr = v + } + + return p +} + +func (p *DeleteObjectParams) Address() *object.Address { + if p != nil { + return p.addr + } + + return nil +} + +// WithTombstoneAddressTarget sets target component to write tombstone address. +func (p *DeleteObjectParams) WithTombstoneAddressTarget(v ObjectAddressWriter) *DeleteObjectParams { + if p != nil { + p.tombTgt = v + } + + return p +} + +// TombstoneAddressTarget returns target component to write tombstone address. +func (p *DeleteObjectParams) TombstoneAddressTarget() ObjectAddressWriter { + if p != nil { + return p.tombTgt + } + + return nil +} + +// DeleteObject is a wrapper over Client.DeleteObject method +// that provides the ability to receive tombstone address +// without setting a target in the parameters. +func DeleteObject(ctx context.Context, c Client, p *DeleteObjectParams, opts ...CallOption) (*object.Address, error) { + w := new(objectAddressWriter) + + err := c.DeleteObject(ctx, p.WithTombstoneAddressTarget(w), opts...) + if err != nil { + return nil, err + } + + return w.addr, nil +} + +// DeleteObject removes object by address. +// +// If target of tombstone address is not set, the address is ignored. +func (c *clientImpl) DeleteObject(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) error { + callOpts := c.defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i](callOpts) + } + } + + // create request + req := new(v2object.DeleteRequest) + + // initialize request body + body := new(v2object.DeleteRequestBody) + req.SetBody(body) + + // set meta header + meta := v2MetaHeaderFromOpts(callOpts) + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + addr: p.addr.ToV2(), + verb: v2session.ObjectVerbDelete, + }); err != nil { + return fmt.Errorf("could not attach session token: %w", err) + } + + req.SetMetaHeader(meta) + + // fill body fields + body.SetAddress(p.addr.ToV2()) + + // sign the request + if err := signature.SignServiceMessage(callOpts.key, req); err != nil { + return fmt.Errorf("signing the request failed: %w", err) + } + + // send request + resp, err := rpcapi.DeleteObject(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return fmt.Errorf("sending the request failed: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOpts, resp); err != nil { + return err + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return fmt.Errorf("response verification failed: %w", err) + } + + if p.tombTgt != nil { + p.tombTgt.SetAddress(object.NewAddressFromV2(resp.GetBody().GetTombstone())) + } + + return nil +} + +func (p *GetObjectParams) WithAddress(v *object.Address) *GetObjectParams { + if p != nil { + p.addr = v + } + + return p +} + +func (p *GetObjectParams) Address() *object.Address { + if p != nil { + return p.addr + } + + return nil +} + +func (p *GetObjectParams) WithPayloadWriter(w io.Writer) *GetObjectParams { + if p != nil { + p.w = w + } + + return p +} + +func (p *GetObjectParams) PayloadWriter() io.Writer { + if p != nil { + return p.w + } + + return nil +} + +func (p *GetObjectParams) WithRawFlag(v bool) *GetObjectParams { + if p != nil { + p.raw = v + } + + return p +} + +func (p *GetObjectParams) RawFlag() bool { + if p != nil { + return p.raw + } + + return false +} + +// ReaderHandler is a function over io.Reader. +type ReaderHandler func(io.Reader) + +// WithPayloadReaderHandler sets handler of the payload reader. +// +// If provided, payload reader is composed after receiving the header. +// In this case payload writer set via WithPayloadWriter is ignored. +// +// Handler should not be nil. +func (p *GetObjectParams) WithPayloadReaderHandler(f ReaderHandler) *GetObjectParams { + if p != nil { + p.readerHandler = f + } + + return p +} + +// wrapper over the Object Get stream that provides io.Reader. +type objectPayloadReader struct { + stream interface { + Read(*v2object.GetResponse) error + } + + resp v2object.GetResponse + + tail []byte +} + +func (x *objectPayloadReader) Read(p []byte) (read int, err error) { + // read remaining tail + read = copy(p, x.tail) + + x.tail = x.tail[read:] + + if len(p)-read == 0 { + return + } + + // receive message from server stream + err = x.stream.Read(&x.resp) + if err != nil { + if errors.Is(err, io.EOF) { + err = io.EOF + return + } + + err = fmt.Errorf("reading the response failed: %w", err) + return + } + + // get chunk part message + part := x.resp.GetBody().GetObjectPart() + + chunkPart, ok := part.(*v2object.GetObjectPartChunk) + if !ok { + err = errWrongMessageSeq + return + } + + // verify response structure + if err = signature.VerifyServiceMessage(&x.resp); err != nil { + err = fmt.Errorf("response verification failed: %w", err) + return + } + + // read new chunk + chunk := chunkPart.GetChunk() + + tailOffset := copy(p[read:], chunk) + + read += tailOffset + + // save the tail + x.tail = append(x.tail, chunk[tailOffset:]...) + + return +} + +var errWrongMessageSeq = errors.New("incorrect message sequence") + +func (c *clientImpl) GetObject(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*object.Object, error) { + callOpts := c.defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i](callOpts) + } + } + + // create request + req := new(v2object.GetRequest) + + // initialize request body + body := new(v2object.GetRequestBody) + req.SetBody(body) + + // set meta header + meta := v2MetaHeaderFromOpts(callOpts) + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + addr: p.addr.ToV2(), + verb: v2session.ObjectVerbGet, + }); err != nil { + return nil, fmt.Errorf("could not attach session token: %w", err) + } + + req.SetMetaHeader(meta) + + // fill body fields + body.SetAddress(p.addr.ToV2()) + body.SetRaw(p.raw) + + // sign the request + if err := signature.SignServiceMessage(callOpts.key, req); err != nil { + return nil, fmt.Errorf("signing the request failed: %w", err) + } + + // open stream + stream, err := rpcapi.GetObject(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("stream opening failed: %w", err) + } + + var ( + headWas bool + payload []byte + obj = new(v2object.Object) + resp = new(v2object.GetResponse) + ) + +loop: + for { + // receive message from server stream + err := stream.Read(resp) + if err != nil { + if errors.Is(err, io.EOF) { + if !headWas { + return nil, io.ErrUnexpectedEOF + } + + break + } + + return nil, fmt.Errorf("reading the response failed: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOpts, resp); err != nil { + return nil, err + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, fmt.Errorf("response verification failed: %w", err) + } + + switch v := resp.GetBody().GetObjectPart().(type) { + default: + return nil, fmt.Errorf("unexpected object part %T", v) + case *v2object.GetObjectPartInit: + if headWas { + return nil, errWrongMessageSeq + } + + headWas = true + + obj.SetObjectID(v.GetObjectID()) + obj.SetSignature(v.GetSignature()) + + hdr := v.GetHeader() + obj.SetHeader(hdr) + + if p.readerHandler != nil { + p.readerHandler(&objectPayloadReader{ + stream: stream, + }) + + break loop + } + + if p.w == nil { + payload = make([]byte, 0, hdr.GetPayloadLength()) + } + case *v2object.GetObjectPartChunk: + if !headWas { + return nil, errWrongMessageSeq + } + + if p.w != nil { + if _, err := p.w.Write(v.GetChunk()); err != nil { + return nil, fmt.Errorf("could not write payload chunk: %w", err) + } + } else { + payload = append(payload, v.GetChunk()...) + } + case *v2object.SplitInfo: + si := object.NewSplitInfoFromV2(v) + return nil, object.NewSplitInfoError(si) + } + } + + obj.SetPayload(payload) + + // convert the object + return object.NewFromV2(obj), nil +} + +func (p *ObjectHeaderParams) WithAddress(v *object.Address) *ObjectHeaderParams { + if p != nil { + p.addr = v + } + + return p +} + +func (p *ObjectHeaderParams) Address() *object.Address { + if p != nil { + return p.addr + } + + return nil +} + +func (p *ObjectHeaderParams) WithAllFields() *ObjectHeaderParams { + if p != nil { + p.short = false + } + + return p +} + +// AllFields return true if parameter set to return all header fields, returns +// false if parameter set to return only main fields of header. +func (p *ObjectHeaderParams) AllFields() bool { + if p != nil { + return !p.short + } + + return false +} + +func (p *ObjectHeaderParams) WithMainFields() *ObjectHeaderParams { + if p != nil { + p.short = true + } + + return p +} + +func (p *ObjectHeaderParams) WithRawFlag(v bool) *ObjectHeaderParams { + if p != nil { + p.raw = v + } + + return p +} + +func (p *ObjectHeaderParams) RawFlag() bool { + if p != nil { + return p.raw + } + + return false +} + +func (c *clientImpl) GetObjectHeader(ctx context.Context, p *ObjectHeaderParams, opts ...CallOption) (*object.Object, error) { + callOpts := c.defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i](callOpts) + } + } + + // create request + req := new(v2object.HeadRequest) + + // initialize request body + body := new(v2object.HeadRequestBody) + req.SetBody(body) + + // set meta header + meta := v2MetaHeaderFromOpts(callOpts) + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + addr: p.addr.ToV2(), + verb: v2session.ObjectVerbHead, + }); err != nil { + return nil, fmt.Errorf("could not attach session token: %w", err) + } + + req.SetMetaHeader(meta) + + // fill body fields + body.SetAddress(p.addr.ToV2()) + body.SetMainOnly(p.short) + body.SetRaw(p.raw) + + // sign the request + if err := signature.SignServiceMessage(callOpts.key, req); err != nil { + return nil, fmt.Errorf("signing the request failed: %w", err) + } + + // send Head request + resp, err := rpcapi.HeadObject(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("sending the request failed: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOpts, resp); err != nil { + return nil, err + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, fmt.Errorf("response verification failed: %w", err) + } + + var ( + hdr *v2object.Header + idSig *v2refs.Signature + ) + + switch v := resp.GetBody().GetHeaderPart().(type) { + case nil: + return nil, fmt.Errorf("unexpected header type %T", v) + case *v2object.ShortHeader: + if !p.short { + return nil, fmt.Errorf("wrong header part type: expected %T, received %T", + (*v2object.ShortHeader)(nil), (*v2object.HeaderWithSignature)(nil), + ) + } + + h := v + + hdr = new(v2object.Header) + hdr.SetPayloadLength(h.GetPayloadLength()) + hdr.SetVersion(h.GetVersion()) + hdr.SetOwnerID(h.GetOwnerID()) + hdr.SetObjectType(h.GetObjectType()) + hdr.SetCreationEpoch(h.GetCreationEpoch()) + hdr.SetPayloadHash(h.GetPayloadHash()) + hdr.SetHomomorphicHash(h.GetHomomorphicHash()) + case *v2object.HeaderWithSignature: + if p.short { + return nil, fmt.Errorf("wrong header part type: expected %T, received %T", + (*v2object.HeaderWithSignature)(nil), (*v2object.ShortHeader)(nil), + ) + } + + hdrWithSig := v + if hdrWithSig == nil { + return nil, errNilObjectPart + } + + hdr = hdrWithSig.GetHeader() + idSig = hdrWithSig.GetSignature() + + if err := signer.VerifyDataWithSource( + signature.StableMarshalerWrapper{ + SM: p.addr.ObjectID().ToV2(), + }, + func() (key, sig []byte) { + return idSig.GetKey(), idSig.GetSign() + }, + ); err != nil { + return nil, fmt.Errorf("incorrect object header signature: %w", err) + } + case *v2object.SplitInfo: + si := object.NewSplitInfoFromV2(v) + + return nil, object.NewSplitInfoError(si) + } + + obj := new(v2object.Object) + obj.SetHeader(hdr) + obj.SetSignature(idSig) + + raw := object.NewRawFromV2(obj) + raw.SetID(p.addr.ObjectID()) + + // convert the object + return raw.Object(), nil +} + +func (p *RangeDataParams) WithAddress(v *object.Address) *RangeDataParams { + if p != nil { + p.addr = v + } + + return p +} + +func (p *RangeDataParams) Address() *object.Address { + if p != nil { + return p.addr + } + + return nil +} + +func (p *RangeDataParams) WithRaw(v bool) *RangeDataParams { + if p != nil { + p.raw = v + } + + return p +} + +func (p *RangeDataParams) Raw() bool { + if p != nil { + return p.raw + } + + return false +} + +func (p *RangeDataParams) WithRange(v *object.Range) *RangeDataParams { + if p != nil { + p.r = v + } + + return p +} + +func (p *RangeDataParams) Range() *object.Range { + if p != nil { + return p.r + } + + return nil +} + +func (p *RangeDataParams) WithDataWriter(v io.Writer) *RangeDataParams { + if p != nil { + p.w = v + } + + return p +} + +func (p *RangeDataParams) DataWriter() io.Writer { + if p != nil { + return p.w + } + + return nil +} + +func (c *clientImpl) ObjectPayloadRangeData(ctx context.Context, p *RangeDataParams, opts ...CallOption) ([]byte, error) { + callOpts := c.defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i](callOpts) + } + } + + // create request + req := new(v2object.GetRangeRequest) + + // initialize request body + body := new(v2object.GetRangeRequestBody) + req.SetBody(body) + + // set meta header + meta := v2MetaHeaderFromOpts(callOpts) + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + addr: p.addr.ToV2(), + verb: v2session.ObjectVerbRange, + }); err != nil { + return nil, fmt.Errorf("could not attach session token: %w", err) + } + + req.SetMetaHeader(meta) + + // fill body fields + body.SetAddress(p.addr.ToV2()) + body.SetRange(p.r.ToV2()) + body.SetRaw(p.raw) + + // sign the request + if err := signature.SignServiceMessage(callOpts.key, req); err != nil { + return nil, fmt.Errorf("signing the request failed: %w", err) + } + + // open stream + stream, err := rpcapi.GetObjectRange(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("could not create Get payload range stream: %w", err) + } + + var payload []byte + if p.w != nil { + payload = make([]byte, 0, p.r.GetLength()) + } + + resp := new(v2object.GetRangeResponse) + + for { + // receive message from server stream + err := stream.Read(resp) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + return nil, fmt.Errorf("reading the response failed: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOpts, resp); err != nil { + return nil, err + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, fmt.Errorf("could not verify %T: %w", resp, err) + } + + switch v := resp.GetBody().GetRangePart().(type) { + case nil: + return nil, fmt.Errorf("unexpected range type %T", v) + case *v2object.GetRangePartChunk: + if p.w != nil { + if _, err = p.w.Write(v.GetChunk()); err != nil { + return nil, fmt.Errorf("could not write payload chunk: %w", err) + } + } else { + payload = append(payload, v.GetChunk()...) + } + case *v2object.SplitInfo: + si := object.NewSplitInfoFromV2(v) + + return nil, object.NewSplitInfoError(si) + } + } + + return payload, nil +} + +func (p *RangeChecksumParams) WithAddress(v *object.Address) *RangeChecksumParams { + if p != nil { + p.addr = v + } + + return p +} + +func (p *RangeChecksumParams) Address() *object.Address { + if p != nil { + return p.addr + } + + return nil +} + +func (p *RangeChecksumParams) WithRangeList(rs ...*object.Range) *RangeChecksumParams { + if p != nil { + p.rs = rs + } + + return p +} + +func (p *RangeChecksumParams) RangeList() []*object.Range { + if p != nil { + return p.rs + } + + return nil +} + +func (p *RangeChecksumParams) WithSalt(v []byte) *RangeChecksumParams { + if p != nil { + p.salt = v + } + + return p +} + +func (p *RangeChecksumParams) Salt() []byte { + if p != nil { + return p.salt + } + + return nil +} + +func (p *RangeChecksumParams) withChecksumType(t checksumType) *RangeChecksumParams { + if p != nil { + p.typ = t + } + + return p +} + +func (c *clientImpl) ObjectPayloadRangeSHA256(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) ([][sha256.Size]byte, error) { + res, err := c.objectPayloadRangeHash(ctx, p.withChecksumType(checksumSHA256), opts...) + if err != nil { + return nil, err + } + + return res.([][sha256.Size]byte), nil +} + +func (c *clientImpl) ObjectPayloadRangeTZ(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) ([][TZSize]byte, error) { + res, err := c.objectPayloadRangeHash(ctx, p.withChecksumType(checksumTZ), opts...) + if err != nil { + return nil, err + } + + return res.([][TZSize]byte), nil +} + +func (c *clientImpl) objectPayloadRangeHash(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (interface{}, error) { + callOpts := c.defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i](callOpts) + } + } + + // create request + req := new(v2object.GetRangeHashRequest) + + // initialize request body + body := new(v2object.GetRangeHashRequestBody) + req.SetBody(body) + + // set meta header + meta := v2MetaHeaderFromOpts(callOpts) + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + addr: p.addr.ToV2(), + verb: v2session.ObjectVerbRangeHash, + }); err != nil { + return nil, fmt.Errorf("could not attach session token: %w", err) + } + + req.SetMetaHeader(meta) + + // fill body fields + body.SetAddress(p.addr.ToV2()) + body.SetSalt(p.salt) + + typV2 := p.typ.toV2() + body.SetType(typV2) + + rsV2 := rangesToV2(p.rs) + body.SetRanges(rsV2) + + // sign the request + if err := signature.SignServiceMessage(callOpts.key, req); err != nil { + return nil, fmt.Errorf("signing the request failed: %w", err) + } + + // send request + resp, err := rpcapi.HashObjectRange(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("sending the request failed: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOpts, resp); err != nil { + return nil, err + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, fmt.Errorf("response verification failed: %w", err) + } + + respBody := resp.GetBody() + respType := respBody.GetType() + respHashes := respBody.GetHashList() + + if t := p.typ.toV2(); respType != t { + return nil, fmt.Errorf("invalid checksum type: expected %v, received %v", t, respType) + } else if reqLn, respLn := len(rsV2), len(respHashes); reqLn != respLn { + return nil, fmt.Errorf("wrong checksum number: expected %d, received %d", reqLn, respLn) + } + + var res interface{} + + switch p.typ { + case checksumSHA256: + r := make([][sha256.Size]byte, 0, len(respHashes)) + + for i := range respHashes { + if ln := len(respHashes[i]); ln != sha256.Size { + return nil, fmt.Errorf("invalid checksum length: expected %d, received %d", sha256.Size, ln) + } + + cs := [sha256.Size]byte{} + copy(cs[:], respHashes[i]) + + r = append(r, cs) + } + + res = r + case checksumTZ: + r := make([][TZSize]byte, 0, len(respHashes)) + + for i := range respHashes { + if ln := len(respHashes[i]); ln != TZSize { + return nil, fmt.Errorf("invalid checksum length: expected %d, received %d", TZSize, ln) + } + + cs := [TZSize]byte{} + copy(cs[:], respHashes[i]) + + r = append(r, cs) + } + + res = r + } + + return res, nil +} + +func (p *SearchObjectParams) WithContainerID(v *cid.ID) *SearchObjectParams { + if p != nil { + p.cid = v + } + + return p +} + +func (p *SearchObjectParams) ContainerID() *cid.ID { + if p != nil { + return p.cid + } + + return nil +} + +func (p *SearchObjectParams) WithSearchFilters(v object.SearchFilters) *SearchObjectParams { + if p != nil { + p.filters = v + } + + return p +} + +func (p *SearchObjectParams) SearchFilters() object.SearchFilters { + if p != nil { + return p.filters + } + + return nil +} + +func (c *clientImpl) SearchObject(ctx context.Context, p *SearchObjectParams, opts ...CallOption) ([]*object.ID, error) { + callOpts := c.defaultCallOptions() + + for i := range opts { + if opts[i] != nil { + opts[i](callOpts) + } + } + + // create request + req := new(v2object.SearchRequest) + + // initialize request body + body := new(v2object.SearchRequestBody) + req.SetBody(body) + + v2Addr := new(v2refs.Address) + v2Addr.SetContainerID(p.cid.ToV2()) + + // set meta header + meta := v2MetaHeaderFromOpts(callOpts) + + if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{ + addr: v2Addr, + verb: v2session.ObjectVerbSearch, + }); err != nil { + return nil, fmt.Errorf("could not attach session token: %w", err) + } + + req.SetMetaHeader(meta) + + // fill body fields + body.SetContainerID(v2Addr.GetContainerID()) + body.SetVersion(searchQueryVersion) + body.SetFilters(p.filters.ToV2()) + + // sign the request + if err := signature.SignServiceMessage(callOpts.key, req); err != nil { + return nil, fmt.Errorf("signing the request failed: %w", err) + } + + // create search stream + stream, err := rpcapi.SearchObjects(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("stream opening failed: %w", err) + } + + var ( + searchResult []*object.ID + resp = new(v2object.SearchResponse) + ) + + for { + // receive message from server stream + err := stream.Read(resp) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + return nil, fmt.Errorf("reading the response failed: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOpts, resp); err != nil { + return nil, err + } + + // verify response structure + if err := signature.VerifyServiceMessage(resp); err != nil { + return nil, fmt.Errorf("could not verify %T: %w", resp, err) + } + + chunk := resp.GetBody().GetIDList() + for i := range chunk { + searchResult = append(searchResult, object.NewIDFromV2(chunk[i])) + } + } + + return searchResult, nil +} + +func (c *clientImpl) attachV2SessionToken(opts *callOptions, hdr *v2session.RequestMetaHeader, info v2SessionReqInfo) error { + if opts.session == nil { + return nil + } + + // Do not resign already prepared session token + if opts.session.Signature() != nil { + hdr.SetSessionToken(opts.session.ToV2()) + return nil + } + + opCtx := new(v2session.ObjectSessionContext) + opCtx.SetAddress(info.addr) + opCtx.SetVerb(info.verb) + + lt := new(v2session.TokenLifetime) + lt.SetIat(info.iat) + lt.SetNbf(info.nbf) + lt.SetExp(info.exp) + + body := new(v2session.SessionTokenBody) + body.SetID(opts.session.ID()) + body.SetOwnerID(opts.session.OwnerID().ToV2()) + body.SetSessionKey(opts.session.SessionKey()) + body.SetContext(opCtx) + body.SetLifetime(lt) + + token := new(v2session.SessionToken) + token.SetBody(body) + + signWrapper := signature.StableMarshalerWrapper{SM: token.GetBody()} + + err := signer.SignDataWithHandler(opts.key, signWrapper, func(key []byte, sig []byte) { + sessionTokenSignature := new(v2refs.Signature) + sessionTokenSignature.SetKey(key) + sessionTokenSignature.SetSign(sig) + token.SetSignature(sessionTokenSignature) + }) + if err != nil { + return err + } + + hdr.SetSessionToken(token) + + return nil +} diff --git a/client/object_test.go b/client/object_test.go new file mode 100644 index 00000000..5666ea9b --- /dev/null +++ b/client/object_test.go @@ -0,0 +1,94 @@ +package client + +import ( + "io" + "testing" + + "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/signature" + "github.com/nspcc-dev/neofs-crypto/test" + "github.com/stretchr/testify/require" +) + +type singleResponseStream struct { + called bool + resp object.GetResponse +} + +func (x *singleResponseStream) Read(r *object.GetResponse) error { + if x.called { + return io.EOF + } + + x.called = true + + *r = x.resp + + return nil +} + +var key = test.DecodeKey(0) + +func chunkResponse(c []byte) (r object.GetResponse) { + chunkPart := new(object.GetObjectPartChunk) + chunkPart.SetChunk(c) + + body := new(object.GetResponseBody) + body.SetObjectPart(chunkPart) + + r.SetBody(body) + + if err := signature.SignServiceMessage(key, &r); err != nil { + panic(err) + } + + return +} + +func data(sz int) []byte { + data := make([]byte, sz) + + for i := range data { + data[i] = byte(i) % ^byte(0) + } + + return data +} + +func checkFullRead(t *testing.T, r io.Reader, buf, payload []byte) { + var ( + restored []byte + read int + ) + + for { + n, err := r.Read(buf) + + read += n + restored = append(restored, buf[:n]...) + + if err != nil { + require.Equal(t, err, io.EOF) + break + } + } + + require.Equal(t, payload, restored) + require.EqualValues(t, len(payload), read) +} + +func TestObjectPayloadReader_Read(t *testing.T) { + t.Run("read with tail", func(t *testing.T) { + payload := data(10) + + buf := make([]byte, len(payload)-1) + + var r io.Reader = &objectPayloadReader{ + stream: &singleResponseStream{ + resp: chunkResponse(payload), + }, + } + + checkFullRead(t, r, buf, payload) + }) +} diff --git a/client/opts.go b/client/opts.go new file mode 100644 index 00000000..f94c94e3 --- /dev/null +++ b/client/opts.go @@ -0,0 +1,189 @@ +package client + +import ( + "crypto/ecdsa" + "crypto/tls" + "time" + + "github.com/nspcc-dev/neofs-api-go/rpc/client" + "github.com/nspcc-dev/neofs-api-go/v2/refs" + v2session "github.com/nspcc-dev/neofs-api-go/v2/session" + "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/token" + "github.com/nspcc-dev/neofs-sdk-go/version" + "google.golang.org/grpc" +) + +type ( + CallOption func(*callOptions) + + Option func(*clientOptions) + + callOptions struct { + version *version.Version + xHeaders []*session.XHeader + ttl uint32 + epoch uint64 + key *ecdsa.PrivateKey + session *session.Token + bearer *token.BearerToken + } + + clientOptions struct { + key *ecdsa.PrivateKey + + rawOpts []client.Option + + cbRespInfo func(ResponseMetaInfo) error + } + + v2SessionReqInfo struct { + addr *refs.Address + verb v2session.ObjectSessionVerb + + exp, nbf, iat uint64 + } +) + +func (c *clientImpl) defaultCallOptions() *callOptions { + return &callOptions{ + version: version.Current(), + ttl: 2, + key: c.opts.key, + } +} + +func WithXHeader(x *session.XHeader) CallOption { + return func(opts *callOptions) { + opts.xHeaders = append(opts.xHeaders, x) + } +} + +func WithTTL(ttl uint32) CallOption { + return func(opts *callOptions) { + opts.ttl = ttl + } +} + +// WithKey sets client's key for the next request. +func WithKey(key *ecdsa.PrivateKey) CallOption { + return func(opts *callOptions) { + opts.key = key + } +} + +func WithEpoch(epoch uint64) CallOption { + return func(opts *callOptions) { + opts.epoch = epoch + } +} + +func WithSession(token *session.Token) CallOption { + return func(opts *callOptions) { + opts.session = token + } +} + +func WithBearer(token *token.BearerToken) CallOption { + return func(opts *callOptions) { + opts.bearer = token + } +} + +func v2MetaHeaderFromOpts(options *callOptions) *v2session.RequestMetaHeader { + meta := new(v2session.RequestMetaHeader) + meta.SetVersion(options.version.ToV2()) + meta.SetTTL(options.ttl) + meta.SetEpoch(options.epoch) + + xhdrs := make([]*v2session.XHeader, len(options.xHeaders)) + for i := range options.xHeaders { + xhdrs[i] = options.xHeaders[i].ToV2() + } + + meta.SetXHeaders(xhdrs) + + if options.bearer != nil { + meta.SetBearerToken(options.bearer.ToV2()) + } + + meta.SetSessionToken(options.session.ToV2()) + + return meta +} + +func defaultClientOptions() *clientOptions { + return &clientOptions{ + rawOpts: make([]client.Option, 0, 4), + } +} + +// WithAddress returns option to specify +// network address of the remote server. +// +// Ignored if WithGRPCConnection is provided. +func WithAddress(addr string) Option { + return func(opts *clientOptions) { + opts.rawOpts = append(opts.rawOpts, client.WithNetworkAddress(addr)) + } +} + +// WithDialTimeout returns option to set connection timeout to the remote node. +// +// Ignored if WithGRPCConn is provided. +func WithDialTimeout(dur time.Duration) Option { + return func(opts *clientOptions) { + opts.rawOpts = append(opts.rawOpts, client.WithDialTimeout(dur)) + } +} + +// WithTLSConfig returns option to set connection's TLS config to the remote node. +// +// Ignored if WithGRPCConnection is provided. +func WithTLSConfig(cfg *tls.Config) Option { + return func(opts *clientOptions) { + opts.rawOpts = append(opts.rawOpts, client.WithTLSCfg(cfg)) + } +} + +// WithDefaultPrivateKey returns option to set default private key +// used for the work. +func WithDefaultPrivateKey(key *ecdsa.PrivateKey) Option { + return func(opts *clientOptions) { + opts.key = key + } +} + +// WithURIAddress returns option to specify +// network address of a remote server and connection +// scheme for it. +// +// Format of the URI: +// +// [scheme://]host:port +// +// Supported schemes: +// - grpc; +// - grpcs. +// +// tls.Cfg second argument is optional and is taken into +// account only in case of `grpcs` scheme. +// +// Falls back to WithNetworkAddress if address is not a valid URI. +// +// Do not use along with WithAddress and WithTLSConfig. +// +// Ignored if WithGRPCConnection is provided. +func WithURIAddress(addr string, tlsCfg *tls.Config) Option { + return func(opts *clientOptions) { + opts.rawOpts = append(opts.rawOpts, client.WithNetworkURIAddress(addr, tlsCfg)...) + } +} + +// WithGRPCConnection returns option to set GRPC connection to +// the remote node. +func WithGRPCConnection(grpcConn *grpc.ClientConn) Option { + return func(opts *clientOptions) { + opts.rawOpts = append(opts.rawOpts, client.WithGRPCConn(grpcConn)) + } +} diff --git a/client/raw.go b/client/raw.go new file mode 100644 index 00000000..fcb57511 --- /dev/null +++ b/client/raw.go @@ -0,0 +1,17 @@ +package client + +import ( + "io" + + "github.com/nspcc-dev/neofs-api-go/rpc/client" +) + +// Raw returns underlying raw protobuf client. +func (c *clientImpl) Raw() *client.Client { + return c.raw +} + +// implements Client.Conn method. +func (c *clientImpl) Conn() io.Closer { + return c.raw.Conn() +} diff --git a/client/reputation.go b/client/reputation.go new file mode 100644 index 00000000..eccc7749 --- /dev/null +++ b/client/reputation.go @@ -0,0 +1,171 @@ +package client + +import ( + "context" + "fmt" + + "github.com/nspcc-dev/neofs-api-go/rpc/client" + v2reputation "github.com/nspcc-dev/neofs-api-go/v2/reputation" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" + v2signature "github.com/nspcc-dev/neofs-api-go/v2/signature" + "github.com/nspcc-dev/neofs-sdk-go/reputation" +) + +// Reputation contains methods for working with Reputation system values. +type Reputation interface { + // AnnounceLocalTrust announces local trust values of local peer. + AnnounceLocalTrust(context.Context, AnnounceLocalTrustPrm, ...CallOption) (*AnnounceLocalTrustRes, error) + + // AnnounceIntermediateTrust announces the intermediate result of the iterative algorithm for calculating + // the global reputation of the node. + AnnounceIntermediateTrust(context.Context, AnnounceIntermediateTrustPrm, ...CallOption) (*AnnounceIntermediateTrustRes, error) +} + +// AnnounceLocalTrustPrm groups parameters of AnnounceLocalTrust operation. +type AnnounceLocalTrustPrm struct { + epoch uint64 + + trusts []*reputation.Trust +} + +// Epoch returns epoch in which the trust was assessed. +func (x AnnounceLocalTrustPrm) Epoch() uint64 { + return x.epoch +} + +// SetEpoch sets epoch in which the trust was assessed. +func (x *AnnounceLocalTrustPrm) SetEpoch(epoch uint64) { + x.epoch = epoch +} + +// Trusts returns list of local trust values. +func (x AnnounceLocalTrustPrm) Trusts() []*reputation.Trust { + return x.trusts +} + +// SetTrusts sets list of local trust values. +func (x *AnnounceLocalTrustPrm) SetTrusts(trusts []*reputation.Trust) { + x.trusts = trusts +} + +// AnnounceLocalTrustRes groups results of AnnounceLocalTrust operation. +type AnnounceLocalTrustRes struct{} + +func (c *clientImpl) AnnounceLocalTrust(ctx context.Context, prm AnnounceLocalTrustPrm, opts ...CallOption) (*AnnounceLocalTrustRes, error) { + // apply all available options + callOptions := c.defaultCallOptions() + + for i := range opts { + opts[i](callOptions) + } + + reqBody := new(v2reputation.AnnounceLocalTrustRequestBody) + reqBody.SetEpoch(prm.Epoch()) + reqBody.SetTrusts(reputation.TrustsToV2(prm.Trusts())) + + req := new(v2reputation.AnnounceLocalTrustRequest) + req.SetBody(reqBody) + req.SetMetaHeader(v2MetaHeaderFromOpts(callOptions)) + + err := v2signature.SignServiceMessage(callOptions.key, req) + if err != nil { + return nil, err + } + + resp, err := rpcapi.AnnounceLocalTrust(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, err + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOptions, resp); err != nil { + return nil, err + } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, fmt.Errorf("can't verify response message: %w", err) + } + + return new(AnnounceLocalTrustRes), nil +} + +// AnnounceIntermediateTrustPrm groups parameters of AnnounceIntermediateTrust operation. +type AnnounceIntermediateTrustPrm struct { + epoch uint64 + + iter uint32 + + trust *reputation.PeerToPeerTrust +} + +func (x *AnnounceIntermediateTrustPrm) Epoch() uint64 { + return x.epoch +} + +func (x *AnnounceIntermediateTrustPrm) SetEpoch(epoch uint64) { + x.epoch = epoch +} + +// Iteration returns sequence number of the iteration. +func (x AnnounceIntermediateTrustPrm) Iteration() uint32 { + return x.iter +} + +// SetIteration sets sequence number of the iteration. +func (x *AnnounceIntermediateTrustPrm) SetIteration(iter uint32) { + x.iter = iter +} + +// Trust returns current global trust value computed at the specified iteration. +func (x AnnounceIntermediateTrustPrm) Trust() *reputation.PeerToPeerTrust { + return x.trust +} + +// SetTrust sets current global trust value computed at the specified iteration. +func (x *AnnounceIntermediateTrustPrm) SetTrust(trust *reputation.PeerToPeerTrust) { + x.trust = trust +} + +// AnnounceIntermediateTrustRes groups results of AnnounceIntermediateTrust operation. +type AnnounceIntermediateTrustRes struct{} + +func (c *clientImpl) AnnounceIntermediateTrust(ctx context.Context, prm AnnounceIntermediateTrustPrm, opts ...CallOption) (*AnnounceIntermediateTrustRes, error) { + // apply all available options + callOptions := c.defaultCallOptions() + + for i := range opts { + opts[i](callOptions) + } + + reqBody := new(v2reputation.AnnounceIntermediateResultRequestBody) + reqBody.SetEpoch(prm.Epoch()) + reqBody.SetIteration(prm.Iteration()) + reqBody.SetTrust(prm.Trust().ToV2()) + + req := new(v2reputation.AnnounceIntermediateResultRequest) + req.SetBody(reqBody) + req.SetMetaHeader(v2MetaHeaderFromOpts(callOptions)) + + err := v2signature.SignServiceMessage(callOptions.key, req) + if err != nil { + return nil, err + } + + resp, err := rpcapi.AnnounceIntermediateResult(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, err + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOptions, resp); err != nil { + return nil, err + } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, fmt.Errorf("can't verify response message: %w", err) + } + + return new(AnnounceIntermediateTrustRes), nil +} diff --git a/client/response.go b/client/response.go new file mode 100644 index 00000000..f8bdbd48 --- /dev/null +++ b/client/response.go @@ -0,0 +1,37 @@ +package client + +import "github.com/nspcc-dev/neofs-api-go/v2/session" + +// ResponseMetaInfo groups meta information about any NeoFS API response. +type ResponseMetaInfo struct { + key []byte +} + +type verificationHeaderGetter interface { + GetVerificationHeader() *session.ResponseVerificationHeader +} + +// ResponderKey returns responder's public key in a binary format. +// +// Result must not be mutated. +func (x ResponseMetaInfo) ResponderKey() []byte { + return x.key +} + +// WithResponseInfoHandler allows to specify handler of response meta information for the all Client operations. +// The handler is called right after the response is received. Client returns handler's error immediately. +func WithResponseInfoHandler(f func(ResponseMetaInfo) error) Option { + return func(opts *clientOptions) { + opts.cbRespInfo = f + } +} + +func (c *clientImpl) handleResponseInfoV2(_ *callOptions, resp verificationHeaderGetter) error { + if c.opts.cbRespInfo == nil { + return nil + } + + return c.opts.cbRespInfo(ResponseMetaInfo{ + key: resp.GetVerificationHeader().GetBodySignature().GetKey(), + }) +} diff --git a/client/session.go b/client/session.go new file mode 100644 index 00000000..197ee736 --- /dev/null +++ b/client/session.go @@ -0,0 +1,79 @@ +package client + +import ( + "context" + "errors" + "fmt" + + "github.com/nspcc-dev/neofs-api-go/rpc/client" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" + v2session "github.com/nspcc-dev/neofs-api-go/v2/session" + v2signature "github.com/nspcc-dev/neofs-api-go/v2/signature" + "github.com/nspcc-dev/neofs-sdk-go/owner" + "github.com/nspcc-dev/neofs-sdk-go/session" +) + +// Session contains session-related methods. +type Session interface { + // CreateSession creates session using provided expiration time. + CreateSession(context.Context, uint64, ...CallOption) (*session.Token, error) +} + +var errMalformedResponseBody = errors.New("malformed response body") + +func (c *clientImpl) CreateSession(ctx context.Context, expiration uint64, opts ...CallOption) (*session.Token, error) { + // apply all available options + callOptions := c.defaultCallOptions() + + for i := range opts { + opts[i](callOptions) + } + + w, err := owner.NEO3WalletFromPublicKey(&callOptions.key.PublicKey) + if err != nil { + return nil, err + } + + ownerID := new(owner.ID) + ownerID.SetNeo3Wallet(w) + + reqBody := new(v2session.CreateRequestBody) + reqBody.SetOwnerID(ownerID.ToV2()) + reqBody.SetExpiration(expiration) + + req := new(v2session.CreateRequest) + req.SetBody(reqBody) + req.SetMetaHeader(v2MetaHeaderFromOpts(callOptions)) + + err = v2signature.SignServiceMessage(callOptions.key, req) + if err != nil { + return nil, err + } + + resp, err := rpcapi.CreateSession(c.Raw(), req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("transport error: %w", err) + } + + // handle response meta info + if err := c.handleResponseInfoV2(callOptions, resp); err != nil { + return nil, err + } + + err = v2signature.VerifyServiceMessage(resp) + if err != nil { + return nil, fmt.Errorf("can't verify response message: %w", err) + } + + body := resp.GetBody() + if body == nil { + return nil, errMalformedResponseBody + } + + sessionToken := session.NewToken() + sessionToken.SetID(body.GetID()) + sessionToken.SetSessionKey(body.GetSessionKey()) + sessionToken.SetOwnerID(ownerID) + + return sessionToken, nil +}