From 1d23483828f33a0de3704efa19d30a97408f9c82 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 7 Dec 2020 20:49:47 +0300 Subject: [PATCH] [#235] services/object: Implement new GetRange algorithm Signed-off-by: Leonard Lyubich --- cmd/neofs-node/object.go | 30 +-- go.mod | 2 +- go.sum | Bin 60710 -> 60710 bytes pkg/local_object_storage/engine/head.go | 3 +- pkg/local_object_storage/engine/range.go | 6 +- pkg/network/transport/object/grpc/range.go | 28 +++ pkg/network/transport/object/grpc/service.go | 25 -- pkg/services/object/acl/acl.go | 36 ++- pkg/services/object/acl/eacl/v2/headers.go | 17 +- pkg/services/object/get/assemble.go | 137 +++++++++-- pkg/services/object/get/exec.go | 41 +++- pkg/services/object/get/get.go | 11 +- pkg/services/object/get/get_test.go | 237 +++++++++++++++--- pkg/services/object/get/local.go | 5 +- pkg/services/object/get/prm.go | 41 +++- pkg/services/object/get/service.go | 4 +- pkg/services/object/get/util.go | 92 +++++-- pkg/services/object/get/v2/service.go | 19 ++ pkg/services/object/get/v2/streamer.go | 22 ++ pkg/services/object/get/v2/util.go | 36 ++- pkg/services/object/head/v2/util.go | 10 +- pkg/services/object/range/chain.go | 114 --------- pkg/services/object/range/local.go | 28 --- pkg/services/object/range/prm.go | 48 ---- pkg/services/object/range/remote.go | 67 ------ pkg/services/object/range/res.go | 27 --- pkg/services/object/range/service.go | 190 --------------- pkg/services/object/range/streamer.go | 241 ------------------- pkg/services/object/range/v2/service.go | 50 ---- pkg/services/object/range/v2/streamer.go | 27 --- pkg/services/object/range/v2/util.go | 26 -- pkg/services/object/rangehash/service.go | 29 +-- pkg/services/object/rangehash/util.go | 14 +- pkg/services/object/response.go | 40 +-- pkg/services/object/server.go | 8 +- pkg/services/object/sign.go | 37 ++- pkg/services/object/transport_splitter.go | 76 +++--- 37 files changed, 701 insertions(+), 1123 deletions(-) create mode 100644 pkg/network/transport/object/grpc/range.go delete mode 100644 pkg/services/object/range/chain.go delete mode 100644 pkg/services/object/range/local.go delete mode 100644 pkg/services/object/range/prm.go delete mode 100644 pkg/services/object/range/remote.go delete mode 100644 pkg/services/object/range/res.go delete mode 100644 pkg/services/object/range/service.go delete mode 100644 pkg/services/object/range/streamer.go delete mode 100644 pkg/services/object/range/v2/service.go delete mode 100644 pkg/services/object/range/v2/streamer.go delete mode 100644 pkg/services/object/range/v2/util.go diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index bad8f619..b6e01321 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -28,8 +28,6 @@ import ( headsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/head/v2" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" putsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2" - rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" - rangesvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/range/v2" rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash" rangehashsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash/v2" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" @@ -52,8 +50,6 @@ type objectSvc struct { get *getsvcV2.Service - rng *rangesvcV2.Service - rngHash *rangehashsvcV2.Service delete *deletesvcV2.Service @@ -163,8 +159,8 @@ func (s *objectSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*obj return s.delete.Delete(ctx, req) } -func (s *objectSvc) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { - return s.rng.GetRange(ctx, req) +func (s *objectSvc) GetRange(req *object.GetRangeRequest, stream objectService.GetObjectRangeStream) error { + return s.get.GetRange(req, stream) } func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { @@ -354,25 +350,6 @@ func initObjectService(c *cfg) { headsvcV2.WithInternalService(sHead), ) - sRange := rangesvc.NewService( - rangesvc.WithKeyStorage(keyStorage), - rangesvc.WithClientCache(clientCache), - rangesvc.WithLocalStorage(ls), - rangesvc.WithContainerSource(c.cfgObject.cnrStorage), - rangesvc.WithNetworkMapSource(c.cfgObject.netMapStorage), - rangesvc.WithLocalAddressSource(c), - rangesvc.WithWorkerPool(c.cfgObject.pool.rng), - rangesvc.WithHeadService(sHead), - rangesvc.WithLogger(c.log), - rangesvc.WithClientOptions( - client.WithDialTimeout(c.viper.GetDuration(cfgObjectRangeDialTimeout)), - ), - ) - - sRangeV2 := rangesvcV2.NewService( - rangesvcV2.WithInternalService(sRange), - ) - sGet := getsvc.New( getsvc.WithLogger(c.log), getsvc.WithLocalStorageEngine(ls), @@ -401,7 +378,7 @@ func initObjectService(c *cfg) { rangehashsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), rangehashsvc.WithLocalAddressSource(c), rangehashsvc.WithHeadService(sHead), - rangehashsvc.WithRangeService(sRange), + rangehashsvc.WithRangeService(sGet), rangehashsvc.WithWorkerPool(c.cfgObject.pool.rngHash), rangehashsvc.WithLogger(c.log), rangehashsvc.WithClientOptions( @@ -451,7 +428,6 @@ func initObjectService(c *cfg) { put: sPutV2, search: sSearchV2, head: sHeadV2, - rng: sRangeV2, get: sGetV2, rngHash: sRangeHashV2, delete: sDeleteV2, diff --git a/go.mod b/go.mod index 430d0aae..7d8a985a 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/multiformats/go-multihash v0.0.13 // indirect github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.91.1-pre.0.20201030072836-71216865717b - github.com/nspcc-dev/neofs-api-go v1.20.3-0.20201203150742-6db6b569e098 + github.com/nspcc-dev/neofs-api-go v1.20.3-0.20201208072327-139660c6ff59 github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.3.0 diff --git a/go.sum b/go.sum index 13380742d2ab1c318c1d3ae4a29f408595d925d3..27c97f4ddc9f6b38944b103a01e57b4017449895 100644 GIT binary patch delta 107 zcmZ2>i+R~C<_%ACoGlE@jf{=Vbq$Rz&CCpv&C=3LEfq2ht^CtcJ^dn5vVDtk(ozbG t!@Nyhog*{N%hF3MLkj%$gN(g=1C#Rt^HL%_CvV6x6~SiC=EB^q^#F-vBOL$$ delta 106 zcmZ2>i+R~C<_%ACoQw@k4a`l9bj?zd%#uvaEK?0EEfg{gt#S$t&3z0C{SuSIO_Ec* sOo9uO^Fne{GmX;q%hR=u&GNGIldAGO%_AKrug@_P!Dh_n!rZO(0A0u;x&QzG diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 78020f7f..0b566466 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -53,7 +53,8 @@ func (r *HeadRes) Header() *object.Object { // Returns any error encountered that // did not allow to completely read the object header. // -// Returns ErrNotFound if requested object is missing in local storage. +// Returns object.ErrNotFound if requested object is missing in local storage. +// Returns object.ErrAlreadyRemoved if requested object was inhumed. func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) { var ( head *object.Object diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index d5484c2d..412be9f1 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -36,9 +36,9 @@ func (p *RngPrm) WithAddress(addr *objectSDK.Address) *RngPrm { // // Missing an option or calling with zero length is equivalent // to getting the full payload range. -func (p *RngPrm) WithPayloadRange(off, ln uint64) *RngPrm { +func (p *RngPrm) WithPayloadRange(rng *objectSDK.Range) *RngPrm { if p != nil { - p.off, p.ln = off, ln + p.off, p.ln = rng.GetOffset(), rng.GetLength() } return p @@ -111,7 +111,7 @@ func (e *StorageEngine) GetRange(prm *RngPrm) (*RngRes, error) { func GetRange(storage *StorageEngine, addr *objectSDK.Address, rng *objectSDK.Range) ([]byte, error) { res, err := storage.GetRange(new(RngPrm). WithAddress(addr). - WithPayloadRange(rng.GetOffset(), rng.GetLength()), + WithPayloadRange(rng), ) if err != nil { return nil, err diff --git a/pkg/network/transport/object/grpc/range.go b/pkg/network/transport/object/grpc/range.go new file mode 100644 index 00000000..fd700824 --- /dev/null +++ b/pkg/network/transport/object/grpc/range.go @@ -0,0 +1,28 @@ +package object + +import ( + "github.com/nspcc-dev/neofs-api-go/v2/object" + objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" +) + +type getRangeStreamerV2 struct { + objectGRPC.ObjectService_GetRangeServer +} + +func (s *getRangeStreamerV2) Send(resp *object.GetRangeResponse) error { + return s.ObjectService_GetRangeServer.Send( + object.GetRangeResponseToGRPCMessage(resp), + ) +} + +// GetRange converts gRPC GetRangeRequest message and server-side stream and overtakes its data +// to gRPC stream. +func (s *Server) GetRange(req *objectGRPC.GetRangeRequest, gStream objectGRPC.ObjectService_GetRangeServer) error { + // TODO: think about how we transport errors through gRPC + return s.srv.GetRange( + object.GetRangeRequestFromGRPCMessage(req), + &getRangeStreamerV2{ + ObjectService_GetRangeServer: gStream, + }, + ) +} diff --git a/pkg/network/transport/object/grpc/service.go b/pkg/network/transport/object/grpc/service.go index ee2e3034..a0429a91 100644 --- a/pkg/network/transport/object/grpc/service.go +++ b/pkg/network/transport/object/grpc/service.go @@ -99,31 +99,6 @@ func (s *Server) Search(req *objectGRPC.SearchRequest, gStream objectGRPC.Object } } -// GetRange converts gRPC GetRangeRequest message, opens internal Object service Search stream and overtakes its data -// to gRPC stream. -func (s *Server) GetRange(req *objectGRPC.GetRangeRequest, gStream objectGRPC.ObjectService_GetRangeServer) error { - stream, err := s.srv.GetRange(gStream.Context(), object.GetRangeRequestFromGRPCMessage(req)) - if err != nil { - // TODO: think about how we transport errors through gRPC - return err - } - - for { - r, err := stream.Recv() - if err != nil { - if errors.Is(errors.Cause(err), io.EOF) { - return nil - } - - return err - } - - if err := gStream.Send(object.GetRangeResponseToGRPCMessage(r)); err != nil { - return err - } - } -} - // GetRangeHash converts gRPC GetRangeHashRequest message and passes it to internal Object service. func (s *Server) GetRangeHash(ctx context.Context, req *objectGRPC.GetRangeHashRequest) (*objectGRPC.GetRangeHashResponse, error) { resp, err := s.srv.GetRangeHash(ctx, object.GetRangeHashRequestFromGRPCMessage(req)) diff --git a/pkg/services/object/acl/acl.go b/pkg/services/object/acl/acl.go index 919226f9..40514ba1 100644 --- a/pkg/services/object/acl/acl.go +++ b/pkg/services/object/acl/acl.go @@ -45,6 +45,14 @@ type ( *eACLCfg } + rangeStreamBasicChecker struct { + objectSvc.GetObjectRangeStream + + info requestInfo + + *eACLCfg + } + searchStreamBasicChecker struct { object.SearchObjectStreamer } @@ -262,13 +270,10 @@ func (b Service) Delete( return b.next.Delete(ctx, request) } -func (b Service) GetRange( - ctx context.Context, - request *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { - +func (b Service) GetRange(request *object.GetRangeRequest, stream objectSvc.GetObjectRangeStream) error { cid, err := getContainerIDFromRequest(request) if err != nil { - return nil, err + return err } req := metaWithToken{ @@ -279,17 +284,20 @@ func (b Service) GetRange( reqInfo, err := b.findRequestInfo(req, cid, acl.OperationRange) if err != nil { - return nil, err + return err } if !basicACLCheck(reqInfo) { - return nil, basicACLErr(reqInfo) + return basicACLErr(reqInfo) } else if !eACLCheck(request, reqInfo, b.eACLCfg) { - return nil, eACLErr(reqInfo) + return eACLErr(reqInfo) } - stream, err := b.next.GetRange(ctx, request) - return getRangeStreamBasicChecker{stream}, err + return b.next.GetRange(request, &rangeStreamBasicChecker{ + GetObjectRangeStream: stream, + info: reqInfo, + eACLCfg: b.eACLCfg, + }) } func (b Service) GetRangeHash( @@ -374,6 +382,14 @@ func (g *getStreamBasicChecker) Send(resp *object.GetResponse) error { return g.GetObjectStream.Send(resp) } +func (g *rangeStreamBasicChecker) Send(resp *object.GetRangeResponse) error { + if !eACLCheck(resp, g.info, g.eACLCfg) { + return eACLErr(g.info) + } + + return g.GetObjectRangeStream.Send(resp) +} + func (b Service) findRequestInfo( req metaWithToken, cid *container.ID, diff --git a/pkg/services/object/acl/eacl/v2/headers.go b/pkg/services/object/acl/eacl/v2/headers.go index 3076a1b0..de81ec2e 100644 --- a/pkg/services/object/acl/eacl/v2/headers.go +++ b/pkg/services/object/acl/eacl/v2/headers.go @@ -119,17 +119,16 @@ func (h *headerSource) objectHeaders() ([]eacl.Header, bool) { var hdr *objectV2.Header switch v := resp.GetBody().GetHeaderPart().(type) { - case *objectV2.GetHeaderPartShort: + case *objectV2.ShortHeader: hdr = new(objectV2.Header) - h := v.GetShortHeader() - hdr.SetVersion(h.GetVersion()) - hdr.SetCreationEpoch(h.GetCreationEpoch()) - hdr.SetOwnerID(h.GetOwnerID()) - hdr.SetObjectType(h.GetObjectType()) - hdr.SetPayloadLength(h.GetPayloadLength()) - case *objectV2.GetHeaderPartFull: - hdr = v.GetHeaderWithSignature().GetHeader() + hdr.SetVersion(v.GetVersion()) + hdr.SetCreationEpoch(v.GetCreationEpoch()) + hdr.SetOwnerID(v.GetOwnerID()) + hdr.SetObjectType(v.GetObjectType()) + hdr.SetPayloadLength(v.GetPayloadLength()) + case *objectV2.HeaderWithSignature: + hdr = v.GetHeader() } oV2.SetHeader(hdr) diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index b692eb7c..d440d8dc 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -24,8 +24,19 @@ func (exec *execCtx) assemble() { prev, children := exec.initFromChild(childID) if len(children) > 0 { - if ok := exec.writeCollectedHeader(); ok { - exec.overtakePayloadDirectly(children) + if exec.ctxRange() == nil { + if ok := exec.writeCollectedHeader(); ok { + exec.overtakePayloadDirectly(children, nil) + } + } else { + // TODO: choose one-by-one restoring algorithm according to size + // * if size > MAX => go right-to-left with HEAD and back with GET + // * else go right-to-left with GET and compose in single object before writing + + if ok := exec.overtakePayloadInReverse(children[len(children)-1]); ok { + // payload of all children except the last are written, write last payload + exec.writeObjectPayload(exec.collectedObject) + } } } else if prev != nil { if ok := exec.writeCollectedHeader(); ok { @@ -39,9 +50,6 @@ func (exec *execCtx) assemble() { } } } else { - exec.status = statusUndefined - exec.err = object.ErrNotFound - exec.log.Debug("could not init parent from child") } } @@ -51,8 +59,9 @@ func (exec *execCtx) initFromChild(id *objectSDK.ID) (prev *objectSDK.ID, childr log.Debug("starting assembling from child") - child, ok := exec.getChild(id) + child, ok := exec.getChild(id, nil) if !ok { + return } @@ -75,14 +84,48 @@ func (exec *execCtx) initFromChild(id *objectSDK.ID) (prev *objectSDK.ID, childr } exec.collectedObject = par - object.NewRawFromObject(exec.collectedObject).SetPayload(child.Payload()) + + var payload []byte + + if rng := exec.ctxRange(); rng != nil { + seekLen := rng.GetLength() + seekOff := rng.GetOffset() + parSize := par.PayloadSize() + + if seekOff+seekLen > parSize { + exec.status = statusOutOfRange + exec.err = object.ErrRangeOutOfBounds + return + } + + childSize := child.PayloadSize() + + if to := seekOff + seekLen; childSize > 0 && to > parSize-childSize { + pref := to + childSize - parSize + payload = child.Payload()[:pref] + rng.SetLength(rng.GetLength() - pref) + } + + exec.curOff = parSize - childSize + } else { + payload = child.Payload() + } + + object.NewRawFromObject(exec.collectedObject).SetPayload(payload) return child.PreviousID(), child.Children() } -func (exec *execCtx) overtakePayloadDirectly(children []*objectSDK.ID) { +func (exec *execCtx) overtakePayloadDirectly(children []*objectSDK.ID, rngs []*objectSDK.Range) { + withRng := len(rngs) > 0 + for i := range children { - child, ok := exec.getChild(children[i]) + var r *objectSDK.Range + if withRng { + r = rngs[i] + } + + child, ok := exec.getChild(children[i], r) if !ok { return } @@ -97,26 +140,23 @@ func (exec *execCtx) overtakePayloadDirectly(children []*objectSDK.ID) { } func (exec *execCtx) overtakePayloadInReverse(prev *objectSDK.ID) bool { - chain := make([]*objectSDK.ID, 0) - - // fill the chain end-to-start - for prev != nil { - head, ok := exec.headChild(prev) - if !ok { - return false - } - - chain = append(chain, head.ID()) - - prev = head.PreviousID() + chain, rngs := exec.buildChainInReverse(prev) + if len(chain) == 0 { + return false } + reverseRngs := len(rngs) > 0 + // reverse chain for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 { chain[left], chain[right] = chain[right], chain[left] + + if reverseRngs { + rngs[left], rngs[right] = rngs[right], rngs[left] + } } - exec.overtakePayloadDirectly(chain) + exec.overtakePayloadDirectly(chain, rngs) exec.status = statusOK exec.err = nil @@ -124,6 +164,59 @@ func (exec *execCtx) overtakePayloadInReverse(prev *objectSDK.ID) bool { return true } +func (exec *execCtx) buildChainInReverse(prev *objectSDK.ID) ([]*objectSDK.ID, []*objectSDK.Range) { + var ( + chain = make([]*objectSDK.ID, 0) + rngs = make([]*objectSDK.Range, 0) + seekRng = exec.ctxRange() + from = seekRng.GetOffset() + to = from + seekRng.GetLength() + ) + + // fill the chain end-to-start + for prev != nil { + if exec.curOff < from { + break + } + + head, ok := exec.headChild(prev) + if !ok { + return nil, nil + } + + if seekRng != nil { + sz := head.PayloadSize() + + exec.curOff -= sz + + if exec.curOff < from+to { + off := uint64(0) + if from > exec.curOff { + off = from - exec.curOff + sz -= from - exec.curOff + } + + if to < exec.curOff+off+sz { + sz = to - off - exec.curOff + } + + r := objectSDK.NewRange() + r.SetOffset(off) + r.SetLength(sz) + + rngs = append(rngs, r) + chain = append(chain, head.ID()) + } + } else { + chain = append(chain, head.ID()) + } + + prev = head.PreviousID() + } + + return chain, rngs +} + func equalAddresses(a, b *objectSDK.Address) bool { return a.ContainerID().Equal(b.ContainerID()) && a.ObjectID().Equal(b.ObjectID()) diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 9abbe49a..439d382b 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -24,7 +24,7 @@ type execCtx struct { ctx context.Context - prm Prm + prm RangePrm statusError @@ -33,6 +33,8 @@ type execCtx struct { log *logger.Logger collectedObject *object.Object + + curOff uint64 } const ( @@ -40,11 +42,17 @@ const ( statusOK statusINHUMED statusVIRTUAL + statusOutOfRange ) func (exec *execCtx) setLogger(l *logger.Logger) { + req := "GET" + if exec.ctxRange() != nil { + req = "GET_RANGE" + } + exec.log = l.With( - zap.String("request", "GET"), + zap.String("request", req), zap.Stringer("address", exec.address()), zap.Bool("raw", exec.isRaw()), zap.Bool("local", exec.isLocal()), @@ -62,11 +70,11 @@ func (exec execCtx) isLocal() bool { } func (exec execCtx) isRaw() bool { - return exec.prm.raw + return exec.prm.RawFlag() } func (exec execCtx) address() *objectSDK.Address { - return exec.prm.addr + return exec.prm.Address() } func (exec execCtx) key() *ecdsa.PrivateKey { @@ -79,8 +87,8 @@ func (exec execCtx) callOptions() []client.CallOption { func (exec execCtx) remotePrm() *client.GetObjectParams { return new(client.GetObjectParams). - WithAddress(exec.prm.addr). - WithRawFlag(exec.prm.raw) + WithAddress(exec.prm.Address()). + WithRawFlag(exec.prm.RawFlag()) } func (exec *execCtx) canAssemble() bool { @@ -95,6 +103,10 @@ func (exec *execCtx) containerID() *container.ID { return exec.address().ContainerID() } +func (exec *execCtx) ctxRange() *objectSDK.Range { + return exec.prm.rng +} + func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Traverser, bool) { t, err := exec.svc.traverserGenerator.GenerateTraverser(addr) @@ -113,18 +125,19 @@ func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Trav } } -func (exec *execCtx) getChild(id *objectSDK.ID) (*object.Object, bool) { +func (exec *execCtx) getChild(id *objectSDK.ID, rng *objectSDK.Range) (*object.Object, bool) { w := newSimpleObjectWriter() p := exec.prm p.common = p.common.WithLocalOnly(false) - p.SetObjectWriter(w) + p.objWriter = w + p.SetRange(rng) addr := objectSDK.NewAddress() addr.SetContainerID(exec.address().ContainerID()) addr.SetObjectID(id) - p.SetAddress(addr) + p.WithAddress(addr) exec.statusError = exec.svc.get(exec.context(), p) @@ -138,9 +151,11 @@ func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) { p := exec.prm p.common = p.common.WithLocalOnly(false) - p.SetAddress(childAddr) + p.WithAddress(childAddr) - header, err := exec.svc.headSvc.head(exec.context(), p) + header, err := exec.svc.headSvc.head(exec.context(), Prm{ + commonPrm: p.commonPrm, + }) switch { default: @@ -204,6 +219,10 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) { } func (exec *execCtx) writeCollectedHeader() bool { + if exec.ctxRange() != nil { + return true + } + err := exec.prm.objWriter.WriteHeader( object.NewRawFromObject(exec.collectedObject).CutPayload().Object(), ) diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index b0e4e70d..47ce1e29 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -9,10 +9,17 @@ import ( // Get serves a request to get an object by address, and returns Streamer instance. func (s *Service) Get(ctx context.Context, prm Prm) error { + return s.get(ctx, RangePrm{ + commonPrm: prm.commonPrm, + }).err +} + +// GetRange serves a request to get an object by address, and returns Streamer instance. +func (s *Service) GetRange(ctx context.Context, prm RangePrm) error { return s.get(ctx, prm).err } -func (s *Service) get(ctx context.Context, prm Prm) statusError { +func (s *Service) get(ctx context.Context, prm RangePrm) statusError { exec := &execCtx{ svc: s, ctx: ctx, @@ -46,6 +53,8 @@ func (exec *execCtx) analyzeStatus(execCnr bool) { case statusVIRTUAL: exec.log.Debug("requested object is virtual") exec.assemble() + case statusOutOfRange: + exec.log.Debug("requested range is out of object bounds") default: exec.log.Debug("operation finished with error", zap.String("error", exec.err.Error()), diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 91b9f618..c3ef3e18 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -93,17 +93,21 @@ func newTestClient() *testClient { } } -func (c *testClient) GetObject(_ context.Context, p Prm) (*objectSDK.Object, error) { - v, ok := c.results[p.addr.String()] +func (c *testClient) GetObject(_ context.Context, p RangePrm) (*objectSDK.Object, error) { + v, ok := c.results[p.Address().String()] if !ok { return nil, object.ErrNotFound } - return v.obj.Object().SDK(), v.err + if v.err != nil { + return nil, v.err + } + + return cutToRange(v.obj.Object(), p.rng).SDK(), nil } func (c *testClient) head(_ context.Context, p Prm) (*object.Object, error) { - v, ok := c.results[p.addr.String()] + v, ok := c.results[p.Address().String()] if !ok { return nil, object.ErrNotFound } @@ -122,11 +126,11 @@ func (c *testClient) addResult(addr *objectSDK.Address, obj *object.RawObject, e }{obj: obj, err: err} } -func (s *testStorage) Get(addr *objectSDK.Address) (*object.Object, error) { +func (s *testStorage) Get(p RangePrm) (*object.Object, error) { var ( ok bool obj *object.Object - sAddr = addr.String() + sAddr = p.Address().String() ) if _, ok = s.inhumed[sAddr]; ok { @@ -138,12 +142,30 @@ func (s *testStorage) Get(addr *objectSDK.Address) (*object.Object, error) { } if obj, ok = s.phy[sAddr]; ok { - return obj, nil + return cutToRange(obj, p.rng), nil } return nil, object.ErrNotFound } +func cutToRange(o *object.Object, rng *objectSDK.Range) *object.Object { + obj := object.NewRawFromObject(o) + + if rng == nil { + return obj.Object() + } + + from := rng.GetOffset() + to := from + rng.GetLength() + + payload := obj.Payload() + + obj = obj.CutPayload() + obj.SetPayload(payload[from:to]) + + return obj.Object() +} + func (s *testStorage) addPhy(addr *objectSDK.Address, obj *object.RawObject) { s.phy[addr.String()] = obj.Object() } @@ -204,11 +226,27 @@ func TestGetLocalOnly(t *testing.T) { } newPrm := func(raw bool, w ObjectWriter) Prm { - return Prm{ - objWriter: w, - raw: raw, - common: new(util.CommonPrm).WithLocalOnly(true), - } + p := Prm{} + p.SetObjectWriter(w) + p.WithRawFlag(raw) + p.common = new(util.CommonPrm).WithLocalOnly(true) + + return p + } + + newRngPrm := func(raw bool, w ChunkWriter, off, ln uint64) RangePrm { + p := RangePrm{} + p.SetChunkWriter(w) + p.WithRawFlag(raw) + p.common = new(util.CommonPrm).WithLocalOnly(true) + + r := objectSDK.NewRange() + r.SetOffset(off) + r.SetLength(ln) + + p.SetRange(r) + + return p } t.Run("OK", func(t *testing.T) { @@ -218,12 +256,16 @@ func TestGetLocalOnly(t *testing.T) { w := newSimpleObjectWriter() p := newPrm(false, w) + payloadSz := uint64(10) + payload := make([]byte, payloadSz) + rand.Read(payload) + addr := generateAddress() - obj := generateObject(addr, nil, nil) + obj := generateObject(addr, nil, payload) storage.addPhy(addr, obj) - p.addr = addr + p.WithAddress(addr) storage.addPhy(addr, obj) @@ -232,6 +274,15 @@ func TestGetLocalOnly(t *testing.T) { require.NoError(t, err) require.Equal(t, obj.Object(), w.object()) + + w = newSimpleObjectWriter() + + rngPrm := newRngPrm(false, w, payloadSz/3, payloadSz/3) + rngPrm.WithAddress(addr) + + err = svc.GetRange(ctx, rngPrm) + require.NoError(t, err) + require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.object().Payload()) }) t.Run("INHUMED", func(t *testing.T) { @@ -244,11 +295,17 @@ func TestGetLocalOnly(t *testing.T) { storage.inhume(addr) - p.addr = addr + p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, errors.Is(err, object.ErrAlreadyRemoved)) + + rngPrm := newRngPrm(false, nil, 0, 0) + rngPrm.WithAddress(addr) + + err = svc.GetRange(ctx, rngPrm) + require.True(t, errors.Is(err, object.ErrAlreadyRemoved)) }) t.Run("404", func(t *testing.T) { @@ -259,11 +316,18 @@ func TestGetLocalOnly(t *testing.T) { addr := generateAddress() - p.addr = addr + p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, errors.Is(err, object.ErrNotFound)) + + rngPrm := newRngPrm(false, nil, 0, 0) + rngPrm.WithAddress(addr) + + err = svc.GetRange(ctx, rngPrm) + + require.True(t, errors.Is(err, object.ErrNotFound)) }) t.Run("VIRTUAL", func(t *testing.T) { @@ -279,7 +343,7 @@ func TestGetLocalOnly(t *testing.T) { splitInfo.SetLink(generateID()) splitInfo.SetLastPart(generateID()) - p.addr = addr + p.WithAddress(addr) storage.addVirtual(addr, splitInfo) @@ -290,6 +354,13 @@ func TestGetLocalOnly(t *testing.T) { require.True(t, errors.As(err, &errSplit)) require.Equal(t, splitInfo, errSplit.SplitInfo()) + + rngPrm := newRngPrm(true, nil, 0, 0) + rngPrm.WithAddress(addr) + + err = svc.Get(ctx, p) + + require.True(t, errors.As(err, &errSplit)) }) } @@ -347,6 +418,7 @@ func generateChain(ln int, cid *container.ID) ([]*object.RawObject, []*objectSDK o := generateObject(addr, prevID, []byte{byte(i)}) o.SetPayload(payloadPart) + o.SetPayloadSize(uint64(len(payloadPart))) o.SetID(curID) payload = append(payload, payloadPart...) @@ -381,11 +453,27 @@ func TestGetRemoteSmall(t *testing.T) { } newPrm := func(raw bool, w ObjectWriter) Prm { - return Prm{ - objWriter: w, - raw: raw, - common: new(util.CommonPrm).WithLocalOnly(false), - } + p := Prm{} + p.SetObjectWriter(w) + p.WithRawFlag(raw) + p.common = new(util.CommonPrm).WithLocalOnly(false) + + return p + } + + newRngPrm := func(raw bool, w ChunkWriter, off, ln uint64) RangePrm { + p := RangePrm{} + p.SetChunkWriter(w) + p.WithRawFlag(raw) + p.common = new(util.CommonPrm).WithLocalOnly(false) + + r := objectSDK.NewRange() + r.SetOffset(off) + r.SetLength(ln) + + p.SetRange(r) + + return p } t.Run("OK", func(t *testing.T) { @@ -400,7 +488,11 @@ func TestGetRemoteSmall(t *testing.T) { }, } - obj := generateObject(addr, nil, nil) + payloadSz := uint64(10) + payload := make([]byte, payloadSz) + rand.Read(payload) + + obj := generateObject(addr, nil, payload) c1 := newTestClient() c1.addResult(addr, obj, nil) @@ -418,7 +510,7 @@ func TestGetRemoteSmall(t *testing.T) { w := newSimpleObjectWriter() p := newPrm(false, w) - p.addr = addr + p.WithAddress(addr) err := svc.Get(ctx, p) require.NoError(t, err) @@ -429,6 +521,14 @@ func TestGetRemoteSmall(t *testing.T) { err = svc.Get(ctx, p) require.NoError(t, err) require.Equal(t, obj.Object(), w.object()) + + w = newSimpleObjectWriter() + rngPrm := newRngPrm(false, w, payloadSz/3, payloadSz/3) + rngPrm.WithAddress(addr) + + err = svc.GetRange(ctx, rngPrm) + require.NoError(t, err) + require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.object().Payload()) }) t.Run("INHUMED", func(t *testing.T) { @@ -457,10 +557,16 @@ func TestGetRemoteSmall(t *testing.T) { }) p := newPrm(false, nil) - p.addr = addr + p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, errors.Is(err, object.ErrAlreadyRemoved)) + + rngPrm := newRngPrm(false, nil, 0, 0) + rngPrm.WithAddress(addr) + + err = svc.GetRange(ctx, rngPrm) + require.True(t, errors.Is(err, object.ErrAlreadyRemoved)) }) t.Run("404", func(t *testing.T) { @@ -489,10 +595,16 @@ func TestGetRemoteSmall(t *testing.T) { }) p := newPrm(false, nil) - p.addr = addr + p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, errors.Is(err, object.ErrNotFound)) + + rngPrm := newRngPrm(false, nil, 0, 0) + rngPrm.WithAddress(addr) + + err = svc.GetRange(ctx, rngPrm) + require.True(t, errors.Is(err, object.ErrNotFound)) }) t.Run("VIRTUAL", func(t *testing.T) { @@ -534,10 +646,16 @@ func TestGetRemoteSmall(t *testing.T) { }) p := newPrm(false, nil) - p.addr = addr + p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, errors.Is(err, object.ErrNotFound)) + + rngPrm := newRngPrm(false, nil, 0, 0) + rngPrm.WithAddress(addr) + + err = svc.GetRange(ctx, rngPrm) + require.True(t, errors.Is(err, object.ErrNotFound)) }) t.Run("get chain element failure", func(t *testing.T) { @@ -546,6 +664,7 @@ func TestGetRemoteSmall(t *testing.T) { addr.SetObjectID(generateID()) srcObj := generateObject(addr, nil, nil) + srcObj.SetPayloadSize(10) ns, as := testNodeMatrix(t, []int{2}) @@ -580,7 +699,7 @@ func TestGetRemoteSmall(t *testing.T) { c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) c2.addResult(linkAddr, linkingObj, nil) c2.addResult(child1Addr, children[0], nil) - c2.addResult(child2Addr, nil, errors.New("any error")) + c2.addResult(child2Addr, nil, object.ErrNotFound) builder := &testPlacementBuilder{ vectors: map[string][]netmap.Nodes{ @@ -599,10 +718,18 @@ func TestGetRemoteSmall(t *testing.T) { }) p := newPrm(false, newSimpleObjectWriter()) - p.addr = addr + p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, errors.Is(err, object.ErrNotFound)) + + svc.headSvc = c2 + + rngPrm := newRngPrm(false, newSimpleObjectWriter(), 0, 1) + rngPrm.WithAddress(addr) + + err = svc.GetRange(ctx, rngPrm) + require.True(t, errors.Is(err, object.ErrNotFound)) }) t.Run("OK", func(t *testing.T) { @@ -619,6 +746,7 @@ func TestGetRemoteSmall(t *testing.T) { children, childIDs, payload := generateChain(2, cid) srcObj.SetPayload(payload) + srcObj.SetPayloadSize(uint64(len(payload))) linkAddr := objectSDK.NewAddress() linkAddr.SetContainerID(cid) @@ -667,11 +795,26 @@ func TestGetRemoteSmall(t *testing.T) { w := newSimpleObjectWriter() p := newPrm(false, w) - p.addr = addr + p.WithAddress(addr) err := svc.Get(ctx, p) require.NoError(t, err) require.Equal(t, srcObj.Object(), w.object()) + + svc.headSvc = c2 + + w = newSimpleObjectWriter() + payloadSz := srcObj.PayloadSize() + + off := payloadSz / 3 + ln := payloadSz / 3 + + rngPrm := newRngPrm(false, w, off, ln) + rngPrm.WithAddress(addr) + + err = svc.GetRange(ctx, rngPrm) + require.NoError(t, err) + require.Equal(t, payload[off:off+ln], w.object().Payload()) }) }) @@ -713,10 +856,16 @@ func TestGetRemoteSmall(t *testing.T) { }) p := newPrm(false, nil) - p.addr = addr + p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, errors.Is(err, object.ErrNotFound)) + + rngPrm := newRngPrm(false, nil, 0, 0) + rngPrm.WithAddress(addr) + + err = svc.GetRange(ctx, rngPrm) + require.True(t, errors.Is(err, object.ErrNotFound)) }) t.Run("get chain element failure", func(t *testing.T) { @@ -725,6 +874,7 @@ func TestGetRemoteSmall(t *testing.T) { addr.SetObjectID(generateID()) srcObj := generateObject(addr, nil, nil) + srcObj.SetPayloadSize(10) ns, as := testNodeMatrix(t, []int{2}) @@ -757,7 +907,6 @@ func TestGetRemoteSmall(t *testing.T) { addr.String(): ns, rightAddr.String(): ns, preRightAddr.String(): ns, - preRightAddr.String(): ns, }, } @@ -773,10 +922,16 @@ func TestGetRemoteSmall(t *testing.T) { svc.headSvc = headSvc p := newPrm(false, newSimpleObjectWriter()) - p.addr = addr + p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, errors.Is(err, object.ErrNotFound)) + + rngPrm := newRngPrm(false, nil, 0, 1) + rngPrm.WithAddress(addr) + + err = svc.GetRange(ctx, rngPrm) + require.True(t, errors.Is(err, object.ErrNotFound)) }) t.Run("OK", func(t *testing.T) { @@ -792,6 +947,7 @@ func TestGetRemoteSmall(t *testing.T) { splitInfo.SetLastPart(generateID()) children, _, payload := generateChain(2, cid) + srcObj.SetPayloadSize(uint64(len(payload))) srcObj.SetPayload(payload) rightObj := children[len(children)-1] @@ -836,11 +992,24 @@ func TestGetRemoteSmall(t *testing.T) { w := newSimpleObjectWriter() p := newPrm(false, w) - p.addr = addr + p.WithAddress(addr) err := svc.Get(ctx, p) require.NoError(t, err) require.Equal(t, srcObj.Object(), w.object()) + + w = newSimpleObjectWriter() + payloadSz := srcObj.PayloadSize() + + off := payloadSz / 3 + ln := payloadSz / 3 + + rngPrm := newRngPrm(false, w, off, ln) + rngPrm.WithAddress(addr) + + err = svc.GetRange(ctx, rngPrm) + require.NoError(t, err) + require.Equal(t, payload[off:off+ln], w.object().Payload()) }) }) }) diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index 1a31e9b7..7e59f483 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -10,7 +10,7 @@ import ( func (exec *execCtx) executeLocal() { var err error - exec.collectedObject, err = exec.svc.localStorage.Get(exec.address()) + exec.collectedObject, err = exec.svc.localStorage.Get(exec.prm) var errSplitInfo *objectSDK.SplitInfoError @@ -33,5 +33,8 @@ func (exec *execCtx) executeLocal() { exec.status = statusVIRTUAL mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo()) exec.err = objectSDK.NewSplitInfoError(exec.infoSplit) + case errors.Is(err, object.ErrRangeOutOfBounds): + exec.status = statusOutOfRange + exec.err = object.ErrRangeOutOfBounds } } diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index 71c91e13..f3cb41b9 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -11,6 +11,17 @@ import ( // Prm groups parameters of Get service call. type Prm struct { + commonPrm +} + +// RangePrm groups parameters of GetRange service call. +type RangePrm struct { + commonPrm + + rng *objectSDK.Range +} + +type commonPrm struct { objWriter ObjectWriter // TODO: replace key and callOpts to CommonPrm @@ -20,16 +31,19 @@ type Prm struct { common *util.CommonPrm - // TODO: use parameters from NeoFS SDK - addr *objectSDK.Address + client.GetObjectParams +} - raw bool +// ChunkWriter is an interface of target component +// to write payload chunk. +type ChunkWriter interface { + WriteChunk([]byte) error } // ObjectWriter is an interface of target component to write object. type ObjectWriter interface { WriteHeader(*object.Object) error - WriteChunk([]byte) error + ChunkWriter } // SetObjectWriter sets target component to write the object. @@ -38,22 +52,23 @@ func (p *Prm) SetObjectWriter(w ObjectWriter) { } // SetPrivateKey sets private key to use during execution. -func (p *Prm) SetPrivateKey(key *ecdsa.PrivateKey) { +func (p *commonPrm) SetPrivateKey(key *ecdsa.PrivateKey) { p.key = key } // SetRemoteCallOptions sets call options remote remote client calls. -func (p *Prm) SetRemoteCallOptions(opts ...client.CallOption) { +func (p *commonPrm) SetRemoteCallOptions(opts ...client.CallOption) { p.callOpts = opts } -// SetAddress sets address of the requested object. -func (p *Prm) SetAddress(addr *objectSDK.Address) { - p.addr = addr +// SetObjectWriter sets target component to write the object payload range. +func (p *RangePrm) SetChunkWriter(w ChunkWriter) { + p.objWriter = &rangeWriter{ + chunkWriter: w, + } } -// SetRaw sets raw flag. If flag is set, -// object assembling will not be undertaken. -func (p *Prm) SetRaw(raw bool) { - p.raw = raw +// SetRange sets range of the requested payload data. +func (p *RangePrm) SetRange(rng *objectSDK.Range) { + p.rng = rng } diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index 5b05fc73..c89e918c 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -25,7 +25,7 @@ type Service struct { type Option func(*cfg) type getClient interface { - GetObject(context.Context, Prm) (*objectSDK.Object, error) + GetObject(context.Context, RangePrm) (*objectSDK.Object, error) } type cfg struct { @@ -38,7 +38,7 @@ type cfg struct { } localStorage interface { - Get(*objectSDK.Address) (*object.Object, error) + Get(RangePrm) (*object.Object, error) } clientCache interface { diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index bc84ccf1..a437a566 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -15,7 +15,7 @@ import ( type simpleObjectWriter struct { obj *object.RawObject - payload []byte + pld []byte } type clientCacheWrapper struct { @@ -36,20 +36,28 @@ type headSvcWrapper struct { svc *headsvc.Service } +type rangeWriter struct { + ObjectWriter + + chunkWriter ChunkWriter +} + func newSimpleObjectWriter() *simpleObjectWriter { - return new(simpleObjectWriter) + return &simpleObjectWriter{ + obj: object.NewRaw(), + } } func (s *simpleObjectWriter) WriteHeader(obj *object.Object) error { s.obj = object.NewRawFromObject(obj) - s.payload = make([]byte, 0, obj.PayloadSize()) + s.pld = make([]byte, 0, obj.PayloadSize()) return nil } func (s *simpleObjectWriter) WriteChunk(p []byte) error { - s.payload = append(s.payload, p...) + s.pld = append(s.pld, p...) return nil } @@ -58,8 +66,8 @@ func (s *simpleObjectWriter) Close() error { } func (s *simpleObjectWriter) object() *object.Object { - if len(s.payload) > 0 { - s.obj.SetPayload(s.payload) + if len(s.pld) > 0 { + s.obj.SetPayload(s.pld) } return s.obj.Object() @@ -73,31 +81,60 @@ func (c *clientCacheWrapper) get(key *ecdsa.PrivateKey, addr string) (getClient, }, err } -func (c *clientWrapper) GetObject(ctx context.Context, p Prm) (*objectSDK.Object, error) { +func (c *clientWrapper) GetObject(ctx context.Context, p RangePrm) (*objectSDK.Object, error) { // we don't specify payload writer because we accumulate // the object locally (even huge). - return c.client.GetObject(ctx, - new(client.GetObjectParams). - WithAddress(p.addr). - WithRawFlag(true), - p.callOpts..., - ) + if p.rng != nil { + data, err := c.client.ObjectPayloadRangeData(ctx, + new(client.RangeDataParams). + WithAddress(p.Address()). + WithRange(p.rng). + WithRaw(p.RawFlag()), + p.callOpts..., + ) + if err != nil { + return nil, err + } + + return payloadOnlyObject(data), nil + } else { + // we don't specify payload writer because we accumulate + // the object locally (even huge). + return c.client.GetObject(ctx, + new(client.GetObjectParams). + WithAddress(p.Address()). + WithRawFlag(p.RawFlag()), + p.callOpts..., + ) + } } -func (e *storageEngineWrapper) Get(addr *objectSDK.Address) (*object.Object, error) { - r, err := e.engine.Get(new(engine.GetPrm). - WithAddress(addr), - ) - if err != nil { - return nil, err - } +func (e *storageEngineWrapper) Get(p RangePrm) (*object.Object, error) { + if p.rng != nil { + r, err := e.engine.GetRange(new(engine.RngPrm). + WithAddress(p.Address()). + WithPayloadRange(p.rng), + ) + if err != nil { + return nil, err + } - return r.Object(), nil + return r.Object(), nil + } else { + r, err := e.engine.Get(new(engine.GetPrm). + WithAddress(p.Address()), + ) + if err != nil { + return nil, err + } + + return r.Object(), nil + } } func (s *headSvcWrapper) head(ctx context.Context, p Prm) (*object.Object, error) { r, err := s.svc.Head(ctx, new(headsvc.Prm). - WithAddress(p.addr). + WithAddress(p.Address()). WithCommonPrm(p.common). Short(false), ) @@ -108,3 +145,14 @@ func (s *headSvcWrapper) head(ctx context.Context, p Prm) (*object.Object, error return r.Header(), nil } + +func (w *rangeWriter) WriteChunk(p []byte) error { + return w.chunkWriter.WriteChunk(p) +} + +func payloadOnlyObject(payload []byte) *objectSDK.Object { + rawObj := object.NewRaw() + rawObj.SetPayload(payload) + + return rawObj.Object().SDK() +} diff --git a/pkg/services/object/get/v2/service.go b/pkg/services/object/get/v2/service.go index 4f3b88e1..caf176fc 100644 --- a/pkg/services/object/get/v2/service.go +++ b/pkg/services/object/get/v2/service.go @@ -55,6 +55,25 @@ func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream } } +// GetRange calls internal service and returns v2 payload range stream. +func (s *Service) GetRange(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) error { + p, err := s.toRangePrm(req, stream) + if err != nil { + return err + } + + err = s.svc.GetRange(stream.Context(), *p) + + var splitErr *object.SplitInfoError + + switch { + case errors.As(err, &splitErr): + return stream.Send(splitInfoRangeResponse(splitErr.SplitInfo())) + default: + return err + } +} + func WithInternalService(v *getsvc.Service) Option { return func(c *cfg) { c.svc = v diff --git a/pkg/services/object/get/v2/streamer.go b/pkg/services/object/get/v2/streamer.go index 0e18e2e1..21567907 100644 --- a/pkg/services/object/get/v2/streamer.go +++ b/pkg/services/object/get/v2/streamer.go @@ -10,6 +10,10 @@ type streamObjectWriter struct { objectSvc.GetObjectStream } +type streamObjectRangeWriter struct { + objectSvc.GetObjectRangeStream +} + func (s *streamObjectWriter) WriteHeader(obj *object.Object) error { p := new(objectV2.GetObjectPartInit) @@ -38,3 +42,21 @@ func newResponse(p objectV2.GetObjectPart) *objectV2.GetResponse { return r } + +func (s *streamObjectRangeWriter) WriteChunk(chunk []byte) error { + return s.GetObjectRangeStream.Send(newRangeResponse(chunk)) +} + +func newRangeResponse(p []byte) *objectV2.GetRangeResponse { + r := new(objectV2.GetRangeResponse) + + body := new(objectV2.GetRangeResponseBody) + r.SetBody(body) + + part := new(objectV2.GetRangePartChunk) + part.SetChunk(p) + + body.SetRangePart(part) + + return r +} diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index d3e1b8a7..3233f4d6 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -23,14 +23,35 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre p.SetPrivateKey(key) body := req.GetBody() - p.SetAddress(object.NewAddressFromV2(body.GetAddress())) - p.SetRaw(body.GetRaw()) + p.WithAddress(object.NewAddressFromV2(body.GetAddress())) + p.WithRawFlag(body.GetRaw()) p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...) p.SetObjectWriter(&streamObjectWriter{stream}) return p, nil } +func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) (*getsvc.RangePrm, error) { + meta := req.GetMetaHeader() + + key, err := s.keyStorage.GetKey(token.NewSessionTokenFromV2(meta.GetSessionToken())) + if err != nil { + return nil, err + } + + p := new(getsvc.RangePrm) + p.SetPrivateKey(key) + + body := req.GetBody() + p.WithAddress(object.NewAddressFromV2(body.GetAddress())) + p.WithRawFlag(body.GetRaw()) + p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...) + p.SetChunkWriter(&streamObjectRangeWriter{stream}) + p.SetRange(object.NewRangeFromV2(body.GetRange())) + + return p, nil +} + // can be shared accross all services func remoteCallOptionsFromMeta(meta *session.RequestMetaHeader) []client.CallOption { xHdrs := meta.GetXHeaders() @@ -60,3 +81,14 @@ func splitInfoResponse(info *object.SplitInfo) *objectV2.GetResponse { return resp } + +func splitInfoRangeResponse(info *object.SplitInfo) *objectV2.GetRangeResponse { + resp := new(objectV2.GetRangeResponse) + + body := new(objectV2.GetRangeResponseBody) + resp.SetBody(body) + + body.SetRangePart(info.ToV2()) + + return resp +} diff --git a/pkg/services/object/head/v2/util.go b/pkg/services/object/head/v2/util.go index bf862ac3..4066be22 100644 --- a/pkg/services/object/head/v2/util.go +++ b/pkg/services/object/head/v2/util.go @@ -40,10 +40,7 @@ func fullPartFromResponse(r *headsvc.Response) objectV2.GetHeaderPart { hs.SetHeader(obj.GetHeader()) hs.SetSignature(obj.GetSignature()) - p := new(objectV2.GetHeaderPartFull) - p.SetHeaderWithSignature(hs) - - return p + return hs } func shortPartFromResponse(r *headsvc.Response) objectV2.GetHeaderPart { @@ -56,8 +53,5 @@ func shortPartFromResponse(r *headsvc.Response) objectV2.GetHeaderPart { sh.SetVersion(hdr.GetVersion()) sh.SetObjectType(hdr.GetObjectType()) - p := new(objectV2.GetHeaderPartShort) - p.SetShortHeader(sh) - - return p + return sh } diff --git a/pkg/services/object/range/chain.go b/pkg/services/object/range/chain.go deleted file mode 100644 index 000198e7..00000000 --- a/pkg/services/object/range/chain.go +++ /dev/null @@ -1,114 +0,0 @@ -package rangesvc - -import ( - "fmt" - - objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" - - "github.com/nspcc-dev/neofs-node/pkg/core/object" -) - -type rangeTraverser struct { - chain *rangeChain - - seekBounds *rangeBounds -} - -type rangeBounds struct { - left, right uint64 -} - -type objectRange struct { - rng *objectSDK.Range - - id *objectSDK.ID -} - -type rangeChain struct { - next, prev *rangeChain - - bounds *rangeBounds - - id *objectSDK.ID -} - -func newRangeTraverser(originSize uint64, rightElement *object.Object, rngSeek *objectSDK.Range) *rangeTraverser { - right := &rangeChain{ - bounds: &rangeBounds{ - left: originSize - rightElement.PayloadSize(), - right: originSize, - }, - id: rightElement.ID(), - } - - left := &rangeChain{ - id: rightElement.PreviousID(), - } - - left.next, right.prev = right, left - - return &rangeTraverser{ - chain: right, - seekBounds: &rangeBounds{ - left: rngSeek.GetOffset(), - right: rngSeek.GetOffset() + rngSeek.GetLength(), - }, - } -} - -func (c *rangeTraverser) next() *objectRange { - left := c.chain.bounds.left - seekLeft := c.seekBounds.left - - res := new(objectRange) - - if left > seekLeft { - res.id = c.chain.prev.id - } else { - res.id = c.chain.id - res.rng = objectSDK.NewRange() - res.rng.SetOffset(seekLeft - left) - res.rng.SetLength(min(c.chain.bounds.right, c.seekBounds.right) - seekLeft) - } - - return res -} - -func min(a, b uint64) uint64 { - if a < b { - return a - } - - return b -} - -func (c *rangeTraverser) pushHeader(obj *object.Object) { - id := obj.ID() - if !id.Equal(c.chain.prev.id) { - panic(fmt.Sprintf("(%T) unexpected identifier in header", c)) - } - - sz := obj.PayloadSize() - - c.chain.prev.bounds = &rangeBounds{ - left: c.chain.bounds.left - sz, - right: c.chain.bounds.left, - } - - c.chain = c.chain.prev - - if prev := obj.PreviousID(); prev != nil { - c.chain.prev = &rangeChain{ - next: c.chain, - id: prev, - } - } -} - -func (c *rangeTraverser) pushSuccessSize(sz uint64) { - c.seekBounds.left += sz - - if c.seekBounds.left >= c.chain.bounds.right && c.chain.next != nil { - c.chain = c.chain.next - } -} diff --git a/pkg/services/object/range/local.go b/pkg/services/object/range/local.go deleted file mode 100644 index 994b13d8..00000000 --- a/pkg/services/object/range/local.go +++ /dev/null @@ -1,28 +0,0 @@ -package rangesvc - -import ( - "io" - - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/pkg/errors" -) - -type localRangeWriter struct { - addr *object.Address - - rng *object.Range - - storage *engine.StorageEngine -} - -func (l *localRangeWriter) WriteTo(w io.Writer) (int64, error) { - rngData, err := engine.GetRange(l.storage, l.addr, l.rng) - if err != nil { - return 0, errors.Wrapf(err, "(%T) could not get object from local storage", l) - } - - n, err := w.Write(rngData) - - return int64(n), err -} diff --git a/pkg/services/object/range/prm.go b/pkg/services/object/range/prm.go deleted file mode 100644 index ddc4d03f..00000000 --- a/pkg/services/object/range/prm.go +++ /dev/null @@ -1,48 +0,0 @@ -package rangesvc - -import ( - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" -) - -type Prm struct { - common *util.CommonPrm - - full bool - - addr *object.Address - - rng *object.Range -} - -func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm { - if p != nil { - p.common = v - } - - return p -} - -func (p *Prm) WithAddress(v *object.Address) *Prm { - if p != nil { - p.addr = v - } - - return p -} - -func (p *Prm) WithRange(v *object.Range) *Prm { - if p != nil { - p.rng = v - } - - return p -} - -func (p *Prm) FullRange() *Prm { - if p != nil { - p.full = true - } - - return p -} diff --git a/pkg/services/object/range/remote.go b/pkg/services/object/range/remote.go deleted file mode 100644 index 3af136e2..00000000 --- a/pkg/services/object/range/remote.go +++ /dev/null @@ -1,67 +0,0 @@ -package rangesvc - -import ( - "context" - "io" - - "github.com/nspcc-dev/neofs-api-go/pkg/client" - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-api-go/pkg/token" - "github.com/nspcc-dev/neofs-node/pkg/network" - "github.com/nspcc-dev/neofs-node/pkg/network/cache" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/pkg/errors" -) - -type remoteRangeWriter struct { - ctx context.Context - - keyStorage *util.KeyStorage - - node *network.Address - - token *token.SessionToken - - bearer *token.BearerToken - - addr *object.Address - - rng *object.Range - - clientCache *cache.ClientCache - - clientOpts []client.Option -} - -func (r *remoteRangeWriter) WriteTo(w io.Writer) (int64, error) { - key, err := r.keyStorage.GetKey(r.token) - if err != nil { - return 0, errors.Wrapf(err, "(%T) could not receive private key", r) - } - - addr, err := r.node.IPAddrString() - if err != nil { - return 0, err - } - - c, err := r.clientCache.Get(key, addr, r.clientOpts...) - if err != nil { - return 0, errors.Wrapf(err, "(%T) could not create SDK client %s", r, addr) - } - - // TODO: change ObjectPayloadRangeData to implement WriterTo - chunk, err := c.ObjectPayloadRangeData(r.ctx, new(client.RangeDataParams). - WithRange(r.rng). - WithAddress(r.addr), - client.WithTTL(1), // FIXME: use constant - client.WithSession(r.token), - client.WithBearer(r.bearer), - ) - if err != nil { - return 0, errors.Wrapf(err, "(%T) could not read object payload range from %s", r, addr) - } - - n, err := w.Write(chunk) - - return int64(n), err -} diff --git a/pkg/services/object/range/res.go b/pkg/services/object/range/res.go deleted file mode 100644 index 90c3ccc4..00000000 --- a/pkg/services/object/range/res.go +++ /dev/null @@ -1,27 +0,0 @@ -package rangesvc - -import ( - "github.com/nspcc-dev/neofs-node/pkg/core/object" -) - -type Result struct { - head *object.Object - - stream Streamer -} - -type Response struct { - chunk []byte -} - -func (r *Response) PayloadChunk() []byte { - return r.chunk -} - -func (r *Result) Head() *object.Object { - return r.head -} - -func (r *Result) Stream() Streamer { - return r.stream -} diff --git a/pkg/services/object/range/service.go b/pkg/services/object/range/service.go deleted file mode 100644 index d6b99155..00000000 --- a/pkg/services/object/range/service.go +++ /dev/null @@ -1,190 +0,0 @@ -package rangesvc - -import ( - "context" - "sync" - - "github.com/nspcc-dev/neofs-api-go/pkg/client" - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/core/container" - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/nspcc-dev/neofs-node/pkg/network" - "github.com/nspcc-dev/neofs-node/pkg/network/cache" - headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" - objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/util" - "github.com/nspcc-dev/neofs-node/pkg/util/logger" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -type Service struct { - *cfg -} - -type Option func(*cfg) - -type cfg struct { - keyStorage *objutil.KeyStorage - - localStore *engine.StorageEngine - - cnrSrc container.Source - - netMapSrc netmap.Source - - workerPool util.WorkerPool - - localAddrSrc network.LocalAddressSource - - headSvc *headsvc.Service - - clientCache *cache.ClientCache - - log *logger.Logger - - clientOpts []client.Option -} - -func defaultCfg() *cfg { - return &cfg{ - workerPool: new(util.SyncWorkerPool), - log: zap.L(), - } -} - -func NewService(opts ...Option) *Service { - c := defaultCfg() - - for i := range opts { - opts[i](c) - } - - return &Service{ - cfg: c, - } -} - -func (s *Service) GetRange(ctx context.Context, prm *Prm) (*Result, error) { - headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm). - WithAddress(prm.addr). - WithCommonPrm(prm.common), - ) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not receive Head result", s) - } - - origin := headResult.Header() - - originSize := origin.PayloadSize() - - if prm.full { - prm.rng = new(object.Range) - prm.rng.SetLength(originSize) - } - - if originSize < prm.rng.GetOffset()+prm.rng.GetLength() { - return nil, errors.Errorf("(%T) requested payload range is out-of-bounds", s) - } - - rngTraverser := objutil.NewRangeTraverser(originSize, origin, prm.rng) - if err := s.fillTraverser(ctx, prm, rngTraverser); err != nil { - return nil, errors.Wrapf(err, "(%T) could not fill range traverser", s) - } - - return &Result{ - head: origin, - stream: &streamer{ - cfg: s.cfg, - once: new(sync.Once), - ctx: ctx, - prm: prm, - rangeTraverser: rngTraverser, - }, - }, nil -} - -func (s *Service) fillTraverser(ctx context.Context, prm *Prm, traverser *objutil.RangeTraverser) error { - addr := object.NewAddress() - addr.SetContainerID(prm.addr.ContainerID()) - - for { - nextID, nextRng := traverser.Next() - if nextRng != nil { - return nil - } - - addr.SetObjectID(nextID) - - head, err := s.headSvc.Head(ctx, new(headsvc.Prm). - WithAddress(addr). - WithCommonPrm(prm.common), - ) - if err != nil { - return errors.Wrapf(err, "(%T) could not receive object header", s) - } - - traverser.PushHeader(head.Header()) - } -} - -func WithKeyStorage(v *objutil.KeyStorage) Option { - return func(c *cfg) { - c.keyStorage = v - } -} - -func WithLocalStorage(v *engine.StorageEngine) Option { - return func(c *cfg) { - c.localStore = v - } -} - -func WithContainerSource(v container.Source) Option { - return func(c *cfg) { - c.cnrSrc = v - } -} - -func WithNetworkMapSource(v netmap.Source) Option { - return func(c *cfg) { - c.netMapSrc = v - } -} - -func WithWorkerPool(v util.WorkerPool) Option { - return func(c *cfg) { - c.workerPool = v - } -} - -func WithLocalAddressSource(v network.LocalAddressSource) Option { - return func(c *cfg) { - c.localAddrSrc = v - } -} - -func WithHeadService(v *headsvc.Service) Option { - return func(c *cfg) { - c.headSvc = v - } -} - -func WithClientCache(v *cache.ClientCache) Option { - return func(c *cfg) { - c.clientCache = v - } -} - -func WithLogger(l *logger.Logger) Option { - return func(c *cfg) { - c.log = l - } -} - -func WithClientOptions(opts ...client.Option) Option { - return func(c *cfg) { - c.clientOpts = opts - } -} diff --git a/pkg/services/object/range/streamer.go b/pkg/services/object/range/streamer.go deleted file mode 100644 index 24a85aa1..00000000 --- a/pkg/services/object/range/streamer.go +++ /dev/null @@ -1,241 +0,0 @@ -package rangesvc - -import ( - "context" - "io" - "sync" - - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" - "github.com/nspcc-dev/neofs-node/pkg/network" - svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" - "github.com/pkg/errors" -) - -type Streamer interface { - Recv() (*Response, error) -} - -type streamer struct { - *cfg - - once *sync.Once - - ctx context.Context - - prm *Prm - - traverser *placement.Traverser - - rangeTraverser *svcutil.RangeTraverser - - ch chan []byte -} - -type chunkWriter struct { - ctx context.Context - - ch chan<- []byte - - written uint64 -} - -func (p *streamer) Recv() (*Response, error) { - var err error - - p.once.Do(func() { - p.ch = make(chan []byte) - err = p.workerPool.Submit(p.start) - }) - - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not start streaming", p) - } - - select { - case <-p.ctx.Done(): - return nil, errors.Wrapf(p.ctx.Err(), "(%T) stream is stopped by context", p) - case v, ok := <-p.ch: - if !ok { - if _, rng := p.rangeTraverser.Next(); rng.GetLength() != 0 { - return nil, errors.Errorf("(%T) incomplete get payload range", p) - } - - return nil, io.EOF - } - - return &Response{ - chunk: v, - }, nil - } -} - -func (p *streamer) switchToObject(id *object.ID) error { - var err error - - // get latest network map - nm, err := netmap.GetLatestNetworkMap(p.netMapSrc) - if err != nil { - return errors.Wrapf(err, "(%T) could not get latest network map", p) - } - - // get container to read payload range - cnr, err := p.cnrSrc.Get(p.prm.addr.ContainerID()) - if err != nil { - return errors.Wrapf(err, "(%T) could not get container by ID", p) - } - - // allocate placement traverser options - traverseOpts := make([]placement.Option, 0, 4) - - // add common options - traverseOpts = append(traverseOpts, - // set processing container - placement.ForContainer(cnr), - - // set success count (1st incoming full range) - placement.SuccessAfter(1), - - // set identifier of the processing object - placement.ForObject(id), - ) - - // create placement builder from network map - builder := placement.NewNetworkMapBuilder(nm) - - if p.prm.common.LocalOnly() { - // use local-only placement builder - builder = svcutil.NewLocalPlacement(builder, p.localAddrSrc) - } - - // set placement builder - traverseOpts = append(traverseOpts, placement.UseBuilder(builder)) - - // build placement traverser - if p.traverser, err = placement.NewTraverser(traverseOpts...); err != nil { - return errors.Wrapf(err, "(%T) could not build placement traverser", p) - } - - return nil -} - -func (p *streamer) start() { - defer close(p.ch) - - objAddr := object.NewAddress() - objAddr.SetContainerID(p.prm.addr.ContainerID()) - -loop: - for { - select { - case <-p.ctx.Done(): - // TODO: log this - break loop - default: - } - - nextID, nextRange := p.rangeTraverser.Next() - if nextRange.GetLength() == 0 { - break - } else if err := p.switchToObject(nextID); err != nil { - // TODO: log error - break - } - - objAddr.SetObjectID(nextID) - - subloop: - for { - select { - case <-p.ctx.Done(): - // TODO: log this - break loop - default: - } - - addrs := p.traverser.Next() - if len(addrs) == 0 { - break - } - - for i := range addrs { - wg := new(sync.WaitGroup) - wg.Add(1) - - addr := addrs[i] - - if err := p.workerPool.Submit(func() { - defer wg.Done() - - var rngWriter io.WriterTo - - if network.IsLocalAddress(p.localAddrSrc, addr) { - rngWriter = &localRangeWriter{ - addr: objAddr, - rng: nextRange, - storage: p.localStore, - } - } else { - rngWriter = &remoteRangeWriter{ - ctx: p.ctx, - keyStorage: p.keyStorage, - node: addr, - token: p.prm.common.SessionToken(), - bearer: p.prm.common.BearerToken(), - addr: objAddr, - rng: nextRange, - clientCache: p.clientCache, - clientOpts: p.clientOpts, - } - } - - written, err := rngWriter.WriteTo(&chunkWriter{ - ctx: p.ctx, - ch: p.ch, - }) - if err != nil { - svcutil.LogServiceError(p.log, "RANGE", addr, err) - } - - ln := nextRange.GetLength() - uw := uint64(written) - - p.rangeTraverser.PushSuccessSize(uw) - nextRange.SetLength(ln - uw) - nextRange.SetOffset(nextRange.GetOffset() + uw) - }); err != nil { - wg.Done() - - svcutil.LogWorkerPoolError(p.log, "RANGE", err) - - break loop - } - - wg.Wait() - - if nextRange.GetLength() == 0 { - p.traverser.SubmitSuccess() - break subloop - } - } - } - - if !p.traverser.Success() { - // TODO: log error - break loop - } - } -} - -func (w *chunkWriter) Write(p []byte) (int, error) { - select { - case <-w.ctx.Done(): - return 0, w.ctx.Err() - case w.ch <- p: - } - - w.written += uint64(len(p)) - - return len(p), nil -} diff --git a/pkg/services/object/range/v2/service.go b/pkg/services/object/range/v2/service.go deleted file mode 100644 index 9c039e89..00000000 --- a/pkg/services/object/range/v2/service.go +++ /dev/null @@ -1,50 +0,0 @@ -package rangesvc - -import ( - "context" - - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" - "github.com/pkg/errors" -) - -// Service implements GetRange operation of Object service v2. -type Service struct { - *cfg -} - -// Option represents Service constructor option. -type Option func(*cfg) - -type cfg struct { - svc *rangesvc.Service -} - -// NewService constructs Service instance from provided options. -func NewService(opts ...Option) *Service { - c := new(cfg) - - for i := range opts { - opts[i](c) - } - - return &Service{ - cfg: c, - } -} - -// GetRange calls internal service and returns v2 object payload range stream. -func (s *Service) GetRange(ctx context.Context, req *objectV2.GetRangeRequest) (objectV2.GetRangeObjectStreamer, error) { - r, err := s.svc.GetRange(ctx, toPrm(req)) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not get object payload range data", s) - } - - return fromResponse(r.Stream()), nil -} - -func WithInternalService(v *rangesvc.Service) Option { - return func(c *cfg) { - c.svc = v - } -} diff --git a/pkg/services/object/range/v2/streamer.go b/pkg/services/object/range/v2/streamer.go deleted file mode 100644 index c3c82525..00000000 --- a/pkg/services/object/range/v2/streamer.go +++ /dev/null @@ -1,27 +0,0 @@ -package rangesvc - -import ( - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" - "github.com/pkg/errors" -) - -type streamer struct { - stream rangesvc.Streamer - - body *objectV2.GetRangeResponseBody -} - -func (s *streamer) Recv() (*objectV2.GetRangeResponse, error) { - r, err := s.stream.Recv() - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not read response from stream", s) - } - - s.body.SetChunk(r.PayloadChunk()) - - resp := new(objectV2.GetRangeResponse) - resp.SetBody(s.body) - - return resp, nil -} diff --git a/pkg/services/object/range/v2/util.go b/pkg/services/object/range/v2/util.go deleted file mode 100644 index e0cc8d22..00000000 --- a/pkg/services/object/range/v2/util.go +++ /dev/null @@ -1,26 +0,0 @@ -package rangesvc - -import ( - "github.com/nspcc-dev/neofs-api-go/pkg/object" - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" -) - -func toPrm(req *objectV2.GetRangeRequest) *rangesvc.Prm { - body := req.GetBody() - - return new(rangesvc.Prm). - WithAddress( - object.NewAddressFromV2(body.GetAddress()), - ). - WithRange(object.NewRangeFromV2(body.GetRange())). - WithCommonPrm(util.CommonPrmFromV2(req)) -} - -func fromResponse(stream rangesvc.Streamer) objectV2.GetRangeObjectStreamer { - return &streamer{ - stream: stream, - body: new(objectV2.GetRangeResponseBody), - } -} diff --git a/pkg/services/object/rangehash/service.go b/pkg/services/object/rangehash/service.go index d600802e..69defe93 100644 --- a/pkg/services/object/rangehash/service.go +++ b/pkg/services/object/rangehash/service.go @@ -4,7 +4,6 @@ import ( "context" "crypto/sha256" "fmt" - "io" "github.com/nspcc-dev/neofs-api-go/pkg" "github.com/nspcc-dev/neofs-api-go/pkg/client" @@ -14,8 +13,8 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network/cache" + getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" - rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" @@ -44,7 +43,7 @@ type cfg struct { headSvc *headsvc.Service - rangeSvc *rangesvc.Service + rangeSvc *getsvc.Service clientCache *cache.ClientCache @@ -171,23 +170,15 @@ func (s *Service) getHashes(ctx context.Context, prm *Prm, traverser *objutil.Ra if prm.typ == pkg.ChecksumSHA256 && nextRng.GetLength() != rng.GetLength() { // here we cannot receive SHA256 checksum through GetRangeHash service // since SHA256 is not homomorphic - res, err := s.rangeSvc.GetRange(ctx, new(rangesvc.Prm). - WithAddress(addr). - WithRange(nextRng). - WithCommonPrm(prm.common), - ) + rngPrm := getsvc.RangePrm{} + rngPrm.SetRange(nextRng) + rngPrm.WithAddress(addr) + rngPrm.SetChunkWriter(hasher) + + err := s.rangeSvc.GetRange(ctx, rngPrm) if err != nil { return nil, errors.Wrapf(err, "(%T) could not receive payload range for %v checksum", s, prm.typ) } - - for stream := res.Stream(); ; { - resp, err := stream.Recv() - if errors.Is(errors.Cause(err), io.EOF) { - break - } - - hasher.add(resp.PayloadChunk()) - } } else { resp, err := (&distributedHasher{ cfg: s.cfg, @@ -206,7 +197,7 @@ func (s *Service) getHashes(ctx context.Context, prm *Prm, traverser *objutil.Ra return nil, errors.Errorf("(%T) unexpected %v hashes amount %d", s, prm.typ, ln) } - hasher.add(hs[0]) + _ = hasher.WriteChunk(hs[0]) } traverser.PushSuccessSize(nextRng.GetLength()) @@ -265,7 +256,7 @@ func WithHeadService(v *headsvc.Service) Option { } } -func WithRangeService(v *rangesvc.Service) Option { +func WithRangeService(v *getsvc.Service) Option { return func(c *cfg) { c.rangeSvc = v } diff --git a/pkg/services/object/rangehash/util.go b/pkg/services/object/rangehash/util.go index 7f62096d..d41b5095 100644 --- a/pkg/services/object/rangehash/util.go +++ b/pkg/services/object/rangehash/util.go @@ -20,7 +20,7 @@ type onceHashWriter struct { } type hasher interface { - add([]byte) + WriteChunk([]byte) error sum() ([]byte, error) } @@ -36,8 +36,10 @@ type singleHasher struct { hash []byte } -func (h *singleHasher) add(p []byte) { +func (h *singleHasher) WriteChunk(p []byte) error { h.hash = p + + return nil } func (h *singleHasher) sum() ([]byte, error) { @@ -52,18 +54,20 @@ func (w *onceHashWriter) write(hs [][]byte) { }) } -func (h *tzHasher) add(p []byte) { +func (h *tzHasher) WriteChunk(p []byte) error { h.hashes = append(h.hashes, p) - return + return nil } func (h *tzHasher) sum() ([]byte, error) { return tz.Concat(h.hashes) } -func (h *commonHasher) add(p []byte) { +func (h *commonHasher) WriteChunk(p []byte) error { h.h.Write(p) + + return nil } func (h *commonHasher) sum() ([]byte, error) { diff --git a/pkg/services/object/response.go b/pkg/services/object/response.go index b19a9a92..f7b43beb 100644 --- a/pkg/services/object/response.go +++ b/pkg/services/object/response.go @@ -26,7 +26,9 @@ type getStreamResponser struct { } type getRangeStreamResponser struct { - stream *response.ServerMessageStreamer + util.ServerStream + + respWriter util.ResponseMessageWriter } type putStreamResponser struct { @@ -143,35 +145,17 @@ func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) return resp.(*object.DeleteResponse), nil } -func (s *getRangeStreamResponser) Recv() (*object.GetRangeResponse, error) { - r, err := s.stream.Recv() - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not receive response", s) - } - - return r.(*object.GetRangeResponse), nil +func (s *getRangeStreamResponser) Send(resp *object.GetRangeResponse) error { + return s.respWriter(resp) } -func (s *ResponseService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { - stream, err := s.respSvc.HandleServerStreamRequest(ctx, req, - func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) { - stream, err := s.svc.GetRange(ctx, req.(*object.GetRangeRequest)) - if err != nil { - return nil, err - } - - return func() (util.ResponseMessage, error) { - return stream.Recv() - }, nil - }, - ) - if err != nil { - return nil, err - } - - return &getRangeStreamResponser{ - stream: stream, - }, nil +func (s *ResponseService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error { + return s.svc.GetRange(req, &getRangeStreamResponser{ + ServerStream: stream, + respWriter: s.respSvc.HandleServerStreamRequest_(func(resp util.ResponseMessage) error { + return stream.Send(resp.(*object.GetRangeResponse)) + }), + }) } func (s *ResponseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 46ddd7e4..50b30656 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -13,6 +13,12 @@ type GetObjectStream interface { Send(*object.GetResponse) error } +// GetObjectRangeStream is an interface of NeoFS API v2 compatible payload range streamer. +type GetObjectRangeStream interface { + util.ServerStream + Send(*object.GetRangeResponse) error +} + // ServiceServer is an interface of utility // serving v2 Object service. type ServiceServer interface { @@ -21,6 +27,6 @@ type ServiceServer interface { Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error) Search(context.Context, *object.SearchRequest) (object.SearchObjectStreamer, error) Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error) - GetRange(context.Context, *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) + GetRange(*object.GetRangeRequest, GetObjectRangeStream) error GetRangeHash(context.Context, *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) } diff --git a/pkg/services/object/sign.go b/pkg/services/object/sign.go index a8e1613b..c9f39570 100644 --- a/pkg/services/object/sign.go +++ b/pkg/services/object/sign.go @@ -32,7 +32,9 @@ type putStreamSigner struct { } type getRangeStreamSigner struct { - stream *util.ResponseMessageStreamer + util.ServerStream + + respWriter util.ResponseMessageWriter } func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService { @@ -151,35 +153,24 @@ func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*o return resp.(*object.DeleteResponse), nil } -func (s *getRangeStreamSigner) Recv() (*object.GetRangeResponse, error) { - r, err := s.stream.Recv() - if err != nil { - return nil, errors.Wrap(err, "could not receive response") - } - - return r.(*object.GetRangeResponse), nil +func (s *getRangeStreamSigner) Send(resp *object.GetRangeResponse) error { + return s.respWriter(resp) } -func (s *SignService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { - stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req, - func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) { - stream, err := s.svc.GetRange(ctx, req.(*object.GetRangeRequest)) - if err != nil { - return nil, err - } - - return func() (util.ResponseMessage, error) { - return stream.Recv() - }, nil +func (s *SignService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error { + respWriter, err := s.sigSvc.HandleServerStreamRequest_(req, + func(resp util.ResponseMessage) error { + return stream.Send(resp.(*object.GetRangeResponse)) }, ) if err != nil { - return nil, err + return err } - return &getRangeStreamSigner{ - stream: stream, - }, nil + return s.svc.GetRange(req, &getRangeStreamSigner{ + ServerStream: stream, + respWriter: respWriter, + }) } func (s *SignService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { diff --git a/pkg/services/object/transport_splitter.go b/pkg/services/object/transport_splitter.go index f1c910ee..f1f33945 100644 --- a/pkg/services/object/transport_splitter.go +++ b/pkg/services/object/transport_splitter.go @@ -37,10 +37,11 @@ type ( addrAmount uint64 } - rangeStreamBasicChecker struct { - next object.GetRangeObjectStreamer - buf *bytes.Buffer - resp *object.GetRangeResponse + rangeStreamMsgSizeCtrl struct { + util.ServerStream + + stream GetObjectRangeStream + chunkSize int } ) @@ -112,46 +113,47 @@ func (c TransportSplitter) Delete(ctx context.Context, request *object.DeleteReq return c.next.Delete(ctx, request) } -func (c TransportSplitter) GetRange(ctx context.Context, request *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { - stream, err := c.next.GetRange(ctx, request) +func (s *rangeStreamMsgSizeCtrl) Send(resp *object.GetRangeResponse) error { + body := resp.GetBody() - return &rangeStreamBasicChecker{ - next: stream, - chunkSize: int(c.chunkSize), - }, err + chunkPart, ok := body.GetRangePart().(*object.GetRangePartChunk) + if !ok { + return s.stream.Send(resp) + } + + var newResp *object.GetRangeResponse + + for buf := bytes.NewBuffer(chunkPart.GetChunk()); buf.Len() > 0; { + if newResp == nil { + newResp = new(object.GetRangeResponse) + newResp.SetBody(body) + } + + chunkPart.SetChunk(buf.Next(s.chunkSize)) + body.SetRangePart(chunkPart) + newResp.SetMetaHeader(resp.GetMetaHeader()) + newResp.SetVerificationHeader(resp.GetVerificationHeader()) + + if err := s.stream.Send(newResp); err != nil { + return err + } + } + + return nil +} + +func (c TransportSplitter) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error { + return c.next.GetRange(req, &rangeStreamMsgSizeCtrl{ + ServerStream: stream, + stream: stream, + chunkSize: int(c.chunkSize), + }) } func (c TransportSplitter) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { return c.next.GetRangeHash(ctx, request) } -func (r *rangeStreamBasicChecker) Recv() (*object.GetRangeResponse, error) { - if r.resp == nil { - resp, err := r.next.Recv() - if err != nil { - return resp, err - } - - r.resp = resp - r.buf = bytes.NewBuffer(resp.GetBody().GetChunk()) - } - - body := new(object.GetRangeResponseBody) - body.SetChunk(r.buf.Next(r.chunkSize)) - - resp := new(object.GetRangeResponse) - resp.SetVerificationHeader(r.resp.GetVerificationHeader()) - resp.SetMetaHeader(r.resp.GetMetaHeader()) - resp.SetBody(body) - - if r.buf.Len() == 0 { - r.buf = nil - r.resp = nil - } - - return resp, nil -} - func (s *searchStreamBasicChecker) Recv() (*object.SearchResponse, error) { if s.resp == nil { resp, err := s.next.Recv()