From f24daa10ffc3726560c709cd8794b787fa56482f Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 3 Dec 2020 02:45:25 +0300 Subject: [PATCH] [#233] services/object: Implement new Get algorithm Signed-off-by: Leonard Lyubich --- cmd/neofs-node/config.go | 1 + cmd/neofs-node/object.go | 22 +- go.sum | 5 + pkg/network/transport/object/grpc/get.go | 28 + pkg/network/transport/object/grpc/service.go | 30 +- pkg/services/object/acl/acl.go | 58 +- pkg/services/object/acl/opts.go | 4 +- pkg/services/object/get/assemble.go | 130 +++ pkg/services/object/get/container.go | 54 ++ pkg/services/object/get/exec.go | 250 ++++++ pkg/services/object/get/get.go | 59 ++ pkg/services/object/get/get_test.go | 847 +++++++++++++++++++ pkg/services/object/get/local.go | 37 + pkg/services/object/get/prm.go | 59 +- pkg/services/object/get/remote.go | 50 ++ pkg/services/object/get/service.go | 115 ++- pkg/services/object/get/streamer.go | 26 - pkg/services/object/get/util.go | 110 +++ pkg/services/object/get/v2/service.go | 31 +- pkg/services/object/get/v2/streamer.go | 63 +- pkg/services/object/get/v2/util.go | 61 +- pkg/services/object/response.go | 60 +- pkg/services/object/server.go | 26 + pkg/services/object/sign.go | 57 +- pkg/services/object/transport_splitter.go | 91 +- pkg/services/object/util/key.go | 7 +- pkg/services/object/util/local.go | 45 - pkg/services/object/util/placement.go | 155 ++++ pkg/services/util/response/server_stream.go | 8 + pkg/services/util/server.go | 10 + pkg/services/util/sign.go | 19 + 31 files changed, 2163 insertions(+), 355 deletions(-) create mode 100644 pkg/network/transport/object/grpc/get.go create mode 100644 pkg/services/object/get/assemble.go create mode 100644 pkg/services/object/get/container.go create mode 100644 pkg/services/object/get/exec.go create mode 100644 pkg/services/object/get/get.go create mode 100644 pkg/services/object/get/get_test.go create mode 100644 pkg/services/object/get/local.go create mode 100644 pkg/services/object/get/remote.go delete mode 100644 pkg/services/object/get/streamer.go create mode 100644 pkg/services/object/get/util.go create mode 100644 pkg/services/object/server.go delete mode 100644 pkg/services/object/util/local.go create mode 100644 pkg/services/object/util/placement.go create mode 100644 pkg/services/util/server.go diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 9bcdf6826..bf4951c2b 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -108,6 +108,7 @@ const ( cfgObjectRangeDialTimeout = "object.range.dial_timeout" cfgObjectRangeHashDialTimeout = "object.rangehash.dial_timeout" cfgObjectSearchDialTimeout = "object.search.dial_timeout" + cfgObjectGetDialTimeout = "object.get.dial_timeout" ) const ( diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 2ce5b101f..c51c48e08 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -155,8 +155,8 @@ func (s *objectSvc) Search(ctx context.Context, req *object.SearchRequest) (obje return s.search.Search(ctx, req) } -func (s *objectSvc) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) { - return s.get.Get(ctx, req) +func (s *objectSvc) Get(req *object.GetRequest, stream objectService.GetObjectStream) error { + return s.get.Get(req, stream) } func (s *objectSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { @@ -291,6 +291,8 @@ func initObjectService(c *cfg) { } }) + traverseGen := util.NewTraverserGenerator(c.cfgObject.netMapStorage, c.cfgObject.cnrStorage, c) + c.workers = append(c.workers, pol) sPut := putsvc.NewService( @@ -372,12 +374,24 @@ func initObjectService(c *cfg) { rangesvcV2.WithInternalService(sRange), ) - sGet := getsvc.NewService( - getsvc.WithRangeService(sRange), + sGet := getsvc.New( + getsvc.WithLogger(c.log), + getsvc.WithLocalStorageEngine(ls), + getsvc.WithClientCache(clientCache), + getsvc.WithHeadService(sHead), + getsvc.WithClientOptions( + client.WithDialTimeout(c.viper.GetDuration(cfgObjectGetDialTimeout)), + ), + getsvc.WithTraverserGenerator( + traverseGen.WithTraverseOptions( + placement.SuccessAfter(1), + ), + ), ) sGetV2 := getsvcV2.NewService( getsvcV2.WithInternalService(sGet), + getsvcV2.WithKeyStorage(keyStorage), ) sRangeHash := rangehashsvc.NewService( diff --git a/go.sum b/go.sum index 3827d8312..13380742d 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,7 @@ cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbf cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= +cloud.google.com/go/storage v1.0.0 h1:VV2nUM3wwLLGh9lSABFgZMjInyUbJeaRSE64WuAIQ+4= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48 h1:/EMHruHCFXR9xClkGV/t0rmHrdhX4+trQUcBqjwc9xE= code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= @@ -36,8 +37,10 @@ github.com/alecthomas/participle v0.6.0 h1:Pvo8XUCQKgIywVjz/+Ci3IsjGg+g/TdKkMcfg github.com/alecthomas/participle v0.6.0/go.mod h1:HfdmEuwvr12HXQN44HPWXR0lHmVolVYe4dyL6lQ3duY= github.com/alecthomas/repr v0.0.0-20181024024818-d37bc2a10ba1/go.mod h1:xTS7Pm1pD1mvyM075QCDSRqH6qRLXylzS24ZTpRiSzQ= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 h1:45bxf7AZMwWcqkLzDAQugVEwedisr5nRJ1r+7LYnv0U= github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= @@ -345,6 +348,7 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= @@ -582,6 +586,7 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= gopkg.in/abiosoft/ishell.v2 v2.0.0/go.mod h1:sFp+cGtH6o4s1FtpVPTMcHq2yue+c4DGOVohJCPUzwY= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/network/transport/object/grpc/get.go b/pkg/network/transport/object/grpc/get.go new file mode 100644 index 000000000..77fdea328 --- /dev/null +++ b/pkg/network/transport/object/grpc/get.go @@ -0,0 +1,28 @@ +package object + +import ( + "github.com/nspcc-dev/neofs-api-go/v2/object" + objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" +) + +type getStreamerV2 struct { + objectGRPC.ObjectService_GetServer +} + +func (s *getStreamerV2) Send(resp *object.GetResponse) error { + return s.ObjectService_GetServer.Send( + object.GetResponseToGRPCMessage(resp), + ) +} + +// Get converts gRPC GetRequest message and server-side stream and overtakes its data +// to gRPC stream. +func (s *Server) Get(req *objectGRPC.GetRequest, gStream objectGRPC.ObjectService_GetServer) error { + // TODO: think about how we transport errors through gRPC + return s.srv.Get( + object.GetRequestFromGRPCMessage(req), + &getStreamerV2{ + ObjectService_GetServer: gStream, + }, + ) +} diff --git a/pkg/network/transport/object/grpc/service.go b/pkg/network/transport/object/grpc/service.go index 20bb844f2..ee2e3034f 100644 --- a/pkg/network/transport/object/grpc/service.go +++ b/pkg/network/transport/object/grpc/service.go @@ -6,47 +6,23 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" "github.com/pkg/errors" ) // Server wraps NeoFS API Object service and // provides gRPC Object service server interface. type Server struct { - srv object.Service + srv objectSvc.ServiceServer } // New creates, initializes and returns Server instance. -func New(c object.Service) *Server { +func New(c objectSvc.ServiceServer) *Server { return &Server{ srv: c, } } -// Get converts gRPC GetRequest message, opens internal Object service Get stream and overtakes its data -// to gRPC stream. -func (s *Server) Get(req *objectGRPC.GetRequest, gStream objectGRPC.ObjectService_GetServer) error { - stream, err := s.srv.Get(gStream.Context(), object.GetRequestFromGRPCMessage(req)) - if err != nil { - // TODO: think about how we transport errors through gRPC - return err - } - - for { - r, err := stream.Recv() - if err != nil { - if errors.Is(errors.Cause(err), io.EOF) { - return nil - } - - return err - } - - if err := gStream.Send(object.GetResponseToGRPCMessage(r)); err != nil { - return err - } - } -} - // Put opens internal Object service Put stream and overtakes data from gRPC stream to it. func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error { stream, err := s.srv.Put(gStream.Context()) diff --git a/pkg/services/object/acl/acl.go b/pkg/services/object/acl/acl.go index 9c024ffd9..919226f9d 100644 --- a/pkg/services/object/acl/acl.go +++ b/pkg/services/object/acl/acl.go @@ -18,6 +18,7 @@ import ( core "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" + objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl" eaclV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl/v2" "github.com/pkg/errors" @@ -37,7 +38,8 @@ type ( } getStreamBasicChecker struct { - next object.GetObjectStreamer + objectSvc.GetObjectStream + info requestInfo *eACLCfg @@ -74,7 +76,7 @@ type cfg struct { sender SenderClassifier - next object.Service + next objectSvc.ServiceServer *eACLCfg } @@ -123,12 +125,10 @@ func New(opts ...Option) Service { } } -func (b Service) Get( - ctx context.Context, - request *object.GetRequest) (object.GetObjectStreamer, error) { +func (b Service) Get(request *object.GetRequest, stream objectSvc.GetObjectStream) error { cid, err := getContainerIDFromRequest(request) if err != nil { - return nil, err + return err } req := metaWithToken{ @@ -139,22 +139,20 @@ func (b Service) Get( reqInfo, err := b.findRequestInfo(req, cid, acl.OperationGet) if err != nil { - return nil, err + return err } if !basicACLCheck(reqInfo) { - return nil, basicACLErr(reqInfo) + return basicACLErr(reqInfo) } else if !eACLCheck(request, reqInfo, b.eACLCfg) { - return nil, eACLErr(reqInfo) + return eACLErr(reqInfo) } - stream, err := b.next.Get(ctx, request) - - return getStreamBasicChecker{ - next: stream, - info: reqInfo, - eACLCfg: b.eACLCfg, - }, err + return b.next.Get(request, &getStreamBasicChecker{ + GetObjectStream: stream, + info: reqInfo, + eACLCfg: b.eACLCfg, + }) } func (b Service) Put(ctx context.Context) (object.PutObjectStreamer, error) { @@ -366,32 +364,14 @@ func (p putStreamBasicChecker) CloseAndRecv() (*object.PutResponse, error) { return p.next.CloseAndRecv() } -func (g getStreamBasicChecker) Recv() (*object.GetResponse, error) { - resp, err := g.next.Recv() - if err != nil { - return resp, err - } - - body := resp.GetBody() - if body == nil { - return resp, err - } - - part := body.GetObjectPart() - if _, ok := part.(*object.GetObjectPartInit); ok { - ownerID, err := getObjectOwnerFromMessage(resp) - if err != nil { - return nil, err - } - - if !stickyBitCheck(g.info, ownerID) { - return nil, basicACLErr(g.info) - } else if !eACLCheck(resp, g.info, g.eACLCfg) { - return nil, eACLErr(g.info) +func (g *getStreamBasicChecker) Send(resp *object.GetResponse) error { + if _, ok := resp.GetBody().GetObjectPart().(*object.GetObjectPartInit); ok { + if !eACLCheck(resp, g.info, g.eACLCfg) { + return eACLErr(g.info) } } - return resp, err + return g.GetObjectStream.Send(resp) } func (b Service) findRequestInfo( diff --git a/pkg/services/object/acl/opts.go b/pkg/services/object/acl/opts.go index 13952c446..df3b47a4f 100644 --- a/pkg/services/object/acl/opts.go +++ b/pkg/services/object/acl/opts.go @@ -1,10 +1,10 @@ package acl import ( - "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" + objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl" ) @@ -23,7 +23,7 @@ func WithSenderClassifier(v SenderClassifier) Option { } // WithNextService returns option to set next object service. -func WithNextService(v object.Service) Option { +func WithNextService(v objectSvc.ServiceServer) Option { return func(c *cfg) { c.next = v } diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go new file mode 100644 index 000000000..b692eb7c5 --- /dev/null +++ b/pkg/services/object/get/assemble.go @@ -0,0 +1,130 @@ +package getsvc + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "go.uber.org/zap" +) + +func (exec *execCtx) assemble() { + if !exec.canAssemble() { + exec.log.Debug("can not assemble the object") + return + } + + exec.log.Debug("trying to assemble the object...") + + splitInfo := exec.splitInfo() + + childID := splitInfo.Link() + if childID == nil { + childID = splitInfo.LastPart() + } + + prev, children := exec.initFromChild(childID) + + if len(children) > 0 { + if ok := exec.writeCollectedHeader(); ok { + exec.overtakePayloadDirectly(children) + } + } else if prev != nil { + if ok := exec.writeCollectedHeader(); ok { + // TODO: choose one-by-one restoring algorithm according to size + // * if size > MAX => go right-to-left with HEAD and back with GET + // * else go right-to-left with GET and compose in single object before writing + + if ok := exec.overtakePayloadInReverse(prev); ok { + // payload of all children except the last are written, write last payload + exec.writeObjectPayload(exec.collectedObject) + } + } + } else { + exec.status = statusUndefined + exec.err = object.ErrNotFound + + exec.log.Debug("could not init parent from child") + } +} + +func (exec *execCtx) initFromChild(id *objectSDK.ID) (prev *objectSDK.ID, children []*objectSDK.ID) { + log := exec.log.With(zap.Stringer("child ID", id)) + + log.Debug("starting assembling from child") + + child, ok := exec.getChild(id) + if !ok { + return + } + + par := child.GetParent() + if par == nil { + exec.status = statusUndefined + + log.Debug("received child with empty parent") + + return + } else if !equalAddresses(par.Address(), exec.address()) { + exec.status = statusUndefined + + log.Debug("parent address in child object differs", + zap.Stringer("expected", exec.address()), + zap.Stringer("received", par.Address()), + ) + + return + } + + exec.collectedObject = par + object.NewRawFromObject(exec.collectedObject).SetPayload(child.Payload()) + + return child.PreviousID(), child.Children() +} + +func (exec *execCtx) overtakePayloadDirectly(children []*objectSDK.ID) { + for i := range children { + child, ok := exec.getChild(children[i]) + if !ok { + return + } + + if ok := exec.writeObjectPayload(child); !ok { + return + } + } + + exec.status = statusOK + exec.err = nil +} + +func (exec *execCtx) overtakePayloadInReverse(prev *objectSDK.ID) bool { + chain := make([]*objectSDK.ID, 0) + + // fill the chain end-to-start + for prev != nil { + head, ok := exec.headChild(prev) + if !ok { + return false + } + + chain = append(chain, head.ID()) + + prev = head.PreviousID() + } + + // reverse chain + for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 { + chain[left], chain[right] = chain[right], chain[left] + } + + exec.overtakePayloadDirectly(chain) + + exec.status = statusOK + exec.err = nil + + return true +} + +func equalAddresses(a, b *objectSDK.Address) bool { + return a.ContainerID().Equal(b.ContainerID()) && + a.ObjectID().Equal(b.ObjectID()) +} diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go new file mode 100644 index 000000000..d964a2690 --- /dev/null +++ b/pkg/services/object/get/container.go @@ -0,0 +1,54 @@ +package getsvc + +import ( + "context" + + "go.uber.org/zap" +) + +func (exec *execCtx) executeOnContainer() { + if exec.isLocal() { + exec.log.Debug("return result directly") + return + } + + exec.log.Debug("trying to execute in container...") + + traverser, ok := exec.generateTraverser(exec.address()) + if !ok { + return + } + + ctx, cancel := context.WithCancel(exec.context()) + defer cancel() + + exec.status = statusUndefined + +loop: + for { + addrs := traverser.Next() + if len(addrs) == 0 { + exec.log.Debug("no more nodes, abort placement iteration") + break + } + + for i := range addrs { + select { + case <-ctx.Done(): + exec.log.Debug("interrupt placement iteration by context", + zap.String("error", ctx.Err().Error()), + ) + break loop + default: + } + + // TODO: consider parallel execution + // TODO: consider optimization: if status == SPLIT we can continue until + // we reach the best result - split info with linking object ID. + if exec.processNode(ctx, addrs[i]) { + exec.log.Debug("completing the operation") + break loop + } + } + } +} diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go new file mode 100644 index 000000000..9abbe49ad --- /dev/null +++ b/pkg/services/object/get/exec.go @@ -0,0 +1,250 @@ +package getsvc + +import ( + "context" + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +type statusError struct { + status int + err error +} + +type execCtx struct { + svc *Service + + ctx context.Context + + prm Prm + + statusError + + infoSplit *objectSDK.SplitInfo + + log *logger.Logger + + collectedObject *object.Object +} + +const ( + statusUndefined int = iota + statusOK + statusINHUMED + statusVIRTUAL +) + +func (exec *execCtx) setLogger(l *logger.Logger) { + exec.log = l.With( + zap.String("request", "GET"), + zap.Stringer("address", exec.address()), + zap.Bool("raw", exec.isRaw()), + zap.Bool("local", exec.isLocal()), + zap.Bool("with session", exec.prm.common.SessionToken() != nil), + zap.Bool("with bearer", exec.prm.common.BearerToken() != nil), + ) +} + +func (exec execCtx) context() context.Context { + return exec.ctx +} + +func (exec execCtx) isLocal() bool { + return exec.prm.common.LocalOnly() +} + +func (exec execCtx) isRaw() bool { + return exec.prm.raw +} + +func (exec execCtx) address() *objectSDK.Address { + return exec.prm.addr +} + +func (exec execCtx) key() *ecdsa.PrivateKey { + return exec.prm.key +} + +func (exec execCtx) callOptions() []client.CallOption { + return exec.prm.callOpts +} + +func (exec execCtx) remotePrm() *client.GetObjectParams { + return new(client.GetObjectParams). + WithAddress(exec.prm.addr). + WithRawFlag(exec.prm.raw) +} + +func (exec *execCtx) canAssemble() bool { + return exec.svc.assembly && !exec.isRaw() +} + +func (exec *execCtx) splitInfo() *objectSDK.SplitInfo { + return exec.infoSplit +} + +func (exec *execCtx) containerID() *container.ID { + return exec.address().ContainerID() +} + +func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Traverser, bool) { + t, err := exec.svc.traverserGenerator.GenerateTraverser(addr) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not generate container traverser", + zap.String("error", err.Error()), + ) + + return nil, false + case err == nil: + return t, true + } +} + +func (exec *execCtx) getChild(id *objectSDK.ID) (*object.Object, bool) { + w := newSimpleObjectWriter() + + p := exec.prm + p.common = p.common.WithLocalOnly(false) + p.SetObjectWriter(w) + + addr := objectSDK.NewAddress() + addr.SetContainerID(exec.address().ContainerID()) + addr.SetObjectID(id) + + p.SetAddress(addr) + + exec.statusError = exec.svc.get(exec.context(), p) + + return w.object(), exec.status == statusOK +} + +func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) { + childAddr := objectSDK.NewAddress() + childAddr.SetContainerID(exec.containerID()) + childAddr.SetObjectID(id) + + p := exec.prm + p.common = p.common.WithLocalOnly(false) + p.SetAddress(childAddr) + + header, err := exec.svc.headSvc.head(exec.context(), p) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not get child object header", + zap.Stringer("child ID", id), + zap.String("error", err.Error()), + ) + + return nil, false + case err == nil: + exec.status = statusOK + exec.err = nil + + return header, true + } +} + +func (exec execCtx) remoteClient(node *network.Address) (getClient, bool) { + ipAddr, err := node.IPAddrString() + + log := exec.log.With(zap.Stringer("node", node)) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + log.Debug("could not calculate node IP address") + case err == nil: + c, err := exec.svc.clientCache.get(exec.key(), ipAddr) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + log.Debug("could not construct remote node client") + case err == nil: + return c, true + } + } + + return nil, false +} + +func mergeSplitInfo(dst, src *objectSDK.SplitInfo) { + if last := src.LastPart(); last != nil { + dst.SetLastPart(last) + } + + if link := src.Link(); link != nil { + dst.SetLink(link) + } + + if splitID := src.SplitID(); splitID != nil { + dst.SetSplitID(splitID) + } +} + +func (exec *execCtx) writeCollectedHeader() bool { + err := exec.prm.objWriter.WriteHeader( + object.NewRawFromObject(exec.collectedObject).CutPayload().Object(), + ) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not write header", + zap.String("error", err.Error()), + ) + case err == nil: + exec.status = statusOK + exec.err = nil + } + + return exec.status == statusOK +} + +func (exec *execCtx) writeObjectPayload(obj *object.Object) bool { + err := exec.prm.objWriter.WriteChunk(obj.Payload()) + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not write payload chunk", + zap.String("error", err.Error()), + ) + case err == nil: + exec.status = statusOK + exec.err = nil + } + + return err == nil +} + +func (exec *execCtx) writeCollectedObject() { + if ok := exec.writeCollectedHeader(); ok { + exec.writeObjectPayload(exec.collectedObject) + } +} diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go new file mode 100644 index 000000000..b0e4e70d6 --- /dev/null +++ b/pkg/services/object/get/get.go @@ -0,0 +1,59 @@ +package getsvc + +import ( + "context" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "go.uber.org/zap" +) + +// Get serves a request to get an object by address, and returns Streamer instance. +func (s *Service) Get(ctx context.Context, prm Prm) error { + return s.get(ctx, prm).err +} + +func (s *Service) get(ctx context.Context, prm Prm) statusError { + exec := &execCtx{ + svc: s, + ctx: ctx, + prm: prm, + infoSplit: objectSDK.NewSplitInfo(), + } + + exec.setLogger(s.log) + + exec.execute() + + return exec.statusError +} + +func (exec *execCtx) execute() { + exec.log.Debug("serving request...") + + // perform local operation + exec.executeLocal() + + exec.analyzeStatus(true) +} + +func (exec *execCtx) analyzeStatus(execCnr bool) { + // analyze local result + switch exec.status { + case statusOK: + exec.log.Debug("operation finished successfully") + case statusINHUMED: + exec.log.Debug("requested object was marked as removed") + case statusVIRTUAL: + exec.log.Debug("requested object is virtual") + exec.assemble() + default: + exec.log.Debug("operation finished with error", + zap.String("error", exec.err.Error()), + ) + + if execCnr { + exec.executeOnContainer() + exec.analyzeStatus(false) + } + } +} diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go new file mode 100644 index 000000000..91b9f6181 --- /dev/null +++ b/pkg/services/object/get/get_test.go @@ -0,0 +1,847 @@ +package getsvc + +import ( + "context" + "crypto/ecdsa" + "crypto/rand" + "crypto/sha256" + "fmt" + "strconv" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/nspcc-dev/neofs-node/pkg/util/logger/test" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +type testStorage struct { + inhumed map[string]struct{} + + virtual map[string]*objectSDK.SplitInfo + + phy map[string]*object.Object +} + +type testTraverserGenerator struct { + c *container.Container + b placement.Builder +} + +type testPlacementBuilder struct { + vectors map[string][]netmap.Nodes +} + +type testClientCache struct { + clients map[string]*testClient +} + +type testClient struct { + results map[string]struct { + obj *object.RawObject + err error + } +} + +func newTestStorage() *testStorage { + return &testStorage{ + inhumed: make(map[string]struct{}), + virtual: make(map[string]*objectSDK.SplitInfo), + phy: make(map[string]*object.Object), + } +} + +func (g *testTraverserGenerator) GenerateTraverser(addr *objectSDK.Address) (*placement.Traverser, error) { + return placement.NewTraverser( + placement.ForContainer(g.c), + placement.ForObject(addr.ObjectID()), + placement.UseBuilder(g.b), + placement.SuccessAfter(1), + ) +} + +func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap.PlacementPolicy) ([]netmap.Nodes, error) { + vs, ok := p.vectors[addr.String()] + if !ok { + return nil, errors.New("vectors for address not found") + } + + return vs, nil +} + +func (c *testClientCache) get(_ *ecdsa.PrivateKey, addr string) (getClient, error) { + v, ok := c.clients[addr] + if !ok { + return nil, errors.New("could not construct client") + } + + return v, nil +} + +func newTestClient() *testClient { + return &testClient{ + results: map[string]struct { + obj *object.RawObject + err error + }{}, + } +} + +func (c *testClient) GetObject(_ context.Context, p Prm) (*objectSDK.Object, error) { + v, ok := c.results[p.addr.String()] + if !ok { + return nil, object.ErrNotFound + } + + return v.obj.Object().SDK(), v.err +} + +func (c *testClient) head(_ context.Context, p Prm) (*object.Object, error) { + v, ok := c.results[p.addr.String()] + if !ok { + return nil, object.ErrNotFound + } + + if v.err != nil { + return nil, v.err + } + + return v.obj.CutPayload().Object(), nil +} + +func (c *testClient) addResult(addr *objectSDK.Address, obj *object.RawObject, err error) { + c.results[addr.String()] = struct { + obj *object.RawObject + err error + }{obj: obj, err: err} +} + +func (s *testStorage) Get(addr *objectSDK.Address) (*object.Object, error) { + var ( + ok bool + obj *object.Object + sAddr = addr.String() + ) + + if _, ok = s.inhumed[sAddr]; ok { + return nil, object.ErrAlreadyRemoved + } + + if info, ok := s.virtual[sAddr]; ok { + return nil, objectSDK.NewSplitInfoError(info) + } + + if obj, ok = s.phy[sAddr]; ok { + return obj, nil + } + + return nil, object.ErrNotFound +} + +func (s *testStorage) addPhy(addr *objectSDK.Address, obj *object.RawObject) { + s.phy[addr.String()] = obj.Object() +} + +func (s *testStorage) addVirtual(addr *objectSDK.Address, info *objectSDK.SplitInfo) { + s.virtual[addr.String()] = info +} + +func (s *testStorage) inhume(addr *objectSDK.Address) { + s.inhumed[addr.String()] = struct{}{} +} + +func testSHA256() (cs [sha256.Size]byte) { + rand.Read(cs[:]) + return cs +} + +func generateID() *objectSDK.ID { + id := objectSDK.NewID() + id.SetSHA256(testSHA256()) + + return id +} + +func generateAddress() *objectSDK.Address { + addr := objectSDK.NewAddress() + addr.SetObjectID(generateID()) + + cid := container.NewID() + cid.SetSHA256(testSHA256()) + + addr.SetContainerID(cid) + + return addr +} + +func generateObject(addr *objectSDK.Address, prev *objectSDK.ID, payload []byte, children ...*objectSDK.ID) *object.RawObject { + obj := object.NewRaw() + obj.SetContainerID(addr.ContainerID()) + obj.SetID(addr.ObjectID()) + obj.SetPayload(payload) + obj.SetPreviousID(prev) + obj.SetChildren(children...) + + return obj +} + +func TestGetLocalOnly(t *testing.T) { + ctx := context.Background() + + newSvc := func(storage *testStorage) *Service { + svc := &Service{cfg: new(cfg)} + svc.log = test.NewLogger(false) + svc.localStorage = storage + svc.assembly = true + + return svc + } + + newPrm := func(raw bool, w ObjectWriter) Prm { + return Prm{ + objWriter: w, + raw: raw, + common: new(util.CommonPrm).WithLocalOnly(true), + } + } + + t.Run("OK", func(t *testing.T) { + storage := newTestStorage() + svc := newSvc(storage) + + w := newSimpleObjectWriter() + p := newPrm(false, w) + + addr := generateAddress() + obj := generateObject(addr, nil, nil) + + storage.addPhy(addr, obj) + + p.addr = addr + + storage.addPhy(addr, obj) + + err := svc.Get(ctx, p) + + require.NoError(t, err) + + require.Equal(t, obj.Object(), w.object()) + }) + + t.Run("INHUMED", func(t *testing.T) { + storage := newTestStorage() + svc := newSvc(storage) + + p := newPrm(false, nil) + + addr := generateAddress() + + storage.inhume(addr) + + p.addr = addr + + err := svc.Get(ctx, p) + + require.True(t, errors.Is(err, object.ErrAlreadyRemoved)) + }) + + t.Run("404", func(t *testing.T) { + storage := newTestStorage() + svc := newSvc(storage) + + p := newPrm(false, nil) + + addr := generateAddress() + + p.addr = addr + + err := svc.Get(ctx, p) + + require.True(t, errors.Is(err, object.ErrNotFound)) + }) + + t.Run("VIRTUAL", func(t *testing.T) { + storage := newTestStorage() + svc := newSvc(storage) + + p := newPrm(true, nil) + + addr := generateAddress() + + splitInfo := objectSDK.NewSplitInfo() + splitInfo.SetSplitID(objectSDK.NewSplitID()) + splitInfo.SetLink(generateID()) + splitInfo.SetLastPart(generateID()) + + p.addr = addr + + storage.addVirtual(addr, splitInfo) + + err := svc.Get(ctx, p) + + errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo()) + + require.True(t, errors.As(err, &errSplit)) + + require.Equal(t, splitInfo, errSplit.SplitInfo()) + }) +} + +func testNodeMatrix(t testing.TB, dim []int) ([]netmap.Nodes, [][]string) { + mNodes := make([]netmap.Nodes, len(dim)) + mAddr := make([][]string, len(dim)) + + for i := range dim { + ns := make([]netmap.NodeInfo, dim[i]) + as := make([]string, dim[i]) + + for j := 0; j < dim[i]; j++ { + a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s", + strconv.Itoa(i), + strconv.Itoa(60000+j), + ) + + var err error + na, err := network.AddressFromString(a) + require.NoError(t, err) + + as[j], err = na.IPAddrString() + require.NoError(t, err) + + ni := netmap.NewNodeInfo() + ni.SetAddress(a) + + ns[j] = *ni + } + + mNodes[i] = netmap.NodesFromInfo(ns) + mAddr[i] = as + } + + return mNodes, mAddr +} + +func generateChain(ln int, cid *container.ID) ([]*object.RawObject, []*objectSDK.ID, []byte) { + curID := generateID() + var prevID *objectSDK.ID + + addr := objectSDK.NewAddress() + addr.SetContainerID(cid) + + res := make([]*object.RawObject, 0, ln) + ids := make([]*objectSDK.ID, 0, ln) + payload := make([]byte, 0, ln*10) + + for i := 0; i < ln; i++ { + ids = append(ids, curID) + addr.SetObjectID(curID) + + payloadPart := make([]byte, 10) + rand.Read(payloadPart) + + o := generateObject(addr, prevID, []byte{byte(i)}) + o.SetPayload(payloadPart) + o.SetID(curID) + + payload = append(payload, payloadPart...) + + res = append(res, o) + + prevID = curID + curID = generateID() + } + + return res, ids, payload +} + +func TestGetRemoteSmall(t *testing.T) { + ctx := context.Background() + + cnr := container.New(container.WithPolicy(new(netmap.PlacementPolicy))) + cid := container.CalculateID(cnr) + + newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service { + svc := &Service{cfg: new(cfg)} + svc.log = test.NewLogger(true) + svc.localStorage = newTestStorage() + svc.assembly = true + svc.traverserGenerator = &testTraverserGenerator{ + c: cnr, + b: b, + } + svc.clientCache = c + + return svc + } + + newPrm := func(raw bool, w ObjectWriter) Prm { + return Prm{ + objWriter: w, + raw: raw, + common: new(util.CommonPrm).WithLocalOnly(false), + } + } + + t.Run("OK", func(t *testing.T) { + addr := generateAddress() + addr.SetContainerID(cid) + + ns, as := testNodeMatrix(t, []int{2}) + + builder := &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns, + }, + } + + obj := generateObject(addr, nil, nil) + + c1 := newTestClient() + c1.addResult(addr, obj, nil) + + c2 := newTestClient() + c2.addResult(addr, nil, errors.New("any error")) + + svc := newSvc(builder, &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c1, + as[0][1]: c2, + }, + }) + + w := newSimpleObjectWriter() + + p := newPrm(false, w) + p.addr = addr + + err := svc.Get(ctx, p) + require.NoError(t, err) + require.Equal(t, obj.Object(), w.object()) + + *c1, *c2 = *c2, *c1 + + err = svc.Get(ctx, p) + require.NoError(t, err) + require.Equal(t, obj.Object(), w.object()) + }) + + t.Run("INHUMED", func(t *testing.T) { + addr := generateAddress() + addr.SetContainerID(cid) + + ns, as := testNodeMatrix(t, []int{2}) + + builder := &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns, + }, + } + + c1 := newTestClient() + c1.addResult(addr, nil, errors.New("any error")) + + c2 := newTestClient() + c2.addResult(addr, nil, object.ErrAlreadyRemoved) + + svc := newSvc(builder, &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c1, + as[0][1]: c2, + }, + }) + + p := newPrm(false, nil) + p.addr = addr + + err := svc.Get(ctx, p) + require.True(t, errors.Is(err, object.ErrAlreadyRemoved)) + }) + + t.Run("404", func(t *testing.T) { + addr := generateAddress() + addr.SetContainerID(cid) + + ns, as := testNodeMatrix(t, []int{2}) + + builder := &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns, + }, + } + + c1 := newTestClient() + c1.addResult(addr, nil, errors.New("any error")) + + c2 := newTestClient() + c2.addResult(addr, nil, errors.New("any error")) + + svc := newSvc(builder, &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c1, + as[0][1]: c2, + }, + }) + + p := newPrm(false, nil) + p.addr = addr + + err := svc.Get(ctx, p) + require.True(t, errors.Is(err, object.ErrNotFound)) + }) + + t.Run("VIRTUAL", func(t *testing.T) { + t.Run("linking", func(t *testing.T) { + t.Run("get linking failure", func(t *testing.T) { + addr := generateAddress() + addr.SetContainerID(cid) + addr.SetObjectID(generateID()) + + ns, as := testNodeMatrix(t, []int{2}) + + splitInfo := objectSDK.NewSplitInfo() + splitInfo.SetLink(generateID()) + + splitAddr := objectSDK.NewAddress() + splitAddr.SetContainerID(cid) + splitAddr.SetObjectID(splitInfo.Link()) + + c1 := newTestClient() + c1.addResult(addr, nil, errors.New("any error")) + c1.addResult(splitAddr, nil, object.ErrNotFound) + + c2 := newTestClient() + c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) + c2.addResult(splitAddr, nil, object.ErrNotFound) + + builder := &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns, + splitAddr.String(): ns, + }, + } + + svc := newSvc(builder, &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c1, + as[0][1]: c2, + }, + }) + + p := newPrm(false, nil) + p.addr = addr + + err := svc.Get(ctx, p) + require.True(t, errors.Is(err, object.ErrNotFound)) + }) + + t.Run("get chain element failure", func(t *testing.T) { + addr := generateAddress() + addr.SetContainerID(cid) + addr.SetObjectID(generateID()) + + srcObj := generateObject(addr, nil, nil) + + ns, as := testNodeMatrix(t, []int{2}) + + splitInfo := objectSDK.NewSplitInfo() + splitInfo.SetLink(generateID()) + + children, childIDs, _ := generateChain(2, cid) + + linkAddr := objectSDK.NewAddress() + linkAddr.SetContainerID(cid) + linkAddr.SetObjectID(splitInfo.Link()) + + linkingObj := generateObject(linkAddr, nil, nil, childIDs...) + linkingObj.SetParentID(addr.ObjectID()) + linkingObj.SetParent(srcObj.Object().SDK()) + + child1Addr := objectSDK.NewAddress() + child1Addr.SetContainerID(cid) + child1Addr.SetObjectID(childIDs[0]) + + child2Addr := objectSDK.NewAddress() + child2Addr.SetContainerID(cid) + child2Addr.SetObjectID(childIDs[1]) + + c1 := newTestClient() + c1.addResult(addr, nil, errors.New("any error")) + c1.addResult(linkAddr, nil, errors.New("any error")) + c1.addResult(child1Addr, nil, errors.New("any error")) + c1.addResult(child2Addr, nil, errors.New("any error")) + + c2 := newTestClient() + c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) + c2.addResult(linkAddr, linkingObj, nil) + c2.addResult(child1Addr, children[0], nil) + c2.addResult(child2Addr, nil, errors.New("any error")) + + builder := &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns, + linkAddr.String(): ns, + child1Addr.String(): ns, + child2Addr.String(): ns, + }, + } + + svc := newSvc(builder, &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c1, + as[0][1]: c2, + }, + }) + + p := newPrm(false, newSimpleObjectWriter()) + p.addr = addr + + err := svc.Get(ctx, p) + require.True(t, errors.Is(err, object.ErrNotFound)) + }) + + t.Run("OK", func(t *testing.T) { + addr := generateAddress() + addr.SetContainerID(cid) + addr.SetObjectID(generateID()) + + srcObj := generateObject(addr, nil, nil) + + ns, as := testNodeMatrix(t, []int{2}) + + splitInfo := objectSDK.NewSplitInfo() + splitInfo.SetLink(generateID()) + + children, childIDs, payload := generateChain(2, cid) + srcObj.SetPayload(payload) + + linkAddr := objectSDK.NewAddress() + linkAddr.SetContainerID(cid) + linkAddr.SetObjectID(splitInfo.Link()) + + linkingObj := generateObject(linkAddr, nil, nil, childIDs...) + linkingObj.SetParentID(addr.ObjectID()) + linkingObj.SetParent(srcObj.Object().SDK()) + + child1Addr := objectSDK.NewAddress() + child1Addr.SetContainerID(cid) + child1Addr.SetObjectID(childIDs[0]) + + child2Addr := objectSDK.NewAddress() + child2Addr.SetContainerID(cid) + child2Addr.SetObjectID(childIDs[1]) + + c1 := newTestClient() + c1.addResult(addr, nil, errors.New("any error")) + c1.addResult(linkAddr, nil, errors.New("any error")) + c1.addResult(child1Addr, nil, errors.New("any error")) + c1.addResult(child2Addr, nil, errors.New("any error")) + + c2 := newTestClient() + c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) + c2.addResult(linkAddr, linkingObj, nil) + c2.addResult(child1Addr, children[0], nil) + c2.addResult(child2Addr, children[1], nil) + + builder := &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns, + linkAddr.String(): ns, + child1Addr.String(): ns, + child2Addr.String(): ns, + }, + } + + svc := newSvc(builder, &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c1, + as[0][1]: c2, + }, + }) + + w := newSimpleObjectWriter() + + p := newPrm(false, w) + p.addr = addr + + err := svc.Get(ctx, p) + require.NoError(t, err) + require.Equal(t, srcObj.Object(), w.object()) + }) + }) + + t.Run("right child", func(t *testing.T) { + t.Run("get right child failure", func(t *testing.T) { + addr := generateAddress() + addr.SetContainerID(cid) + addr.SetObjectID(generateID()) + + ns, as := testNodeMatrix(t, []int{2}) + + splitInfo := objectSDK.NewSplitInfo() + splitInfo.SetLastPart(generateID()) + + splitAddr := objectSDK.NewAddress() + splitAddr.SetContainerID(cid) + splitAddr.SetObjectID(splitInfo.LastPart()) + + c1 := newTestClient() + c1.addResult(addr, nil, errors.New("any error")) + c1.addResult(splitAddr, nil, object.ErrNotFound) + + c2 := newTestClient() + c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) + c2.addResult(splitAddr, nil, object.ErrNotFound) + + builder := &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns, + splitAddr.String(): ns, + }, + } + + svc := newSvc(builder, &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c1, + as[0][1]: c2, + }, + }) + + p := newPrm(false, nil) + p.addr = addr + + err := svc.Get(ctx, p) + require.True(t, errors.Is(err, object.ErrNotFound)) + }) + + t.Run("get chain element failure", func(t *testing.T) { + addr := generateAddress() + addr.SetContainerID(cid) + addr.SetObjectID(generateID()) + + srcObj := generateObject(addr, nil, nil) + + ns, as := testNodeMatrix(t, []int{2}) + + splitInfo := objectSDK.NewSplitInfo() + splitInfo.SetLastPart(generateID()) + + children, _, _ := generateChain(2, cid) + + rightAddr := objectSDK.NewAddress() + rightAddr.SetContainerID(cid) + rightAddr.SetObjectID(splitInfo.LastPart()) + + rightObj := children[len(children)-1] + + rightObj.SetParentID(addr.ObjectID()) + rightObj.SetParent(srcObj.Object().SDK()) + + preRightAddr := children[len(children)-2].Object().Address() + + c1 := newTestClient() + c1.addResult(addr, nil, errors.New("any error")) + c1.addResult(rightAddr, nil, errors.New("any error")) + + c2 := newTestClient() + c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) + c2.addResult(rightAddr, rightObj, nil) + + builder := &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns, + rightAddr.String(): ns, + preRightAddr.String(): ns, + preRightAddr.String(): ns, + }, + } + + svc := newSvc(builder, &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c1, + as[0][1]: c2, + }, + }) + + headSvc := newTestClient() + headSvc.addResult(preRightAddr, nil, object.ErrNotFound) + svc.headSvc = headSvc + + p := newPrm(false, newSimpleObjectWriter()) + p.addr = addr + + err := svc.Get(ctx, p) + require.True(t, errors.Is(err, object.ErrNotFound)) + }) + + t.Run("OK", func(t *testing.T) { + addr := generateAddress() + addr.SetContainerID(cid) + addr.SetObjectID(generateID()) + + srcObj := generateObject(addr, nil, nil) + + ns, as := testNodeMatrix(t, []int{2}) + + splitInfo := objectSDK.NewSplitInfo() + splitInfo.SetLastPart(generateID()) + + children, _, payload := generateChain(2, cid) + srcObj.SetPayload(payload) + + rightObj := children[len(children)-1] + + rightObj.SetID(splitInfo.LastPart()) + rightObj.SetParentID(addr.ObjectID()) + rightObj.SetParent(srcObj.Object().SDK()) + + c1 := newTestClient() + c1.addResult(addr, nil, errors.New("any error")) + + for i := range children { + c1.addResult(children[i].Object().Address(), nil, errors.New("any error")) + } + + c2 := newTestClient() + c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) + + for i := range children { + c2.addResult(children[i].Object().Address(), children[i], nil) + } + + builder := &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{}, + } + + builder.vectors[addr.String()] = ns + + for i := range children { + builder.vectors[children[i].Object().Address().String()] = ns + } + + svc := newSvc(builder, &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c1, + as[0][1]: c2, + }, + }) + + svc.headSvc = c2 + + w := newSimpleObjectWriter() + + p := newPrm(false, w) + p.addr = addr + + err := svc.Get(ctx, p) + require.NoError(t, err) + require.Equal(t, srcObj.Object(), w.object()) + }) + }) + }) +} diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go new file mode 100644 index 000000000..1a31e9b7a --- /dev/null +++ b/pkg/services/object/get/local.go @@ -0,0 +1,37 @@ +package getsvc + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +func (exec *execCtx) executeLocal() { + var err error + + exec.collectedObject, err = exec.svc.localStorage.Get(exec.address()) + + var errSplitInfo *objectSDK.SplitInfoError + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("local get failed", + zap.String("error", err.Error()), + ) + case err == nil: + exec.status = statusOK + exec.err = nil + exec.writeCollectedObject() + case errors.Is(err, object.ErrAlreadyRemoved): + exec.status = statusINHUMED + exec.err = object.ErrAlreadyRemoved + case errors.As(err, &errSplitInfo): + exec.status = statusVIRTUAL + mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo()) + exec.err = objectSDK.NewSplitInfoError(exec.infoSplit) + } +} diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index 496d77fbf..71c91e130 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -1,30 +1,59 @@ package getsvc import ( - "github.com/nspcc-dev/neofs-api-go/pkg/object" + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) +// Prm groups parameters of Get service call. type Prm struct { + objWriter ObjectWriter + + // TODO: replace key and callOpts to CommonPrm + key *ecdsa.PrivateKey + + callOpts []client.CallOption + common *util.CommonPrm - full bool + // TODO: use parameters from NeoFS SDK + addr *objectSDK.Address - addr *object.Address + raw bool } -func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm { - if p != nil { - p.common = v - } - - return p +// ObjectWriter is an interface of target component to write object. +type ObjectWriter interface { + WriteHeader(*object.Object) error + WriteChunk([]byte) error } -func (p *Prm) WithAddress(v *object.Address) *Prm { - if p != nil { - p.addr = v - } - - return p +// SetObjectWriter sets target component to write the object. +func (p *Prm) SetObjectWriter(w ObjectWriter) { + p.objWriter = w +} + +// SetPrivateKey sets private key to use during execution. +func (p *Prm) SetPrivateKey(key *ecdsa.PrivateKey) { + p.key = key +} + +// SetRemoteCallOptions sets call options remote remote client calls. +func (p *Prm) SetRemoteCallOptions(opts ...client.CallOption) { + p.callOpts = opts +} + +// SetAddress sets address of the requested object. +func (p *Prm) SetAddress(addr *objectSDK.Address) { + p.addr = addr +} + +// SetRaw sets raw flag. If flag is set, +// object assembling will not be undertaken. +func (p *Prm) SetRaw(raw bool) { + p.raw = raw } diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go new file mode 100644 index 000000000..ee73db205 --- /dev/null +++ b/pkg/services/object/get/remote.go @@ -0,0 +1,50 @@ +package getsvc + +import ( + "context" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +func (exec *execCtx) processNode(ctx context.Context, addr *network.Address) bool { + log := exec.log.With(zap.Stringer("remote node", addr)) + + log.Debug("processing node...") + + client, ok := exec.remoteClient(addr) + if !ok { + return true + } + + obj, err := client.GetObject(ctx, exec.prm) + + var errSplitInfo *objectSDK.SplitInfoError + + switch { + default: + exec.status = statusUndefined + exec.err = object.ErrNotFound + + log.Debug("remote call failed", + zap.String("error", err.Error()), + ) + case err == nil: + exec.status = statusOK + exec.err = nil + exec.collectedObject = object.NewFromSDK(obj) + exec.writeCollectedObject() + case errors.Is(err, object.ErrAlreadyRemoved): + exec.status = statusINHUMED + exec.err = object.ErrAlreadyRemoved + case errors.As(err, &errSplitInfo): + exec.status = statusVIRTUAL + mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo()) + exec.err = objectSDK.NewSplitInfoError(exec.infoSplit) + } + + return exec.status != statusUndefined +} diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index 030acee00..5b05fc73e 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -2,26 +2,67 @@ package getsvc import ( "context" + "crypto/ecdsa" - rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" - "github.com/pkg/errors" + "github.com/nspcc-dev/neofs-api-go/pkg/client" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" + "github.com/nspcc-dev/neofs-node/pkg/network/cache" + headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" ) +// Service utility serving requests of Object.Get service. type Service struct { *cfg } +// Option is a Service's constructor option. type Option func(*cfg) +type getClient interface { + GetObject(context.Context, Prm) (*objectSDK.Object, error) +} + type cfg struct { - rngSvc *rangesvc.Service + assembly bool + + log *logger.Logger + + headSvc interface { + head(context.Context, Prm) (*object.Object, error) + } + + localStorage interface { + Get(*objectSDK.Address) (*object.Object, error) + } + + clientCache interface { + get(*ecdsa.PrivateKey, string) (getClient, error) + } + + traverserGenerator interface { + GenerateTraverser(*objectSDK.Address) (*placement.Traverser, error) + } } func defaultCfg() *cfg { - return new(cfg) + return &cfg{ + assembly: true, + log: zap.L(), + headSvc: new(headSvcWrapper), + localStorage: new(storageEngineWrapper), + clientCache: new(clientCacheWrapper), + } } -func NewService(opts ...Option) *Service { +// New creates, initializes and returns utility serving +// Object.Get service requests. +func New(opts ...Option) *Service { c := defaultCfg() for i := range opts { @@ -33,23 +74,53 @@ func NewService(opts ...Option) *Service { } } -func (s *Service) Get(ctx context.Context, prm *Prm) (*Streamer, error) { - r, err := s.rngSvc.GetRange(ctx, new(rangesvc.Prm). - WithAddress(prm.addr). - FullRange(). - WithCommonPrm(prm.common), - ) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not get range", s) - } - - return &Streamer{ - rngRes: r, - }, nil -} - -func WithRangeService(v *rangesvc.Service) Option { +// WithLogger returns option to specify Get service's logger. +func WithLogger(l *logger.Logger) Option { return func(c *cfg) { - c.rngSvc = v + c.log = l.With(zap.String("component", "Object.Get service")) + } +} + +// WithoutAssembly returns option to disable object assembling. +func WithoutAssembly() Option { + return func(c *cfg) { + c.assembly = false + } +} + +// WithLocalStorageEngine returns option to set local storage +// instance. +func WithLocalStorageEngine(e *engine.StorageEngine) Option { + return func(c *cfg) { + c.localStorage.(*storageEngineWrapper).engine = e + } +} + +// WithClientCache returns option to set cache of remote node clients. +func WithClientCache(v *cache.ClientCache) Option { + return func(c *cfg) { + c.clientCache.(*clientCacheWrapper).cache = v + } +} + +// WithClientOptions returns option to specify options of remote node clients. +func WithClientOptions(opts ...client.Option) Option { + return func(c *cfg) { + c.clientCache.(*clientCacheWrapper).opts = opts + } +} + +// WithTraverserGenerator returns option to set generator of +// placement traverser to get the objects from containers. +func WithTraverserGenerator(t *util.TraverserGenerator) Option { + return func(c *cfg) { + c.traverserGenerator = t + } +} + +// WithHeadService returns option to set the utility serving object.Head. +func WithHeadService(svc *headsvc.Service) Option { + return func(c *cfg) { + c.headSvc.(*headSvcWrapper).svc = svc } } diff --git a/pkg/services/object/get/streamer.go b/pkg/services/object/get/streamer.go deleted file mode 100644 index 28ac9507b..000000000 --- a/pkg/services/object/get/streamer.go +++ /dev/null @@ -1,26 +0,0 @@ -package getsvc - -import ( - rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" - "github.com/pkg/errors" -) - -type Streamer struct { - headSent bool - - rngRes *rangesvc.Result -} - -func (p *Streamer) Recv() (interface{}, error) { - if !p.headSent { - p.headSent = true - return p.rngRes.Head(), nil - } - - rngResp, err := p.rngRes.Stream().Recv() - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not receive range response", p) - } - - return rngResp.PayloadChunk(), nil -} diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go new file mode 100644 index 000000000..bc84ccf15 --- /dev/null +++ b/pkg/services/object/get/util.go @@ -0,0 +1,110 @@ +package getsvc + +import ( + "context" + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" + "github.com/nspcc-dev/neofs-node/pkg/network/cache" + headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" +) + +type simpleObjectWriter struct { + obj *object.RawObject + + payload []byte +} + +type clientCacheWrapper struct { + cache *cache.ClientCache + + opts []client.Option +} + +type clientWrapper struct { + client *client.Client +} + +type storageEngineWrapper struct { + engine *engine.StorageEngine +} + +type headSvcWrapper struct { + svc *headsvc.Service +} + +func newSimpleObjectWriter() *simpleObjectWriter { + return new(simpleObjectWriter) +} + +func (s *simpleObjectWriter) WriteHeader(obj *object.Object) error { + s.obj = object.NewRawFromObject(obj) + + s.payload = make([]byte, 0, obj.PayloadSize()) + + return nil +} + +func (s *simpleObjectWriter) WriteChunk(p []byte) error { + s.payload = append(s.payload, p...) + return nil +} + +func (s *simpleObjectWriter) Close() error { + return nil +} + +func (s *simpleObjectWriter) object() *object.Object { + if len(s.payload) > 0 { + s.obj.SetPayload(s.payload) + } + + return s.obj.Object() +} + +func (c *clientCacheWrapper) get(key *ecdsa.PrivateKey, addr string) (getClient, error) { + clt, err := c.cache.Get(key, addr, c.opts...) + + return &clientWrapper{ + client: clt, + }, err +} + +func (c *clientWrapper) GetObject(ctx context.Context, p Prm) (*objectSDK.Object, error) { + // we don't specify payload writer because we accumulate + // the object locally (even huge). + return c.client.GetObject(ctx, + new(client.GetObjectParams). + WithAddress(p.addr). + WithRawFlag(true), + p.callOpts..., + ) +} + +func (e *storageEngineWrapper) Get(addr *objectSDK.Address) (*object.Object, error) { + r, err := e.engine.Get(new(engine.GetPrm). + WithAddress(addr), + ) + if err != nil { + return nil, err + } + + return r.Object(), nil +} + +func (s *headSvcWrapper) head(ctx context.Context, p Prm) (*object.Object, error) { + r, err := s.svc.Head(ctx, new(headsvc.Prm). + WithAddress(p.addr). + WithCommonPrm(p.common). + Short(false), + ) + + if err != nil { + return nil, err + } + + return r.Header(), nil +} diff --git a/pkg/services/object/get/v2/service.go b/pkg/services/object/get/v2/service.go index d8637b515..4f3b88e13 100644 --- a/pkg/services/object/get/v2/service.go +++ b/pkg/services/object/get/v2/service.go @@ -1,10 +1,11 @@ package getsvc import ( - "context" - + "github.com/nspcc-dev/neofs-api-go/pkg/object" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" + objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/pkg/errors" ) @@ -18,6 +19,8 @@ type Option func(*cfg) type cfg struct { svc *getsvc.Service + + keyStorage *objutil.KeyStorage } // NewService constructs Service instance from provided options. @@ -34,13 +37,22 @@ func NewService(opts ...Option) *Service { } // Get calls internal service and returns v2 object stream. -func (s *Service) Get(ctx context.Context, req *objectV2.GetRequest) (objectV2.GetObjectStreamer, error) { - stream, err := s.svc.Get(ctx, toPrm(req)) +func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) error { + p, err := s.toPrm(req, stream) if err != nil { - return nil, errors.Wrapf(err, "(%T) could not get object payload range data", s) + return err } - return fromResponse(stream), nil + err = s.svc.Get(stream.Context(), *p) + + var splitErr *object.SplitInfoError + + switch { + case errors.As(err, &splitErr): + return stream.Send(splitInfoResponse(splitErr.SplitInfo())) + default: + return err + } } func WithInternalService(v *getsvc.Service) Option { @@ -48,3 +60,10 @@ func WithInternalService(v *getsvc.Service) Option { c.svc = v } } + +// WithKeyStorage returns option to set local private key storage. +func WithKeyStorage(ks *objutil.KeyStorage) Option { + return func(c *cfg) { + c.keyStorage = ks + } +} diff --git a/pkg/services/object/get/v2/streamer.go b/pkg/services/object/get/v2/streamer.go index 2438310fe..0e18e2e10 100644 --- a/pkg/services/object/get/v2/streamer.go +++ b/pkg/services/object/get/v2/streamer.go @@ -1,47 +1,40 @@ package getsvc import ( - "fmt" - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-node/pkg/core/object" - getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" - "github.com/pkg/errors" + objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" ) -type streamer struct { - stream *getsvc.Streamer - - body *objectV2.GetResponseBody +type streamObjectWriter struct { + objectSvc.GetObjectStream } -func (s *streamer) Recv() (*objectV2.GetResponse, error) { - r, err := s.stream.Recv() - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not receive get response", s) - } +func (s *streamObjectWriter) WriteHeader(obj *object.Object) error { + p := new(objectV2.GetObjectPartInit) - switch v := r.(type) { - case *object.Object: - oV2 := v.ToV2() + objV2 := obj.ToV2() + p.SetObjectID(objV2.GetObjectID()) + p.SetHeader(objV2.GetHeader()) + p.SetSignature(objV2.GetSignature()) - partInit := new(objectV2.GetObjectPartInit) - partInit.SetHeader(oV2.GetHeader()) - partInit.SetSignature(oV2.GetSignature()) - partInit.SetObjectID(oV2.GetObjectID()) - - s.body.SetObjectPart(partInit) - case []byte: - partChunk := new(objectV2.GetObjectPartChunk) - partChunk.SetChunk(v) - - s.body.SetObjectPart(partChunk) - default: - panic(fmt.Sprintf("unexpected response type %T from %T", r, s.stream)) - } - - resp := new(objectV2.GetResponse) - resp.SetBody(s.body) - - return resp, nil + return s.GetObjectStream.Send(newResponse(p)) +} + +func (s *streamObjectWriter) WriteChunk(chunk []byte) error { + p := new(objectV2.GetObjectPartChunk) + p.SetChunk(chunk) + + return s.GetObjectStream.Send(newResponse(p)) +} + +func newResponse(p objectV2.GetObjectPart) *objectV2.GetResponse { + r := new(objectV2.GetResponse) + + body := new(objectV2.GetResponseBody) + r.SetBody(body) + + body.SetObjectPart(p) + + return r } diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 763c264a3..d3e1b8a74 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -1,23 +1,62 @@ package getsvc import ( + "github.com/nspcc-dev/neofs-api-go/pkg" + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/token" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/session" + objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) -func toPrm(req *objectV2.GetRequest) *getsvc.Prm { - return new(getsvc.Prm). - WithAddress( - object.NewAddressFromV2(req.GetBody().GetAddress()), - ). - WithCommonPrm(util.CommonPrmFromV2(req)) +func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) { + meta := req.GetMetaHeader() + + key, err := s.keyStorage.GetKey(token.NewSessionTokenFromV2(meta.GetSessionToken())) + if err != nil { + return nil, err + } + + p := new(getsvc.Prm) + p.SetPrivateKey(key) + + body := req.GetBody() + p.SetAddress(object.NewAddressFromV2(body.GetAddress())) + p.SetRaw(body.GetRaw()) + p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...) + p.SetObjectWriter(&streamObjectWriter{stream}) + + return p, nil } -func fromResponse(res *getsvc.Streamer) objectV2.GetObjectStreamer { - return &streamer{ - stream: res, - body: new(objectV2.GetResponseBody), +// can be shared accross all services +func remoteCallOptionsFromMeta(meta *session.RequestMetaHeader) []client.CallOption { + xHdrs := meta.GetXHeaders() + + opts := make([]client.CallOption, 0, 3+len(xHdrs)) + + opts = append(opts, + client.WithBearer(token.NewBearerTokenFromV2(meta.GetBearerToken())), + client.WithSession(token.NewSessionTokenFromV2(meta.GetSessionToken())), + client.WithTTL(meta.GetTTL()-1), + ) + + for i := range xHdrs { + opts = append(opts, client.WithXHeader(pkg.NewXHeaderFromV2(xHdrs[i]))) } + + return opts +} + +func splitInfoResponse(info *object.SplitInfo) *objectV2.GetResponse { + resp := new(objectV2.GetResponse) + + body := new(objectV2.GetResponseBody) + resp.SetBody(body) + + body.SetObjectPart(info.ToV2()) + + return resp } diff --git a/pkg/services/object/response.go b/pkg/services/object/response.go index 94be331a6..b19a9a929 100644 --- a/pkg/services/object/response.go +++ b/pkg/services/object/response.go @@ -9,10 +9,10 @@ import ( "github.com/pkg/errors" ) -type responseService struct { +type ResponseService struct { respSvc *response.Service - svc object.Service + svc ServiceServer } type searchStreamResponser struct { @@ -20,7 +20,9 @@ type searchStreamResponser struct { } type getStreamResponser struct { - stream *response.ServerMessageStreamer + util.ServerStream + + respWriter util.ResponseMessageWriter } type getRangeStreamResponser struct { @@ -33,42 +35,24 @@ type putStreamResponser struct { // NewResponseService returns object service instance that passes internal service // call to response service. -func NewResponseService(objSvc object.Service, respSvc *response.Service) object.Service { - return &responseService{ +func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *ResponseService { + return &ResponseService{ respSvc: respSvc, svc: objSvc, } } -func (s *getStreamResponser) Recv() (*object.GetResponse, error) { - r, err := s.stream.Recv() - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not receive response", s) - } - - return r.(*object.GetResponse), nil +func (s *getStreamResponser) Send(resp *object.GetResponse) error { + return s.respWriter(resp) } -func (s *responseService) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) { - stream, err := s.respSvc.HandleServerStreamRequest(ctx, req, - func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) { - stream, err := s.svc.Get(ctx, req.(*object.GetRequest)) - if err != nil { - return nil, err - } - - return func() (util.ResponseMessage, error) { - return stream.Recv() - }, nil - }, - ) - if err != nil { - return nil, err - } - - return &getStreamResponser{ - stream: stream, - }, nil +func (s *ResponseService) Get(req *object.GetRequest, stream GetObjectStream) error { + return s.svc.Get(req, &getStreamResponser{ + ServerStream: stream, + respWriter: s.respSvc.HandleServerStreamRequest_(func(resp util.ResponseMessage) error { + return stream.Send(resp.(*object.GetResponse)) + }), + }) } func (s *putStreamResponser) Send(req *object.PutRequest) error { @@ -84,7 +68,7 @@ func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) { return r.(*object.PutResponse), nil } -func (s *responseService) Put(ctx context.Context) (object.PutObjectStreamer, error) { +func (s *ResponseService) Put(ctx context.Context) (object.PutObjectStreamer, error) { stream, err := s.svc.Put(ctx) if err != nil { return nil, errors.Wrap(err, "could not create Put object streamer") @@ -102,7 +86,7 @@ func (s *responseService) Put(ctx context.Context) (object.PutObjectStreamer, er }, nil } -func (s *responseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { +func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { resp, err := s.respSvc.HandleUnaryRequest(ctx, req, func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.Head(ctx, req.(*object.HeadRequest)) @@ -124,7 +108,7 @@ func (s *searchStreamResponser) Recv() (*object.SearchResponse, error) { return r.(*object.SearchResponse), nil } -func (s *responseService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) { +func (s *ResponseService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) { stream, err := s.respSvc.HandleServerStreamRequest(ctx, req, func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) { stream, err := s.svc.Search(ctx, req.(*object.SearchRequest)) @@ -146,7 +130,7 @@ func (s *responseService) Search(ctx context.Context, req *object.SearchRequest) }, nil } -func (s *responseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { +func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { resp, err := s.respSvc.HandleUnaryRequest(ctx, req, func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.Delete(ctx, req.(*object.DeleteRequest)) @@ -168,7 +152,7 @@ func (s *getRangeStreamResponser) Recv() (*object.GetRangeResponse, error) { return r.(*object.GetRangeResponse), nil } -func (s *responseService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { +func (s *ResponseService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { stream, err := s.respSvc.HandleServerStreamRequest(ctx, req, func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) { stream, err := s.svc.GetRange(ctx, req.(*object.GetRangeRequest)) @@ -190,7 +174,7 @@ func (s *responseService) GetRange(ctx context.Context, req *object.GetRangeRequ }, nil } -func (s *responseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { +func (s *ResponseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { resp, err := s.respSvc.HandleUnaryRequest(ctx, req, func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest)) diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go new file mode 100644 index 000000000..46ddd7e48 --- /dev/null +++ b/pkg/services/object/server.go @@ -0,0 +1,26 @@ +package object + +import ( + "context" + + "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-node/pkg/services/util" +) + +// GetObjectStream is an interface of NeoFS API v2 compatible object streamer. +type GetObjectStream interface { + util.ServerStream + Send(*object.GetResponse) error +} + +// ServiceServer is an interface of utility +// serving v2 Object service. +type ServiceServer interface { + Get(*object.GetRequest, GetObjectStream) error + Put(context.Context) (object.PutObjectStreamer, error) + Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error) + Search(context.Context, *object.SearchRequest) (object.SearchObjectStreamer, error) + Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error) + GetRange(context.Context, *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) + GetRangeHash(context.Context, *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) +} diff --git a/pkg/services/object/sign.go b/pkg/services/object/sign.go index 908d3c031..a8e1613ba 100644 --- a/pkg/services/object/sign.go +++ b/pkg/services/object/sign.go @@ -9,12 +9,12 @@ import ( "github.com/pkg/errors" ) -type signService struct { +type SignService struct { key *ecdsa.PrivateKey sigSvc *util.SignService - svc object.Service + svc ServiceServer } type searchStreamSigner struct { @@ -22,7 +22,9 @@ type searchStreamSigner struct { } type getStreamSigner struct { - stream *util.ResponseMessageStreamer + util.ServerStream + + respWriter util.ResponseMessageWriter } type putStreamSigner struct { @@ -33,43 +35,32 @@ type getRangeStreamSigner struct { stream *util.ResponseMessageStreamer } -func NewSignService(key *ecdsa.PrivateKey, svc object.Service) object.Service { - return &signService{ +func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService { + return &SignService{ key: key, sigSvc: util.NewUnarySignService(key), svc: svc, } } -func (s *getStreamSigner) Recv() (*object.GetResponse, error) { - r, err := s.stream.Recv() - if err != nil { - return nil, errors.Wrap(err, "could not receive response") - } - - return r.(*object.GetResponse), nil +func (s *getStreamSigner) Send(resp *object.GetResponse) error { + return s.respWriter(resp) } -func (s *signService) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) { - stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req, - func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) { - stream, err := s.svc.Get(ctx, req.(*object.GetRequest)) - if err != nil { - return nil, err - } - - return func() (util.ResponseMessage, error) { - return stream.Recv() - }, nil +func (s *SignService) Get(req *object.GetRequest, stream GetObjectStream) error { + respWriter, err := s.sigSvc.HandleServerStreamRequest_(req, + func(resp util.ResponseMessage) error { + return stream.Send(resp.(*object.GetResponse)) }, ) if err != nil { - return nil, err + return err } - return &getStreamSigner{ - stream: stream, - }, nil + return s.svc.Get(req, &getStreamSigner{ + ServerStream: stream, + respWriter: respWriter, + }) } func (s *putStreamSigner) Send(req *object.PutRequest) error { @@ -85,7 +76,7 @@ func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) { return r.(*object.PutResponse), nil } -func (s *signService) Put(ctx context.Context) (object.PutObjectStreamer, error) { +func (s *SignService) Put(ctx context.Context) (object.PutObjectStreamer, error) { stream, err := s.svc.Put(ctx) if err != nil { return nil, errors.Wrap(err, "could not create Put object streamer") @@ -103,7 +94,7 @@ func (s *signService) Put(ctx context.Context) (object.PutObjectStreamer, error) }, nil } -func (s *signService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { +func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req, func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.Head(ctx, req.(*object.HeadRequest)) @@ -125,7 +116,7 @@ func (s *searchStreamSigner) Recv() (*object.SearchResponse, error) { return r.(*object.SearchResponse), nil } -func (s *signService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) { +func (s *SignService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) { stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req, func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) { stream, err := s.svc.Search(ctx, req.(*object.SearchRequest)) @@ -147,7 +138,7 @@ func (s *signService) Search(ctx context.Context, req *object.SearchRequest) (ob }, nil } -func (s *signService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { +func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req, func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.Delete(ctx, req.(*object.DeleteRequest)) @@ -169,7 +160,7 @@ func (s *getRangeStreamSigner) Recv() (*object.GetRangeResponse, error) { return r.(*object.GetRangeResponse), nil } -func (s *signService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { +func (s *SignService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req, func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) { stream, err := s.svc.GetRange(ctx, req.(*object.GetRangeRequest)) @@ -191,7 +182,7 @@ func (s *signService) GetRange(ctx context.Context, req *object.GetRangeRequest) }, nil } -func (s *signService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { +func (s *SignService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { resp, err := s.sigSvc.HandleUnaryRequest(ctx, req, func(ctx context.Context, req interface{}) (util.ResponseMessage, error) { return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest)) diff --git a/pkg/services/object/transport_splitter.go b/pkg/services/object/transport_splitter.go index b3713f5d0..f1c910ee0 100644 --- a/pkg/services/object/transport_splitter.go +++ b/pkg/services/object/transport_splitter.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-api-go/v2/refs" + "github.com/nspcc-dev/neofs-node/pkg/services/util" "github.com/pkg/errors" ) @@ -15,16 +16,17 @@ var ( type ( TransportSplitter struct { - next object.Service + next ServiceServer chunkSize uint64 addrAmount uint64 } - getStreamBasicChecker struct { - next object.GetObjectStreamer - buf *bytes.Buffer - resp *object.GetResponse + getStreamMsgSizeCtrl struct { + util.ServerStream + + stream GetObjectStream + chunkSize int } @@ -43,7 +45,37 @@ type ( } ) -func NewTransportSplitter(size, amount uint64, next object.Service) *TransportSplitter { +func (s *getStreamMsgSizeCtrl) Send(resp *object.GetResponse) error { + body := resp.GetBody() + + part := body.GetObjectPart() + + chunkPart, ok := part.(*object.GetObjectPartChunk) + if !ok { + return s.stream.Send(resp) + } + + var newResp *object.GetResponse + + for buf := bytes.NewBuffer(chunkPart.GetChunk()); buf.Len() > 0; { + if newResp == nil { + newResp = new(object.GetResponse) + newResp.SetBody(body) + } + + chunkPart.SetChunk(buf.Next(s.chunkSize)) + newResp.SetMetaHeader(resp.GetMetaHeader()) + newResp.SetVerificationHeader(resp.GetVerificationHeader()) + + if err := s.stream.Send(newResp); err != nil { + return err + } + } + + return nil +} + +func NewTransportSplitter(size, amount uint64, next ServiceServer) *TransportSplitter { return &TransportSplitter{ next: next, chunkSize: size, @@ -51,13 +83,12 @@ func NewTransportSplitter(size, amount uint64, next object.Service) *TransportSp } } -func (c TransportSplitter) Get(ctx context.Context, request *object.GetRequest) (object.GetObjectStreamer, error) { - stream, err := c.next.Get(ctx, request) - - return &getStreamBasicChecker{ - next: stream, - chunkSize: int(c.chunkSize), - }, err +func (c *TransportSplitter) Get(req *object.GetRequest, stream GetObjectStream) error { + return c.next.Get(req, &getStreamMsgSizeCtrl{ + ServerStream: stream, + stream: stream, + chunkSize: int(c.chunkSize), + }) } func (c TransportSplitter) Put(ctx context.Context) (object.PutObjectStreamer, error) { @@ -94,40 +125,6 @@ func (c TransportSplitter) GetRangeHash(ctx context.Context, request *object.Get return c.next.GetRangeHash(ctx, request) } -func (g *getStreamBasicChecker) Recv() (*object.GetResponse, error) { - if g.resp == nil { - resp, err := g.next.Recv() - if err != nil { - return resp, err - } - - if part, ok := resp.GetBody().GetObjectPart().(*object.GetObjectPartChunk); !ok { - return resp, err - } else { - g.resp = resp - g.buf = bytes.NewBuffer(part.GetChunk()) - } - } - - chunkBody := new(object.GetObjectPartChunk) - chunkBody.SetChunk(g.buf.Next(g.chunkSize)) - - body := new(object.GetResponseBody) - body.SetObjectPart(chunkBody) - - resp := new(object.GetResponse) - resp.SetVerificationHeader(g.resp.GetVerificationHeader()) - resp.SetMetaHeader(g.resp.GetMetaHeader()) - resp.SetBody(body) - - if g.buf.Len() == 0 { - g.buf = nil - g.resp = nil - } - - return resp, nil -} - func (r *rangeStreamBasicChecker) Recv() (*object.GetRangeResponse, error) { if r.resp == nil { resp, err := r.next.Recv() diff --git a/pkg/services/object/util/key.go b/pkg/services/object/util/key.go index 862923e24..98826827a 100644 --- a/pkg/services/object/util/key.go +++ b/pkg/services/object/util/key.go @@ -5,7 +5,6 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/token" "github.com/nspcc-dev/neofs-node/pkg/services/session/storage" - "github.com/pkg/errors" ) // KeyStorage represents private key storage of the local node. @@ -30,11 +29,9 @@ func NewKeyStorage(localKey *ecdsa.PrivateKey, tokenStore *storage.TokenStore) * func (s *KeyStorage) GetKey(token *token.SessionToken) (*ecdsa.PrivateKey, error) { if token != nil { pToken := s.tokenStore.Get(token.OwnerID(), token.ID()) - if pToken == nil { - return nil, errors.Wrapf(storage.ErrNotFound, "(%T) could not get session key", s) + if pToken != nil { + return pToken.SessionKey(), nil } - - return pToken.SessionKey(), nil } return s.key, nil diff --git a/pkg/services/object/util/local.go b/pkg/services/object/util/local.go deleted file mode 100644 index 2f8f981ee..000000000 --- a/pkg/services/object/util/local.go +++ /dev/null @@ -1,45 +0,0 @@ -package util - -import ( - "github.com/nspcc-dev/neofs-api-go/pkg/netmap" - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/network" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" - "github.com/pkg/errors" -) - -type localPlacement struct { - builder placement.Builder - - localAddrSrc network.LocalAddressSource -} - -func NewLocalPlacement(b placement.Builder, s network.LocalAddressSource) placement.Builder { - return &localPlacement{ - builder: b, - localAddrSrc: s, - } -} - -func (p *localPlacement) BuildPlacement(addr *object.Address, policy *netmap.PlacementPolicy) ([]netmap.Nodes, error) { - vs, err := p.builder.BuildPlacement(addr, policy) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not build object placement", p) - } - - for i := range vs { - for j := range vs[i] { - addr, err := network.AddressFromString(vs[i][j].Address()) - if err != nil { - // TODO: log error - continue - } - - if network.IsLocalAddress(p.localAddrSrc, addr) { - return []netmap.Nodes{{vs[i][j]}}, nil - } - } - } - - return nil, errors.Errorf("(%T) local node is outside of object placement", p) -} diff --git a/pkg/services/object/util/placement.go b/pkg/services/object/util/placement.go new file mode 100644 index 000000000..33be0872a --- /dev/null +++ b/pkg/services/object/util/placement.go @@ -0,0 +1,155 @@ +package util + +import ( + netmapSDK "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/pkg/errors" +) + +type localPlacement struct { + builder placement.Builder + + localAddrSrc network.LocalAddressSource +} + +type remotePlacement struct { + builder placement.Builder + + localAddrSrc network.LocalAddressSource +} + +// TraverserGenerator represents tool that generates +// container traverser for the particular need. +type TraverserGenerator struct { + netMapSrc netmap.Source + + cnrSrc container.Source + + localAddrSrc network.LocalAddressSource + + customOpts []placement.Option +} + +func NewLocalPlacement(b placement.Builder, s network.LocalAddressSource) placement.Builder { + return &localPlacement{ + builder: b, + localAddrSrc: s, + } +} + +func (p *localPlacement) BuildPlacement(addr *object.Address, policy *netmapSDK.PlacementPolicy) ([]netmapSDK.Nodes, error) { + vs, err := p.builder.BuildPlacement(addr, policy) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not build object placement", p) + } + + for i := range vs { + for j := range vs[i] { + addr, err := network.AddressFromString(vs[i][j].Address()) + if err != nil { + // TODO: log error + continue + } + + if network.IsLocalAddress(p.localAddrSrc, addr) { + return []netmapSDK.Nodes{{vs[i][j]}}, nil + } + } + } + + return nil, errors.Errorf("(%T) local node is outside of object placement", p) +} + +// NewRemotePlacementBuilder creates, initializes and returns placement builder that +// excludes local node from any placement vector. +func NewRemotePlacementBuilder(b placement.Builder, s network.LocalAddressSource) placement.Builder { + return &remotePlacement{ + builder: b, + localAddrSrc: s, + } +} + +func (p *remotePlacement) BuildPlacement(addr *object.Address, policy *netmapSDK.PlacementPolicy) ([]netmapSDK.Nodes, error) { + vs, err := p.builder.BuildPlacement(addr, policy) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not build object placement", p) + } + + for i := range vs { + for j := 0; j < len(vs[i]); j++ { + addr, err := network.AddressFromString(vs[i][j].Address()) + if err != nil { + // TODO: log error + continue + } + + if network.IsLocalAddress(p.localAddrSrc, addr) { + vs[i] = append(vs[i][:j], vs[i][j+1:]...) + j-- + } + } + } + + return vs, nil +} + +// NewTraverserGenerator creates, initializes and returns new TraverserGenerator instance. +func NewTraverserGenerator(nmSrc netmap.Source, cnrSrc container.Source, localAddrSrc network.LocalAddressSource) *TraverserGenerator { + return &TraverserGenerator{ + netMapSrc: nmSrc, + cnrSrc: cnrSrc, + localAddrSrc: localAddrSrc, + } +} + +// WithTraverseOptions returns TraverseGenerator that additionally applies provided options. +func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *TraverserGenerator { + return &TraverserGenerator{ + netMapSrc: g.netMapSrc, + cnrSrc: g.cnrSrc, + localAddrSrc: g.localAddrSrc, + customOpts: opts, + } +} + +// GenerateTraverser generates placement Traverser for provided object address. +func (g *TraverserGenerator) GenerateTraverser(addr *object.Address) (*placement.Traverser, error) { + // get latest network map + nm, err := netmap.GetLatestNetworkMap(g.netMapSrc) + if err != nil { + return nil, errors.Wrapf(err, "could not get latest network map") + } + + // get container related container + cnr, err := g.cnrSrc.Get(addr.ContainerID()) + if err != nil { + return nil, errors.Wrap(err, "could not get container") + } + + // allocate placement traverser options + traverseOpts := make([]placement.Option, 0, 3+len(g.customOpts)) + traverseOpts = append(traverseOpts, g.customOpts...) + + // create builder of the remote nodes from network map + builder := NewRemotePlacementBuilder( + placement.NewNetworkMapBuilder(nm), + g.localAddrSrc, + ) + + traverseOpts = append(traverseOpts, + // set processing container + placement.ForContainer(cnr), + + // set identifier of the processing object + placement.ForObject(addr.ObjectID()), + + // set placement builder + placement.UseBuilder(builder), + ) + + return placement.NewTraverser(traverseOpts...) +} diff --git a/pkg/services/util/response/server_stream.go b/pkg/services/util/response/server_stream.go index fbc1a9615..c903507a1 100644 --- a/pkg/services/util/response/server_stream.go +++ b/pkg/services/util/response/server_stream.go @@ -40,3 +40,11 @@ func (s *Service) HandleServerStreamRequest(ctx context.Context, req interface{} recv: msgRdr, }, nil } + +func (s *Service) HandleServerStreamRequest_(respWriter util.ResponseMessageWriter) util.ResponseMessageWriter { + return func(resp util.ResponseMessage) error { + setMeta(resp, s.cfg) + + return respWriter(resp) + } +} diff --git a/pkg/services/util/server.go b/pkg/services/util/server.go new file mode 100644 index 000000000..83ab323f7 --- /dev/null +++ b/pkg/services/util/server.go @@ -0,0 +1,10 @@ +package util + +import ( + "context" +) + +// ServerStream is an interface of server-side stream v2. +type ServerStream interface { + Context() context.Context +} diff --git a/pkg/services/util/sign.go b/pkg/services/util/sign.go index 83ac05944..2f0550072 100644 --- a/pkg/services/util/sign.go +++ b/pkg/services/util/sign.go @@ -21,6 +21,10 @@ type SignService struct { key *ecdsa.PrivateKey } +type ResponseMessageWriter func(ResponseMessage) error + +type ServerStreamHandler_ func(interface{}, ResponseMessageWriter) (ResponseMessageWriter, error) + type ServerStreamHandler func(context.Context, interface{}) (ResponseMessageReader, error) type ResponseMessageReader func() (ResponseMessage, error) @@ -109,6 +113,21 @@ func (s *SignService) HandleServerStreamRequest(ctx context.Context, req interfa }, nil } +func (s *SignService) HandleServerStreamRequest_(req interface{}, respWriter ResponseMessageWriter) (ResponseMessageWriter, error) { + // verify request signatures + if err := signature.VerifyServiceMessage(req); err != nil { + return nil, errors.Wrap(err, "could not verify request") + } + + return func(resp ResponseMessage) error { + if err := signature.SignServiceMessage(s.key, resp); err != nil { + return errors.Wrap(err, "could not sign response message") + } + + return respWriter(resp) + }, nil +} + func (s *SignService) HandleUnaryRequest(ctx context.Context, req interface{}, handler UnaryHandler) (ResponseMessage, error) { // verify request signatures if err := signature.VerifyServiceMessage(req); err != nil {