From 7f5fb130c006337de41e5b141d8861bdb581a2d4 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Sat, 6 Nov 2021 14:13:04 +0300 Subject: [PATCH] [#961] *: Support NeoFS API status returns Upgrade NeoFS API Go library to version with status returns. Make all API clients to pull out and return errors from failed statuses. Make signature service to respond with status if client version supports it. Signed-off-by: Leonard Lyubich --- cmd/neofs-cli/internal/client/client.go | 181 +++++++++----- cmd/neofs-node/container.go | 3 +- cmd/neofs-node/object.go | 51 ++-- .../reputation/internal/client/client.go | 17 +- go.mod | 6 +- go.sum | Bin 99050 -> 93004 bytes pkg/core/client/client.go | 2 +- pkg/innerring/internal/client/client.go | 40 +++- pkg/network/cache/multi.go | 226 ++++++++---------- pkg/network/transport/object/grpc/service.go | 10 + pkg/services/accounting/sign.go | 3 + pkg/services/container/sign.go | 21 ++ pkg/services/control/convert.go | 4 +- pkg/services/control/ir/convert.go | 4 +- pkg/services/control/ir/rpc.go | 4 +- pkg/services/control/ir/service.go | 2 +- pkg/services/control/rpc.go | 4 +- pkg/services/control/service.go | 2 +- pkg/services/control/types.go | 2 +- pkg/services/netmap/sign.go | 6 + pkg/services/object/get/v2/util.go | 2 +- pkg/services/object/internal/client/client.go | 45 +++- pkg/services/object/search/v2/util.go | 2 +- pkg/services/object/sign.go | 85 ++++--- pkg/services/reputation/rpc/sign.go | 6 + pkg/services/session/sign.go | 3 + pkg/services/util/sign.go | 205 ++++++++++++---- 27 files changed, 600 insertions(+), 336 deletions(-) diff --git a/cmd/neofs-cli/internal/client/client.go b/cmd/neofs-cli/internal/client/client.go index bd9eed208..e39703e84 100644 --- a/cmd/neofs-cli/internal/client/client.go +++ b/cmd/neofs-cli/internal/client/client.go @@ -2,18 +2,17 @@ package internal import ( "context" - "crypto/sha256" "io" "math" "github.com/nspcc-dev/neofs-sdk-go/accounting" "github.com/nspcc-dev/neofs-sdk-go/client" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" - "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/version" ) @@ -25,12 +24,12 @@ type BalanceOfPrm struct { // BalanceOfRes groups resulting values of BalanceOf operation. type BalanceOfRes struct { - cliRes *accounting.Decimal + cliRes *client.BalanceOfRes } // Balance returns current balance. func (x BalanceOfRes) Balance() *accounting.Decimal { - return x.cliRes + return x.cliRes.Amount() } // BalanceOf requests current balance of NeoFS user. @@ -40,6 +39,10 @@ func BalanceOf(prm BalanceOfPrm) (res BalanceOfRes, err error) { res.cliRes, err = prm.cli.GetBalance(context.Background(), prm.ownerID, client.WithKey(prm.privKey), ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -52,12 +55,12 @@ type ListContainersPrm struct { // ListContainersRes groups resulting values of ListContainers operation. type ListContainersRes struct { - cliRes []*cid.ID + cliRes *client.ContainerListRes } // IDList returns list of identifiers of user's containers. func (x ListContainersRes) IDList() []*cid.ID { - return x.cliRes + return x.cliRes.IDList() } // ListContainers requests list of NeoFS user's containers. @@ -67,6 +70,10 @@ func ListContainers(prm ListContainersPrm) (res ListContainersRes, err error) { res.cliRes, err = prm.cli.ListContainers(context.Background(), prm.ownerID, client.WithKey(prm.privKey), ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -86,12 +93,12 @@ func (x *PutContainerPrm) SetContainer(cnr *container.Container) { // PutContainerRes groups resulting values of PutContainer operation. type PutContainerRes struct { - cliRes *cid.ID + cliRes *client.ContainerPutRes } // ID returns identifier of the created container. func (x PutContainerRes) ID() *cid.ID { - return x.cliRes + return x.cliRes.ID() } // PutContainer sends request to save container in NeoFS. @@ -107,6 +114,10 @@ func PutContainer(prm PutContainerPrm) (res PutContainerRes, err error) { client.WithKey(prm.privKey), client.WithSession(prm.sessionToken), ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -119,12 +130,12 @@ type GetContainerPrm struct { // GetContainerRes groups resulting values of GetContainer operation. type GetContainerRes struct { - cliRes *container.Container + cliRes *client.ContainerGetRes } // Container returns structured of the requested container. func (x GetContainerRes) Container() *container.Container { - return x.cliRes + return x.cliRes.Container() } // GetContainer reads container from NeoFS by ID. @@ -134,6 +145,10 @@ func GetContainer(prm GetContainerPrm) (res GetContainerRes, err error) { res.cliRes, err = prm.cli.GetContainer(context.Background(), prm.cnrID, client.WithKey(prm.privKey), ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -157,10 +172,16 @@ type DeleteContainerRes struct{} // // Returns any error prevented the operation from completing correctly in error return. func DeleteContainer(prm DeleteContainerPrm) (res DeleteContainerRes, err error) { - err = prm.cli.DeleteContainer(context.Background(), prm.cnrID, + var cliRes *client.ContainerDeleteRes + + cliRes, err = prm.cli.DeleteContainer(context.Background(), prm.cnrID, client.WithKey(prm.privKey), client.WithSession(prm.sessionToken), ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(cliRes.Status()) + } return } @@ -173,21 +194,25 @@ type EACLPrm struct { // EACLRes groups resulting values of EACL operation. type EACLRes struct { - cliRes *client.EACLWithSignature + cliRes *client.EACLRes } // EACL returns requested eACL table. func (x EACLRes) EACL() *eacl.Table { - return x.cliRes.EACL() + return x.cliRes.Table() } // EACL reads eACL table from NeoFS by container ID. // // Returns any error prevented the operation from completing correctly in error return. func EACL(prm EACLPrm) (res EACLRes, err error) { - res.cliRes, err = prm.cli.GetEACL(context.Background(), prm.cnrID, + res.cliRes, err = prm.cli.EACL(context.Background(), prm.cnrID, client.WithKey(prm.privKey), ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -217,10 +242,16 @@ type SetEACLRes struct{} // // Returns any error prevented the operation from completing correctly in error return. func SetEACL(prm SetEACLPrm) (res SetEACLRes, err error) { - err = prm.cli.SetEACL(context.Background(), prm.eaclTable, + var cliRes *client.SetEACLRes + + cliRes, err = prm.cli.SetEACL(context.Background(), prm.eaclTable, client.WithKey(prm.privKey), client.WithSession(prm.sessionToken), ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(cliRes.Status()) + } return } @@ -232,12 +263,12 @@ type NetworkInfoPrm struct { // NetworkInfoRes groups resulting values of NetworkInfo operation. type NetworkInfoRes struct { - cliRes *netmap.NetworkInfo + cliRes *client.NetworkInfoRes } // NetworkInfo returns structured information about the NeoFS network. func (x NetworkInfoRes) NetworkInfo() *netmap.NetworkInfo { - return x.cliRes + return x.cliRes.Info() } // NetworkInfo reads information about the NeoFS network. @@ -247,6 +278,10 @@ func NetworkInfo(prm NetworkInfoPrm) (res NetworkInfoRes, err error) { res.cliRes, err = prm.cli.NetworkInfo(context.Background(), client.WithKey(prm.privKey), ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -258,17 +293,17 @@ type NodeInfoPrm struct { // NodeInfoRes groups resulting values of NodeInfo operation. type NodeInfoRes struct { - cliRes *client.EndpointInfo + cliRes *client.EndpointInfoRes } // NodeInfo returns information about the node from netmap. func (x NodeInfoRes) NodeInfo() *netmap.NodeInfo { - return x.cliRes.NodeInfo() + return x.cliRes.Info().NodeInfo() } // LatestVersion returns latest NeoFS API version in use. func (x NodeInfoRes) LatestVersion() *version.Version { - return x.cliRes.LatestVersion() + return x.cliRes.Info().LatestVersion() } // NodeInfo requests information about the remote server from NeoFS netmap. @@ -278,6 +313,10 @@ func NodeInfo(prm NodeInfoPrm) (res NodeInfoRes, err error) { res.cliRes, err = prm.cli.EndpointInfo(context.Background(), client.WithKey(prm.privKey), ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -289,7 +328,7 @@ type CreateSessionPrm struct { // CreateSessionRes groups resulting values of CreateSession operation. type CreateSessionRes struct { - cliRes *session.Token + cliRes *client.CreateSessionRes } // ID returns session identifier. @@ -309,6 +348,10 @@ func CreateSession(prm CreateSessionPrm) (res CreateSessionRes, err error) { res.cliRes, err = prm.cli.CreateSession(context.Background(), math.MaxUint64, client.WithKey(prm.privKey), ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -334,12 +377,12 @@ func (x *PutObjectPrm) SetPayloadReader(rdr io.Reader) { // PutObjectRes groups resulting values of PutObject operation. type PutObjectRes struct { - cliRes *object.ID + cliRes *client.ObjectPutRes } // ID returns identifier of the created object. func (x PutObjectRes) ID() *object.ID { - return x.cliRes + return x.cliRes.ID() } // PutObject saves the object in NeoFS network. @@ -356,6 +399,10 @@ func PutObject(prm PutObjectPrm) (res PutObjectRes, err error) { client.WithSession(prm.sessionToken), client.WithBearer(prm.bearerToken), )...) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -368,12 +415,12 @@ type DeleteObjectPrm struct { // DeleteObjectRes groups resulting values of DeleteObject operation. type DeleteObjectRes struct { - cliRes *object.Address + cliRes *client.ObjectDeleteRes } // TombstoneAddress returns address of the created object with tombstone. func (x DeleteObjectRes) TombstoneAddress() *object.Address { - return x.cliRes + return x.cliRes.TombstoneAddress() } // DeleteObject marks object to be removed from NeoFS through tombstone placement. @@ -384,11 +431,15 @@ func DeleteObject(prm DeleteObjectPrm) (res DeleteObjectRes, err error) { delPrm.WithAddress(prm.objAddr) - res.cliRes, err = client.DeleteObject(context.Background(), prm.cli, &delPrm, append(prm.opts, + res.cliRes, err = prm.cli.DeleteObject(context.Background(), &delPrm, append(prm.opts, client.WithKey(prm.privKey), client.WithSession(prm.sessionToken), client.WithBearer(prm.bearerToken), )...) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -403,12 +454,12 @@ type GetObjectPrm struct { // GetObjectRes groups resulting values of GetObject operation. type GetObjectRes struct { - cliRes *object.Object + cliRes *client.ObjectGetRes } // Object returns header of the request object. func (x GetObjectRes) Header() *object.Object { - return x.cliRes + return x.cliRes.Object() } // GetObject reads the object by address. @@ -429,6 +480,10 @@ func GetObject(prm GetObjectPrm) (res GetObjectRes, err error) { client.WithSession(prm.sessionToken), client.WithBearer(prm.bearerToken), )...) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -449,12 +504,12 @@ func (x *HeadObjectPrm) SetMainOnlyFlag(v bool) { // HeadObjectRes groups resulting values of HeadObject operation. type HeadObjectRes struct { - cliRes *object.Object + cliRes *client.ObjectHeadRes } // Header returns requested object header. func (x HeadObjectRes) Header() *object.Object { - return x.cliRes + return x.cliRes.Object() } // HeadObject reads object header by address. @@ -473,11 +528,15 @@ func HeadObject(prm HeadObjectPrm) (res HeadObjectRes, err error) { cliPrm.WithAllFields() } - res.cliRes, err = prm.cli.GetObjectHeader(context.Background(), &cliPrm, append(prm.opts, + res.cliRes, err = prm.cli.HeadObject(context.Background(), &cliPrm, append(prm.opts, client.WithKey(prm.privKey), client.WithSession(prm.sessionToken), client.WithBearer(prm.bearerToken), )...) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -497,12 +556,12 @@ func (x *SearchObjectsPrm) SetFilters(filters object.SearchFilters) { // SearchObjectsRes groups resulting values of SearchObjects operation. type SearchObjectsRes struct { - cliRes []*object.ID + cliRes *client.ObjectSearchRes } // IDList returns identifiers of the matched objects. func (x SearchObjectsRes) IDList() []*object.ID { - return x.cliRes + return x.cliRes.IDList() } // SearchObjects selects objects from container which match the filters. @@ -514,11 +573,15 @@ func SearchObjects(prm SearchObjectsPrm) (res SearchObjectsRes, err error) { cliPrm.WithSearchFilters(prm.filters) cliPrm.WithContainerID(prm.cnrID) - res.cliRes, err = prm.cli.SearchObject(context.Background(), &cliPrm, append(prm.opts, + res.cliRes, err = prm.cli.SearchObjects(context.Background(), &cliPrm, append(prm.opts, client.WithKey(prm.privKey), client.WithSession(prm.sessionToken), client.WithBearer(prm.bearerToken), )...) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -552,12 +615,12 @@ func (x *HashPayloadRangesPrm) SetSalt(salt []byte) { // HashPayloadRangesRes groups resulting values of HashPayloadRanges operation. type HashPayloadRangesRes struct { - cliRes [][]byte + cliRes *client.ObjectRangeHashRes } // HashList returns list of hashes of the payload ranges keeping order. func (x HashPayloadRangesRes) HashList() [][]byte { - return x.cliRes + return x.cliRes.Hashes() } // HashPayloadRanges requests hashes (by default SHA256) of the object payload ranges. @@ -572,35 +635,17 @@ func HashPayloadRanges(prm HashPayloadRangesPrm) (res HashPayloadRangesRes, err cliPrm.WithRangeList(prm.rngs...) if prm.tz { - var hs [][sha256.Size]byte + cliPrm.TZ() + } - hs, err = prm.cli.ObjectPayloadRangeSHA256(context.Background(), &cliPrm, append(prm.opts, - client.WithKey(prm.privKey), - client.WithSession(prm.sessionToken), - client.WithBearer(prm.bearerToken), - )...) - if err == nil { - res.cliRes = make([][]byte, 0, len(hs)) - - for i := range hs { - res.cliRes = append(res.cliRes, hs[i][:]) - } - } - } else { - var hs [][client.TZSize]byte - - hs, err = prm.cli.ObjectPayloadRangeTZ(context.Background(), &cliPrm, append(prm.opts, - client.WithKey(prm.privKey), - client.WithSession(prm.sessionToken), - client.WithBearer(prm.bearerToken), - )...) - if err == nil { - res.cliRes = make([][]byte, 0, len(hs)) - - for i := range hs { - res.cliRes = append(res.cliRes, hs[i][:]) - } - } + res.cliRes, err = prm.cli.HashObjectPayloadRanges(context.Background(), &cliPrm, append(prm.opts, + client.WithKey(prm.privKey), + client.WithSession(prm.sessionToken), + client.WithBearer(prm.bearerToken), + )...) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) } return @@ -638,11 +683,17 @@ func PayloadRange(prm PayloadRangePrm) (res PayloadRangeRes, err error) { cliPrm.WithDataWriter(prm.wrt) cliPrm.WithRange(prm.rng) - _, err = prm.cli.ObjectPayloadRangeData(context.Background(), &cliPrm, append(prm.opts, + var cliRes *client.ObjectRangeRes + + cliRes, err = prm.cli.ObjectPayloadRangeData(context.Background(), &cliPrm, append(prm.opts, client.WithKey(prm.privKey), client.WithSession(prm.sessionToken), client.WithBearer(prm.bearerToken), )...) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(cliRes.Status()) + } return } diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 92a70e418..42e9b878e 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -312,7 +312,8 @@ func (r *remoteLoadAnnounceWriter) Put(a containerSDK.UsedSpaceAnnouncement) err } func (r *remoteLoadAnnounceWriter) Close() error { - return r.client.AnnounceContainerUsedSpace(r.ctx, r.buf, apiClient.WithKey(r.key)) + _, err := r.client.AnnounceContainerUsedSpace(r.ctx, r.buf, apiClient.WithKey(r.key)) + return err } type loadPlacementBuilder struct { diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 8c698b0a5..7f82fe5e6 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "crypto/ecdsa" - "crypto/sha256" "fmt" "github.com/nspcc-dev/neofs-api-go/v2/object" @@ -474,68 +473,60 @@ func (c *reputationClient) submitResult(err error) { c.cons.trustStorage.Update(prm) } -func (c *reputationClient) PutObject(ctx context.Context, prm *client.PutObjectParams, opts ...client.CallOption) (*objectSDK.ID, error) { - id, err := c.Client.PutObject(ctx, prm, opts...) +func (c *reputationClient) PutObject(ctx context.Context, prm *client.PutObjectParams, opts ...client.CallOption) (*client.ObjectPutRes, error) { + res, err := c.Client.PutObject(ctx, prm, opts...) c.submitResult(err) - return id, err + return res, err } -func (c *reputationClient) DeleteObject(ctx context.Context, prm *client.DeleteObjectParams, opts ...client.CallOption) error { - err := c.Client.DeleteObject(ctx, prm, opts...) +func (c *reputationClient) DeleteObject(ctx context.Context, prm *client.DeleteObjectParams, opts ...client.CallOption) (*client.ObjectDeleteRes, error) { + res, err := c.Client.DeleteObject(ctx, prm, opts...) c.submitResult(err) - return err + return res, err } -func (c *reputationClient) GetObject(ctx context.Context, prm *client.GetObjectParams, opts ...client.CallOption) (*objectSDK.Object, error) { - obj, err := c.Client.GetObject(ctx, prm, opts...) +func (c *reputationClient) GetObject(ctx context.Context, prm *client.GetObjectParams, opts ...client.CallOption) (*client.ObjectGetRes, error) { + res, err := c.Client.GetObject(ctx, prm, opts...) c.submitResult(err) - return obj, err + return res, err } -func (c *reputationClient) GetObjectHeader(ctx context.Context, prm *client.ObjectHeaderParams, opts ...client.CallOption) (*objectSDK.Object, error) { - obj, err := c.Client.GetObjectHeader(ctx, prm, opts...) +func (c *reputationClient) HeadObject(ctx context.Context, prm *client.ObjectHeaderParams, opts ...client.CallOption) (*client.ObjectHeadRes, error) { + res, err := c.Client.HeadObject(ctx, prm, opts...) c.submitResult(err) - return obj, err + return res, err } -func (c *reputationClient) ObjectPayloadRangeData(ctx context.Context, prm *client.RangeDataParams, opts ...client.CallOption) ([]byte, error) { - rng, err := c.Client.ObjectPayloadRangeData(ctx, prm, opts...) +func (c *reputationClient) ObjectPayloadRangeData(ctx context.Context, prm *client.RangeDataParams, opts ...client.CallOption) (*client.ObjectRangeRes, error) { + res, err := c.Client.ObjectPayloadRangeData(ctx, prm, opts...) c.submitResult(err) - return rng, err + return res, err } -func (c *reputationClient) ObjectPayloadRangeSHA256(ctx context.Context, prm *client.RangeChecksumParams, opts ...client.CallOption) ([][sha256.Size]byte, error) { - hashes, err := c.Client.ObjectPayloadRangeSHA256(ctx, prm, opts...) +func (c *reputationClient) HashObjectPayloadRanges(ctx context.Context, prm *client.RangeChecksumParams, opts ...client.CallOption) (*client.ObjectRangeHashRes, error) { + res, err := c.Client.HashObjectPayloadRanges(ctx, prm, opts...) c.submitResult(err) - return hashes, err + return res, err } -func (c *reputationClient) ObjectPayloadRangeTZ(ctx context.Context, prm *client.RangeChecksumParams, opts ...client.CallOption) ([][client.TZSize]byte, error) { - hashes, err := c.Client.ObjectPayloadRangeTZ(ctx, prm, opts...) +func (c *reputationClient) SearchObjects(ctx context.Context, prm *client.SearchObjectParams, opts ...client.CallOption) (*client.ObjectSearchRes, error) { + res, err := c.Client.SearchObjects(ctx, prm, opts...) c.submitResult(err) - return hashes, err -} - -func (c *reputationClient) SearchObject(ctx context.Context, prm *client.SearchObjectParams, opts ...client.CallOption) ([]*objectSDK.ID, error) { - ids, err := c.Client.SearchObject(ctx, prm, opts...) - - c.submitResult(err) - - return ids, err + return res, err } func (c *reputationClientConstructor) Get(info coreclient.NodeInfo) (client.Client, error) { diff --git a/cmd/neofs-node/reputation/internal/client/client.go b/cmd/neofs-node/reputation/internal/client/client.go index 8a693e68f..13d712ce8 100644 --- a/cmd/neofs-node/reputation/internal/client/client.go +++ b/cmd/neofs-node/reputation/internal/client/client.go @@ -5,6 +5,7 @@ import ( "crypto/ecdsa" "github.com/nspcc-dev/neofs-sdk-go/client" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/reputation" ) @@ -65,7 +66,13 @@ type AnnounceLocalRes struct{} // // Returns any error prevented the operation from completing correctly in error return. func AnnounceLocal(prm AnnounceLocalPrm) (res AnnounceLocalRes, err error) { - _, err = prm.cli.AnnounceLocalTrust(prm.ctx, prm.cliPrm, prm.opts...) + var cliRes *client.AnnounceLocalTrustRes + + cliRes, err = prm.cli.AnnounceLocalTrust(prm.ctx, prm.cliPrm, prm.opts...) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(cliRes.Status()) + } return } @@ -102,7 +109,13 @@ type AnnounceIntermediateRes struct{} // // Returns any error prevented the operation from completing correctly in error return. func AnnounceIntermediate(prm AnnounceIntermediatePrm) (res AnnounceIntermediateRes, err error) { - _, err = prm.cli.AnnounceIntermediateTrust(prm.ctx, prm.cliPrm, prm.opts...) + var cliRes *client.AnnounceIntermediateTrustRes + + cliRes, err = prm.cli.AnnounceIntermediateTrust(prm.ctx, prm.cliPrm, prm.opts...) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(cliRes.Status()) + } return } diff --git a/go.mod b/go.mod index 64191e868..854676263 100644 --- a/go.mod +++ b/go.mod @@ -11,9 +11,9 @@ require ( github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.4.0 github.com/nspcc-dev/hrw v1.0.9 - github.com/nspcc-dev/neo-go v0.97.3 - github.com/nspcc-dev/neofs-api-go v1.30.0 - github.com/nspcc-dev/neofs-sdk-go v0.0.0-20211110152919-10b78e74afbd + github.com/nspcc-dev/neo-go v0.97.4-pre.0.20211123163659-b25c3775e847 + github.com/nspcc-dev/neofs-api-go/v2 v2.11.0-pre.0.20211118144033-580f6c5554ff + github.com/nspcc-dev/neofs-sdk-go v0.0.0-20211123100340-d9317cbea191 github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.4.0 github.com/paulmach/orb v0.2.2 diff --git a/go.sum b/go.sum index 80b124b0e64a83190875bc2a758f82675d2ab6a1..5dd613f6e6604a86f87626d15f948ac7e1a1c493 100644 GIT binary patch delta 1571 zcmb7>%a0Rv0LL>6tULsE!Ce*v*}V{O*v@;VgL-H?({88jwB5d@MIzIgX{Vh&X4+0) zfCi0;i3dY|#;So-KM4439x7K8u)$JPUrnynT_DJy|Qy<-(AT0b^mY&m^pd_JUxFL{4u}PIqA9x zfq87J^C>zF1(b#+D+Xe=db4fJg}GRySa((H>a30(tA}#Dt@#bg7g}fsb#al3q>K_u zNIpIV$LTaDm1;RVAP@u(j^R%@KVzRmAV}50dD;N`-E+?O%qL^c?x#+Uv7`$fF&4E9 zEcaLslA;+tD~Wy&R*Ug&4QbWlZ3JbjDK(6g%4K)G=zRE$Z`$R)HWPn52wT~nw8Oqw1TjLeSjuB6X<6{G#Bc^Q5?fCoWN*;rr7yB zPALS#P@0=$R@Y&bMUPa?)+61F?h|Si9aY#?r$HlFLFRlEW2(_M?(uf5a1*?DZV#Av zbKCL%WIlqC)VWS8hlQlF*~e>Wsuwa6Xp?X0R7Vr7h>^pUOd=SPyE4C)v2=bM%)T}L zEYikj+qetETmtlZuZ9DgP^I1gxHLt38F+-LY5!7s@6LyVXi3)8pGX*JBtRk#t>A zyIJ19KHk##3Fq?LFG1YO9VkkW=)B4j7^CDh8Dp_kf>be8Yt$D#S|(}QrB0I$#tV@` zyVJ=LshHC0<2@smLcRGw0bRf2oqeHyH5+N!H??}oEL5r$iZJS(Q<-#`S(F;FpaV}n7+5rwOZw3#ScjB~btsvqrrPFRTfoO2n^5a4~ zO7x^ff8HBT2JHlA*gi(~Mm_B?Y=Ft-=~XGaA|0vIl{lY}g#(^gzhs-1)S<~VPBx8v zVx>!xFX%ne3RJNg7gvDi<2h%~@RiNZZ`ZGmIb)wqj)T%y&x7pOm%x`dcQ|KnCdR;( z+mAWg?H9+uU-zcKPj}ySZr%HKi?jX5nM26xqGT-~r)@Oe$d{9dCxpiq!*;w7w{@Bm z+8S)~Vcklqo-)xz!P!3#47P88P7IzKhbD%>jnMw>!*3=bbmP#Nh8|81?i_~hubCzq zA{>(3o<<8v6vzd!Oy`t7N(Y!U--nqF+ft=uESh8B3ORUi1UfjlbObs+xbq}*YVi0` zXl8gDh7N96bCMUcU9sDyOM!T$mzHx?gU%JRJSW3K5p#20EgwrXR61Y8B7+J7?H$~A zL4n~d1d^tP=XmIYIoB#=r)8JrBIAQejt)iYb|u4RWxE~1D@C>KizILwEjKO8D>ViW g1Ze7w6VUF#ToFlL)G1X1Il;|;{X5v delta 5283 zcmbVQ+pi<{U6+@1MN&2Fb}!8as@ciYH}{!VX*ITIJmc~BZjWsV#N02Qn?2(h z&ph;iN>o(Ra2eXu{~#3)Bw&e6r79N*B#J5p1VV@hC>6XQL<$cDgeoyUn@}&xa?_FR zqjTo_`QAR?o4@ntANuSPx zx+W@eDal03VkVT0Tj>8oCM!RC%c)-1KLe%6y%afv@dy0jvbwWpx&UY>gxdA5aHr zfU}A>0Kjln)iA(7gECQqG7%F|)Ju<~QEx1v_%H%FlQ);^*gaQWI9Lf#Mi}{c3Hfp; z!7*oI>)9h*`oAeXaabJgZGK9x_M%}H^@qMi-$ zcE{cHmKi>oIZXNMpML6R|Ci7w)=xIG?O-?yV$L=F#aQYOb5)x-9niF9GQ#XQo*FX2 z1aYQWrQwU``YukK%R;kvz7jZv@qoFPq@PcMX}oL&9>*&Z0y)+IwuuGGGh{VyLUpv; z8@{#PV8t?gI17&gJu#F|wu#DpQd`0F4DYnaZhPZvvq_U8g$>RkxEM?bQ(DdlW3VpY zZJZtO#_LyR^aJj|0EZEnKv5X3ri+T}sD`723Qq8C&zVCJhImxf=QLtTt0NNZdYO>= z-F(jCYs`^E&!)20= z>H3^UF3Z0`e){lR$lVW}Zw=jvq09GsDRFGsblnN6=^ZJFqk6`X0wRb~xJ|O7nQP{u zwDIa&$X#t(J*Q}OM9$+Zt=a;7e&kvY#k6XL{}X)qIqJiQm&wJ)YjFAHYVv^DNyDxW07v4zgWr$B-90u~rh+_EMsfLz!P9glb`lXM z+CT~tkVA2g4*0mh#I7LVMrtgQ;Q-rY?4qO3O}Q`wp}ZHaFJ~Tx$E`6wPAURPV{Df3 zbgYl?X^Tq1rafx$!6K=z6>&11v*p|2Q{^v*TY^Q#{# z|LVmj?;tQh!SRg%ctg`joTg!w)J(Nhz9b!=IPhP3=?Yy9fRhk(2woq^X)z)aFXi-< zoAjZf)hc?!KD2-q)iANAE;pXFHSJQ-=fC`6x*FRj2&xQ7%YbEW5jU;PRvSjW*_Ie+ z`Do8?B!rMweIu47od>4~r~6X^>{Q$OC1T&QSPK%JX z%m1{YqT-1SxwXqi%}AD}{2q`(Ukq8VJEc070{3Qm)suiVCGkA#64ZMLA9=9$ZFUZ{ zT6XQ~!77(rw(WJ@I+Z|vOc}v+GRvBKk%?VTFyi+G$y3u&xMz4Q)xrC#tm7NQR=%zv ziDop+*|j&FtAmua@jaUs4-~)pm8UZ;DFB%?BQIH`lhNaQ+JzNM(yFjejZhw z*5?3jPUh^!z-gfshvT;JfnelAA#p$}vr=tGGnj-KI2Gl(y5O96W>5B!>AC(+=7vIq zsL2BcRhGT*Bg@dLbsS5(AH?Ly^+%mGE#RIkwG!WG(`{5YFk+YUC=W=vX)LC!050S6 zuzyFClPrlXYD&ZLTCga0m(Fa`8!2j>jp!~R`oV(TMm)92+D@`y-Ulnc`_dDKKlzm} z*4i+rMk@`|4H&2-WssO+s+V^HK8+C}?;5}+dpA8Fxqb9Qyl6&dAFUF=li|kxr zs~zN6DzAR!Y4HQWPAo+^FY6$~mQ)vfinP$QFEt$)g}E?rge@iOYmTCPx|sB4N6bV0 z<-dE}wjIw7&1#b|v&-uUekdQ-8g)9Ftkd;4ZKh)okL%)4GuisMFNcL2W|7gHchJ?m z|M2*a9AmpRG{`Hk}h)A04iJ{k12{cV4+c9C=jz@t7<=Pg{T)o58BN2^JaC z#7;6Q>9zP-S$p;7;fdGYyh5XA$O**Ckqxfgl_&g50({fU8gb>(n8(PPIOT) z05v;{5Jz2dFwvSpM{oC`ytxH&)ft1# z1c!IO8D2Txhg-e6;f;OeSYR2Dw#}pDfll;t4S~r^4`Cb!a6?5X0x?h>C?t8JRauHU z1oeBUkfVpjeZD*V{I`DdN_qaqCywVY1rsnp zsNguB-+t=Z@(1s-FLh2C35~Zd5IfK-6PR8%>O| zQQjHn+M=IREa)8nZKvdNN{9)J^`JJ1c3^XtRW<$Y-QEWyoj5`Vd$c|3So)Aso1-?q z*>_>rrDxM3abmBBiIZkbkCAkaFUz<7`R+md{!e}6(i0p|I0hpP1+TuTz$sl-Xr(;= z;r!tKu<_XKV_hIHje`cjOdM!>b<1k%S8KJ|{fk=d->%=T!T>0QLaI{6%i(j}=J4S;9GzKbWptPo$u&xuPLL8%EhO9a&pe7g;#8TMo zAc9_-i{aAXM$B#}ExEO;Y!-!w%i+c5M(ytH4~Dw9e$@4YTeY9Kz;4yNr$DuNgK8l| zs)ejNbYNB0Fj`a1i;v%~buRw(R_&AbU%yqm`(qdP?$o%8cW&1n2jtP5(2KHkh=__N z!z($B$K-Bghxv4wip(xDN853y$yG-m(pg`;dAoLpI4z@H9jQY^VbvWH-j1Xrn;cUs z2&pCRw27N6?)~IWt$XYC@yiJmr>mYoYi9LnP!w#?7YtOB@4sSX%7g4MB(GCnMHo(~NAZ=)*ViF3hkeEgvObz#9R<)0@9c;~wSQ%bkye-t8lO6?jLM^;n8#`i`Ag2=nY0|?%?76Mtu4TxV*x@82EO-n#lsflnq;;kKtu UT4Y2NySTon-Mjxix#r&Z523}MX#fBK diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index cc3f77942..1f4c8906a 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -1,7 +1,7 @@ package client import ( - rawclient "github.com/nspcc-dev/neofs-api-go/rpc/client" + rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-sdk-go/client" ) diff --git a/pkg/innerring/internal/client/client.go b/pkg/innerring/internal/client/client.go index 04c41912e..3ddf21b7d 100644 --- a/pkg/innerring/internal/client/client.go +++ b/pkg/innerring/internal/client/client.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup" "github.com/nspcc-dev/neofs-sdk-go/client" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" ) @@ -42,12 +43,12 @@ func (x *SearchSGPrm) SetContainerID(id *cid.ID) { // SearchSGRes groups resulting values of SearchSG operation. type SearchSGRes struct { - cliRes []*object.ID + cliRes *client.ObjectSearchRes } // IDList returns list of IDs of storage groups in container. func (x SearchSGRes) IDList() []*object.ID { - return x.cliRes + return x.cliRes.IDList() } var sgFilter = storagegroup.SearchQuery() @@ -61,9 +62,13 @@ func (x Client) SearchSG(prm SearchSGPrm) (res SearchSGRes, err error) { cliPrm.WithContainerID(prm.cnrID) cliPrm.WithSearchFilters(sgFilter) - res.cliRes, err = x.c.SearchObject(prm.ctx, &cliPrm, + res.cliRes, err = x.c.SearchObjects(prm.ctx, &cliPrm, client.WithKey(x.key), ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -75,12 +80,12 @@ type GetObjectPrm struct { // GetObjectRes groups resulting values of GetObject operation. type GetObjectRes struct { - cliRes *object.Object + cliRes *client.ObjectGetRes } // Object returns received object. func (x GetObjectRes) Object() *object.Object { - return x.cliRes + return x.cliRes.Object() } // GetObject reads the object by address. @@ -94,6 +99,10 @@ func (x Client) GetObject(prm GetObjectPrm) (res GetObjectRes, err error) { res.cliRes, err = x.c.GetObject(prm.ctx, &cliPrm, client.WithKey(x.key), ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -119,12 +128,12 @@ func (x *HeadObjectPrm) SetTTL(ttl uint32) { // HeadObjectRes groups resulting values of HeadObject operation. type HeadObjectRes struct { - cliRes *object.Object + cliRes *client.ObjectHeadRes } // Header returns received object header. func (x HeadObjectRes) Header() *object.Object { - return x.cliRes + return x.cliRes.Object() } // HeadObject reads short object header by address. @@ -138,10 +147,14 @@ func (x Client) HeadObject(prm HeadObjectPrm) (res HeadObjectRes, err error) { cliPrm.WithRawFlag(prm.raw) cliPrm.WithMainFields() - res.cliRes, err = x.c.GetObjectHeader(prm.ctx, &cliPrm, + res.cliRes, err = x.c.HeadObject(prm.ctx, &cliPrm, client.WithKey(x.key), client.WithTTL(prm.ttl), ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -225,15 +238,22 @@ func (x Client) HashPayloadRange(prm HashPayloadRangePrm) (res HashPayloadRangeR cliPrm.WithAddress(prm.objAddr) cliPrm.WithRangeList(prm.rng) - hs, err := x.c.ObjectPayloadRangeTZ(prm.ctx, &cliPrm, + cliRes, err := x.c.HashObjectPayloadRanges(prm.ctx, &cliPrm, client.WithKey(x.key), client.WithTTL(1), ) if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(cliRes.Status()) + if err != nil { + return + } + + hs := cliRes.Hashes() if ln := len(hs); ln != 1 { err = fmt.Errorf("wrong number of hashes %d", ln) } else { - res.h = hs[0][:] + res.h = hs[0] } } diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index 16f2125e6..9c0a7b82f 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -2,23 +2,18 @@ package cache import ( "context" - "crypto/sha256" "crypto/tls" "errors" "io" "sync" - rawclient "github.com/nspcc-dev/neofs-api-go/rpc/client" + rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-node/pkg/network" - "github.com/nspcc-dev/neofs-sdk-go/accounting" "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/eacl" - "github.com/nspcc-dev/neofs-sdk-go/netmap" - "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/owner" - "github.com/nspcc-dev/neofs-sdk-go/session" ) type multiClient struct { @@ -87,215 +82,184 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(client.Client) return firstErr } -func (x *multiClient) PutObject(ctx context.Context, p *client.PutObjectParams, opts ...client.CallOption) (*object.ID, error) { - var res *object.ID - - err := x.iterateClients(ctx, func(c client.Client) (err error) { +func (x *multiClient) PutObject(ctx context.Context, p *client.PutObjectParams, opts ...client.CallOption) (res *client.ObjectPutRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { res, err = c.PutObject(ctx, p, opts...) - return + return err }) - return res, err + return } -func (x *multiClient) GetBalance(ctx context.Context, id *owner.ID, opts ...client.CallOption) (*accounting.Decimal, error) { - var res *accounting.Decimal - - err := x.iterateClients(ctx, func(c client.Client) (err error) { +func (x *multiClient) GetBalance(ctx context.Context, id *owner.ID, opts ...client.CallOption) (res *client.BalanceOfRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { res, err = c.GetBalance(ctx, id, opts...) - return + return err }) - return res, err + return } -func (x *multiClient) PutContainer(ctx context.Context, cnr *container.Container, opts ...client.CallOption) (*cid.ID, error) { - var res *cid.ID - - err := x.iterateClients(ctx, func(c client.Client) (err error) { +func (x *multiClient) PutContainer(ctx context.Context, cnr *container.Container, opts ...client.CallOption) (res *client.ContainerPutRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { res, err = c.PutContainer(ctx, cnr, opts...) - return + return err }) - return res, err + return } -func (x *multiClient) GetContainer(ctx context.Context, id *cid.ID, opts ...client.CallOption) (*container.Container, error) { - var res *container.Container - - err := x.iterateClients(ctx, func(c client.Client) (err error) { +func (x *multiClient) GetContainer(ctx context.Context, id *cid.ID, opts ...client.CallOption) (res *client.ContainerGetRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { res, err = c.GetContainer(ctx, id, opts...) - return + return err }) - return res, err + return } -func (x *multiClient) ListContainers(ctx context.Context, id *owner.ID, opts ...client.CallOption) ([]*cid.ID, error) { - var res []*cid.ID - - err := x.iterateClients(ctx, func(c client.Client) (err error) { +func (x *multiClient) ListContainers(ctx context.Context, id *owner.ID, opts ...client.CallOption) (res *client.ContainerListRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { res, err = c.ListContainers(ctx, id, opts...) - return + return err }) - return res, err + return } -func (x *multiClient) DeleteContainer(ctx context.Context, id *cid.ID, opts ...client.CallOption) error { - return x.iterateClients(ctx, func(c client.Client) error { - return c.DeleteContainer(ctx, id, opts...) - }) -} - -func (x *multiClient) GetEACL(ctx context.Context, id *cid.ID, opts ...client.CallOption) (*client.EACLWithSignature, error) { - var res *client.EACLWithSignature - - err := x.iterateClients(ctx, func(c client.Client) (err error) { - res, err = c.GetEACL(ctx, id, opts...) - return +func (x *multiClient) DeleteContainer(ctx context.Context, id *cid.ID, opts ...client.CallOption) (res *client.ContainerDeleteRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { + res, err = c.DeleteContainer(ctx, id, opts...) + return err }) - return res, err + return } -func (x *multiClient) SetEACL(ctx context.Context, t *eacl.Table, opts ...client.CallOption) error { - return x.iterateClients(ctx, func(c client.Client) error { - return c.SetEACL(ctx, t, opts...) +func (x *multiClient) EACL(ctx context.Context, id *cid.ID, opts ...client.CallOption) (res *client.EACLRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { + res, err = c.EACL(ctx, id, opts...) + return err }) + + return } -func (x *multiClient) AnnounceContainerUsedSpace(ctx context.Context, as []container.UsedSpaceAnnouncement, opts ...client.CallOption) error { - return x.iterateClients(ctx, func(c client.Client) error { - return c.AnnounceContainerUsedSpace(ctx, as, opts...) +func (x *multiClient) SetEACL(ctx context.Context, t *eacl.Table, opts ...client.CallOption) (res *client.SetEACLRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { + res, err = c.SetEACL(ctx, t, opts...) + return err }) + + return } -func (x *multiClient) EndpointInfo(ctx context.Context, opts ...client.CallOption) (*client.EndpointInfo, error) { - var res *client.EndpointInfo +func (x *multiClient) AnnounceContainerUsedSpace(ctx context.Context, as []container.UsedSpaceAnnouncement, opts ...client.CallOption) (res *client.AnnounceSpaceRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { + res, err = c.AnnounceContainerUsedSpace(ctx, as, opts...) + return err + }) - err := x.iterateClients(ctx, func(c client.Client) (err error) { + return +} + +func (x *multiClient) EndpointInfo(ctx context.Context, opts ...client.CallOption) (res *client.EndpointInfoRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { res, err = c.EndpointInfo(ctx, opts...) - return + return err }) - return res, err + return } -func (x *multiClient) NetworkInfo(ctx context.Context, opts ...client.CallOption) (*netmap.NetworkInfo, error) { - var res *netmap.NetworkInfo - - err := x.iterateClients(ctx, func(c client.Client) (err error) { +func (x *multiClient) NetworkInfo(ctx context.Context, opts ...client.CallOption) (res *client.NetworkInfoRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { res, err = c.NetworkInfo(ctx, opts...) - return + return err }) - return res, err + return } -func (x *multiClient) DeleteObject(ctx context.Context, p *client.DeleteObjectParams, opts ...client.CallOption) error { - return x.iterateClients(ctx, func(c client.Client) error { - return c.DeleteObject(ctx, p, opts...) +func (x *multiClient) DeleteObject(ctx context.Context, p *client.DeleteObjectParams, opts ...client.CallOption) (res *client.ObjectDeleteRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { + res, err = c.DeleteObject(ctx, p, opts...) + return err }) + + return } -func (x *multiClient) GetObject(ctx context.Context, p *client.GetObjectParams, opts ...client.CallOption) (*object.Object, error) { - var res *object.Object - - err := x.iterateClients(ctx, func(c client.Client) (err error) { +func (x *multiClient) GetObject(ctx context.Context, p *client.GetObjectParams, opts ...client.CallOption) (res *client.ObjectGetRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { res, err = c.GetObject(ctx, p, opts...) - return + return err }) - return res, err + return } -func (x *multiClient) GetObjectHeader(ctx context.Context, p *client.ObjectHeaderParams, opts ...client.CallOption) (*object.Object, error) { - var res *object.Object - - err := x.iterateClients(ctx, func(c client.Client) (err error) { - res, err = c.GetObjectHeader(ctx, p, opts...) - return - }) - - return res, err -} - -func (x *multiClient) ObjectPayloadRangeData(ctx context.Context, p *client.RangeDataParams, opts ...client.CallOption) ([]byte, error) { - var res []byte - - err := x.iterateClients(ctx, func(c client.Client) (err error) { +func (x *multiClient) ObjectPayloadRangeData(ctx context.Context, p *client.RangeDataParams, opts ...client.CallOption) (res *client.ObjectRangeRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { res, err = c.ObjectPayloadRangeData(ctx, p, opts...) - return + return err }) - return res, err + return } -func (x *multiClient) ObjectPayloadRangeSHA256(ctx context.Context, p *client.RangeChecksumParams, opts ...client.CallOption) ([][sha256.Size]byte, error) { - var res [][sha256.Size]byte - - err := x.iterateClients(ctx, func(c client.Client) (err error) { - res, err = c.ObjectPayloadRangeSHA256(ctx, p, opts...) - return +func (x *multiClient) HeadObject(ctx context.Context, p *client.ObjectHeaderParams, opts ...client.CallOption) (res *client.ObjectHeadRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { + res, err = c.HeadObject(ctx, p, opts...) + return err }) - return res, err + return } -func (x *multiClient) ObjectPayloadRangeTZ(ctx context.Context, p *client.RangeChecksumParams, opts ...client.CallOption) ([][client.TZSize]byte, error) { - var res [][client.TZSize]byte - - err := x.iterateClients(ctx, func(c client.Client) (err error) { - res, err = c.ObjectPayloadRangeTZ(ctx, p, opts...) - return +func (x *multiClient) HashObjectPayloadRanges(ctx context.Context, p *client.RangeChecksumParams, opts ...client.CallOption) (res *client.ObjectRangeHashRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { + res, err = c.HashObjectPayloadRanges(ctx, p, opts...) + return err }) - return res, err + return } -func (x *multiClient) SearchObject(ctx context.Context, p *client.SearchObjectParams, opts ...client.CallOption) ([]*object.ID, error) { - var res []*object.ID - - err := x.iterateClients(ctx, func(c client.Client) (err error) { - res, err = c.SearchObject(ctx, p, opts...) - return +func (x *multiClient) SearchObjects(ctx context.Context, p *client.SearchObjectParams, opts ...client.CallOption) (res *client.ObjectSearchRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { + res, err = c.SearchObjects(ctx, p, opts...) + return err }) - return res, err + return } -func (x *multiClient) CreateSession(ctx context.Context, exp uint64, opts ...client.CallOption) (*session.Token, error) { - var res *session.Token - - err := x.iterateClients(ctx, func(c client.Client) (err error) { +func (x *multiClient) CreateSession(ctx context.Context, exp uint64, opts ...client.CallOption) (res *client.CreateSessionRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { res, err = c.CreateSession(ctx, exp, opts...) - return + return err }) - return res, err + return } -func (x *multiClient) AnnounceLocalTrust(ctx context.Context, p client.AnnounceLocalTrustPrm, opts ...client.CallOption) (*client.AnnounceLocalTrustRes, error) { - var res *client.AnnounceLocalTrustRes - - err := x.iterateClients(ctx, func(c client.Client) (err error) { +func (x *multiClient) AnnounceLocalTrust(ctx context.Context, p client.AnnounceLocalTrustPrm, opts ...client.CallOption) (res *client.AnnounceLocalTrustRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { res, err = c.AnnounceLocalTrust(ctx, p, opts...) - return + return err }) - return res, err + return } -func (x *multiClient) AnnounceIntermediateTrust(ctx context.Context, p client.AnnounceIntermediateTrustPrm, opts ...client.CallOption) (*client.AnnounceIntermediateTrustRes, error) { - var res *client.AnnounceIntermediateTrustRes - - err := x.iterateClients(ctx, func(c client.Client) (err error) { +func (x *multiClient) AnnounceIntermediateTrust(ctx context.Context, p client.AnnounceIntermediateTrustPrm, opts ...client.CallOption) (res *client.AnnounceIntermediateTrustRes, err error) { + err = x.iterateClients(ctx, func(c client.Client) error { res, err = c.AnnounceIntermediateTrust(ctx, p, opts...) - return + return err }) - return res, err + return } func (x *multiClient) Raw() *rawclient.Client { diff --git a/pkg/network/transport/object/grpc/service.go b/pkg/network/transport/object/grpc/service.go index a57df9db3..4739f8c56 100644 --- a/pkg/network/transport/object/grpc/service.go +++ b/pkg/network/transport/object/grpc/service.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" + "github.com/nspcc-dev/neofs-node/pkg/services/util" ) // Server wraps NeoFS API Object service and @@ -52,6 +53,15 @@ func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error { } if err := stream.Send(putReq); err != nil { + if errors.Is(err, util.ErrAbortStream) { + resp, err := stream.CloseAndRecv() + if err != nil { + return err + } + + return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PutResponse)) + } + return err } } diff --git a/pkg/services/accounting/sign.go b/pkg/services/accounting/sign.go index 82aceec28..7c0e3c892 100644 --- a/pkg/services/accounting/sign.go +++ b/pkg/services/accounting/sign.go @@ -26,6 +26,9 @@ func (s *signService) Balance(ctx context.Context, req *accounting.BalanceReques func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.Balance(ctx, req.(*accounting.BalanceRequest)) }, + func() util.ResponseMessage { + return new(accounting.BalanceResponse) + }, ) if err != nil { return nil, err diff --git a/pkg/services/container/sign.go b/pkg/services/container/sign.go index 7cb729b00..51e7644f5 100644 --- a/pkg/services/container/sign.go +++ b/pkg/services/container/sign.go @@ -26,6 +26,9 @@ func (s *signService) Put(ctx context.Context, req *container.PutRequest) (*cont func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.Put(ctx, req.(*container.PutRequest)) }, + func() util.ResponseMessage { + return new(container.PutResponse) + }, ) if err != nil { return nil, err @@ -39,6 +42,9 @@ func (s *signService) Delete(ctx context.Context, req *container.DeleteRequest) func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.Delete(ctx, req.(*container.DeleteRequest)) }, + func() util.ResponseMessage { + return new(container.DeleteResponse) + }, ) if err != nil { return nil, err @@ -52,6 +58,9 @@ func (s *signService) Get(ctx context.Context, req *container.GetRequest) (*cont func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.Get(ctx, req.(*container.GetRequest)) }, + func() util.ResponseMessage { + return new(container.GetResponse) + }, ) if err != nil { return nil, err @@ -65,6 +74,9 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.List(ctx, req.(*container.ListRequest)) }, + func() util.ResponseMessage { + return new(container.ListResponse) + }, ) if err != nil { return nil, err @@ -78,6 +90,9 @@ func (s *signService) SetExtendedACL(ctx context.Context, req *container.SetExte func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.SetExtendedACL(ctx, req.(*container.SetExtendedACLRequest)) }, + func() util.ResponseMessage { + return new(container.SetExtendedACLResponse) + }, ) if err != nil { return nil, err @@ -91,6 +106,9 @@ func (s *signService) GetExtendedACL(ctx context.Context, req *container.GetExte func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.GetExtendedACL(ctx, req.(*container.GetExtendedACLRequest)) }, + func() util.ResponseMessage { + return new(container.GetExtendedACLResponse) + }, ) if err != nil { return nil, err @@ -104,6 +122,9 @@ func (s *signService) AnnounceUsedSpace(ctx context.Context, req *container.Anno func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.AnnounceUsedSpace(ctx, req.(*container.AnnounceUsedSpaceRequest)) }, + func() util.ResponseMessage { + return new(container.AnnounceUsedSpaceResponse) + }, ) if err != nil { return nil, err diff --git a/pkg/services/control/convert.go b/pkg/services/control/convert.go index dd5d63091..b8d7fc295 100644 --- a/pkg/services/control/convert.go +++ b/pkg/services/control/convert.go @@ -1,8 +1,8 @@ package control import ( - "github.com/nspcc-dev/neofs-api-go/rpc/grpc" - "github.com/nspcc-dev/neofs-api-go/rpc/message" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/message" ) type requestWrapper struct { diff --git a/pkg/services/control/ir/convert.go b/pkg/services/control/ir/convert.go index 3536951f4..24235e520 100644 --- a/pkg/services/control/ir/convert.go +++ b/pkg/services/control/ir/convert.go @@ -1,8 +1,8 @@ package control import ( - "github.com/nspcc-dev/neofs-api-go/rpc/grpc" - "github.com/nspcc-dev/neofs-api-go/rpc/message" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/message" ) type requestWrapper struct { diff --git a/pkg/services/control/ir/rpc.go b/pkg/services/control/ir/rpc.go index 8ed7cf44e..e7b7c2320 100644 --- a/pkg/services/control/ir/rpc.go +++ b/pkg/services/control/ir/rpc.go @@ -1,8 +1,8 @@ package control import ( - "github.com/nspcc-dev/neofs-api-go/rpc/client" - "github.com/nspcc-dev/neofs-api-go/rpc/common" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/common" ) const serviceName = "ircontrol.ControlService" diff --git a/pkg/services/control/ir/service.go b/pkg/services/control/ir/service.go index 8f7705c98..7e542a090 100644 --- a/pkg/services/control/ir/service.go +++ b/pkg/services/control/ir/service.go @@ -1,7 +1,7 @@ package control import ( - "github.com/nspcc-dev/neofs-api-go/util/proto" + "github.com/nspcc-dev/neofs-api-go/v2/util/proto" ) // StableMarshal reads binary representation of health check request body diff --git a/pkg/services/control/rpc.go b/pkg/services/control/rpc.go index 15fdf1c43..ef3311c0e 100644 --- a/pkg/services/control/rpc.go +++ b/pkg/services/control/rpc.go @@ -1,8 +1,8 @@ package control import ( - "github.com/nspcc-dev/neofs-api-go/rpc/client" - "github.com/nspcc-dev/neofs-api-go/rpc/common" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/common" ) const serviceName = "control.ControlService" diff --git a/pkg/services/control/service.go b/pkg/services/control/service.go index 66ef33b24..753769832 100644 --- a/pkg/services/control/service.go +++ b/pkg/services/control/service.go @@ -1,7 +1,7 @@ package control import ( - "github.com/nspcc-dev/neofs-api-go/util/proto" + "github.com/nspcc-dev/neofs-api-go/v2/util/proto" ) // StableMarshal reads binary representation of health check request body diff --git a/pkg/services/control/types.go b/pkg/services/control/types.go index ed563337e..ec55c9aff 100644 --- a/pkg/services/control/types.go +++ b/pkg/services/control/types.go @@ -1,7 +1,7 @@ package control import ( - "github.com/nspcc-dev/neofs-api-go/util/proto" + "github.com/nspcc-dev/neofs-api-go/v2/util/proto" "google.golang.org/protobuf/encoding/protojson" ) diff --git a/pkg/services/netmap/sign.go b/pkg/services/netmap/sign.go index adff8548a..13473c441 100644 --- a/pkg/services/netmap/sign.go +++ b/pkg/services/netmap/sign.go @@ -28,6 +28,9 @@ func (s *signService) LocalNodeInfo( func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.LocalNodeInfo(ctx, req.(*netmap.LocalNodeInfoRequest)) }, + func() util.ResponseMessage { + return new(netmap.LocalNodeInfoResponse) + }, ) if err != nil { return nil, err @@ -41,6 +44,9 @@ func (s *signService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRe func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.NetworkInfo(ctx, req.(*netmap.NetworkInfoRequest)) }, + func() util.ResponseMessage { + return new(netmap.NetworkInfoResponse) + }, ) if err != nil { return nil, err diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 1d36a2075..8e92404d7 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -9,10 +9,10 @@ import ( "io" "sync" - rpcclient "github.com/nspcc-dev/neofs-api-go/rpc/client" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-api-go/v2/refs" "github.com/nspcc-dev/neofs-api-go/v2/rpc" + rpcclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/nspcc-dev/neofs-node/pkg/core/client" diff --git a/pkg/services/object/internal/client/client.go b/pkg/services/object/internal/client/client.go index 9a4cacf6a..27b908867 100644 --- a/pkg/services/object/internal/client/client.go +++ b/pkg/services/object/internal/client/client.go @@ -7,6 +7,7 @@ import ( session2 "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-sdk-go/client" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/session" @@ -112,12 +113,12 @@ func (x *GetObjectPrm) SetAddress(addr *object.Address) { // GetObjectRes groups resulting values of GetObject operation. type GetObjectRes struct { - cliRes *object.Object + cliRes *client.ObjectGetRes } // Object returns requested object. func (x GetObjectRes) Object() *object.Object { - return x.cliRes + return x.cliRes.Object() } // GetObject reads the object by address. @@ -130,6 +131,10 @@ func (x GetObjectRes) Object() *object.Object { // object.ErrAlreadyRemoved error if requested object is marked to be removed. func GetObject(prm GetObjectPrm) (res GetObjectRes, err error) { res.cliRes, err = prm.cli.GetObject(prm.ctx, &prm.cliPrm, prm.opts...) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } // FIXME: object.ErrAlreadyRemoved never returns @@ -159,12 +164,12 @@ func (x *HeadObjectPrm) SetAddress(addr *object.Address) { // GetObjectRes groups resulting values of GetObject operation. type HeadObjectRes struct { - cliRes *object.Object + cliRes *client.ObjectHeadRes } // Header returns requested object header. func (x HeadObjectRes) Header() *object.Object { - return x.cliRes + return x.cliRes.Object() } // HeadObject reads object header by address. @@ -176,7 +181,11 @@ func (x HeadObjectRes) Header() *object.Object { // error of type *object.SplitInfoError if object if raw flag is set and requested object is virtual; // object.ErrAlreadyRemoved error if requested object is marked to be removed. func HeadObject(prm HeadObjectPrm) (res HeadObjectRes, err error) { - res.cliRes, err = prm.cli.GetObjectHeader(prm.ctx, &prm.cliPrm, prm.opts...) + res.cliRes, err = prm.cli.HeadObject(prm.ctx, &prm.cliPrm, prm.opts...) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } // FIXME: object.ErrAlreadyRemoved never returns @@ -213,12 +222,12 @@ func (x *PayloadRangePrm) SetRange(rng *object.Range) { // PayloadRangeRes groups resulting values of GetObject operation. type PayloadRangeRes struct { - cliRes []byte + cliRes *client.ObjectRangeRes } // PayloadRange returns data of the requested payload range. func (x PayloadRangeRes) PayloadRange() []byte { - return x.cliRes + return x.cliRes.Data() } // PayloadRange reads object payload range by address. @@ -231,6 +240,10 @@ func (x PayloadRangeRes) PayloadRange() []byte { // object.ErrAlreadyRemoved error if requested object is marked to be removed. func PayloadRange(prm PayloadRangePrm) (res PayloadRangeRes, err error) { res.cliRes, err = prm.cli.ObjectPayloadRangeData(prm.ctx, &prm.cliPrm, prm.opts...) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } // FIXME: object.ErrAlreadyRemoved never returns @@ -253,12 +266,12 @@ func (x *PutObjectPrm) SetObject(obj *object.Object) { // PutObjectRes groups resulting values of PutObject operation. type PutObjectRes struct { - cliRes *object.ID + cliRes *client.ObjectPutRes } // ID returns identifier of the stored object. func (x PutObjectRes) ID() *object.ID { - return x.cliRes + return x.cliRes.ID() } // PutObject saves the object in local storage of the remote node. @@ -270,6 +283,10 @@ func PutObject(prm PutObjectPrm) (res PutObjectRes, err error) { res.cliRes, err = prm.cli.PutObject(prm.ctx, &prm.cliPrm, append(prm.opts, client.WithTTL(1))..., ) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } @@ -295,19 +312,23 @@ func (x *SearchObjectsPrm) SetFilters(fs object.SearchFilters) { // SearchObjectsRes groups resulting values of SearchObjects operation. type SearchObjectsRes struct { - cliRes []*object.ID + cliRes *client.ObjectSearchRes } // IDList returns identifiers of the matched objects. func (x SearchObjectsRes) IDList() []*object.ID { - return x.cliRes + return x.cliRes.IDList() } // SearchObjects selects objects from container which match the filters. // // Returns any error prevented the operation from completing correctly in error return. func SearchObjects(prm SearchObjectsPrm) (res SearchObjectsRes, err error) { - res.cliRes, err = prm.cli.SearchObject(prm.ctx, &prm.cliPrm, prm.opts...) + res.cliRes, err = prm.cli.SearchObjects(prm.ctx, &prm.cliPrm, prm.opts...) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.cliRes.Status()) + } return } diff --git a/pkg/services/object/search/v2/util.go b/pkg/services/object/search/v2/util.go index 09c8d7fa8..4b58964b1 100644 --- a/pkg/services/object/search/v2/util.go +++ b/pkg/services/object/search/v2/util.go @@ -6,9 +6,9 @@ import ( "io" "sync" - rpcclient "github.com/nspcc-dev/neofs-api-go/rpc/client" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-api-go/v2/rpc" + rpcclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/nspcc-dev/neofs-node/pkg/core/client" diff --git a/pkg/services/object/sign.go b/pkg/services/object/sign.go index f5b8c4648..9361755bf 100644 --- a/pkg/services/object/sign.go +++ b/pkg/services/object/sign.go @@ -21,6 +21,8 @@ type searchStreamSigner struct { util.ServerStream respWriter util.ResponseMessageWriter + + nonEmptyResp bool // set on first Send call } type getStreamSigner struct { @@ -52,19 +54,20 @@ func (s *getStreamSigner) Send(resp *object.GetResponse) error { } func (s *SignService) Get(req *object.GetRequest, stream GetObjectStream) error { - respWriter, err := s.sigSvc.HandleServerStreamRequest(req, + return s.sigSvc.HandleServerStreamRequest(req, func(resp util.ResponseMessage) error { return stream.Send(resp.(*object.GetResponse)) }, + func() util.ResponseMessage { + return new(object.GetResponse) + }, + func(respWriter util.ResponseMessageWriter) error { + return s.svc.Get(req, &getStreamSigner{ + ServerStream: stream, + respWriter: respWriter, + }) + }, ) - if err != nil { - return err - } - - return s.svc.Get(req, &getStreamSigner{ - ServerStream: stream, - respWriter: respWriter, - }) } func (s *putStreamSigner) Send(req *object.PutRequest) error { @@ -94,6 +97,9 @@ func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) { func() (util.ResponseMessage, error) { return stream.CloseAndRecv() }, + func() util.ResponseMessage { + return new(object.PutResponse) + }, ), }, nil } @@ -103,6 +109,9 @@ func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*objec func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.Head(ctx, req.(*object.HeadRequest)) }, + func() util.ResponseMessage { + return new(object.HeadResponse) + }, ) if err != nil { return nil, err @@ -112,23 +121,38 @@ func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*objec } func (s *searchStreamSigner) Send(resp *object.SearchResponse) error { + s.nonEmptyResp = true return s.respWriter(resp) } func (s *SignService) Search(req *object.SearchRequest, stream SearchStream) error { - respWriter, err := s.sigSvc.HandleServerStreamRequest(req, + return s.sigSvc.HandleServerStreamRequest(req, func(resp util.ResponseMessage) error { return stream.Send(resp.(*object.SearchResponse)) }, - ) - if err != nil { - return err - } + func() util.ResponseMessage { + return new(object.SearchResponse) + }, + func(respWriter util.ResponseMessageWriter) error { + stream := &searchStreamSigner{ + ServerStream: stream, + respWriter: respWriter, + } - return s.svc.Search(req, &searchStreamSigner{ - ServerStream: stream, - respWriter: respWriter, - }) + err := s.svc.Search(req, stream) + + if err == nil && !stream.nonEmptyResp { + // The higher component does not write any response in the case of an empty result (which is correct). + // With the introduction of status returns at least one answer must be signed and sent to the client. + // This approach is supported by clients who do not know how to work with statuses (one could make + // a switch according to the protocol version from the request, but the costs of sending an empty + // answer can be neglected due to the gradual refusal to use the "old" clients). + return stream.Send(new(object.SearchResponse)) + } + + return nil + }, + ) } func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { @@ -136,6 +160,9 @@ func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*o func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.Delete(ctx, req.(*object.DeleteRequest)) }, + func() util.ResponseMessage { + return new(object.DeleteResponse) + }, ) if err != nil { return nil, err @@ -149,19 +176,20 @@ func (s *getRangeStreamSigner) Send(resp *object.GetRangeResponse) error { } func (s *SignService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error { - respWriter, err := s.sigSvc.HandleServerStreamRequest(req, + return s.sigSvc.HandleServerStreamRequest(req, func(resp util.ResponseMessage) error { return stream.Send(resp.(*object.GetRangeResponse)) }, + func() util.ResponseMessage { + return new(object.GetRangeResponse) + }, + func(respWriter util.ResponseMessageWriter) error { + return s.svc.GetRange(req, &getRangeStreamSigner{ + ServerStream: stream, + respWriter: respWriter, + }) + }, ) - if err != nil { - return err - } - - return s.svc.GetRange(req, &getRangeStreamSigner{ - ServerStream: stream, - respWriter: respWriter, - }) } func (s *SignService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { @@ -169,6 +197,9 @@ func (s *SignService) GetRangeHash(ctx context.Context, req *object.GetRangeHash func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest)) }, + func() util.ResponseMessage { + return new(object.GetRangeHashResponse) + }, ) if err != nil { return nil, err diff --git a/pkg/services/reputation/rpc/sign.go b/pkg/services/reputation/rpc/sign.go index bbbf946c6..73b2ca9cf 100644 --- a/pkg/services/reputation/rpc/sign.go +++ b/pkg/services/reputation/rpc/sign.go @@ -26,6 +26,9 @@ func (s *signService) AnnounceLocalTrust(ctx context.Context, req *reputation.An func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.AnnounceLocalTrust(ctx, req.(*reputation.AnnounceLocalTrustRequest)) }, + func() util.ResponseMessage { + return new(reputation.AnnounceLocalTrustResponse) + }, ) if err != nil { return nil, err @@ -39,6 +42,9 @@ func (s *signService) AnnounceIntermediateResult(ctx context.Context, req *reput func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.AnnounceIntermediateResult(ctx, req.(*reputation.AnnounceIntermediateResultRequest)) }, + func() util.ResponseMessage { + return new(reputation.AnnounceIntermediateResultResponse) + }, ) if err != nil { return nil, err diff --git a/pkg/services/session/sign.go b/pkg/services/session/sign.go index 9519691ef..06e13fc9f 100644 --- a/pkg/services/session/sign.go +++ b/pkg/services/session/sign.go @@ -26,6 +26,9 @@ func (s *signService) Create(ctx context.Context, req *session.CreateRequest) (* func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.Create(ctx, req.(*session.CreateRequest)) }, + func() util.ResponseMessage { + return new(session.CreateResponse) + }, ) if err != nil { return nil, err diff --git a/pkg/services/util/sign.go b/pkg/services/util/sign.go index 5c239b2b2..3b3253658 100644 --- a/pkg/services/util/sign.go +++ b/pkg/services/util/sign.go @@ -3,12 +3,18 @@ package util import ( "context" "crypto/ecdsa" + "errors" "fmt" "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" ) +type RequestMessage interface { + GetMetaHeader() *session.RequestMetaHeader +} + // ResponseMessage is an interface of NeoFS response message. type ResponseMessage interface { GetMetaHeader() *session.ResponseMetaHeader @@ -27,11 +33,9 @@ type ServerStreamHandler func(context.Context, interface{}) (ResponseMessageRead type ResponseMessageReader func() (ResponseMessage, error) -type ResponseMessageStreamer struct { - key *ecdsa.PrivateKey +var ErrAbortStream = errors.New("abort message stream") - recv ResponseMessageReader -} +type ResponseConstructor func() ResponseMessage type RequestMessageWriter func(interface{}) error @@ -43,6 +47,12 @@ type RequestMessageStreamer struct { send RequestMessageWriter close ClientStreamCloser + + respCons ResponseConstructor + + statusSupported bool + + sendErr error } func NewUnarySignService(key *ecdsa.PrivateKey) *SignService { @@ -52,79 +62,192 @@ func NewUnarySignService(key *ecdsa.PrivateKey) *SignService { } func (s *RequestMessageStreamer) Send(req interface{}) error { + // req argument should be strengthen with type RequestMessage + s.statusSupported = isStatusSupported(req.(RequestMessage)) // panic is OK here for now + + var err error + // verify request signatures - if err := signature.VerifyServiceMessage(req); err != nil { - return fmt.Errorf("could not verify request: %w", err) + if err = signature.VerifyServiceMessage(req); err != nil { + err = fmt.Errorf("could not verify request: %w", err) + } else { + err = s.send(req) } - return s.send(req) + if err != nil { + if !s.statusSupported { + return err + } + + s.sendErr = err + + return ErrAbortStream + } + + return nil } func (s *RequestMessageStreamer) CloseAndRecv() (ResponseMessage, error) { - resp, err := s.close() - if err != nil { - return nil, fmt.Errorf("could not close stream and receive response: %w", err) + var ( + resp ResponseMessage + err error + ) + + if s.sendErr != nil { + err = s.sendErr + } else { + resp, err = s.close() + if err != nil { + err = fmt.Errorf("could not close stream and receive response: %w", err) + } } - if err := signature.SignServiceMessage(s.key, resp); err != nil { - return nil, fmt.Errorf("could not sign response: %w", err) + if err != nil { + if !s.statusSupported { + return nil, err + } + + var st apistatus.ServerInternal // specific API status should be set according to error + + apistatus.WriteInternalServerErr(&st, err) + + resp = s.respCons() + + setStatusV2(resp, st) + } + + if err = signResponse(s.key, resp, s.statusSupported); err != nil { + return nil, err } return resp, nil } -func (s *SignService) CreateRequestStreamer(sender RequestMessageWriter, closer ClientStreamCloser) *RequestMessageStreamer { +func (s *SignService) CreateRequestStreamer(sender RequestMessageWriter, closer ClientStreamCloser, blankResp ResponseConstructor) *RequestMessageStreamer { return &RequestMessageStreamer{ key: s.key, send: sender, close: closer, + + respCons: blankResp, } } -func (s *ResponseMessageStreamer) Recv() (ResponseMessage, error) { - m, err := s.recv() - if err != nil { - return nil, fmt.Errorf("could not receive response message for signing: %w", err) - } +func (s *SignService) HandleServerStreamRequest( + req interface{}, + respWriter ResponseMessageWriter, + blankResp ResponseConstructor, + respWriterCaller func(ResponseMessageWriter) error, +) error { + // handle protocol versions <=2.10 (API statuses was introduced in 2.11 only) - if err := signature.SignServiceMessage(s.key, m); err != nil { - return nil, fmt.Errorf("could not sign response message: %w", err) - } + // req argument should be strengthen with type RequestMessage + statusSupported := isStatusSupported(req.(RequestMessage)) // panic is OK here for now - return m, nil -} + var err error -func (s *SignService) HandleServerStreamRequest(req interface{}, respWriter ResponseMessageWriter) (ResponseMessageWriter, error) { // verify request signatures - if err := signature.VerifyServiceMessage(req); err != nil { - return nil, fmt.Errorf("could not verify request: %w", err) + if err = signature.VerifyServiceMessage(req); err != nil { + err = fmt.Errorf("could not verify request: %w", err) + } else { + err = respWriterCaller(func(resp ResponseMessage) error { + if err := signResponse(s.key, resp, statusSupported); err != nil { + return err + } + + return respWriter(resp) + }) } - return func(resp ResponseMessage) error { - if err := signature.SignServiceMessage(s.key, resp); err != nil { - return fmt.Errorf("could not sign response message: %w", err) + if err != nil { + if !statusSupported { + return err } - return respWriter(resp) - }, nil -} + var st apistatus.ServerInternal // specific API status should be set according to error -func (s *SignService) HandleUnaryRequest(ctx context.Context, req interface{}, handler UnaryHandler) (ResponseMessage, error) { - // verify request signatures - if err := signature.VerifyServiceMessage(req); err != nil { - return nil, fmt.Errorf("could not verify request: %w", err) + apistatus.WriteInternalServerErr(&st, err) + + resp := blankResp() + + setStatusV2(resp, st) + + _ = signResponse(s.key, resp, false) // panics or returns nil with false arg + + return respWriter(resp) + } + + return nil +} + +func (s *SignService) HandleUnaryRequest(ctx context.Context, req interface{}, handler UnaryHandler, blankResp ResponseConstructor) (ResponseMessage, error) { + // handle protocol versions <=2.10 (API statuses was introduced in 2.11 only) + + // req argument should be strengthen with type RequestMessage + statusSupported := isStatusSupported(req.(RequestMessage)) // panic is OK here for now + + var ( + resp ResponseMessage + err error + ) + + // verify request signatures + if err = signature.VerifyServiceMessage(req); err != nil { + err = fmt.Errorf("could not verify request: %w", err) + } else { + // process request + resp, err = handler(ctx, req) } - // process request - resp, err := handler(ctx, req) if err != nil { - return nil, fmt.Errorf("could not handle request: %w", err) + if !statusSupported { + return nil, err + } + + var st apistatus.ServerInternal // specific API status should be set according to error + + apistatus.WriteInternalServerErr(&st, err) + + resp = blankResp() + + setStatusV2(resp, st) } // sign the response - if err := signature.SignServiceMessage(s.key, resp); err != nil { - return nil, fmt.Errorf("could not sign response: %w", err) + if err = signResponse(s.key, resp, statusSupported); err != nil { + return nil, err } return resp, nil } + +func isStatusSupported(req RequestMessage) bool { + version := req.GetMetaHeader().GetVersion() + + mjr := version.GetMajor() + + return mjr > 2 || mjr == 2 && version.GetMinor() >= 11 +} + +func setStatusV2(resp ResponseMessage, st apistatus.Status) { + session.SetStatus(resp, apistatus.ToStatusV2(st)) +} + +// signs response with private key via signature.SignServiceMessage. +// The signature error affects the result depending on the protocol version: +// * if status return is supported, panics since we cannot return the failed status, because it will not be signed; +// * otherwise, returns error in order to transport it directly. +func signResponse(key *ecdsa.PrivateKey, resp interface{}, statusSupported bool) error { + err := signature.SignServiceMessage(key, resp) + if err != nil { + err = fmt.Errorf("could not sign response: %w", err) + + if statusSupported { + // We can't pass this error as status code since response will be unsigned. + // Isn't expected in practice, so panic is ok here. + panic(err) + } + } + + return err +}