From e0dce1043aa8c8f9338089c1a9356a78e70f74ed Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 25 Feb 2022 12:20:49 +0300 Subject: [PATCH] [#1195] Adopt recent changes in NeoFS SDK Signed-off-by: Leonard Lyubich --- cmd/neofs-cli/internal/client/client.go | 477 +++++++++++++----- cmd/neofs-cli/internal/client/prm.go | 10 +- cmd/neofs-cli/modules/container.go | 12 +- cmd/neofs-node/container.go | 6 +- cmd/neofs-node/object.go | 75 ++- .../reputation/intermediate/remote.go | 1 - .../reputation/internal/client/client.go | 18 +- cmd/neofs-node/reputation/local/remote.go | 1 - go.mod | 6 +- go.sum | Bin 93820 -> 93774 bytes pkg/core/client/client.go | 20 +- pkg/innerring/internal/client/client.go | 193 +++++-- .../processors/neofs/process_bind.go | 15 +- .../processors/settlement/basic/context.go | 21 +- pkg/innerring/rpc.go | 4 - pkg/network/cache/multi.go | 36 +- pkg/services/audit/auditor/pop.go | 4 +- pkg/services/container/executor.go | 4 +- pkg/services/container/morph/executor_test.go | 18 +- pkg/services/object/acl/acl.go | 28 +- pkg/services/object/acl/acl_test.go | 2 +- pkg/services/object/acl/classifier.go | 4 +- pkg/services/object/internal/client/client.go | 331 +++++++++--- .../object_manager/storagegroup/collect.go | 3 +- .../object_manager/transformer/transformer.go | 8 +- 25 files changed, 879 insertions(+), 418 deletions(-) diff --git a/cmd/neofs-cli/internal/client/client.go b/cmd/neofs-cli/internal/client/client.go index fb23fec80..706c84bcf 100644 --- a/cmd/neofs-cli/internal/client/client.go +++ b/cmd/neofs-cli/internal/client/client.go @@ -1,7 +1,10 @@ package internal import ( + "bytes" "context" + "errors" + "fmt" "io" "github.com/nspcc-dev/neofs-sdk-go/accounting" @@ -19,12 +22,12 @@ import ( // BalanceOfPrm groups parameters of BalanceOf operation. type BalanceOfPrm struct { commonPrm - client.GetBalancePrm + client.PrmBalanceGet } // BalanceOfRes groups resulting values of BalanceOf operation. type BalanceOfRes struct { - cliRes *client.GetBalanceRes + cliRes *client.ResBalanceGet } // Balance returns current balance. @@ -36,7 +39,7 @@ func (x BalanceOfRes) Balance() *accounting.Decimal { // // Returns any error prevented the operation from completing correctly in error return. func BalanceOf(prm BalanceOfPrm) (res BalanceOfRes, err error) { - res.cliRes, err = prm.cli.GetBalance(context.Background(), prm.GetBalancePrm) + res.cliRes, err = prm.cli.BalanceGet(context.Background(), prm.PrmBalanceGet) return } @@ -44,12 +47,12 @@ func BalanceOf(prm BalanceOfPrm) (res BalanceOfRes, err error) { // ListContainersPrm groups parameters of ListContainers operation. type ListContainersPrm struct { commonPrm - client.ContainerListPrm + client.PrmContainerList } // ListContainersRes groups resulting values of ListContainers operation. type ListContainersRes struct { - cliRes *client.ContainerListRes + cliRes *client.ResContainerList } // IDList returns list of identifiers of user's containers. @@ -61,7 +64,7 @@ func (x ListContainersRes) IDList() []*cid.ID { // // Returns any error prevented the operation from completing correctly in error return. func ListContainers(prm ListContainersPrm) (res ListContainersRes, err error) { - res.cliRes, err = prm.cli.ListContainers(context.Background(), prm.ContainerListPrm) + res.cliRes, err = prm.cli.ContainerList(context.Background(), prm.PrmContainerList) return } @@ -69,12 +72,12 @@ func ListContainers(prm ListContainersPrm) (res ListContainersRes, err error) { // PutContainerPrm groups parameters of PutContainer operation. type PutContainerPrm struct { commonPrm - client.ContainerPutPrm + client.PrmContainerPut } // PutContainerRes groups resulting values of PutContainer operation. type PutContainerRes struct { - cliRes *client.ContainerPutRes + cliRes *client.ResContainerPut } // ID returns identifier of the created container. @@ -91,7 +94,7 @@ func (x PutContainerRes) ID() *cid.ID { // // Returns any error prevented the operation from completing correctly in error return. func PutContainer(prm PutContainerPrm) (res PutContainerRes, err error) { - res.cliRes, err = prm.cli.PutContainer(context.Background(), prm.ContainerPutPrm) + res.cliRes, err = prm.cli.ContainerPut(context.Background(), prm.PrmContainerPut) return } @@ -99,7 +102,7 @@ func PutContainer(prm PutContainerPrm) (res PutContainerRes, err error) { // GetContainerPrm groups parameters of GetContainer operation. type GetContainerPrm struct { commonPrm - cliPrm client.ContainerGetPrm + cliPrm client.PrmContainerGet } // SetContainer sets identifier of the container to be read. @@ -109,7 +112,7 @@ func (x *GetContainerPrm) SetContainer(id cid.ID) { // GetContainerRes groups resulting values of GetContainer operation. type GetContainerRes struct { - cliRes *client.ContainerGetRes + cliRes *client.ResContainerGet } // Container returns structured of the requested container. @@ -121,7 +124,7 @@ func (x GetContainerRes) Container() *container.Container { // // Returns any error prevented the operation from completing correctly in error return. func GetContainer(prm GetContainerPrm) (res GetContainerRes, err error) { - res.cliRes, err = prm.cli.GetContainer(context.Background(), prm.cliPrm) + res.cliRes, err = prm.cli.ContainerGet(context.Background(), prm.cliPrm) return } @@ -129,7 +132,7 @@ func GetContainer(prm GetContainerPrm) (res GetContainerRes, err error) { // DeleteContainerPrm groups parameters of DeleteContainerPrm operation. type DeleteContainerPrm struct { commonPrm - client.ContainerDeletePrm + client.PrmContainerDelete } // DeleteContainerRes groups resulting values of DeleteContainer operation. @@ -144,7 +147,7 @@ type DeleteContainerRes struct{} // // Returns any error prevented the operation from completing correctly in error return. func DeleteContainer(prm DeleteContainerPrm) (res DeleteContainerRes, err error) { - _, err = prm.cli.DeleteContainer(context.Background(), prm.ContainerDeletePrm) + _, err = prm.cli.ContainerDelete(context.Background(), prm.PrmContainerDelete) return } @@ -152,12 +155,12 @@ func DeleteContainer(prm DeleteContainerPrm) (res DeleteContainerRes, err error) // EACLPrm groups parameters of EACL operation. type EACLPrm struct { commonPrm - client.EACLPrm + client.PrmContainerEACL } // EACLRes groups resulting values of EACL operation. type EACLRes struct { - cliRes *client.EACLRes + cliRes *client.ResContainerEACL } // EACL returns requested eACL table. @@ -169,7 +172,7 @@ func (x EACLRes) EACL() *eacl.Table { // // Returns any error prevented the operation from completing correctly in error return. func EACL(prm EACLPrm) (res EACLRes, err error) { - res.cliRes, err = prm.cli.EACL(context.Background(), prm.EACLPrm) + res.cliRes, err = prm.cli.ContainerEACL(context.Background(), prm.PrmContainerEACL) return } @@ -177,7 +180,7 @@ func EACL(prm EACLPrm) (res EACLRes, err error) { // SetEACLPrm groups parameters of SetEACL operation. type SetEACLPrm struct { commonPrm - client.SetEACLPrm + client.PrmContainerSetEACL } // SetEACLRes groups resulting values of SetEACL operation. @@ -192,7 +195,7 @@ type SetEACLRes struct{} // // Returns any error prevented the operation from completing correctly in error return. func SetEACL(prm SetEACLPrm) (res SetEACLRes, err error) { - _, err = prm.cli.SetEACL(context.Background(), prm.SetEACLPrm) + _, err = prm.cli.ContainerSetEACL(context.Background(), prm.PrmContainerSetEACL) return } @@ -200,12 +203,12 @@ func SetEACL(prm SetEACLPrm) (res SetEACLRes, err error) { // NetworkInfoPrm groups parameters of NetworkInfo operation. type NetworkInfoPrm struct { commonPrm - client.NetworkInfoPrm + client.PrmNetworkInfo } // NetworkInfoRes groups resulting values of NetworkInfo operation. type NetworkInfoRes struct { - cliRes *client.NetworkInfoRes + cliRes *client.ResNetworkInfo } // NetworkInfo returns structured information about the NeoFS network. @@ -217,7 +220,7 @@ func (x NetworkInfoRes) NetworkInfo() *netmap.NetworkInfo { // // Returns any error prevented the operation from completing correctly in error return. func NetworkInfo(prm NetworkInfoPrm) (res NetworkInfoRes, err error) { - res.cliRes, err = prm.cli.NetworkInfo(context.Background(), prm.NetworkInfoPrm) + res.cliRes, err = prm.cli.NetworkInfo(context.Background(), prm.PrmNetworkInfo) return } @@ -225,12 +228,12 @@ func NetworkInfo(prm NetworkInfoPrm) (res NetworkInfoRes, err error) { // NodeInfoPrm groups parameters of NodeInfo operation. type NodeInfoPrm struct { commonPrm - client.EndpointInfoPrm + client.PrmEndpointInfo } // NodeInfoRes groups resulting values of NodeInfo operation. type NodeInfoRes struct { - cliRes *client.EndpointInfoRes + cliRes *client.ResEndpointInfo } // NodeInfo returns information about the node from netmap. @@ -247,7 +250,7 @@ func (x NodeInfoRes) LatestVersion() *version.Version { // // Returns any error prevented the operation from completing correctly in error return. func NodeInfo(prm NodeInfoPrm) (res NodeInfoRes, err error) { - res.cliRes, err = prm.cli.EndpointInfo(context.Background(), prm.EndpointInfoPrm) + res.cliRes, err = prm.cli.EndpointInfo(context.Background(), prm.PrmEndpointInfo) return } @@ -255,12 +258,12 @@ func NodeInfo(prm NodeInfoPrm) (res NodeInfoRes, err error) { // CreateSessionPrm groups parameters of CreateSession operation. type CreateSessionPrm struct { commonPrm - client.CreateSessionPrm + client.PrmSessionCreate } // CreateSessionRes groups resulting values of CreateSession operation. type CreateSessionRes struct { - cliRes *client.CreateSessionRes + cliRes *client.ResSessionCreate } // ID returns session identifier. @@ -277,7 +280,7 @@ func (x CreateSessionRes) SessionKey() []byte { // // Returns any error prevented the operation from completing correctly in error return. func CreateSession(prm CreateSessionPrm) (res CreateSessionRes, err error) { - res.cliRes, err = prm.cli.CreateSession(context.Background(), prm.CreateSessionPrm) + res.cliRes, err = prm.cli.SessionCreate(context.Background(), prm.PrmSessionCreate) return } @@ -303,29 +306,94 @@ func (x *PutObjectPrm) SetPayloadReader(rdr io.Reader) { // PutObjectRes groups resulting values of PutObject operation. type PutObjectRes struct { - cliRes *client.ObjectPutRes + id *oidSDK.ID } // ID returns identifier of the created object. func (x PutObjectRes) ID() *oidSDK.ID { - return x.cliRes.ID() + return x.id } // PutObject saves the object in NeoFS network. // // Returns any error prevented the operation from completing correctly in error return. -func PutObject(prm PutObjectPrm) (res PutObjectRes, err error) { - var putPrm client.PutObjectParams +func PutObject(prm PutObjectPrm) (*PutObjectRes, error) { + var putPrm client.PrmObjectPutInit - putPrm.WithObject(prm.hdr) - putPrm.WithPayloadReader(prm.rdr) + wrt, err := prm.cli.ObjectPutInit(context.Background(), putPrm) + if err != nil { + return nil, fmt.Errorf("init object writing: %w", err) + } - res.cliRes, err = prm.cli.PutObject(context.Background(), &putPrm, append(prm.opts, - client.WithSession(prm.sessionToken), - client.WithBearer(prm.bearerToken), - )...) + if prm.sessionToken != nil { + wrt.WithinSession(*prm.sessionToken) + } - return + if prm.bearerToken != nil { + wrt.WithBearerToken(*prm.bearerToken) + } + + if prm.local { + wrt.MarkLocal() + } + + if wrt.WriteHeader(*prm.hdr) { + sz := prm.hdr.PayloadSize() + + if data := prm.hdr.Payload(); len(data) > 0 { + if prm.rdr != nil { + prm.rdr = io.MultiReader(bytes.NewReader(data), prm.rdr) + } else { + prm.rdr = bytes.NewReader(data) + sz = uint64(len(data)) + } + } + + if prm.rdr != nil { + // TODO: (neofs-node#1198) explore better values or configure it + const defaultBufferSizePut = 4096 + + if sz == 0 || sz > defaultBufferSizePut { + sz = defaultBufferSizePut + } + + buf := make([]byte, sz) + + var n int + + for { + n, err = prm.rdr.Read(buf) + if n > 0 { + if !wrt.WritePayloadChunk(buf[:n]) { + break + } + + continue + } + + if errors.Is(err, io.EOF) { + break + } + + return nil, fmt.Errorf("read payload: %w", err) + } + } + } + + res, err := wrt.Close() + if err != nil { // here err already carries both status and client errors + return nil, fmt.Errorf("client failure: %w", err) + } + + var id oidSDK.ID + + if !res.ReadStoredObjectID(&id) { + return nil, errors.New("missing ID of the stored object") + } + + return &PutObjectRes{ + id: &id, + }, nil } // DeleteObjectPrm groups parameters of DeleteObject operation. @@ -336,28 +404,54 @@ type DeleteObjectPrm struct { // DeleteObjectRes groups resulting values of DeleteObject operation. type DeleteObjectRes struct { - cliRes *client.ObjectDeleteRes + addrTombstone *addressSDK.Address } // TombstoneAddress returns address of the created object with tombstone. func (x DeleteObjectRes) TombstoneAddress() *addressSDK.Address { - return x.cliRes.TombstoneAddress() + return x.addrTombstone } // DeleteObject marks object to be removed from NeoFS through tombstone placement. // // Returns any error prevented the operation from completing correctly in error return. -func DeleteObject(prm DeleteObjectPrm) (res DeleteObjectRes, err error) { - var delPrm client.DeleteObjectParams +func DeleteObject(prm DeleteObjectPrm) (*DeleteObjectRes, error) { + var delPrm client.PrmObjectDelete - delPrm.WithAddress(prm.objAddr) + if id := prm.objAddr.ContainerID(); id != nil { + delPrm.FromContainer(*id) + } - res.cliRes, err = prm.cli.DeleteObject(context.Background(), &delPrm, append(prm.opts, - client.WithSession(prm.sessionToken), - client.WithBearer(prm.bearerToken), - )...) + if id := prm.objAddr.ObjectID(); id != nil { + delPrm.ByID(*id) + } - return + if prm.sessionToken != nil { + delPrm.WithinSession(*prm.sessionToken) + } + + if prm.bearerToken != nil { + delPrm.WithBearerToken(*prm.bearerToken) + } + + cliRes, err := prm.cli.ObjectDelete(context.Background(), delPrm) + if err != nil { + return nil, fmt.Errorf("remove object via client: %w", err) + } + + var id oidSDK.ID + + if !cliRes.ReadTombstoneID(&id) { + return nil, errors.New("object removed but tombstone ID is missing") + } + + var addr addressSDK.Address + addr.SetObjectID(&id) + addr.SetContainerID(prm.objAddr.ContainerID()) + + return &DeleteObjectRes{ + addrTombstone: &addr, + }, nil } // GetObjectPrm groups parameters of GetObject operation. @@ -370,33 +464,79 @@ type GetObjectPrm struct { // GetObjectRes groups resulting values of GetObject operation. type GetObjectRes struct { - cliRes *client.ObjectGetRes + hdr *object.Object } // Header returns header of the request object. func (x GetObjectRes) Header() *object.Object { - return x.cliRes.Object() + return x.hdr } +// maximum size of the buffer use for io.Copy*. +// Chosen small due to the expected low volume of NeoFS CLI process resources. +// TODO: (neofs-node#1198) explore better values or configure it +const maxPayloadBufferSize = 1024 + // GetObject reads the object by address. // // Interrupts on any writer error. If successful, payload is written to writer. // // Returns any error prevented the operation from completing correctly in error return. // For raw reading, returns *object.SplitInfoError error if object is virtual. -func GetObject(prm GetObjectPrm) (res GetObjectRes, err error) { - var getPrm client.GetObjectParams +func GetObject(prm GetObjectPrm) (*GetObjectRes, error) { + var getPrm client.PrmObjectGet - getPrm.WithAddress(prm.objAddr) - getPrm.WithPayloadWriter(prm.wrt) - getPrm.WithRawFlag(prm.raw) + if id := prm.objAddr.ContainerID(); id != nil { + getPrm.FromContainer(*id) + } - res.cliRes, err = prm.cli.GetObject(context.Background(), &getPrm, append(prm.opts, - client.WithSession(prm.sessionToken), - client.WithBearer(prm.bearerToken), - )...) + if id := prm.objAddr.ObjectID(); id != nil { + getPrm.ByID(*id) + } - return + if prm.sessionToken != nil { + getPrm.WithinSession(*prm.sessionToken) + } + + if prm.bearerToken != nil { + getPrm.WithBearerToken(*prm.bearerToken) + } + + if prm.raw { + getPrm.MarkRaw() + } + + if prm.local { + getPrm.MarkLocal() + } + + rdr, err := prm.cli.ObjectGetInit(context.Background(), getPrm) + if err != nil { + return nil, fmt.Errorf("init object reading on client: %w", err) + } + + var hdr object.Object + + if !rdr.ReadHeader(&hdr) { + _, err = rdr.Close() + return nil, fmt.Errorf("read object header: %w", err) + } + + sz := hdr.PayloadSize() + if sz > maxPayloadBufferSize { + sz = maxPayloadBufferSize + } + + buf := make([]byte, sz) + + _, err = io.CopyBuffer(prm.wrt, rdr, buf) + if err != nil { + return nil, fmt.Errorf("copy payload: %w", err) + } + + return &GetObjectRes{ + hdr: &hdr, + }, nil } // HeadObjectPrm groups parameters of HeadObject operation. @@ -415,36 +555,51 @@ func (x *HeadObjectPrm) SetMainOnlyFlag(v bool) { // HeadObjectRes groups resulting values of HeadObject operation. type HeadObjectRes struct { - cliRes *client.ObjectHeadRes + hdr *object.Object } // Header returns requested object header. func (x HeadObjectRes) Header() *object.Object { - return x.cliRes.Object() + return x.hdr } // HeadObject reads object header by address. // // Returns any error prevented the operation from completing correctly in error return. // For raw reading, returns *object.SplitInfoError error if object is virtual. -func HeadObject(prm HeadObjectPrm) (res HeadObjectRes, err error) { - var cliPrm client.ObjectHeaderParams +func HeadObject(prm HeadObjectPrm) (*HeadObjectRes, error) { + var cliPrm client.PrmObjectHead - cliPrm.WithAddress(prm.objAddr) - cliPrm.WithRawFlag(prm.raw) - - if prm.mainOnly { - cliPrm.WithMainFields() - } else { - cliPrm.WithAllFields() + if id := prm.objAddr.ContainerID(); id != nil { + cliPrm.FromContainer(*id) } - res.cliRes, err = prm.cli.HeadObject(context.Background(), &cliPrm, append(prm.opts, - client.WithSession(prm.sessionToken), - client.WithBearer(prm.bearerToken), - )...) + if id := prm.objAddr.ObjectID(); id != nil { + cliPrm.ByID(*id) + } - return + if prm.raw { + cliPrm.MarkRaw() + } + + if prm.local { + cliPrm.MarkLocal() + } + + res, err := prm.cli.ObjectHead(context.Background(), cliPrm) + if err != nil { + return nil, fmt.Errorf("read object header via client: %w", err) + } + + var hdr object.Object + + if !res.ReadHeader(&hdr) { + return nil, fmt.Errorf("missing header in response") + } + + return &HeadObjectRes{ + hdr: &hdr, + }, nil } // SearchObjectsPrm groups parameters of SearchObjects operation. @@ -462,29 +617,68 @@ func (x *SearchObjectsPrm) SetFilters(filters object.SearchFilters) { // SearchObjectsRes groups resulting values of SearchObjects operation. type SearchObjectsRes struct { - cliRes *client.ObjectSearchRes + ids []*oidSDK.ID } // IDList returns identifiers of the matched objects. func (x SearchObjectsRes) IDList() []*oidSDK.ID { - return x.cliRes.IDList() + return x.ids } // SearchObjects selects objects from container which match the filters. // // Returns any error prevented the operation from completing correctly in error return. -func SearchObjects(prm SearchObjectsPrm) (res SearchObjectsRes, err error) { - var cliPrm client.SearchObjectParams +func SearchObjects(prm SearchObjectsPrm) (*SearchObjectsRes, error) { + var cliPrm client.PrmObjectSearch - cliPrm.WithSearchFilters(prm.filters) - cliPrm.WithContainerID(prm.cnrID) + if prm.cnrID != nil { + cliPrm.InContainer(*prm.cnrID) + } - res.cliRes, err = prm.cli.SearchObjects(context.Background(), &cliPrm, append(prm.opts, - client.WithSession(prm.sessionToken), - client.WithBearer(prm.bearerToken), - )...) + cliPrm.SetFilters(prm.filters) - return + if prm.sessionToken != nil { + cliPrm.WithinSession(*prm.sessionToken) + } + + if prm.bearerToken != nil { + cliPrm.WithBearerToken(*prm.bearerToken) + } + + if prm.local { + cliPrm.MarkLocal() + } + + rdr, err := prm.cli.ObjectSearchInit(context.Background(), cliPrm) + if err != nil { + return nil, fmt.Errorf("init object search: %w", err) + } + + buf := make([]oidSDK.ID, 10) + var list []*oidSDK.ID + var n int + var ok bool + + for { + n, ok = rdr.Read(buf) + if !ok { + break + } + + for i := 0; i < n; i++ { + v := buf[i] + list = append(list, &v) + } + } + + _, err = rdr.Close() + if err != nil { + return nil, fmt.Errorf("read object list: %w", err) + } + + return &SearchObjectsRes{ + ids: list, + }, nil } // HashPayloadRangesPrm groups parameters of HashPayloadRanges operation. @@ -516,35 +710,64 @@ func (x *HashPayloadRangesPrm) SetSalt(salt []byte) { // HashPayloadRangesRes groups resulting values of HashPayloadRanges operation. type HashPayloadRangesRes struct { - cliRes *client.ObjectRangeHashRes + cliRes *client.ResObjectHash } // HashList returns list of hashes of the payload ranges keeping order. func (x HashPayloadRangesRes) HashList() [][]byte { - return x.cliRes.Hashes() + return x.cliRes.Checksums() } // HashPayloadRanges requests hashes (by default SHA256) of the object payload ranges. // // Returns any error prevented the operation from completing correctly in error return. // Returns an error if number of received hashes differs with the number of requested ranges. -func HashPayloadRanges(prm HashPayloadRangesPrm) (res HashPayloadRangesRes, err error) { - var cliPrm client.RangeChecksumParams +func HashPayloadRanges(prm HashPayloadRangesPrm) (*HashPayloadRangesRes, error) { + var cliPrm client.PrmObjectHash - cliPrm.WithAddress(prm.objAddr) - cliPrm.WithSalt(prm.salt) - cliPrm.WithRangeList(prm.rngs...) - - if prm.tz { - cliPrm.TZ() + if id := prm.objAddr.ContainerID(); id != nil { + cliPrm.FromContainer(*id) } - res.cliRes, err = prm.cli.HashObjectPayloadRanges(context.Background(), &cliPrm, append(prm.opts, - client.WithSession(prm.sessionToken), - client.WithBearer(prm.bearerToken), - )...) + if id := prm.objAddr.ObjectID(); id != nil { + cliPrm.ByID(*id) + } - return + if prm.local { + cliPrm.MarkLocal() + } + + cliPrm.UseSalt(prm.salt) + + rngs := make([]uint64, 2*len(prm.rngs)) + + for i := range prm.rngs { + rngs[2*i] = prm.rngs[i].GetOffset() + rngs[2*i+1] = prm.rngs[i].GetLength() + } + + cliPrm.SetRangeList(rngs...) + + if prm.tz { + cliPrm.TillichZemorAlgo() + } + + if prm.sessionToken != nil { + cliPrm.WithinSession(*prm.sessionToken) + } + + if prm.bearerToken != nil { + cliPrm.WithBearerToken(*prm.bearerToken) + } + + res, err := prm.cli.ObjectHash(context.Background(), cliPrm) + if err != nil { + return nil, fmt.Errorf("read payload hashes via client: %w", err) + } + + return &HashPayloadRangesRes{ + cliRes: res, + }, nil } // PayloadRangePrm groups parameters of PayloadRange operation. @@ -571,18 +794,44 @@ type PayloadRangeRes struct{} // // Returns any error prevented the operation from completing correctly in error return. // For raw reading, returns *object.SplitInfoError error if object is virtual. -func PayloadRange(prm PayloadRangePrm) (res PayloadRangeRes, err error) { - var cliPrm client.RangeDataParams +func PayloadRange(prm PayloadRangePrm) (*PayloadRangeRes, error) { + var cliPrm client.PrmObjectRange - cliPrm.WithRaw(prm.raw) - cliPrm.WithAddress(prm.objAddr) - cliPrm.WithDataWriter(prm.wrt) - cliPrm.WithRange(prm.rng) + if id := prm.objAddr.ContainerID(); id != nil { + cliPrm.FromContainer(*id) + } - _, err = prm.cli.ObjectPayloadRangeData(context.Background(), &cliPrm, append(prm.opts, - client.WithSession(prm.sessionToken), - client.WithBearer(prm.bearerToken), - )...) + if id := prm.objAddr.ObjectID(); id != nil { + cliPrm.ByID(*id) + } - return + if prm.raw { + cliPrm.MarkRaw() + } + + if prm.local { + cliPrm.MarkLocal() + } + + cliPrm.SetOffset(prm.rng.GetOffset()) + cliPrm.SetLength(prm.rng.GetLength()) + + rdr, err := prm.cli.ObjectRangeInit(context.Background(), cliPrm) + if err != nil { + return nil, fmt.Errorf("init payload reading: %w", err) + } + + sz := prm.rng.GetLength() + if sz > maxPayloadBufferSize { + sz = maxPayloadBufferSize + } + + buf := make([]byte, sz) + + _, err = io.CopyBuffer(prm.wrt, rdr, buf) + if err != nil { + return nil, fmt.Errorf("copy payload: %w", err) + } + + return new(PayloadRangeRes), nil } diff --git a/cmd/neofs-cli/internal/client/prm.go b/cmd/neofs-cli/internal/client/prm.go index 2db149857..186a60669 100644 --- a/cmd/neofs-cli/internal/client/prm.go +++ b/cmd/neofs-cli/internal/client/prm.go @@ -79,17 +79,15 @@ type commonObjectPrm struct { sessionTokenPrm bearerTokenPrm - opts []client.CallOption + local bool } // SetTTL sets request TTL value. func (x *commonObjectPrm) SetTTL(ttl uint32) { - x.opts = append(x.opts, client.WithTTL(ttl)) + x.local = ttl < 2 } // SetXHeaders sets request X-Headers. -func (x *commonObjectPrm) SetXHeaders(xhdrs []*session.XHeader) { - for _, xhdr := range xhdrs { - x.opts = append(x.opts, client.WithXHeader(xhdr)) - } +func (x *commonObjectPrm) SetXHeaders(_ []*session.XHeader) { + // FIXME: (neofs-node#1194) not supported by client } diff --git a/cmd/neofs-cli/modules/container.go b/cmd/neofs-cli/modules/container.go index a6a41f5fd..650e7758d 100644 --- a/cmd/neofs-cli/modules/container.go +++ b/cmd/neofs-cli/modules/container.go @@ -187,10 +187,6 @@ It will be stored in sidechain when inner ring will accepts it.`, prepareAPIClientWithKey(cmd, key, &putPrm, &getPrm) putPrm.SetContainer(*cnr) - if tok != nil { - putPrm.SetSessionToken(*tok) - } - res, err := internalclient.PutContainer(putPrm) exitOnErr(cmd, errf("rpc error: %w", err)) @@ -423,10 +419,6 @@ Container ID in EACL table will be substituted with ID from the CLI.`, prepareAPIClient(cmd, &setEACLPrm, &getEACLPrm) setEACLPrm.SetTable(*eaclTable) - if tok != nil { - setEACLPrm.SetSessionToken(*tok) - } - _, err = internalclient.SetEACL(setEACLPrm) exitOnErr(cmd, errf("rpc error: %w", err)) @@ -754,8 +746,8 @@ func prettyPrintContainer(cmd *cobra.Command, cnr *container.Container, jsonEnco id := container.CalculateID(cnr) cmd.Println("container ID:", id) - version := cnr.Version() - cmd.Printf("version: %d.%d\n", version.Major(), version.Minor()) + v := cnr.Version() + cmd.Printf("version: %d.%d\n", v.Major(), v.Minor()) cmd.Println("owner ID:", cnr.OwnerID()) diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index e83cadd93..4273b5691 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -308,11 +308,11 @@ func (r *remoteLoadAnnounceWriter) Put(a containerSDK.UsedSpaceAnnouncement) err } func (r *remoteLoadAnnounceWriter) Close() error { - var cliPrm apiClient.AnnounceSpacePrm + var cliPrm apiClient.PrmAnnounceSpace cliPrm.SetValues(r.buf) - _, err := r.client.AnnounceContainerUsedSpace(r.ctx, cliPrm) + _, err := r.client.ContainerAnnounceUsedSpace(r.ctx, cliPrm) return err } @@ -515,7 +515,7 @@ func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cid *cid.ID, return false, nil } -func (c *usedSpaceService) processLoadValue(ctx context.Context, a containerSDK.UsedSpaceAnnouncement, +func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.UsedSpaceAnnouncement, route []loadroute.ServerInfo, w loadcontroller.Writer) error { fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.ContainerID(), route[0].PublicKey()) if err != nil { diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 196feb8db..5464a0fee 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -36,6 +36,7 @@ import ( truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-sdk-go/client" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" @@ -449,57 +450,55 @@ func (c *reputationClient) submitResult(err error) { c.cons.trustStorage.Update(prm) } -func (c *reputationClient) PutObject(ctx context.Context, prm *client.PutObjectParams, opts ...client.CallOption) (*client.ObjectPutRes, error) { - res, err := c.MultiAddressClient.PutObject(ctx, prm, opts...) +func (c *reputationClient) ObjectPutInit(ctx context.Context, prm client.PrmObjectPutInit) (*client.ObjectWriter, error) { + res, err := c.MultiAddressClient.ObjectPutInit(ctx, prm) + + // FIXME: (neofs-node#1193) here we submit only initialization errors, writing errors are not processed + c.submitResult(err) + + return res, err +} + +func (c *reputationClient) ObjectDelete(ctx context.Context, prm client.PrmObjectDelete) (*client.ResObjectDelete, error) { + res, err := c.MultiAddressClient.ObjectDelete(ctx, prm) + if err != nil { + c.submitResult(err) + } else { + c.submitResult(apistatus.ErrFromStatus(res.Status())) + } + + return res, err +} + +func (c *reputationClient) GetObjectInit(ctx context.Context, prm client.PrmObjectGet) (*client.ObjectReader, error) { + res, err := c.MultiAddressClient.ObjectGetInit(ctx, prm) + + // FIXME: (neofs-node#1193) here we submit only initialization errors, reading errors are not processed + c.submitResult(err) + + return res, err +} + +func (c *reputationClient) ObjectHead(ctx context.Context, prm client.PrmObjectHead) (*client.ResObjectHead, error) { + res, err := c.MultiAddressClient.ObjectHead(ctx, prm) c.submitResult(err) return res, err } -func (c *reputationClient) DeleteObject(ctx context.Context, prm *client.DeleteObjectParams, opts ...client.CallOption) (*client.ObjectDeleteRes, error) { - res, err := c.MultiAddressClient.DeleteObject(ctx, prm, opts...) +func (c *reputationClient) ObjectHash(ctx context.Context, prm client.PrmObjectHash) (*client.ResObjectHash, error) { + res, err := c.MultiAddressClient.ObjectHash(ctx, prm) c.submitResult(err) return res, err } -func (c *reputationClient) GetObject(ctx context.Context, prm *client.GetObjectParams, opts ...client.CallOption) (*client.ObjectGetRes, error) { - res, err := c.MultiAddressClient.GetObject(ctx, prm, opts...) - - c.submitResult(err) - - return res, err -} - -func (c *reputationClient) HeadObject(ctx context.Context, prm *client.ObjectHeaderParams, opts ...client.CallOption) (*client.ObjectHeadRes, error) { - res, err := c.MultiAddressClient.HeadObject(ctx, prm, opts...) - - c.submitResult(err) - - return res, err -} - -func (c *reputationClient) ObjectPayloadRangeData(ctx context.Context, prm *client.RangeDataParams, opts ...client.CallOption) (*client.ObjectRangeRes, error) { - res, err := c.MultiAddressClient.ObjectPayloadRangeData(ctx, prm, opts...) - - c.submitResult(err) - - return res, err -} - -func (c *reputationClient) HashObjectPayloadRanges(ctx context.Context, prm *client.RangeChecksumParams, opts ...client.CallOption) (*client.ObjectRangeHashRes, error) { - res, err := c.MultiAddressClient.HashObjectPayloadRanges(ctx, prm, opts...) - - c.submitResult(err) - - return res, err -} - -func (c *reputationClient) SearchObjects(ctx context.Context, prm *client.SearchObjectParams, opts ...client.CallOption) (*client.ObjectSearchRes, error) { - res, err := c.MultiAddressClient.SearchObjects(ctx, prm, opts...) +func (c *reputationClient) ObjectSearchInit(ctx context.Context, prm client.PrmObjectSearch) (*client.ObjectListReader, error) { + res, err := c.MultiAddressClient.ObjectSearchInit(ctx, prm) + // FIXME: (neofs-node#1193) here we submit only initialization errors, reading errors are not processed c.submitResult(err) return res, err diff --git a/cmd/neofs-node/reputation/intermediate/remote.go b/cmd/neofs-node/reputation/intermediate/remote.go index 8cc1e3c5b..e19e1e4c2 100644 --- a/cmd/neofs-node/reputation/intermediate/remote.go +++ b/cmd/neofs-node/reputation/intermediate/remote.go @@ -95,7 +95,6 @@ func (rtp *RemoteTrustWriter) Write(t reputation.Trust) error { p.SetContext(rtp.eiCtx) p.SetClient(rtp.client) - p.SetPrivateKey(rtp.key) p.SetEpoch(rtp.eiCtx.Epoch()) p.SetIteration(rtp.eiCtx.I()) p.SetTrust(*apiPeerToPeerTrust) diff --git a/cmd/neofs-node/reputation/internal/client/client.go b/cmd/neofs-node/reputation/internal/client/client.go index e6f40e579..1fcaf5b6e 100644 --- a/cmd/neofs-node/reputation/internal/client/client.go +++ b/cmd/neofs-node/reputation/internal/client/client.go @@ -2,7 +2,6 @@ package internal import ( "context" - "crypto/ecdsa" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-sdk-go/client" @@ -14,8 +13,6 @@ type commonPrm struct { cli coreclient.Client ctx context.Context - - opts []client.CallOption } // SetClient sets base client for NeoFS API communication. @@ -32,18 +29,11 @@ func (x *commonPrm) SetContext(ctx context.Context) { x.ctx = ctx } -// SetPrivateKey sets private key to sign the request(s). -// -// Required parameter. -func (x *commonPrm) SetPrivateKey(key *ecdsa.PrivateKey) { - x.opts = append(x.opts, client.WithKey(key)) -} - // AnnounceLocalPrm groups parameters of AnnounceLocal operation. type AnnounceLocalPrm struct { commonPrm - cliPrm client.AnnounceLocalTrustPrm + cliPrm client.PrmAnnounceLocalTrust } // SetEpoch sets epoch in which the trust was assessed. @@ -65,7 +55,7 @@ type AnnounceLocalRes struct{} // // Returns any error prevented the operation from completing correctly in error return. func AnnounceLocal(prm AnnounceLocalPrm) (res AnnounceLocalRes, err error) { - var cliRes *client.AnnounceLocalTrustRes + var cliRes *client.ResAnnounceLocalTrust cliRes, err = prm.cli.AnnounceLocalTrust(prm.ctx, prm.cliPrm) if err == nil { @@ -80,7 +70,7 @@ func AnnounceLocal(prm AnnounceLocalPrm) (res AnnounceLocalRes, err error) { type AnnounceIntermediatePrm struct { commonPrm - cliPrm client.AnnounceIntermediateTrustPrm + cliPrm client.PrmAnnounceIntermediateTrust } // SetEpoch sets number of the epoch when the trust calculation's iteration was executed. @@ -108,7 +98,7 @@ type AnnounceIntermediateRes struct{} // // Returns any error prevented the operation from completing correctly in error return. func AnnounceIntermediate(prm AnnounceIntermediatePrm) (res AnnounceIntermediateRes, err error) { - var cliRes *client.AnnounceIntermediateTrustRes + var cliRes *client.ResAnnounceIntermediateTrust cliRes, err = prm.cli.AnnounceIntermediateTrust(prm.ctx, prm.cliPrm) if err == nil { diff --git a/cmd/neofs-node/reputation/local/remote.go b/cmd/neofs-node/reputation/local/remote.go index b8254452c..9cea7fbb2 100644 --- a/cmd/neofs-node/reputation/local/remote.go +++ b/cmd/neofs-node/reputation/local/remote.go @@ -89,7 +89,6 @@ func (rtp *RemoteTrustWriter) Close() error { prm.SetContext(rtp.ctx) prm.SetClient(rtp.client) - prm.SetPrivateKey(rtp.key) prm.SetEpoch(rtp.ctx.Epoch()) prm.SetTrusts(rtp.buf) diff --git a/go.mod b/go.mod index 8b3a9236a..d1659caa1 100644 --- a/go.mod +++ b/go.mod @@ -14,10 +14,10 @@ require ( github.com/multiformats/go-multiaddr v0.4.0 github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.98.0 - github.com/nspcc-dev/neofs-api-go/v2 v2.11.2-0.20220127135316-32dd0bb3f9c5 + github.com/nspcc-dev/neofs-api-go/v2 v2.12.0 github.com/nspcc-dev/neofs-contract v0.14.2 - github.com/nspcc-dev/neofs-sdk-go v0.0.0-20220201141054-6a7ba33b59ef - github.com/nspcc-dev/tzhash v1.5.1 + github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.1.0.20220225083115-4fba1af6aa08 + github.com/nspcc-dev/tzhash v1.5.2 github.com/panjf2000/ants/v2 v2.4.0 github.com/paulmach/orb v0.2.2 github.com/prometheus/client_golang v1.11.0 diff --git a/go.sum b/go.sum index 5b253f9a4ad86b9d4ad2c72a188ecd175e5c1c0b..60588c956eb9301faa4717e96602932adf0f30b2 100644 GIT binary patch delta 352 zcmex!hxObY)(xz^Mn-xD3K@o072$;uRry6O*_PpMz9A*y0p`i&F>{ zA%>}@RfXAw<`t2X8@t3Ozw2e2{Jn>d8K`~p!d`yna6>%z(m6^8j4n>dGMm?s8V=<6G1m<9yp z_!YSY=M_}CmcuQRoyZYOtR4F4`FgqeDIlx#!^4aX9Rodz%_4n5vV7BhvwWkk$dyJiQJ3^Mj*}ZrJjKS{*gu%1v#csu7(9k rp-ENRfhpb*Vg4bNRY^$|!C_?vc}{7GmKI=lewf6=Xteq7r25+cr_6KW delta 384 zcmaivNlwCG0EJ@=ZZI(>Cc4xEwExsnaaRGMg>=A{!5!uAz|a&ZwXiboj0rK&6LT**$=^ZM&yAy;1)b2%z7PrGeho-HD)*Mhz?1VN_S;a$F!TE}Oeu N!Rl}itv^=}U%$5*c+~&^ diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index b833fa3cf..9843bfb37 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -12,18 +12,18 @@ import ( // Client is an interface of NeoFS storage // node's client. type Client interface { - AnnounceContainerUsedSpace(context.Context, client.AnnounceSpacePrm) (*client.AnnounceSpaceRes, error) + ContainerAnnounceUsedSpace(context.Context, client.PrmAnnounceSpace) (*client.ResAnnounceSpace, error) - PutObject(context.Context, *client.PutObjectParams, ...client.CallOption) (*client.ObjectPutRes, error) - DeleteObject(context.Context, *client.DeleteObjectParams, ...client.CallOption) (*client.ObjectDeleteRes, error) - GetObject(context.Context, *client.GetObjectParams, ...client.CallOption) (*client.ObjectGetRes, error) - HeadObject(context.Context, *client.ObjectHeaderParams, ...client.CallOption) (*client.ObjectHeadRes, error) - SearchObjects(context.Context, *client.SearchObjectParams, ...client.CallOption) (*client.ObjectSearchRes, error) - ObjectPayloadRangeData(context.Context, *client.RangeDataParams, ...client.CallOption) (*client.ObjectRangeRes, error) - HashObjectPayloadRanges(context.Context, *client.RangeChecksumParams, ...client.CallOption) (*client.ObjectRangeHashRes, error) + ObjectPutInit(context.Context, client.PrmObjectPutInit) (*client.ObjectWriter, error) + ObjectDelete(context.Context, client.PrmObjectDelete) (*client.ResObjectDelete, error) + ObjectGetInit(context.Context, client.PrmObjectGet) (*client.ObjectReader, error) + ObjectHead(context.Context, client.PrmObjectHead) (*client.ResObjectHead, error) + ObjectSearchInit(context.Context, client.PrmObjectSearch) (*client.ObjectListReader, error) + ObjectRangeInit(context.Context, client.PrmObjectRange) (*client.ObjectRangeReader, error) + ObjectHash(context.Context, client.PrmObjectHash) (*client.ResObjectHash, error) - AnnounceLocalTrust(context.Context, client.AnnounceLocalTrustPrm) (*client.AnnounceLocalTrustRes, error) - AnnounceIntermediateTrust(context.Context, client.AnnounceIntermediateTrustPrm) (*client.AnnounceIntermediateTrustRes, error) + AnnounceLocalTrust(context.Context, client.PrmAnnounceLocalTrust) (*client.ResAnnounceLocalTrust, error) + AnnounceIntermediateTrust(context.Context, client.PrmAnnounceIntermediateTrust) (*client.ResAnnounceIntermediateTrust, error) Raw() *rawclient.Client diff --git a/pkg/innerring/internal/client/client.go b/pkg/innerring/internal/client/client.go index 25f15afa6..2d3803245 100644 --- a/pkg/innerring/internal/client/client.go +++ b/pkg/innerring/internal/client/client.go @@ -3,7 +3,9 @@ package neofsapiclient import ( "context" "crypto/ecdsa" + "errors" "fmt" + "io" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup" @@ -46,12 +48,12 @@ func (x *SearchSGPrm) SetContainerID(id *cid.ID) { // SearchSGRes groups resulting values of SearchSG operation. type SearchSGRes struct { - cliRes *client.ObjectSearchRes + cliRes []*oid.ID } // IDList returns list of IDs of storage groups in container. func (x SearchSGRes) IDList() []*oid.ID { - return x.cliRes.IDList() + return x.cliRes } var sgFilter = storagegroup.SearchQuery() @@ -59,21 +61,49 @@ var sgFilter = storagegroup.SearchQuery() // SearchSG lists objects of storage group type in the container. // // Returns any error prevented the operation from completing correctly in error return. -func (x Client) SearchSG(prm SearchSGPrm) (res SearchSGRes, err error) { - var cliPrm client.SearchObjectParams +func (x Client) SearchSG(prm SearchSGPrm) (*SearchSGRes, error) { + var cliPrm client.PrmObjectSearch - cliPrm.WithContainerID(prm.cnrID) - cliPrm.WithSearchFilters(sgFilter) + cliPrm.InContainer(*prm.cnrID) + cliPrm.SetFilters(sgFilter) - res.cliRes, err = x.c.SearchObjects(prm.ctx, &cliPrm, - client.WithKey(x.key), - ) - if err == nil { - // pull out an error from status - err = apistatus.ErrFromStatus(res.cliRes.Status()) + rdr, err := x.c.ObjectSearchInit(prm.ctx, cliPrm) + if err != nil { + return nil, fmt.Errorf("init object search: %w", err) } - return + rdr.UseKey(*x.key) + + buf := make([]oid.ID, 10) + var list []*oid.ID + var n int + var ok bool + + for { + n, ok = rdr.Read(buf) + if !ok { + break + } + + for i := 0; i < n; i++ { + v := buf[i] + list = append(list, &v) + } + } + + res, err := rdr.Close() + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.Status()) + } + + if err != nil { + return nil, fmt.Errorf("read object list: %w", err) + } + + return &SearchSGRes{ + cliRes: list, + }, nil } // GetObjectPrm groups parameters of GetObject operation. @@ -83,31 +113,59 @@ type GetObjectPrm struct { // GetObjectRes groups resulting values of GetObject operation. type GetObjectRes struct { - cliRes *client.ObjectGetRes + obj *object.Object } // Object returns received object. func (x GetObjectRes) Object() *object.Object { - return x.cliRes.Object() + return x.obj } // GetObject reads the object by address. // // Returns any error prevented the operation from completing correctly in error return. -func (x Client) GetObject(prm GetObjectPrm) (res GetObjectRes, err error) { - var cliPrm client.GetObjectParams +func (x Client) GetObject(prm GetObjectPrm) (*GetObjectRes, error) { + var cliPrm client.PrmObjectGet - cliPrm.WithAddress(prm.objAddr) - - res.cliRes, err = x.c.GetObject(prm.ctx, &cliPrm, - client.WithKey(x.key), - ) - if err == nil { - // pull out an error from status - err = apistatus.ErrFromStatus(res.cliRes.Status()) + if id := prm.objAddr.ContainerID(); id != nil { + cliPrm.FromContainer(*id) } - return + if id := prm.objAddr.ObjectID(); id != nil { + cliPrm.ByID(*id) + } + + rdr, err := x.c.ObjectGetInit(prm.ctx, cliPrm) + if err == nil { + return nil, fmt.Errorf("init object search: %w", err) + } + + rdr.UseKey(*x.key) + + var obj object.Object + + if !rdr.ReadHeader(&obj) { + res, err := rdr.Close() + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.Status()) + } + + return nil, fmt.Errorf("read object header: %w", err) + } + + buf := make([]byte, obj.PayloadSize()) + + _, err = rdr.Read(buf) + if err != nil && !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("read payload: %w", err) + } + + object.NewRawFrom(&obj).SetPayload(buf) + + return &GetObjectRes{ + obj: &obj, + }, nil } // HeadObjectPrm groups parameters of HeadObject operation. @@ -116,7 +174,7 @@ type HeadObjectPrm struct { raw bool - ttl uint32 + local bool } // SetRawFlag sets flag of raw request. @@ -126,40 +184,61 @@ func (x *HeadObjectPrm) SetRawFlag() { // SetTTL sets request TTL value. func (x *HeadObjectPrm) SetTTL(ttl uint32) { - x.ttl = ttl + x.local = ttl < 2 } // HeadObjectRes groups resulting values of HeadObject operation. type HeadObjectRes struct { - cliRes *client.ObjectHeadRes + hdr *object.Object } // Header returns received object header. func (x HeadObjectRes) Header() *object.Object { - return x.cliRes.Object() + return x.hdr } // HeadObject reads short object header by address. // // Returns any error prevented the operation from completing correctly in error return. // For raw requests, returns *object.SplitInfoError error if requested object is virtual. -func (x Client) HeadObject(prm HeadObjectPrm) (res HeadObjectRes, err error) { - var cliPrm client.ObjectHeaderParams +func (x Client) HeadObject(prm HeadObjectPrm) (*HeadObjectRes, error) { + var cliPrm client.PrmObjectHead - cliPrm.WithAddress(prm.objAddr) - cliPrm.WithRawFlag(prm.raw) - cliPrm.WithMainFields() - - res.cliRes, err = x.c.HeadObject(prm.ctx, &cliPrm, - client.WithKey(x.key), - client.WithTTL(prm.ttl), - ) - if err == nil { - // pull out an error from status - err = apistatus.ErrFromStatus(res.cliRes.Status()) + if prm.raw { + cliPrm.MarkRaw() } - return + if prm.local { + cliPrm.MarkLocal() + } + + if id := prm.objAddr.ContainerID(); id != nil { + cliPrm.FromContainer(*id) + } + + if id := prm.objAddr.ObjectID(); id != nil { + cliPrm.ByID(*id) + } + + cliRes, err := x.c.ObjectHead(prm.ctx, cliPrm) + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(cliRes.Status()) + } + + if err != nil { + return nil, fmt.Errorf("read object header from NeoFS: %w", err) + } + + var hdr object.Object + + if !cliRes.ReadHeader(&hdr) { + return nil, errors.New("missing object header in the response") + } + + return &HeadObjectRes{ + hdr: &hdr, + }, nil } // GetObjectPayload reads object by address from NeoFS via Client and returns its payload. @@ -231,21 +310,25 @@ func (x HashPayloadRangeRes) Hash() []byte { return x.h } -// HashObjectRange requests to calculate Tillich-Zemor hash of the payload range of the object +// HashPayloadRange requests to calculate Tillich-Zemor hash of the payload range of the object // from the remote server's local storage. // // Returns any error prevented the operation from completing correctly in error return. func (x Client) HashPayloadRange(prm HashPayloadRangePrm) (res HashPayloadRangeRes, err error) { - var cliPrm client.RangeChecksumParams + var cliPrm client.PrmObjectHash - cliPrm.WithAddress(prm.objAddr) - cliPrm.WithRangeList(prm.rng) - cliPrm.TZ() + if id := prm.objAddr.ContainerID(); id != nil { + cliPrm.FromContainer(*id) + } - cliRes, err := x.c.HashObjectPayloadRanges(prm.ctx, &cliPrm, - client.WithKey(x.key), - client.WithTTL(1), - ) + if id := prm.objAddr.ObjectID(); id != nil { + cliPrm.ByID(*id) + } + + cliPrm.SetRangeList(prm.rng.GetOffset(), prm.rng.GetLength()) + cliPrm.TillichZemorAlgo() + + cliRes, err := x.c.ObjectHash(prm.ctx, cliPrm) if err == nil { // pull out an error from status err = apistatus.ErrFromStatus(cliRes.Status()) @@ -253,9 +336,9 @@ func (x Client) HashPayloadRange(prm HashPayloadRangePrm) (res HashPayloadRangeR return } - hs := cliRes.Hashes() + hs := cliRes.Checksums() if ln := len(hs); ln != 1 { - err = fmt.Errorf("wrong number of hashes %d", ln) + err = fmt.Errorf("wrong number of checksums %d", ln) } else { res.h = hs[0] } diff --git a/pkg/innerring/processors/neofs/process_bind.go b/pkg/innerring/processors/neofs/process_bind.go index e46dbd40d..20a6034af 100644 --- a/pkg/innerring/processors/neofs/process_bind.go +++ b/pkg/innerring/processors/neofs/process_bind.go @@ -4,12 +4,11 @@ import ( "crypto/elliptic" "fmt" - "github.com/mr-tron/base58" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/morph/client/neofsid" "github.com/nspcc-dev/neofs-node/pkg/morph/event/neofs" + "github.com/nspcc-dev/neofs-sdk-go/owner" "go.uber.org/zap" ) @@ -74,7 +73,6 @@ func (np *Processor) checkBindCommon(e *bindCommonContext) error { func (np *Processor) approveBindCommon(e *bindCommonContext) { // calculate wallet address - // TODO: nspcc-dev/neofs-sdk-go#134 implement some utilities in API Go lib to do it scriptHash := e.User() u160, err := util.Uint160DecodeBytesBE(scriptHash) @@ -86,17 +84,8 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) { return } - wallet, err := base58.Decode(address.Uint160ToString(u160)) - if err != nil { - np.log.Error("could not decode wallet address", - zap.String("error", err.Error()), - ) - - return - } - prm := neofsid.CommonBindPrm{} - prm.SetOwnerID(wallet) + prm.SetOwnerID(owner.ScriptHashToIDBytes(u160)) prm.SetKeys(e.Keys()) prm.SetHash(e.bindCommon.TxHash()) diff --git a/pkg/innerring/processors/settlement/basic/context.go b/pkg/innerring/processors/settlement/basic/context.go index 075300009..e73eeddb1 100644 --- a/pkg/innerring/processors/settlement/basic/context.go +++ b/pkg/innerring/processors/settlement/basic/context.go @@ -4,7 +4,6 @@ import ( "math/big" "sync" - "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/common" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" @@ -60,10 +59,8 @@ type ( ) func NewIncomeSettlementContext(p *IncomeSettlementContextPrms) (*IncomeSettlementContext, error) { - bankingAccount, err := bankOwnerID() - if err != nil { - return nil, err // should never happen - } + bankingAccount := owner.NewID() + bankingAccount.SetScriptHash(util.Uint160{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}) return &IncomeSettlementContext{ log: p.Log, @@ -79,17 +76,3 @@ func NewIncomeSettlementContext(p *IncomeSettlementContextPrms) (*IncomeSettleme distributeTable: NewNodeSizeTable(), }, nil } - -func bankOwnerID() (*owner.ID, error) { - u := util.Uint160{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, - 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} - - o := owner.NewID() - // TODO: nspcc-dev/neofs-sdk-go#134 use `SetScriptHash` method. - err := o.Parse(address.Uint160ToString(u)) - if err != nil { - return nil, err - } - - return o, nil -} diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index 63e73e6dc..c4332ddb7 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -13,7 +13,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/audit" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" - "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" @@ -75,9 +74,6 @@ func (c *ClientCache) getSG(ctx context.Context, addr *addressSDK.Address, nm *n return nil, fmt.Errorf("can't build object placement: %w", err) } - getParams := new(client.GetObjectParams) - getParams.WithAddress(addr) - var info clientcore.NodeInfo for _, node := range placement.FlattenNodes(nodes) { diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index d4fb65c40..294434b01 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -79,79 +79,79 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie return firstErr } -func (x *multiClient) PutObject(ctx context.Context, p *client.PutObjectParams, opts ...client.CallOption) (res *client.ObjectPutRes, err error) { +func (x *multiClient) ObjectPutInit(ctx context.Context, p client.PrmObjectPutInit) (res *client.ObjectWriter, err error) { err = x.iterateClients(ctx, func(c clientcore.Client) error { - res, err = c.PutObject(ctx, p, opts...) + res, err = c.ObjectPutInit(ctx, p) return err }) return } -func (x *multiClient) AnnounceContainerUsedSpace(ctx context.Context, prm client.AnnounceSpacePrm) (res *client.AnnounceSpaceRes, err error) { +func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, prm client.PrmAnnounceSpace) (res *client.ResAnnounceSpace, err error) { err = x.iterateClients(ctx, func(c clientcore.Client) error { - res, err = c.AnnounceContainerUsedSpace(ctx, prm) + res, err = c.ContainerAnnounceUsedSpace(ctx, prm) return err }) return } -func (x *multiClient) DeleteObject(ctx context.Context, p *client.DeleteObjectParams, opts ...client.CallOption) (res *client.ObjectDeleteRes, err error) { +func (x *multiClient) ObjectDelete(ctx context.Context, p client.PrmObjectDelete) (res *client.ResObjectDelete, err error) { err = x.iterateClients(ctx, func(c clientcore.Client) error { - res, err = c.DeleteObject(ctx, p, opts...) + res, err = c.ObjectDelete(ctx, p) return err }) return } -func (x *multiClient) GetObject(ctx context.Context, p *client.GetObjectParams, opts ...client.CallOption) (res *client.ObjectGetRes, err error) { +func (x *multiClient) ObjectGetInit(ctx context.Context, p client.PrmObjectGet) (res *client.ObjectReader, err error) { err = x.iterateClients(ctx, func(c clientcore.Client) error { - res, err = c.GetObject(ctx, p, opts...) + res, err = c.ObjectGetInit(ctx, p) return err }) return } -func (x *multiClient) ObjectPayloadRangeData(ctx context.Context, p *client.RangeDataParams, opts ...client.CallOption) (res *client.ObjectRangeRes, err error) { +func (x *multiClient) ObjectRangeInit(ctx context.Context, p client.PrmObjectRange) (res *client.ObjectRangeReader, err error) { err = x.iterateClients(ctx, func(c clientcore.Client) error { - res, err = c.ObjectPayloadRangeData(ctx, p, opts...) + res, err = c.ObjectRangeInit(ctx, p) return err }) return } -func (x *multiClient) HeadObject(ctx context.Context, p *client.ObjectHeaderParams, opts ...client.CallOption) (res *client.ObjectHeadRes, err error) { +func (x *multiClient) ObjectHead(ctx context.Context, p client.PrmObjectHead) (res *client.ResObjectHead, err error) { err = x.iterateClients(ctx, func(c clientcore.Client) error { - res, err = c.HeadObject(ctx, p, opts...) + res, err = c.ObjectHead(ctx, p) return err }) return } -func (x *multiClient) HashObjectPayloadRanges(ctx context.Context, p *client.RangeChecksumParams, opts ...client.CallOption) (res *client.ObjectRangeHashRes, err error) { +func (x *multiClient) ObjectHash(ctx context.Context, p client.PrmObjectHash) (res *client.ResObjectHash, err error) { err = x.iterateClients(ctx, func(c clientcore.Client) error { - res, err = c.HashObjectPayloadRanges(ctx, p, opts...) + res, err = c.ObjectHash(ctx, p) return err }) return } -func (x *multiClient) SearchObjects(ctx context.Context, p *client.SearchObjectParams, opts ...client.CallOption) (res *client.ObjectSearchRes, err error) { +func (x *multiClient) ObjectSearchInit(ctx context.Context, p client.PrmObjectSearch) (res *client.ObjectListReader, err error) { err = x.iterateClients(ctx, func(c clientcore.Client) error { - res, err = c.SearchObjects(ctx, p, opts...) + res, err = c.ObjectSearchInit(ctx, p) return err }) return } -func (x *multiClient) AnnounceLocalTrust(ctx context.Context, prm client.AnnounceLocalTrustPrm) (res *client.AnnounceLocalTrustRes, err error) { +func (x *multiClient) AnnounceLocalTrust(ctx context.Context, prm client.PrmAnnounceLocalTrust) (res *client.ResAnnounceLocalTrust, err error) { err = x.iterateClients(ctx, func(c clientcore.Client) error { res, err = c.AnnounceLocalTrust(ctx, prm) return err @@ -160,7 +160,7 @@ func (x *multiClient) AnnounceLocalTrust(ctx context.Context, prm client.Announc return } -func (x *multiClient) AnnounceIntermediateTrust(ctx context.Context, prm client.AnnounceIntermediateTrustPrm) (res *client.AnnounceIntermediateTrustRes, err error) { +func (x *multiClient) AnnounceIntermediateTrust(ctx context.Context, prm client.PrmAnnounceIntermediateTrust) (res *client.ResAnnounceIntermediateTrust, err error) { err = x.iterateClients(ctx, func(c clientcore.Client) error { res, err = c.AnnounceIntermediateTrust(ctx, prm) return err diff --git a/pkg/services/audit/auditor/pop.go b/pkg/services/audit/auditor/pop.go index 8ada03b5c..fec35d761 100644 --- a/pkg/services/audit/auditor/pop.go +++ b/pkg/services/audit/auditor/pop.go @@ -1,15 +1,15 @@ package auditor import ( - "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/netmap" oidSDK "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/tzhash/tz" "go.uber.org/zap" ) const ( hashRangeNumber = 4 - minGamePayloadSize = hashRangeNumber * client.TZSize + minGamePayloadSize = hashRangeNumber * tz.Size ) func (c *Context) executePoP() { diff --git a/pkg/services/container/executor.go b/pkg/services/container/executor.go index 011723b75..276232bbd 100644 --- a/pkg/services/container/executor.go +++ b/pkg/services/container/executor.go @@ -12,7 +12,7 @@ import ( type ContextWithToken struct { context.Context - SessionToken *session.SessionToken + SessionToken *session.Token } type ServiceExecutor interface { @@ -40,7 +40,7 @@ func NewExecutionService(exec ServiceExecutor) Server { func contextWithTokenFromRequest(ctx context.Context, req interface { GetMetaHeader() *session.RequestMetaHeader }) ContextWithToken { - var tok *session.SessionToken + var tok *session.Token for meta := req.GetMetaHeader(); meta != nil; meta = meta.GetOrigin() { tok = meta.GetSessionToken() diff --git a/pkg/services/container/morph/executor_test.go b/pkg/services/container/morph/executor_test.go index 857fd1e9f..28b6eccd7 100644 --- a/pkg/services/container/morph/executor_test.go +++ b/pkg/services/container/morph/executor_test.go @@ -18,27 +18,27 @@ import ( type mock struct{} -func (m mock) Put(c *containerSDK.Container) (*cid.ID, error) { +func (m mock) Put(_ *containerSDK.Container) (*cid.ID, error) { return new(cid.ID), nil } -func (m mock) Delete(witness containerCore.RemovalWitness) error { +func (m mock) Delete(_ containerCore.RemovalWitness) error { return nil } -func (m mock) PutEACL(table *eacl.Table) error { +func (m mock) PutEACL(_ *eacl.Table) error { return nil } -func (m mock) Get(id *cid.ID) (*containerSDK.Container, error) { +func (m mock) Get(_ *cid.ID) (*containerSDK.Container, error) { panic("implement me") } -func (m mock) GetEACL(id *cid.ID) (*eacl.Table, error) { +func (m mock) GetEACL(_ *cid.ID) (*eacl.Table, error) { panic("implement me") } -func (m mock) List(id *owner.ID) ([]*cid.ID, error) { +func (m mock) List(_ *owner.ID) ([]*cid.ID, error) { panic("implement me") } @@ -90,11 +90,11 @@ func TestInvalidToken(t *testing.T) { } } -func generateToken(ctx session.SessionTokenContext) *session.SessionToken { - body := new(session.SessionTokenBody) +func generateToken(ctx session.TokenContext) *session.Token { + body := new(session.TokenBody) body.SetContext(ctx) - tok := new(session.SessionToken) + tok := new(session.Token) tok.SetBody(body) return tok diff --git a/pkg/services/object/acl/acl.go b/pkg/services/object/acl/acl.go index de6ee4ccd..72aed16de 100644 --- a/pkg/services/object/acl/acl.go +++ b/pkg/services/object/acl/acl.go @@ -140,7 +140,7 @@ func New(opts ...Option) Service { } func (b Service) Get(request *object.GetRequest, stream objectSvc.GetObjectStream) error { - cid, err := getContainerIDFromRequest(request) + idCnr, err := getContainerIDFromRequest(request) if err != nil { return err } @@ -154,7 +154,7 @@ func (b Service) Get(request *object.GetRequest, stream objectSvc.GetObjectStrea src: request, } - reqInfo, err := b.findRequestInfo(req, cid, eaclSDK.OperationGet) + reqInfo, err := b.findRequestInfo(req, idCnr, eaclSDK.OperationGet) if err != nil { return err } @@ -188,7 +188,7 @@ func (b Service) Put(ctx context.Context) (objectSvc.PutObjectStream, error) { func (b Service) Head( ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { - cid, err := getContainerIDFromRequest(request) + idCnr, err := getContainerIDFromRequest(request) if err != nil { return nil, err } @@ -202,7 +202,7 @@ func (b Service) Head( src: request, } - reqInfo, err := b.findRequestInfo(req, cid, eaclSDK.OperationHead) + reqInfo, err := b.findRequestInfo(req, idCnr, eaclSDK.OperationHead) if err != nil { return nil, err } @@ -264,7 +264,7 @@ func (b Service) Search(request *object.SearchRequest, stream objectSvc.SearchSt func (b Service) Delete( ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) { - cid, err := getContainerIDFromRequest(request) + idCnr, err := getContainerIDFromRequest(request) if err != nil { return nil, err } @@ -278,7 +278,7 @@ func (b Service) Delete( src: request, } - reqInfo, err := b.findRequestInfo(req, cid, eaclSDK.OperationDelete) + reqInfo, err := b.findRequestInfo(req, idCnr, eaclSDK.OperationDelete) if err != nil { return nil, err } @@ -296,7 +296,7 @@ func (b Service) Delete( } func (b Service) GetRange(request *object.GetRangeRequest, stream objectSvc.GetObjectRangeStream) error { - cid, err := getContainerIDFromRequest(request) + idCnr, err := getContainerIDFromRequest(request) if err != nil { return err } @@ -310,7 +310,7 @@ func (b Service) GetRange(request *object.GetRangeRequest, stream objectSvc.GetO src: request, } - reqInfo, err := b.findRequestInfo(req, cid, eaclSDK.OperationRange) + reqInfo, err := b.findRequestInfo(req, idCnr, eaclSDK.OperationRange) if err != nil { return err } @@ -334,7 +334,7 @@ func (b Service) GetRange(request *object.GetRangeRequest, stream objectSvc.GetO func (b Service) GetRangeHash( ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { - cid, err := getContainerIDFromRequest(request) + idCnr, err := getContainerIDFromRequest(request) if err != nil { return nil, err } @@ -348,7 +348,7 @@ func (b Service) GetRangeHash( src: request, } - reqInfo, err := b.findRequestInfo(req, cid, eaclSDK.OperationRangeHash) + reqInfo, err := b.findRequestInfo(req, idCnr, eaclSDK.OperationRangeHash) if err != nil { return nil, err } @@ -373,7 +373,7 @@ func (p putStreamBasicChecker) Send(request *object.PutRequest) error { part := body.GetObjectPart() if part, ok := part.(*object.PutObjectPartInit); ok { - cid, err := getContainerIDFromRequest(request) + idCnr, err := getContainerIDFromRequest(request) if err != nil { return err } @@ -392,7 +392,7 @@ func (p putStreamBasicChecker) Send(request *object.PutRequest) error { src: request, } - reqInfo, err := p.source.findRequestInfo(req, cid, eaclSDK.OperationPut) + reqInfo, err := p.source.findRequestInfo(req, idCnr, eaclSDK.OperationPut) if err != nil { return err } @@ -507,7 +507,7 @@ func getContainerIDFromRequest(req interface{}) (id *cid.ID, err error) { } } -func useObjectIDFromSession(req *requestInfo, token *session.SessionToken) { +func useObjectIDFromSession(req *requestInfo, token *session.Token) { if token == nil { return } @@ -795,7 +795,7 @@ func originalBearerToken(header *session.RequestMetaHeader) *bearer.BearerToken // originalSessionToken goes down to original request meta header and fetches // session token from there. -func originalSessionToken(header *session.RequestMetaHeader) *session.SessionToken { +func originalSessionToken(header *session.RequestMetaHeader) *session.Token { for header.GetOrigin() != nil { header = header.GetOrigin() } diff --git a/pkg/services/object/acl/acl_test.go b/pkg/services/object/acl/acl_test.go index a0dad6969..5ee3c528f 100644 --- a/pkg/services/object/acl/acl_test.go +++ b/pkg/services/object/acl/acl_test.go @@ -24,7 +24,7 @@ func TestOriginalTokens(t *testing.T) { } } -func testGenerateMetaHeader(depth uint32, b *acl.BearerToken, s *session.SessionToken) *session.RequestMetaHeader { +func testGenerateMetaHeader(depth uint32, b *acl.BearerToken, s *session.Token) *session.RequestMetaHeader { metaHeader := new(session.RequestMetaHeader) metaHeader.SetBearerToken(b) metaHeader.SetSessionToken(s) diff --git a/pkg/services/object/acl/classifier.go b/pkg/services/object/acl/classifier.go index db25f215e..c1acf0bc0 100644 --- a/pkg/services/object/acl/classifier.go +++ b/pkg/services/object/acl/classifier.go @@ -28,7 +28,7 @@ type ( metaWithToken struct { vheader *session.RequestVerificationHeader - token *session.SessionToken + token *session.Token bearer *bearer.BearerToken src interface{} } @@ -188,7 +188,7 @@ func lookupKeyInContainer( return false, nil } -func ownerFromToken(token *session.SessionToken) (*owner.ID, *keys.PublicKey, error) { +func ownerFromToken(token *session.Token) (*owner.ID, *keys.PublicKey, error) { // 1. First check signature of session token. signWrapper := v2signature.StableMarshalerWrapper{SM: token.GetBody()} if err := sigutil.VerifyDataWithSource(signWrapper, func() (key, sig []byte) { diff --git a/pkg/services/object/internal/client/client.go b/pkg/services/object/internal/client/client.go index 2688b8f07..672c46937 100644 --- a/pkg/services/object/internal/client/client.go +++ b/pkg/services/object/internal/client/client.go @@ -3,9 +3,10 @@ package internal import ( "context" "crypto/ecdsa" - "strconv" + "errors" + "fmt" + "io" - session2 "github.com/nspcc-dev/neofs-api-go/v2/session" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-sdk-go/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" @@ -22,7 +23,13 @@ type commonPrm struct { ctx context.Context - opts []client.CallOption + key *ecdsa.PrivateKey + + tokenSession *session.Token + + tokenBearer *token.BearerToken + + local bool } // SetClient sets base client for NeoFS API communication. @@ -43,35 +50,33 @@ func (x *commonPrm) SetContext(ctx context.Context) { // // Required parameter. func (x *commonPrm) SetPrivateKey(key *ecdsa.PrivateKey) { - x.opts = append(x.opts, client.WithKey(key)) + x.key = key } // SetSessionToken sets token of the session within which request should be sent. // // By default the request will be sent outside the session. func (x *commonPrm) SetSessionToken(tok *session.Token) { - x.opts = append(x.opts, client.WithSession(tok)) + x.tokenSession = tok } // SetBearerToken sets bearer token to be attached to the request. // // By default token is not attached to the request. func (x *commonPrm) SetBearerToken(tok *token.BearerToken) { - x.opts = append(x.opts, client.WithBearer(tok)) + x.tokenBearer = tok } // SetTTL sets time-to-live call option. func (x *commonPrm) SetTTL(ttl uint32) { - x.opts = append(x.opts, client.WithTTL(ttl)) + x.local = ttl < 2 } // SetXHeaders sets request X-Headers. // // By default X-Headers will not be attached to the request. -func (x *commonPrm) SetXHeaders(xhdrs []*session.XHeader) { - for _, xhdr := range xhdrs { - x.opts = append(x.opts, client.WithXHeader(xhdr)) - } +func (x *commonPrm) SetXHeaders(_ []*session.XHeader) { + // FIXME: (neofs-node#1194) not supported by client } type readPrmCommon struct { @@ -81,43 +86,45 @@ type readPrmCommon struct { // SetNetmapEpoch sets the epoch number to be used to locate the object. // // By default current epoch on the server will be used. -func (x *readPrmCommon) SetNetmapEpoch(epoch uint64) { - xNetmapEpoch := session.NewXHeader() - xNetmapEpoch.SetKey(session2.XHeaderNetmapEpoch) - xNetmapEpoch.SetValue(strconv.FormatUint(epoch, 10)) - - x.opts = append(x.opts, client.WithXHeader(xNetmapEpoch)) +func (x *readPrmCommon) SetNetmapEpoch(_ uint64) { + // FIXME: (neofs-node#1194) not supported by client } // GetObjectPrm groups parameters of GetObject operation. type GetObjectPrm struct { readPrmCommon - cliPrm client.GetObjectParams + cliPrm client.PrmObjectGet } // SetRawFlag sets raw flag of the request. // // By default request will not be raw. func (x *GetObjectPrm) SetRawFlag() { - x.cliPrm.WithRawFlag(true) + x.cliPrm.MarkRaw() } // SetAddress sets object address. // // Required parameter. func (x *GetObjectPrm) SetAddress(addr *addressSDK.Address) { - x.cliPrm.WithAddress(addr) + if id := addr.ContainerID(); id != nil { + x.cliPrm.FromContainer(*id) + } + + if id := addr.ObjectID(); id != nil { + x.cliPrm.ByID(*id) + } } // GetObjectRes groups resulting values of GetObject operation. type GetObjectRes struct { - cliRes *client.ObjectGetRes + obj *object.Object } // Object returns requested object. func (x GetObjectRes) Object() *object.Object { - return x.cliRes.Object() + return x.obj } // GetObject reads the object by address. @@ -128,47 +135,91 @@ func (x GetObjectRes) Object() *object.Object { // Returns: // error of type *object.SplitInfoError if object if raw flag is set and requested object is virtual; // object.ErrAlreadyRemoved error if requested object is marked to be removed. -func GetObject(prm GetObjectPrm) (res GetObjectRes, err error) { - res.cliRes, err = prm.cli.GetObject(prm.ctx, &prm.cliPrm, prm.opts...) +func GetObject(prm GetObjectPrm) (*GetObjectRes, error) { + if prm.tokenSession != nil { + prm.cliPrm.WithinSession(*prm.tokenSession) + } + + if prm.tokenBearer != nil { + prm.cliPrm.WithBearerToken(*prm.tokenBearer) + } + + if prm.local { + prm.cliPrm.MarkLocal() + } + + rdr, err := prm.cli.ObjectGetInit(prm.ctx, prm.cliPrm) if err == nil { - // pull out an error from status - err = apistatus.ErrFromStatus(res.cliRes.Status()) + return nil, fmt.Errorf("init object reading: %w", err) + } + + if prm.key != nil { + rdr.UseKey(*prm.key) + } + + var obj object.Object + + if !rdr.ReadHeader(&obj) { + res, err := rdr.Close() + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.Status()) + } + + return nil, fmt.Errorf("read object header: %w", err) + } + + buf := make([]byte, obj.PayloadSize()) + + _, err = rdr.Read(buf) + if err != nil && !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("read payload: %w", err) } // FIXME: #1158 object.ErrAlreadyRemoved never returns - return + object.NewRawFrom(&obj).SetPayload(buf) + + return &GetObjectRes{ + obj: &obj, + }, nil } // HeadObjectPrm groups parameters of HeadObject operation. type HeadObjectPrm struct { readPrmCommon - cliPrm client.ObjectHeaderParams + cliPrm client.PrmObjectHead } // SetRawFlag sets raw flag of the request. // // By default request will not be raw. func (x *HeadObjectPrm) SetRawFlag() { - x.cliPrm.WithRawFlag(true) + x.cliPrm.MarkRaw() } // SetAddress sets object address. // // Required parameter. func (x *HeadObjectPrm) SetAddress(addr *addressSDK.Address) { - x.cliPrm.WithAddress(addr) + if id := addr.ContainerID(); id != nil { + x.cliPrm.FromContainer(*id) + } + + if id := addr.ObjectID(); id != nil { + x.cliPrm.ByID(*id) + } } -// GetObjectRes groups resulting values of GetObject operation. +// HeadObjectRes groups resulting values of GetObject operation. type HeadObjectRes struct { - cliRes *client.ObjectHeadRes + hdr *object.Object } // Header returns requested object header. func (x HeadObjectRes) Header() *object.Object { - return x.cliRes.Object() + return x.hdr } // HeadObject reads object header by address. @@ -179,54 +230,87 @@ func (x HeadObjectRes) Header() *object.Object { // Returns: // error of type *object.SplitInfoError if object if raw flag is set and requested object is virtual; // object.ErrAlreadyRemoved error if requested object is marked to be removed. -func HeadObject(prm HeadObjectPrm) (res HeadObjectRes, err error) { - res.cliRes, err = prm.cli.HeadObject(prm.ctx, &prm.cliPrm, prm.opts...) +func HeadObject(prm HeadObjectPrm) (*HeadObjectRes, error) { + if prm.local { + prm.cliPrm.MarkLocal() + } + + if prm.tokenSession != nil { + prm.cliPrm.WithinSession(*prm.tokenSession) + } + + if prm.tokenBearer != nil { + prm.cliPrm.WithBearerToken(*prm.tokenBearer) + } + + cliRes, err := prm.cli.ObjectHead(prm.ctx, prm.cliPrm) if err == nil { // pull out an error from status - err = apistatus.ErrFromStatus(res.cliRes.Status()) + err = apistatus.ErrFromStatus(cliRes.Status()) + } + + if err != nil { + return nil, fmt.Errorf("read object header from NeoFS: %w", err) } // FIXME: #1158 object.ErrAlreadyRemoved never returns - return + var hdr object.Object + + if !cliRes.ReadHeader(&hdr) { + return nil, errors.New("missing object header in the response") + } + + return &HeadObjectRes{ + hdr: &hdr, + }, nil } // PayloadRangePrm groups parameters of PayloadRange operation. type PayloadRangePrm struct { readPrmCommon - cliPrm client.RangeDataParams + ln uint64 + + cliPrm client.PrmObjectRange } // SetRawFlag sets raw flag of the request. // // By default request will not be raw. func (x *PayloadRangePrm) SetRawFlag() { - x.cliPrm.WithRaw(true) + x.cliPrm.MarkRaw() } // SetAddress sets object address. // // Required parameter. func (x *PayloadRangePrm) SetAddress(addr *addressSDK.Address) { - x.cliPrm.WithAddress(addr) + if id := addr.ContainerID(); id != nil { + x.cliPrm.FromContainer(*id) + } + + if id := addr.ObjectID(); id != nil { + x.cliPrm.ByID(*id) + } } // SetRange range of the object payload to be read. // // Required parameter. func (x *PayloadRangePrm) SetRange(rng *object.Range) { - x.cliPrm.WithRange(rng) + x.cliPrm.SetOffset(rng.GetOffset()) + x.ln = rng.GetLength() } // PayloadRangeRes groups resulting values of GetObject operation. type PayloadRangeRes struct { - cliRes *client.ObjectRangeRes + data []byte } // PayloadRange returns data of the requested payload range. func (x PayloadRangeRes) PayloadRange() []byte { - return x.cliRes.Data() + return x.data } // PayloadRange reads object payload range by address. @@ -237,40 +321,62 @@ func (x PayloadRangeRes) PayloadRange() []byte { // Returns: // error of type *object.SplitInfoError if object if raw flag is set and requested object is virtual; // object.ErrAlreadyRemoved error if requested object is marked to be removed. -func PayloadRange(prm PayloadRangePrm) (res PayloadRangeRes, err error) { - res.cliRes, err = prm.cli.ObjectPayloadRangeData(prm.ctx, &prm.cliPrm, prm.opts...) - if err == nil { - // pull out an error from status - err = apistatus.ErrFromStatus(res.cliRes.Status()) +func PayloadRange(prm PayloadRangePrm) (*PayloadRangeRes, error) { + if prm.local { + prm.cliPrm.MarkLocal() + } + + if prm.tokenSession != nil { + prm.cliPrm.WithinSession(*prm.tokenSession) + } + + if prm.tokenBearer != nil { + prm.cliPrm.WithBearerToken(*prm.tokenBearer) + } + + prm.cliPrm.SetLength(prm.ln) + + rdr, err := prm.cli.ObjectRangeInit(prm.ctx, prm.cliPrm) + if err != nil { + return nil, fmt.Errorf("init payload reading: %w", err) + } + + data := make([]byte, prm.ln) + + _, err = io.ReadFull(rdr, data) + if err != nil { + return nil, fmt.Errorf("read payload: %w", err) } // FIXME: #1158 object.ErrAlreadyRemoved never returns - return + return &PayloadRangeRes{ + data: data, + }, nil } // PutObjectPrm groups parameters of PutObject operation. type PutObjectPrm struct { commonPrm - cliPrm client.PutObjectParams + obj *object.Object } // SetObject sets object to be stored. // // Required parameter. func (x *PutObjectPrm) SetObject(obj *object.Object) { - x.cliPrm.WithObject(obj) + x.obj = obj } // PutObjectRes groups resulting values of PutObject operation. type PutObjectRes struct { - cliRes *client.ObjectPutRes + id *oidSDK.ID } // ID returns identifier of the stored object. func (x PutObjectRes) ID() *oidSDK.ID { - return x.cliRes.ID() + return x.id } // PutObject saves the object in local storage of the remote node. @@ -278,56 +384,137 @@ func (x PutObjectRes) ID() *oidSDK.ID { // Client, context and key must be set. // // Returns any error prevented the operation from completing correctly in error return. -func PutObject(prm PutObjectPrm) (res PutObjectRes, err error) { - res.cliRes, err = prm.cli.PutObject(prm.ctx, &prm.cliPrm, - append(prm.opts, client.WithTTL(1))..., - ) - if err == nil { - // pull out an error from status - err = apistatus.ErrFromStatus(res.cliRes.Status()) +func PutObject(prm PutObjectPrm) (*PutObjectRes, error) { + var prmCli client.PrmObjectPutInit + + w, err := prm.cli.ObjectPutInit(prm.ctx, prmCli) + if err != nil { + return nil, fmt.Errorf("init object writing on client: %w", err) } - return + w.MarkLocal() + + if prm.key != nil { + w.UseKey(*prm.key) + } + + if prm.tokenSession != nil { + w.WithinSession(*prm.tokenSession) + } + + if prm.tokenBearer != nil { + w.WithBearerToken(*prm.tokenBearer) + } + + if w.WriteHeader(*prm.obj) { + w.WritePayloadChunk(prm.obj.Payload()) + } + + res, err := w.Close() + if err == nil { + err = apistatus.ErrFromStatus(res.Status()) + } + + if err != nil { + return nil, fmt.Errorf("write object via client: %w", err) + } + + var id oidSDK.ID + if !res.ReadStoredObjectID(&id) { + return nil, errors.New("missing identifier in the response") + } + + return &PutObjectRes{ + id: &id, + }, nil } // SearchObjectsPrm groups parameters of SearchObjects operation. type SearchObjectsPrm struct { readPrmCommon - cliPrm client.SearchObjectParams + cliPrm client.PrmObjectSearch } // SetContainerID sets identifier of the container to search the objects. // // Required parameter. func (x *SearchObjectsPrm) SetContainerID(id *cid.ID) { - x.cliPrm.WithContainerID(id) + if id != nil { + x.cliPrm.InContainer(*id) + } } // SetFilters sets search filters. func (x *SearchObjectsPrm) SetFilters(fs object.SearchFilters) { - x.cliPrm.WithSearchFilters(fs) + x.cliPrm.SetFilters(fs) } // SearchObjectsRes groups resulting values of SearchObjects operation. type SearchObjectsRes struct { - cliRes *client.ObjectSearchRes + ids []*oidSDK.ID } // IDList returns identifiers of the matched objects. func (x SearchObjectsRes) IDList() []*oidSDK.ID { - return x.cliRes.IDList() + return x.ids } // SearchObjects selects objects from container which match the filters. // // Returns any error prevented the operation from completing correctly in error return. -func SearchObjects(prm SearchObjectsPrm) (res SearchObjectsRes, err error) { - res.cliRes, err = prm.cli.SearchObjects(prm.ctx, &prm.cliPrm, prm.opts...) - if err == nil { - // pull out an error from status - err = apistatus.ErrFromStatus(res.cliRes.Status()) +func SearchObjects(prm SearchObjectsPrm) (*SearchObjectsRes, error) { + if prm.local { + prm.cliPrm.MarkLocal() } - return + if prm.tokenSession != nil { + prm.cliPrm.WithinSession(*prm.tokenSession) + } + + if prm.tokenBearer != nil { + prm.cliPrm.WithBearerToken(*prm.tokenBearer) + } + + rdr, err := prm.cli.ObjectSearchInit(prm.ctx, prm.cliPrm) + if err != nil { + return nil, fmt.Errorf("init object searching in client: %w", err) + } + + if prm.key != nil { + rdr.UseKey(*prm.key) + } + + buf := make([]oidSDK.ID, 10) + var ids []*oidSDK.ID + var n int + var ok bool + + for { + n, ok = rdr.Read(buf) + if n > 0 { + for i := range buf[:n] { + v := buf[i] + ids = append(ids, &v) + } + } + + if !ok { + break + } + } + + res, err := rdr.Close() + if err == nil { + // pull out an error from status + err = apistatus.ErrFromStatus(res.Status()) + } + + if err != nil { + return nil, fmt.Errorf("read object list: %w", err) + } + + return &SearchObjectsRes{ + ids: ids, + }, nil } diff --git a/pkg/services/object_manager/storagegroup/collect.go b/pkg/services/object_manager/storagegroup/collect.go index 987798eeb..c2ad0bdb6 100644 --- a/pkg/services/object_manager/storagegroup/collect.go +++ b/pkg/services/object_manager/storagegroup/collect.go @@ -4,7 +4,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-sdk-go/checksum" - "github.com/nspcc-dev/neofs-sdk-go/client" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" oidSDK "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -45,7 +44,7 @@ func CollectMembers(r objutil.HeadReceiver, cid *cid.ID, members []*oidSDK.ID) ( } cs := checksum.New() - tzHash := [client.TZSize]byte{} + tzHash := [64]byte{} copy(tzHash[:], sumHash) cs.SetTillichZemor(tzHash) diff --git a/pkg/services/object_manager/transformer/transformer.go b/pkg/services/object_manager/transformer/transformer.go index 315a42f73..00ce03961 100644 --- a/pkg/services/object_manager/transformer/transformer.go +++ b/pkg/services/object_manager/transformer/transformer.go @@ -39,8 +39,6 @@ type payloadChecksumHasher struct { checksumWriter func([]byte) } -const tzChecksumSize = 64 - // NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length // of the writing object and writes generated objects to targets from initializer. // @@ -148,11 +146,11 @@ func payloadHashersForObject(obj *object.RawObject) []*payloadChecksumHasher { { hasher: tz.New(), checksumWriter: func(cs []byte) { - if ln := len(cs); ln != tzChecksumSize { - panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", ln, tzChecksumSize)) + if ln := len(cs); ln != tz.Size { + panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", tz.Size, ln)) } - csTZ := [tzChecksumSize]byte{} + csTZ := [tz.Size]byte{} copy(csTZ[:], cs) checksum := checksum.New()